JAlcocerTech E-books

Data Quality & Validation

For industries like healthcare and finance, data lineage and quality are essential. This chapter covers modern tools and practices for ensuring data quality throughout your pipelines.

The Modern Data Lakehouse Architecture

The medallion architecture with AI integration:

flowchart LR
    %% --- Styles ---
    classDef bronze fill:#EFEBE9,stroke:#8D6E63,stroke-width:2px,color:#3E2723;
    classDef silver fill:#ECEFF1,stroke:#78909C,stroke-width:2px,color:#263238;
    classDef gold fill:#FFFDE7,stroke:#FBC02D,stroke-width:2px,color:#F57F17;
    classDef ai fill:#F3E5F5,stroke:#8E24AA,stroke-width:2px,stroke-dasharray: 5 5,color:#4A148C;
    classDef source fill:#fff,stroke:#333,stroke-width:1px;

    %% --- Sources ---
    subgraph Sources [Data Sources]
        direction TB
        Logs[Logs / IoT]:::source
        DB[Databases]:::source
        APIs[External APIs]:::source
    end

    %% --- The Lakehouse (Medallion) ---
    subgraph Lakehouse [The Data Lakehouse]
        direction LR
        
        Bronze[("BRONZE<br/>(Raw Ingestion)<br/>As-is Dump")]:::bronze
        Silver[("SILVER<br/>(Refined)<br/>Cleaned & Enriched")]:::silver
        Gold[("GOLD<br/>(Curated)<br/>Business Aggregates")]:::gold
    end

    %% --- AI Integration ---
    subgraph AI_Lab [AI & Machine Learning]
        direction TB
        Training(Model Training):::ai
        Inference(AI Agents / RAG):::ai
        Predictions(Predictions / Tags):::ai
    end

    %% --- Consumers ---
    BI[BI Dashboards<br/>& Reports]:::source

    %% --- The Flow ---
    Sources --> Bronze
    Bronze -- "ETL / Cleaning" --> Silver
    Silver -- "Aggregation" --> Gold
    Gold --> BI

    %% --- Where AI Plugs In ---
    Silver -.->|"Feeds Data"| Training
    Gold -.->|"Context for RAG"| Inference
    Training --> Predictions
    Inference --> Predictions
    Predictions -.->|"Enrichment"| Silver
    Predictions -.->|"New Insights"| Gold

Key Layers:

  • Bronze: Raw data ingestion (as-is dump)
  • Silver: Cleaned and enriched data
  • Gold: Business-ready aggregates
  • AI Layer: Training, inference, and predictions

Data Management Concepts

Understanding the relationship between profiling, modeling, and validation:

graph LR
    A[Data Profiling: Understand As-Is] --> B(Data Modeling: Design To-Be);
    B --> C{Prepare Design Documentation};
    C --> D["Data Model (Entities, Attributes, Relationships)"];
    C --> E["Data Lineage (Sources, Transformations, Destinations)"];
    C --> F[Data Profiling Summary & Quality Rules];

Data Profiling

Purpose: Examining data to understand its characteristics and quality.

Focus:

  • Analyzing data distributions, frequencies, patterns
  • Identifying data quality issues
  • Understanding data relationships

Examples:

  • Percentage of missing values per column
  • Unique values and frequencies
  • Min/max/average for numerical columns
  • Common text patterns
  • Column correlations

When to Use:

  • During data exploration
  • Before data integration/migration
  • To assess data quality
  • When investigating anomalies
  • Understanding new data sources

Tools:

  • Pandas profiling
  • Great Expectations
  • Apache Griffin
  • Deequ (AWS)

Data Validation

Purpose: Ensuring data meets specific criteria or constraints.

Focus:

  • Verifying data conforms to expected formats
  • Enforcing data integrity
  • Preventing incorrect data entry

Examples:

  • Valid date formats
  • Age within reasonable range
  • Valid product IDs
  • Email address format
  • Required fields populated

When to Use:

  • During data entry
  • In data pipelines
  • Before database loading
  • API data reception

Tools:

  • Great Expectations
  • Pydantic
  • Cerberus
  • JSON Schema

Data Modeling

Purpose: Designing the structure and relationships of data.

Focus:

  • Entity-relationship design
  • Schema definition
  • Normalization/denormalization
  • Performance optimization

Key Concepts:

  • Entities and attributes
  • Primary/foreign keys
  • Relationships (1:1, 1:N, N:M)
  • Dimensional modeling (star/snowflake schemas)

Data Validation Tools

Pydantic

Pydantic is a data validation library using Python type annotations.

Key Features:

  • Type validation at runtime
  • Data serialization/deserialization
  • Settings management
  • Custom validation
  • Clear error messages

Use Cases:

  • API request/response validation
  • Configuration management
  • Data parsing

Example:

from pydantic import BaseModel, EmailStr, validator
from datetime import datetime

class User(BaseModel):
    id: int
    name: str
    email: EmailStr
    age: int
    created_at: datetime
    
    @validator('age')
    def age_must_be_positive(cls, v):
        if v < 0 or v > 150:
            raise ValueError('Age must be between 0 and 150')
        return v

# Valid data
user = User(
    id=1,
    name="John Doe",
    email="john@example.com",
    age=30,
    created_at=datetime.now()
)

# Invalid data raises ValidationError
try:
    invalid_user = User(
        id=2,
        name="Jane",
        email="not-an-email",
        age=200,
        created_at="not-a-date"
    )
except Exception as e:
    print(f"Validation error: {e}")

Installation:

pip install pydantic[email]

Great Expectations

Great Expectations is a data testing and validation framework.

Key Features:

  • Define expectations about data
  • Execute validation suites
  • Generate data documentation
  • Integration with data pipelines
  • Profiling capabilities

Use Cases:

  • Data quality monitoring
  • Pipeline testing
  • Data documentation
  • Regression testing

Example:

import great_expectations as gx

# Create a Data Context
context = gx.get_context()

# Connect to data
validator = context.sources.pandas_default.read_csv(
    "data.csv"
)

# Define expectations
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")

# Save expectation suite
validator.save_expectation_suite(discard_failed_expectations=False)

# Run validation
checkpoint_result = context.run_checkpoint(
    checkpoint_name="my_checkpoint",
    validations=[{"batch_request": validator.active_batch_request}]
)

# Check results
if checkpoint_result.success:
    print("All expectations passed!")
else:
    print("Some expectations failed.")

Installation:

pip install great-expectations

Common Expectations:

ExpectationPurpose
expect_column_to_existColumn exists in dataset
expect_column_values_to_not_be_nullNo null values
expect_column_values_to_be_uniqueAll values unique
expect_column_values_to_be_betweenValues in range
expect_column_values_to_be_in_setValues from allowed set
expect_column_mean_to_be_betweenMean in range
expect_table_row_count_to_be_betweenRow count in range

Pydantic vs Great Expectations

FeaturePydanticGreat Expectations
LevelApplicationData Pipeline
FocusData structuresData quality
Use CaseAPI validationPipeline testing
ScopePython objectsDataFrames/Databases
IntegrationCode-levelPipeline-level

When to Use Both:

  1. Pydantic: Validate API input
  2. Great Expectations: Validate processed data in pipeline
  3. Together: Comprehensive validation strategy

Apache Iceberg & Nessie

Modern data lakehouse tools for managing table formats and data versioning.

Apache Iceberg

What is Iceberg?

Apache Iceberg is an open table format for huge analytic datasets.

Key Features:

  • ACID transactions
  • Schema evolution
  • Hidden partitioning
  • Time travel
  • Snapshot isolation

Benefits:

  • Reliable data updates
  • Fast query performance
  • Efficient metadata handling
  • Multi-engine support (Spark, Flink, Trino)

Example Architecture:

flowchart LR
    %% Styles
    classDef storage fill:#cfd8dc,stroke:#37474F,stroke-width:2px;
    classDef format fill:#b2dfdb,stroke:#00695c,stroke-width:2px;
    
    S3[("S3 / MinIO<br/>(Object Storage)")]:::storage
    Iceberg("Apache Iceberg<br/>(Table Format / ACID)"):::format
    
    S3 --- Iceberg

Nessie

What is Nessie?

Nessie is a Git-like version control system for data lakes.

Key Features:

  • Branch and merge data
  • Time travel across tables
  • Catalog management
  • Multi-table transactions

Benefits:

  • Data versioning
  • Reproducible analytics
  • Safe experimentation
  • Rollback capabilities

Integration:

flowchart TB
    Nessie[Nessie Catalog]
    Iceberg[Iceberg Tables]
    S3[Object Storage]
    
    Nessie -->|Manages| Iceberg
    Iceberg -->|Stored in| S3

Complete Architecture

flowchart LR
    %% Styles
    classDef bronze fill:#EFEBE9,stroke:#8D6E63,stroke-width:3px;
    classDef silver fill:#ECEFF1,stroke:#78909C,stroke-width:3px;
    classDef gold fill:#FFFDE7,stroke:#FBC02D,stroke-width:3px;
    classDef storage fill:#cfd8dc,stroke:#37474F,stroke-width:2px;
    classDef format fill:#b2dfdb,stroke:#00695c,stroke-width:2px;
    classDef quality fill:#ffecb3,stroke:#ff6f00,stroke-width:2px,stroke-dasharray: 5 5;
    classDef code fill:#e1bee7,stroke:#4a148c,stroke-width:1px,stroke-dasharray: 2 2;

    %% Physical Foundation
    subgraph PhysicalLayer [Physical Foundation]
        S3[("S3 / MinIO")]:::storage
    end

    %% Table Management
    subgraph ManagementLayer [Table Management]
        Iceberg("Apache Iceberg"):::format
        Nessie("Nessie Catalog"):::format
        Iceberg -.->|Managed by| Nessie
        S3 --- Iceberg
    end

    %% Logical Flow
    subgraph Lakehouse [Logical Data Flow]
        Source[Raw Sources]
        
        subgraph Ingest [Ingestion]
            Pydantic1[("Pydantic")]:::code
        end
        
        Bronze[("BRONZE")]:::bronze
        
        subgraph Process1 [Cleaning]
            GX1[("Great Expectations")]:::quality
        end
        
        Silver[("SILVER")]:::silver
        
        subgraph Process2 [Aggregation]
            GX2[("Great Expectations")]:::quality
        end
        
        Gold[("GOLD")]:::gold
        
        Source --> Pydantic1 --> Bronze
        Bronze --> GX1 --> Silver
        Silver --> GX2 --> Gold
    end

    Bronze -.-> Iceberg
    Silver -.-> Iceberg
    Gold -.-> Iceberg

AI-Assisted Analytics

Modern AI tools to accelerate data analytics work.

Code Assistants

1. GitHub Copilot

  • AI pair programmer
  • Context-aware suggestions
  • Multi-language support
  • IDE integration

2. Google Code Assist (Gemini)

  • Google’s AI coding assistant
  • Available in VS Code
  • Context-aware completions

Installation:

code --install-extension Google.geminicodeassist

3. Open-Source Alternatives

Windsurf (formerly Codeium):

  • Free AI code completion
  • Multi-language support
  • Privacy-focused

Tabby:

  • Self-hosted coding assistant
  • Open-source
  • Customizable models

Bito AI:

  • Code explanation
  • Test generation
  • Documentation

AI-Enhanced IDEs

1. Cursor

  • AI-first code editor
  • Use your own API keys (OpenAI/Anthropic/Azure)
  • Context-aware editing

2. Zed

  • High-performance editor
  • Built-in AI features
  • Collaborative editing

3. Rivet

  • Visual AI workflow builder
  • Integrate with Ollama/Claude/GPT-4
  • Node-based interface

Python AI Libraries

PandasAI:

from pandasai import SmartDataframe
import pandas as pd

df = pd.read_csv("sales.csv")
sdf = SmartDataframe(df)

# Ask questions in natural language
response = sdf.chat("What are the top 5 products by revenue?")
print(response)

Installation:

pip install pandasai

Sketch:

import pandas as pd
import sketch

df = pd.read_csv("data.csv")

# Ask questions about your data
df.sketch.ask("What is the correlation between age and income?")
df.sketch.howto("create a new column with age groups")

Installation:

pip install sketch

Workflow Orchestration

Apache Airflow

What is Airflow?

A platform to programmatically author, schedule, and monitor workflows.

Key Features:

  • DAG-based workflows
  • Rich UI for monitoring
  • Extensive integrations
  • Scalable architecture

Use Cases:

  • ETL pipelines
  • Data warehouse loading
  • ML model training
  • Report generation

Example DAG:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract():
    print("Extracting data...")

def transform():
    print("Transforming data...")

def load():
    print("Loading data...")

with DAG(
    'etl_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily'
) as dag:
    
    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract
    )
    
    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform
    )
    
    load_task = PythonOperator(
        task_id='load',
        python_callable=load
    )
    
    extract_task >> transform_task >> load_task

Airflow vs Jenkins

FeatureAirflowJenkins
PurposeWorkflow orchestrationCI/CD automation
Best ForData pipelinesSoftware builds
SchedulingBuilt-inPlugin-based
UIRich, data-focusedBuild-focused
Use CaseETL, ML workflowsCode deployment

Alternatives

Prefect:

  • Modern Python workflow engine
  • Dynamic workflows
  • Better error handling
  • Hybrid execution model

Mage:

  • Modern data pipeline tool
  • Notebook-based development
  • Built-in data quality checks
  • Easy deployment

Best Practices

Data Quality Strategy

1. Define Quality Metrics:

  • Completeness
  • Accuracy
  • Consistency
  • Timeliness
  • Validity

2. Implement Validation Gates:

  • Bronze → Silver: Schema validation, null checks
  • Silver → Gold: Business rule validation

3. Monitor Continuously:

  • Track data quality metrics
  • Alert on failures
  • Generate quality reports

4. Document Expectations:

  • Clear data contracts
  • Expected schemas
  • Business rules

Validation Pipeline

# Example validation pipeline
from great_expectations.data_context import DataContext
from pydantic import BaseModel

# 1. Schema validation with Pydantic
class DataSchema(BaseModel):
    customer_id: int
    email: str
    age: int

# 2. Data quality with Great Expectations
context = DataContext()
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="my_suite"
)

# 3. Execute validations
results = validator.validate()

# 4. Handle failures
if not results.success:
    # Log failures
    # Send alerts
    # Stop pipeline
    raise ValueError("Data quality check failed")

Conclusion

Modern data quality management requires:

1. Profiling: Understand your data 2. Validation: Enforce quality rules 3. Versioning: Track changes (Iceberg + Nessie) 4. Automation: Use orchestration tools 5. AI Assistance: Accelerate development

Key Takeaways:

  • Use Pydantic for application-level validation
  • Use Great Expectations for pipeline-level validation
  • Implement medallion architecture (Bronze/Silver/Gold)
  • Version your data with Iceberg and Nessie
  • Leverage AI tools to accelerate analytics work

Next Steps:

  1. Profile your existing data
  2. Define quality expectations
  3. Implement validation gates
  4. Set up orchestration
  5. Monitor and improve