02 — Ingest & Transform Data

Official Exam Weight: 30–35% 📁 ← Back to Home


🗺 Domain Overview

mindmap
  root((Ingest & Transform Data))
    2.1 Loading Patterns
      Full Load
      Incremental Load
      Dimensional Model Prep
      Streaming Load
    2.2 Batch Data
      Choose Data Store
      Choose Transform Tool
      OneLake Shortcuts
      Mirroring
      Pipeline Ingestion
      PySpark Transforms
      SQL Transforms
      KQL Transforms
      Denormalization
      Aggregation
      Duplicates & Missing Data
      Late-Arriving Data
    2.3 Streaming Data
      Streaming Engine Choice
      Native Tables vs Shortcuts
      Query Acceleration
      Eventstreams
      Spark Structured Streaming
      KQL Processing
      Windowing Functions

📥 2.1 Design & Implement Loading Patterns

Full Load vs Incremental Load

flowchart TD
    START([Loading Strategy]) --> Q1{Data volume\nand frequency?}
    Q1 -->|"Small dataset\nor first load"| FULL["📦 Full Load\n(Truncate & Reload)"]
    Q1 -->|"Large dataset\nfrequent updates"| INCR["📈 Incremental Load\n(Only changes)"]
    INCR --> Q2{Change detection\nmethod?}
    Q2 -->|"Timestamp column\n(modified_date)"| WATERMARK["🔖 Watermark / High-Water Mark\n(Track last loaded timestamp)"]
    Q2 -->|"CDC available\n(source supports it)"| CDC["🔄 Change Data Capture\n(Insert/Update/Delete tracking)"]
    Q2 -->|"No reliable\nchange column"| HASH["#️⃣ Hash Comparison\n(Compare row hashes)"]

Comparison Table

Aspect Full Load Incremental Load
Data transferred Entire dataset every run Only new/changed rows
Complexity Simple More complex (change detection)
Duration Longer (proportional to total size) Shorter (proportional to changes)
Target operation Truncate + insert Merge / upsert / append
Idempotent Yes (always same result) Requires careful design
Best for Small reference tables, initial loads Large fact tables, frequent updates

Exam Caveat ⚠️:

  • Watermark/high-water mark is the simplest incremental pattern — requires a reliable modified_date or version column in the source
  • MERGE (Delta Lake) is the preferred method for upserts in Fabric Lakehouses
  • For CDC from Azure SQL/SQL Server, use Fabric’s Mirroring feature

Preparing Data for a Dimensional Model

graph TD
    RAW["📥 Raw Data\n(Source systems)"] -->|Extract & Clean| STAGING["🧹 Staging / Bronze\n(Raw, as-is)"]
    STAGING -->|Transform & Conform| SILVER["🥈 Silver / Cleaned\n(Deduped, typed, validated)"]
    SILVER -->|Model & Aggregate| GOLD["🥇 Gold / Dimensional Model"]
    GOLD --> DIM["📊 Dimension Tables\n(Customers, Products, Dates)"]
    GOLD --> FACT["📈 Fact Tables\n(Sales, Transactions)"]
    GOLD --> SCD["🔄 Slowly Changing\nDimensions (SCD)"]

Slowly Changing Dimensions (SCD):

Type Name Behaviour Implementation
SCD Type 1 Overwrite Replace old value with new Simple UPDATE
SCD Type 2 History Keep full history with valid_from/valid_to Add new row, close old row
SCD Type 3 Previous value Keep current + previous value Add previous_value column

Exam Caveat ⚠️: SCD Type 2 is the most common exam scenario — know how to implement it with Delta Lake MERGE operations including valid_from, valid_to, and is_current columns.


Streaming Data Loading Patterns

flowchart TD
    SOURCE["📡 Streaming Source\n(Event Hubs, Kafka, custom app)"] --> ES["⚡ Eventstream\n(Route & transform)"]
    ES -->|Real-time analytics| EH["🏠 Eventhouse\n(KQL Database)"]
    ES -->|Store in lake| LH["🏠 Lakehouse\n(Delta tables)"]
    ES -->|Process with code| NB["📓 Notebook\n(Spark Structured Streaming)"]

🔄 2.2 Ingest & Transform Batch Data

Choosing an Appropriate Data Store

flowchart TD
    START([Where to store data?]) --> Q1{Primary query\nengine?}
    Q1 -->|"T-SQL (read + write)\nStored procs needed"| WH["🏢 Warehouse"]
    Q1 -->|"Spark + SQL\nFlexible, multi-engine"| LH["🏠 Lakehouse"]
    Q1 -->|"KQL\nReal-time / time-series"| EH["⚡ Eventhouse\n(KQL Database)"]
    LH --> Q2{Structured or\nunstructured?}
    Q2 -->|"Structured\n(tables)"| TABLES["📊 Managed Tables\n(Tables/ folder)"]
    Q2 -->|"Unstructured\n(files, images, JSON)"| FILES["📁 Unmanaged Files\n(Files/ folder)"]

Choosing the Right Transform Tool

Need Tool Language
No-code visual transforms, 150+ connectors Dataflow Gen2 M (Power Query)
Complex transforms on large data Notebook PySpark / Spark SQL
Real-time analytics transforms KQL Kusto Query Language
SQL-based transforms in Warehouse T-SQL T-SQL
Simple SQL on Lakehouse tables Notebook (%%sql) Spark SQL

OneLake Shortcuts

Shortcuts provide virtual access to data without copying.

flowchart TD
    LH["🏠 Lakehouse"] -->|"Internal shortcut"| OTHER_LH["🏠 Other Lakehouse\n(same or different workspace)"]
    LH -->|"External shortcut"| ADLS["☁️ ADLS Gen2"]
    LH -->|"External shortcut"| S3["☁️ Amazon S3"]
    LH -->|"External shortcut"| GCS["☁️ Google Cloud Storage"]
    LH -->|"External shortcut"| DV["📊 Dataverse"]
Shortcut Type Data Copied? Latency Security
Internal (OneLake → OneLake) No Low OneLake permissions
External (ADLS Gen2, S3, GCS) No Higher (cross-service) Source + OneLake permissions
Dataverse No Medium Dataverse + OneLake permissions

Exam Caveat ⚠️:

  • Shortcuts appear as regular folders in the Lakehouse — Spark and SQL can query them transparently
  • External shortcuts have higher latency than native OneLake data — consider this for performance-critical queries
  • Shortcut data is not included in OPTIMIZE or VACUUM operations — it’s managed at the source

Mirroring

Mirroring creates a near real-time replica of data from external sources into OneLake.

Source Support Latency
Azure SQL Database GA Near real-time
Azure Cosmos DB GA Near real-time
Snowflake GA Near real-time
Azure SQL Managed Instance GA Near real-time
Azure Database for PostgreSQL GA Near real-time
Azure Database for MySQL GA Near real-time
flowchart LR
    SOURCE["🗄️ Source Database\n(Azure SQL, Cosmos DB,\nSnowflake, etc.)"] -->|"CDC / Change Feed\n(automatic)"| MIRROR["🪞 Mirrored Database\n(OneLake, Delta format)"]
    MIRROR -->|"Query via"| SQL["T-SQL\n(SQL Analytics Endpoint)"]
    MIRROR -->|"Query via"| SPARK["Spark\n(Notebooks)"]

Exam Caveat ⚠️:

  • Mirroring vs Shortcuts: Mirroring copies data into OneLake (near real-time CDC), while shortcuts point to data without copying
  • Mirrored data is stored in Delta format and can be queried by Spark and SQL
  • Mirroring is one-way — changes in OneLake are NOT pushed back to the source

Ingest Data by Using Pipelines

Copy Activity is the primary pipeline activity for data ingestion.

Feature Description
Source connectors 90+ connectors (databases, files, SaaS apps)
Destination Lakehouse (Tables or Files), Warehouse, KQL Database
Format support Parquet, CSV, JSON, ORC, Avro, Delta
Parallelism Configurable degree of copy parallelism
Staging Optional staging via ADLS Gen2 for large copies

Transform Data by Using PySpark

Common PySpark Patterns

Read & write Delta tables:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Read from Lakehouse table
df = spark.read.format("delta").load("Tables/raw_sales")

# Transform
from pyspark.sql.functions import col, year, month, sum as _sum

df_clean = (df
    .filter(col("amount") > 0)
    .withColumn("year", year("order_date"))
    .withColumn("month", month("order_date"))
    .dropDuplicates(["order_id"])
)

# Write to Lakehouse table
df_clean.write.format("delta").mode("overwrite").saveAsTable("cleaned_sales")

Delta MERGE (upsert):

1
2
3
4
5
6
7
8
9
10
from delta.tables import DeltaTable

target = DeltaTable.forName(spark, "gold_customers")

(target.alias("t")
    .merge(df_new.alias("s"), "t.customer_id = s.customer_id")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

Partitioning:

1
2
3
4
5
# Write partitioned by year and month
df.write.format("delta") \
    .partitionBy("year", "month") \
    .mode("overwrite") \
    .saveAsTable("partitioned_sales")

Transform Data by Using SQL (T-SQL)

1
2
3
4
5
6
7
8
9
10
11
-- Create a transformed table in Warehouse
CREATE TABLE gold.monthly_sales AS
SELECT
    YEAR(order_date)  AS sale_year,
    MONTH(order_date) AS sale_month,
    region,
    SUM(amount)       AS total_amount,
    COUNT(*)          AS order_count
FROM staging.raw_sales
WHERE amount > 0
GROUP BY YEAR(order_date), MONTH(order_date), region;

CTAS (CREATE TABLE AS SELECT) is commonly used in Fabric Warehouses for large transforms.


Transform Data by Using KQL

// Aggregate events by hour in an Eventhouse
SensorReadings
| where Timestamp > ago(7d)
| summarize AvgTemp = avg(Temperature),
            MaxTemp = max(Temperature),
            ReadingCount = count()
  by bin(Timestamp, 1h), DeviceId
| order by Timestamp desc

Denormalize Data

Pattern Description Use Case
Flatten Join dimension into fact table Star schema → flat table for BI
Nest Store related data as struct/array Semi-structured data in Delta
Pre-aggregate Compute aggregates ahead of time Dashboard performance

Handle Duplicate, Missing, and Late-Arriving Data

flowchart TD
    DATA["📥 Incoming Data"] --> DUP{Duplicates?}
    DUP -->|Yes| DEDUP["dropDuplicates()\nor MERGE with\nDELETE + INSERT"]
    DUP -->|No| MISS{Missing values?}
    MISS -->|Yes| FILL["fillna() / COALESCE()\nDefault values\nDrop if critical"]
    MISS -->|No| LATE{Late-arriving?}
    LATE -->|Yes| MERGE_LATE["MERGE into existing\npartitions\nUpdate aggregates"]
    LATE -->|No| DONE["✅ Clean data"]

Exam Caveat ⚠️:

  • Late-arriving data in streaming scenarios is handled via watermarks in Spark Structured Streaming
  • In batch scenarios, use MERGE to upsert late-arriving records into the correct partition
  • Always design incremental loads to be idempotent — rerunning should produce the same result

⚡ 2.3 Ingest & Transform Streaming Data

Choosing an Appropriate Streaming Engine

flowchart TD
    START([Streaming Need]) --> Q1{Analytics style?}
    Q1 -->|"Real-time dashboards\ntime-series, logs"| KQL_ENGINE["⚡ Eventhouse + KQL\n(Best for exploration & alerts)"]
    Q1 -->|"Complex transforms\njoin with batch data"| SSS["📓 Spark Structured Streaming\n(Notebooks)"]
    Q1 -->|"Route events\nno-code transforms"| ES["🔄 Eventstream\n(Event routing & processing)"]

Comparison

Feature Eventstream Spark Structured Streaming KQL
Latency Seconds Seconds–minutes Seconds
Coding No-code / low-code PySpark KQL queries
Best for Event routing, simple transforms Complex joins, ML on streams Real-time analytics, alerts
Destination Eventhouse, Lakehouse, custom Lakehouse (Delta) KQL Database
Windowing Basic Full (tumbling, sliding, session) Full (tumbling, sliding, hopping)

Native Tables vs OneLake Shortcuts in Real-Time Intelligence

Feature Native KQL Table OneLake Shortcut
Data location Eventhouse (native storage) OneLake (external reference)
Ingestion Direct ingestion, lowest latency No ingestion — query in place
Performance Fastest KQL queries Slower (cross-service reads)
Use case Hot path — active real-time analytics Warm/cold path — historical lookups

Query acceleration for OneLake shortcuts:

Feature Standard Shortcut Query-Accelerated Shortcut
Caching None Automatic caching in Eventhouse
Performance Slower (reads from OneLake) Faster (reads from cache)
Freshness Real-time Near real-time (cache refresh interval)
Use case Infrequent queries on cold data Frequent queries on shortcut data

Exam Caveat ⚠️: Query acceleration creates a cached copy of shortcut data in the Eventhouse — it improves performance but adds CU consumption and slight data freshness delay.


Process Data by Using Eventstreams

flowchart LR
    SRC["📡 Sources\n• Azure Event Hubs\n• Custom App\n• Azure IoT Hub\n• Kafka"] --> ES["⚡ Eventstream\n(Route + Transform)"]
    ES -->|"Destination"| EH["🏠 Eventhouse"]
    ES -->|"Destination"| LH["🏠 Lakehouse"]
    ES -->|"Destination"| WH["🏢 Warehouse"]
    ES -->|"Destination"| CUSTOM["🔧 Custom Endpoint"]

Eventstream capabilities:

Feature Description
No-code transforms Filter, manage fields, group by, aggregate
Multiple destinations Route same stream to multiple targets
Schema registry Avro, JSON schema support
Error handling Dead-letter queue for failed events

Process Data by Using Spark Structured Streaming

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Read streaming data from Event Hubs via Eventstream
df_stream = (spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .table("bronze_events")
)

# Transform
from pyspark.sql.functions import window, avg, count

df_agg = (df_stream
    .withWatermark("event_time", "10 minutes")
    .groupBy(
        window("event_time", "5 minutes"),
        "device_id"
    )
    .agg(
        avg("temperature").alias("avg_temp"),
        count("*").alias("event_count")
    )
)

# Write to Delta table
(df_agg.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "Files/checkpoints/sensor_agg")
    .toTable("silver_sensor_aggregates")
)

Exam Caveat ⚠️:

  • Watermark is required for stateful aggregations on streaming data — it tells Spark how long to wait for late data
  • Checkpoint location is mandatory for fault-tolerant streaming — stores offset/state between runs
  • Use readChangeFeed on Delta tables to read changes as a stream (CDC on Delta)

Windowing Functions

Window Type Description Use Case
Tumbling Fixed-size, non-overlapping, contiguous “Count events per 5-minute block”
Sliding / Hopping Fixed-size, overlapping by slide interval “Average over last 10 min, updated every 2 min”
Session Dynamic size, based on activity gaps “Group user clicks until 30-min inactivity”
graph LR
    subgraph "Tumbling Window (5 min)"
        T1["[0:00–0:05)"] --> T2["[0:05–0:10)"] --> T3["[0:10–0:15)"]
    end
graph LR
    subgraph "Sliding Window (10 min, slide 5 min)"
        S1["[0:00–0:10)"]
        S2["[0:05–0:15)"]
        S3["[0:10–0:20)"]
    end

KQL windowing example:

// Tumbling window: events per 5-minute block
SensorReadings
| summarize EventCount = count(), AvgTemp = avg(Temperature)
  by bin(Timestamp, 5m), DeviceId

PySpark windowing example:

1
2
3
4
5
6
from pyspark.sql.functions import window

df_windowed = (df_stream
    .groupBy(window("event_time", "5 minutes"), "device_id")
    .count()
)

📊 Quick-Reference Scenario Table

Scenario Requirement Answer
Copy data from Azure SQL to Lakehouse Batch ingestion Pipeline Copy Activity
No-code transform CSV → Lakehouse table Visual ETL Dataflow Gen2
Complex PySpark join on 100M+ rows Code-first large-scale Notebook
Access ADLS Gen2 data without copying Virtual access OneLake Shortcut
Near real-time replica of Azure SQL Automatic CDC Mirroring
Process IoT events in real-time Stream routing Eventstream → Eventhouse
5-minute tumbling window aggregation Streaming analytics KQL bin() or Spark window()
Upsert changed rows into Delta table Incremental load Delta MERGE
Track late-arriving streaming events Late data handling Watermark in Spark Structured Streaming
Query external S3 data from Lakehouse Cross-cloud access OneLake Shortcut (S3)
SCD Type 2 on customer dimension History tracking Delta MERGE with valid_from/valid_to
Aggregate real-time sensor data Time-series analytics KQL in Eventhouse
Speed up queries on shortcut data in RTI Performance Query acceleration