image for Hvorfor PySpark MERGE ikke håndterer SCD2 (og hvordan fikse dette i Delta Lake)

Hvorfor PySpark MERGE ikke håndterer SCD2 (og hvordan fikse dette i Delta Lake)

Når man jobber med data som endrer seg over tid, er det helt naturlig å ville beholde historikken. Dette er kjernen i det som kalles Slowly Changing Dimensions type 2 (SCD2), som er en måte å sørge for at vi alltid kan se hvordan en rad så ut før, samtidig som den oppdateres med ny informasjon.

I Delta Lake og PySpark skulle man jo tro at dette var et løst problem. Det virker så grunnleggende at man nesten forventer en ferdig funksjon for det, for eksempel:

“Bare kall merge_scd2() og Spark fikser resten.”

Men nei.

Det viser seg at PySpark kun støtter én handling per match i en MERGE. Du kan enten oppdatere en rad, eller sette inn en ny, men ikke begge deler for samme match. Og det er akkurat det man trenger i en klassisk SCD2-operasjon.

Problemet

I en SCD2-tabell ønsker man å lukke den gamle raden (ved å sette valid_to til en dato), og samtidig legge inn en ny versjon av raden med oppdaterte verdier og valid_to = NULL.

Med DeltaTable.merge() i PySpark burde dette vært enkelt. Helst noe sånn som dette:

(
deltaTable.alias("t")
.merge(source.alias("s"), "t.id = s.id")
.whenMatchedUpdate(set={"valid_to": current_timestamp()})
.whenMatchedInsert(values={
"id": "s.id",
"value": "s.value",
"valid_from": current_timestamp(),
"valid_to": None
})
.whenNotMatchedInsertAll()
.execute()
)

... men dette går dessverre ikke. PySpark tillater nemlig kun én whenMatched-handling per merge, så du kan ikke både oppdatere og sette inn for samme rad. Prøver du, får du en feilmelding. Du kan heller ikke kjøre to merge-operasjoner etter hverandre uten å risikere datainkonsistens, siden de ikke garanteres å kjøre som én transaksjon.

Etter å ha prøvd meg fram med PySpark-API-et og noen kreative workarounds, landet jeg på en løsning som er både robust og gjenbrukbar: en to-trinnsmetode som bruker SQL under panseret.

Løsningen

Jeg skrev en metode som først lukker eksisterende rader der data har endret seg, og deretter setter inn nye versjoner. Metoden kan brukes mot en hvilken som helst tabell ved å angi nøkler, kolonnenavn og schema.

def upsert_scd2_table(
spark_session: SparkSession,
df_source: DataFrame,
composite_keys: list[str],
comparable_column_name: str,
dynamic_column_names: list[str],
valid_from_column: str,
valid_to_column: str,
target_catalog: str,
target_schema: str,
target_table: str,
schema: StructType,
) -> None:
"""Upserts data into a Slowly Changing Dimension (SCD) Type 2 table based on composite keys and a comparable column.
Parameters:
spark_session (SparkSession): The Spark session for accessing the Delta table.
df_source (DataFrame): The input DataFrame containing the data to be upserted.
composite_keys (list[str]): A list of column names to be used as composite keys for matching records.
comparable_column_name (str): The name of the column used to determine which records are newer.
dynamic_column_names(list[str]): A list of column names to be used for determining changes in the record.
valid_to_column (str): The name of the column representing the start of the validity period for a record.
valid_from_column (str): The name of the column representing the end of the validity period for a record.
target_catalog (str): The name of the target catalog to be created.
target_schema (str): The name of the target schema to be created.
target_table (str): The name of the target table to be created.
"""
target = f"{target_catalog}.{target_schema}.{target_table}"
match_condition = " AND ".join(
[f"target.{key} <=> source.{key}" for key in composite_keys]
)
change_condition = " OR ".join(
[f"target.{col} <> source.{col}" for col in dynamic_column_names]
)
dynamic_columns = [
col for col in schema.fieldNames()
if col not in [valid_from_column, valid_to_column]
]
columns = ", ".join(dynamic_columns)
column_values = ", ".join([f"source.{col}" for col in dynamic_columns])
df_source.createOrReplaceTempView("source_view")
spark_session.sql(f"""
MERGE INTO {target} AS target
USING source_view AS source
ON {match_condition}
WHEN MATCHED AND target.{valid_to_column} IS NULL AND ({change_condition}) THEN
UPDATE SET target.{valid_to_column} = source.{comparable_column_name}
WHEN NOT MATCHED THEN
INSERT ({columns}, {valid_from_column}, {valid_to_column})
VALUES ({column_values}, TO_DATE('1900-01-01'), NULL)
""")
spark_session.sql(f"""
INSERT INTO {target}({columns}, {valid_from_column}, {valid_to_column})
SELECT {column_values}, source.{comparable_column_name}, NULL
FROM source_view AS source
JOIN {target} AS target
ON {match_condition}
WHERE target.{valid_to_column} = source.{comparable_column_name} AND ({change_condition})
""")

Denne løsningen fungerer fordi begge trinnene kjøres som én transaksjon i Delta Lake. Delta håndterer committen som én helhet, slik at du slipper risiko for halvveis oppdateringer. Resultatet er en enkel, forutsigbar og fleksibel implementasjon av SCD2 uten å måtte skrive skreddersydd SQL for hver tabell. Bare å kopiere uten skam dersom dette er noe du også trenger!

Denne metoden har fungert stabilt i produksjon, og gir meg kontroll over hele SCD2-flyten uten å ofre lesbarhet. Noen ganger må man bare preppe sine egne løyper.

Ønsker du å utveksle erfaringer?

Vi digger å dele erfaringer og lære nye triks! Hvis du har ideer, spørsmål eller bare vil slå av en prat, kast oss gjerne en snøball på hei@snokam.no.