Implementing Nested JSON/GeoJSON Flattening in Standardized ETL Pipelines Jump to heading

Geospatial data ingestion routinely encounters deeply nested payloads from municipal APIs, federal open-data portals, and third-party survey platforms. Normalizing these hierarchical structures into flat, relational, or columnar formats is a mandatory prerequisite for spatial indexing, downstream analysis, and compliance auditing. Nested JSON/GeoJSON Flattening operates as a discrete, high-precision transformation stage within broader Automated Attribute Transformation & ETL Workflows, requiring deterministic mapping logic, strict schema enforcement, and predictable fallback routing. This guide details the production implementation of a flattening module, emphasizing configuration-as-code, compliance alignment, and resource-efficient execution.

Configuration-Driven Architecture & Mapping Manifests Jump to heading

Hardcoded parsing routines introduce maintenance debt and schema drift vulnerabilities. The flattening stage must be driven by version-controlled YAML manifests that externalize all transformation logic. Each mapping rule defines a source dot-notation path (relative to properties), a target column identifier, a delimiter convention, an expected data type, and explicit mandatory/optional status.

yaml
# mapping_manifest.yaml
schema_version: "1.0"
delimiter: "__"
rules:
  - source_path: "admin.zoning.code"
    target_column: "zoning_code"
    type: "string"
    mandatory: true
    fallback: null
  - source_path: "survey.responses"
    target_column: "survey_responses"
    type: "json_string"
    mandatory: false
    fallback: "[]"
    array_policy: "serialize"
  - source_path: "metadata.last_updated"
    target_column: "record_timestamp"
    type: "iso8601"
    mandatory: true
    fallback: "1970-01-01T00:00:00Z"

Note that source_path is relative to the properties object — do not include properties. as a prefix. Pipeline initialization must validate the manifest against a central JSON Schema before execution. Invalid path definitions, circular references, or conflicting target names trigger immediate pipeline halts with structured diagnostic output. This declarative architecture aligns directly with established Field Renaming & Type Coercion Rules, ensuring that attribute transformations remain auditable, reproducible, and environment-agnostic.

Deterministic Flattening Logic & GeoJSON Preservation Jump to heading

GeoJSON introduces rigid structural constraints that generic JSON flattening routines frequently violate. Per RFC 7946, top-level reserved keys (type, bbox, geometry, id) must remain intact. The implementation must isolate these keys and restrict recursive traversal exclusively to the properties dictionary. Note that RFC 7946 deprecated the top-level crs property; treat any crs key as informational only and do not recurse into it.

python
# geojson_flattener.py
import json
from typing import Any, Dict, List

RESERVED_KEYS = {"type", "bbox", "geometry", "id", "crs"}

def flatten_properties(properties: Dict[str, Any], delimiter: str = "__") -> Dict[str, Any]:
    """Flatten a nested properties dict using an iterative stack (avoids recursion limits)."""
    flat: Dict[str, Any] = {}
    stack = [(properties, "")]

    while stack:
        current, prefix = stack.pop()
        if isinstance(current, dict):
            for key, val in current.items():
                new_key = f"{prefix}{delimiter}{key}" if prefix else key
                if isinstance(val, (dict, list)):
                    stack.append((val, new_key))
                else:
                    flat[new_key] = val
        # Lists are stored as-is; callers handle array_policy

    return flat

def flatten_geojson(
    feature: Dict[str, Any],
    rules: List[Dict[str, Any]],
    delimiter: str = "__",
) -> Dict[str, Any]:
    """Flatten a single GeoJSON feature according to manifest rules.

    source_path in each rule is dot-notation relative to 'properties';
    after flattening, dots become the configured delimiter.
    """
    output = {k: v for k, v in feature.items() if k in RESERVED_KEYS}
    properties = feature.get("properties") or {}
    flat_props = flatten_properties(properties, delimiter)

    # Apply manifest rules
    for rule in rules:
        target = rule["target_column"]
        # Convert dot-notation path to flattened key
        flat_key = rule["source_path"].replace(".", delimiter)
        value = flat_props.get(flat_key)
        fallback = rule.get("fallback")
        mandatory = rule.get("mandatory", False)

        if value is None:
            if mandatory:
                raise ValueError(f"Mandatory field missing: {rule['source_path']}")
            output[target] = fallback
            continue

        if rule.get("array_policy") == "serialize" and isinstance(value, list):
            output[target] = json.dumps(value, separators=(",", ":"))
        else:
            output[target] = value

    return output

For complex municipal datasets containing nested administrative boundaries, multi-level zoning attributes, or survey arrays, deterministic traversal prevents structural corruption. Detailed strategies for handling irregular geometries and preserving spatial topology during flattening are covered in Flattening deeply nested GeoJSON feature collections safely.

Schema Enforcement & Compliance Routing Jump to heading

Strict compliance alignment requires explicit handling of mandatory versus optional fields at runtime. Mandatory fields trigger pipeline failures or quarantine routing when absent or malformed. Optional fields default to configurable fallback values without interrupting execution.

Field Attribute Behavior Compliance Action
mandatory: true Must resolve to non-null value Halt & quarantine on missing/invalid
mandatory: false May be absent or null Apply fallback or leave null
type: iso8601 Validates datetime format Reject non-conforming strings
type: json_string Serializes nested structures Compact serialization for archival

This routing model integrates seamlessly with Batch Schema Processing Pipelines, enabling parallel validation across feature collections while maintaining strict audit trails for regulatory reporting.

Error Handling & Resource Optimization Jump to heading

Production ETL systems must handle malformed payloads, network timeouts, and memory constraints gracefully. Implement exponential backoff for transient API failures and enforce strict memory boundaries when processing large feature collections.

python
# retry_handler.py
import time
from functools import wraps
from typing import Callable, Any, Tuple, Type

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    exceptions: Tuple[Type[Exception], ...] = (ConnectionError, TimeoutError),
) -> Callable:
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            delay = base_delay
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_retries - 1:
                        raise
                    time.sleep(delay)
                    delay *= 2
        return wrapper
    return decorator

For datasets exceeding available RAM, stream features incrementally using generator-based parsers and write flattened rows directly to Parquet or PostGIS via batch inserts. Use ijson for streaming JSON parsing to avoid materializing the full feature collection in memory.

CI/CD Validation Pipeline Jump to heading

Configuration manifests and flattening logic must be validated before deployment. A minimal GitHub Actions workflow ensures schema compliance, unit test coverage, and linting standards.

yaml
# .github/workflows/validate-flattener.yml
name: Validate Flattening Pipeline
on: [push, pull_request]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.10"
      - name: Install dependencies
        run: pip install pyyaml jsonschema pytest
      - name: Validate YAML Manifest
        run: |
          python -c "
          import yaml, jsonschema
          with open('mapping_manifest.yaml') as f:
              manifest = yaml.safe_load(f)
          schema = {
              'type': 'object',
              'required': ['schema_version', 'rules'],
              'properties': {
                  'rules': {'type': 'array', 'minItems': 1}
              }
          }
          jsonschema.validate(manifest, schema)
          print('Manifest valid.')
          "
      - name: Run Unit Tests
        run: pytest tests/ --tb=short

This pipeline enforces version control discipline, prevents schema drift, and guarantees that all transformation logic remains deterministic across staging and production environments.