VACUUM vs CDF in Microsoft Fabric: How to Protect Incremental Loads from Silent Data Loss
VACUUM keeps Delta tables healthy, but it can also break Change Data Feed replay windows. This walkthrough shows exactly when incremental pipelines fail and how to design a safe Fabric pattern with guardrails and fallback recovery.
Get future articles
Follow for practical Microsoft Fabric, Azure, Spark, and data engineering writeups.
If you are already using CDF-based incremental processing, VACUUM is the command that can quietly invalidate your assumptions.
The previous post covered the happy path: CDF enabled, commit-version watermarking, deterministic MERGE, and reliable incremental loads. This sequel covers the production reality: retention pressure, storage cleanup, and the exact moment your pipeline can no longer replay changes.
In plain terms:
- CDF gives you a change window.
- VACUUM shrinks historical data footprint.
- If your consumer lag exceeds your retained history window, incremental replay breaks.
This post walks through:
- What VACUUM actually removes (and what it does not).
- How that removal impacts CDF readers.
- A step-by-step failure timeline.
- A safe incremental design for Microsoft Fabric that accounts for VACUUM.
- Recovery playbooks when you are already past the safe window.
#The Production Problem
Most teams discover this in the worst possible way:
- A Silver incremental job is paused for a few days (incident, deployment freeze, permission issue, or upstream outage).
- Bronze maintenance continues, including
VACUUM. - The job resumes and asks CDF for changes from an old watermark.
- The read fails because the historical files needed for that range are gone.
Symptoms are usually one of these:
- CDF read error for the requested
startingVersion. - Replay gap where expected updates/deletes do not appear.
- SLA miss because the only remaining option is snapshot rebuild.
Why naive fixes fail:
- Retrying does not restore vacuumed files.
- Advancing watermark manually skips required changes.
- Re-running from old version keeps failing until you rebuild state.
Operationally, this is not just a data engineering nuisance. It can produce stale facts, incorrect dimensional state, and audit exposure when deletes are not propagated.
#What VACUUM Actually Does in Delta
VACUUM removes old data files that are no longer needed by the active table state and are older than your retention threshold.
That matters because CDF replay depends on retained transactional history and files needed to materialize changes for the requested version range.
A typical command looks like this:
-- Keep 7 days of old files, remove older unreferenced files
VACUUM bronze.orders RETAIN 168 HOURS;
What teams get wrong:
- They treat CDF as permanent event storage. It is not.
- They set aggressive VACUUM retention without checking consumer lag.
- They assume daily pipelines can always catch up after multi-day downtime.
Think of CDF as a replay buffer, not an immutable event bus.
#Step-by-Step Timeline: How Incremental Replay Breaks
Use this simplified timeline:
- Day 0: Watermark for
bronze.ordersis version1200. - Day 1-3: Source emits updates and deletes. Bronze reaches version
1350. - Day 1-3: Silver job is down and processes nothing.
- Day 4: Maintenance job runs
VACUUM ... RETAIN 72 HOURS. - Day 5: Silver resumes and tries
startingVersion = 1201.
Result: fail.
Why: some files/log history required to reconstruct change events from older versions have already been removed by retention cleanup.
Your incremental contract just changed from:
- "Process all changes since last watermark"
to:
- "Process all changes since last watermark, but only if watermark is still within retained replay window."
That second clause is where most designs are weak.
#Architecture Pattern: VACUUM-Aware Incremental Processing
The safe design has three explicit controls:
- Consumer lag monitoring in commit-version and wall-clock time.
- Retention-aware precheck before CDF read.
- Fallback path when replay window is already lost.
Reference flow:
[Bronze Delta + CDF] --> [Precheck: watermark vs replay window] --> [Incremental CDF Read]
| | |
| | fail | success
| v v
| [Fallback Snapshot Reconcile] [MERGE to Silver]
| | |
+--------------------------> [Update Watermark + Alerting] <--------+
Design rationale:
- You do not run CDF read blindly.
- You do not let job logic decide policy only after failure.
- You codify when incremental is safe and when controlled rebuild is required.
#Implementation in Fabric: End-to-End Guardrails
#1. Control table for watermarks and policy
Keep the watermark and the policy together so orchestration and notebook logic share one source of truth.
CREATE TABLE IF NOT EXISTS control.cdf_pipeline_state (
source_table STRING,
target_table STRING,
last_commit_version BIGINT,
last_success_ts TIMESTAMP,
max_allowed_lag_hours INT,
fallback_mode STRING, -- e.g. SNAPSHOT_REBUILD, FAIL_FAST
updated_at TIMESTAMP
) USING DELTA;
Why this helps:
last_commit_versiontracks replay point.max_allowed_lag_hoursmakes risk tolerance explicit per pipeline.fallback_modeavoids ad-hoc, manual decisions during incidents.
#2. Precheck notebook cell before reading CDF
This is the gate that prevents undefined behavior.
from pyspark.sql import functions as F
source_table = "bronze.orders"
state_table = "control.cdf_pipeline_state"
state = (
spark.table(state_table)
.filter(F.col("source_table") == source_table)
.orderBy(F.col("updated_at").desc())
.limit(1)
)
row = state.first()
if row is None:
raise ValueError(f"Missing pipeline state for {source_table}")
last_commit_version = int(row["last_commit_version"]) if row["last_commit_version"] is not None else -1
max_allowed_lag_hours = int(row["max_allowed_lag_hours"])
fallback_mode = row["fallback_mode"]
# Commit version now: used to estimate replay distance and detect abnormal lag growth.
current_version = (
spark.sql(f"DESCRIBE HISTORY {source_table}")
.selectExpr("max(version) as v")
.first()["v"]
)
# Wall-clock lag to enforce an operational SLO independent of commit density.
lag_hours = (
spark.sql(f"DESCRIBE HISTORY {source_table}")
.selectExpr("max(timestamp) as latest_ts")
.withColumn("lag_hours", (F.unix_timestamp(F.current_timestamp()) - F.unix_timestamp("latest_ts")) / 3600)
.first()["lag_hours"]
)
if lag_hours is not None and lag_hours > max_allowed_lag_hours:
raise RuntimeError(
f"Pipeline lag {lag_hours:.2f}h exceeded threshold {max_allowed_lag_hours}h for {source_table}"
)
start_version = last_commit_version + 1
Note:
- This check does not magically guarantee replay availability, but it catches dangerous lag early and creates a policy boundary.
- You still need controlled exception handling around CDF read because retention violations are definitive at read time.
#3. CDF read with explicit failure classification
from pyspark.sql.utils import AnalysisException
def read_cdf_or_classify(source_table: str, start_version: int):
try:
return (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", start_version)
.table(source_table)
), None
except AnalysisException as ex:
message = str(ex)
# Delta throws an analysis/runtime error when requested versions are no longer available.
retention_signals = [
"change data feed",
"startingversion",
"no longer present",
"vacuum",
"retention"
]
if any(token in message.lower() for token in retention_signals):
return None, "CDF_WINDOW_LOST"
return None, "UNKNOWN_READ_FAILURE"
changes_df, failure_type = read_cdf_or_classify(source_table, start_version)
Why classify:
CDF_WINDOW_LOSTshould trigger deterministic fallback.- Unknown failures should fail fast and page the owner.
#4. Deterministic MERGE logic for updates/deletes
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.window import Window
target_table = "silver.orders"
pk = "order_id"
# Keep only latest event per key per commit to avoid duplicate mutations inside a micro-batch.
changes_latest = (
changes_df
.withColumn("rn", F.row_number().over(
Window.partitionBy(pk).orderBy(F.col("_commit_version").desc(), F.col("_commit_timestamp").desc())
))
.filter("rn = 1")
.drop("rn")
)
delta_tgt = DeltaTable.forName(spark, target_table)
(
delta_tgt.alias("t")
.merge(changes_latest.alias("s"), f"t.{pk} = s.{pk}")
.whenMatchedUpdate(
condition="s._change_type IN ('update_postimage', 'insert')",
set={
"order_status": "s.order_status",
"order_amount": "s.order_amount",
"updated_at": "s._commit_timestamp",
"_is_deleted": "false"
}
)
.whenMatchedUpdate(
condition="s._change_type = 'delete'",
set={
"_is_deleted": "true",
"updated_at": "s._commit_timestamp"
}
)
.whenNotMatchedInsert(
condition="s._change_type IN ('insert', 'update_postimage')",
values={
"order_id": "s.order_id",
"order_status": "s.order_status",
"order_amount": "s.order_amount",
"updated_at": "s._commit_timestamp",
"_is_deleted": "false"
}
)
.execute()
)
#5. Update watermark only after successful merge
MERGE INTO control.cdf_pipeline_state t
USING (
SELECT
'bronze.orders' AS source_table,
'silver.orders' AS target_table,
MAX(_commit_version) AS last_commit_version,
CURRENT_TIMESTAMP() AS last_success_ts,
48 AS max_allowed_lag_hours,
'SNAPSHOT_REBUILD' AS fallback_mode,
CURRENT_TIMESTAMP() AS updated_at
FROM tmp_changes_applied
) s
ON t.source_table = s.source_table AND t.target_table = s.target_table
WHEN MATCHED THEN UPDATE SET
t.last_commit_version = s.last_commit_version,
t.last_success_ts = s.last_success_ts,
t.max_allowed_lag_hours = s.max_allowed_lag_hours,
t.fallback_mode = s.fallback_mode,
t.updated_at = s.updated_at
WHEN NOT MATCHED THEN INSERT *;
#Safe Fallback When VACUUM Already Removed Replay History
When CDF_WINDOW_LOST is detected, do not improvise. Execute a defined rebuild path.
Recommended sequence:
- Stop downstream Gold loads that depend on this target.
- Rebuild target from current source snapshot (full reconcile).
- Stamp a new watermark at current table version.
- Restart incremental from next version.
- Emit incident event with root cause
WATERMARK_OUTSIDE_RETENTION.
Snapshot reconcile example:
# Fallback path: rebuild Silver table from latest Bronze snapshot.
snapshot_df = spark.table("bronze.orders")
# Optional: apply same conforming logic used in incremental path.
reconciled_df = (
snapshot_df
.withColumn("_is_deleted", F.lit(False))
.withColumn("updated_at", F.current_timestamp())
)
reconciled_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("silver.orders")
current_version = (
spark.sql("DESCRIBE HISTORY bronze.orders")
.selectExpr("max(version) as v")
.first()["v"]
)
spark.sql(f"""
MERGE INTO control.cdf_pipeline_state t
USING (
SELECT
'bronze.orders' AS source_table,
'silver.orders' AS target_table,
{current_version} AS last_commit_version,
CURRENT_TIMESTAMP() AS last_success_ts,
48 AS max_allowed_lag_hours,
'SNAPSHOT_REBUILD' AS fallback_mode,
CURRENT_TIMESTAMP() AS updated_at
) s
ON t.source_table = s.source_table AND t.target_table = s.target_table
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
This is expensive, but correct. Correctness beats partial replay when history is missing.
#Failure Modes You Should Expect
- Retention too aggressive for operational reality.
- Symptom: recurring CDF start-version errors after weekend outages.
- Root cause:
VACUUM RETAINwindow is shorter than actual worst-case pipeline downtime. - Mitigation: increase retention and enforce lag SLO alerts.
- High commit velocity with infrequent consumers.
- Symptom: consumer appears only slightly late in hours, but massively behind in commit distance.
- Root cause: bursty upstream writes compress practical replay window.
- Mitigation: monitor both time lag and version lag; run incremental more frequently.
- Concurrent pipeline ownership gaps.
- Symptom: platform team runs storage optimization while data product team owns consumers.
- Root cause: no shared retention contract or change-management gate.
- Mitigation: codify retention policy per domain and require pre-flight checks before changing VACUUM schedule.
#Performance and Cost Trade-offs
- Longer retention means higher storage cost, but lower probability of forced rebuilds.
- Shorter retention saves storage, but increases risk of expensive snapshot reconciles and SLA breaches.
- Frequent incremental runs reduce replay risk and lower per-run shuffle, often reducing total compute cost despite more job triggers.
A practical rule: storage is usually cheaper than emergency recomputation plus trust erosion. Set retention based on worst-case operational downtime, not best-case pipeline cadence.
#Practical Retention Sizing Formula
Use this baseline:
$$ \text{required_retention_hours} = \text{max_expected_downtime_hours} + \text{recovery_buffer_hours} $$
Example:
- Max outage you can realistically see: 36h
- Recovery buffer (deployment delays, queue backlog, approvals): 24h
- Minimum retention target: 60h
Round up to operationally friendly values (for example 72h or 168h) and revisit quarterly.
#Implementation Checklist
- Define
max_allowed_lag_hoursper incremental pipeline in control metadata. - Add precheck gate before every CDF read.
- Classify CDF read failures and route
CDF_WINDOW_LOSTto fallback. - Update watermark only after successful target merge.
- Add alerting on lag threshold breaches and fallback activation.
- Align VACUUM retention with real downtime + buffer, not optimistic schedules.
- Document ownership: who can change retention and who approves it.
If you already built CDF incremental pipelines, this is the missing reliability layer. CDF gives efficient change replay, but only retention-aware operations make that replay dependable in production. This is exactly the class of platform hardening DataDevOpsLab helps teams implement before the 3am pager proves the gap.
#Related Reading
Word count: ~1,930 words