Data Quality & Validation
For industries like healthcare and finance, data lineage and quality are essential. This chapter covers modern tools and practices for ensuring data quality throughout your pipelines.
The Modern Data Lakehouse Architecture
The medallion architecture with AI integration:
flowchart LR
%% --- Styles ---
classDef bronze fill:#EFEBE9,stroke:#8D6E63,stroke-width:2px,color:#3E2723;
classDef silver fill:#ECEFF1,stroke:#78909C,stroke-width:2px,color:#263238;
classDef gold fill:#FFFDE7,stroke:#FBC02D,stroke-width:2px,color:#F57F17;
classDef ai fill:#F3E5F5,stroke:#8E24AA,stroke-width:2px,stroke-dasharray: 5 5,color:#4A148C;
classDef source fill:#fff,stroke:#333,stroke-width:1px;
%% --- Sources ---
subgraph Sources [Data Sources]
direction TB
Logs[Logs / IoT]:::source
DB[Databases]:::source
APIs[External APIs]:::source
end
%% --- The Lakehouse (Medallion) ---
subgraph Lakehouse [The Data Lakehouse]
direction LR
Bronze[("BRONZE<br/>(Raw Ingestion)<br/>As-is Dump")]:::bronze
Silver[("SILVER<br/>(Refined)<br/>Cleaned & Enriched")]:::silver
Gold[("GOLD<br/>(Curated)<br/>Business Aggregates")]:::gold
end
%% --- AI Integration ---
subgraph AI_Lab [AI & Machine Learning]
direction TB
Training(Model Training):::ai
Inference(AI Agents / RAG):::ai
Predictions(Predictions / Tags):::ai
end
%% --- Consumers ---
BI[BI Dashboards<br/>& Reports]:::source
%% --- The Flow ---
Sources --> Bronze
Bronze -- "ETL / Cleaning" --> Silver
Silver -- "Aggregation" --> Gold
Gold --> BI
%% --- Where AI Plugs In ---
Silver -.->|"Feeds Data"| Training
Gold -.->|"Context for RAG"| Inference
Training --> Predictions
Inference --> Predictions
Predictions -.->|"Enrichment"| Silver
Predictions -.->|"New Insights"| Gold
Key Layers:
- Bronze: Raw data ingestion (as-is dump)
- Silver: Cleaned and enriched data
- Gold: Business-ready aggregates
- AI Layer: Training, inference, and predictions
Data Management Concepts
Understanding the relationship between profiling, modeling, and validation:
graph LR
A[Data Profiling: Understand As-Is] --> B(Data Modeling: Design To-Be);
B --> C{Prepare Design Documentation};
C --> D["Data Model (Entities, Attributes, Relationships)"];
C --> E["Data Lineage (Sources, Transformations, Destinations)"];
C --> F[Data Profiling Summary & Quality Rules];
Data Profiling
Purpose: Examining data to understand its characteristics and quality.
Focus:
- Analyzing data distributions, frequencies, patterns
- Identifying data quality issues
- Understanding data relationships
Examples:
- Percentage of missing values per column
- Unique values and frequencies
- Min/max/average for numerical columns
- Common text patterns
- Column correlations
When to Use:
- During data exploration
- Before data integration/migration
- To assess data quality
- When investigating anomalies
- Understanding new data sources
Tools:
- Pandas profiling
- Great Expectations
- Apache Griffin
- Deequ (AWS)
Data Validation
Purpose: Ensuring data meets specific criteria or constraints.
Focus:
- Verifying data conforms to expected formats
- Enforcing data integrity
- Preventing incorrect data entry
Examples:
- Valid date formats
- Age within reasonable range
- Valid product IDs
- Email address format
- Required fields populated
When to Use:
- During data entry
- In data pipelines
- Before database loading
- API data reception
Tools:
- Great Expectations
- Pydantic
- Cerberus
- JSON Schema
Data Modeling
Purpose: Designing the structure and relationships of data.
Focus:
- Entity-relationship design
- Schema definition
- Normalization/denormalization
- Performance optimization
Key Concepts:
- Entities and attributes
- Primary/foreign keys
- Relationships (1:1, 1:N, N:M)
- Dimensional modeling (star/snowflake schemas)
Data Validation Tools
Pydantic
Pydantic is a data validation library using Python type annotations.
Key Features:
- Type validation at runtime
- Data serialization/deserialization
- Settings management
- Custom validation
- Clear error messages
Use Cases:
- API request/response validation
- Configuration management
- Data parsing
Example:
from pydantic import BaseModel, EmailStr, validator
from datetime import datetime
class User(BaseModel):
id: int
name: str
email: EmailStr
age: int
created_at: datetime
@validator('age')
def age_must_be_positive(cls, v):
if v < 0 or v > 150:
raise ValueError('Age must be between 0 and 150')
return v
# Valid data
user = User(
id=1,
name="John Doe",
email="john@example.com",
age=30,
created_at=datetime.now()
)
# Invalid data raises ValidationError
try:
invalid_user = User(
id=2,
name="Jane",
email="not-an-email",
age=200,
created_at="not-a-date"
)
except Exception as e:
print(f"Validation error: {e}")
Installation:
pip install pydantic[email]
Great Expectations
Great Expectations is a data testing and validation framework.
Key Features:
- Define expectations about data
- Execute validation suites
- Generate data documentation
- Integration with data pipelines
- Profiling capabilities
Use Cases:
- Data quality monitoring
- Pipeline testing
- Data documentation
- Regression testing
Example:
import great_expectations as gx
# Create a Data Context
context = gx.get_context()
# Connect to data
validator = context.sources.pandas_default.read_csv(
"data.csv"
)
# Define expectations
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
# Save expectation suite
validator.save_expectation_suite(discard_failed_expectations=False)
# Run validation
checkpoint_result = context.run_checkpoint(
checkpoint_name="my_checkpoint",
validations=[{"batch_request": validator.active_batch_request}]
)
# Check results
if checkpoint_result.success:
print("All expectations passed!")
else:
print("Some expectations failed.")
Installation:
pip install great-expectations
Common Expectations:
| Expectation | Purpose |
|---|---|
expect_column_to_exist | Column exists in dataset |
expect_column_values_to_not_be_null | No null values |
expect_column_values_to_be_unique | All values unique |
expect_column_values_to_be_between | Values in range |
expect_column_values_to_be_in_set | Values from allowed set |
expect_column_mean_to_be_between | Mean in range |
expect_table_row_count_to_be_between | Row count in range |
Pydantic vs Great Expectations
| Feature | Pydantic | Great Expectations |
|---|---|---|
| Level | Application | Data Pipeline |
| Focus | Data structures | Data quality |
| Use Case | API validation | Pipeline testing |
| Scope | Python objects | DataFrames/Databases |
| Integration | Code-level | Pipeline-level |
When to Use Both:
- Pydantic: Validate API input
- Great Expectations: Validate processed data in pipeline
- Together: Comprehensive validation strategy
Apache Iceberg & Nessie
Modern data lakehouse tools for managing table formats and data versioning.
Apache Iceberg
What is Iceberg?
Apache Iceberg is an open table format for huge analytic datasets.
Key Features:
- ACID transactions
- Schema evolution
- Hidden partitioning
- Time travel
- Snapshot isolation
Benefits:
- Reliable data updates
- Fast query performance
- Efficient metadata handling
- Multi-engine support (Spark, Flink, Trino)
Example Architecture:
flowchart LR
%% Styles
classDef storage fill:#cfd8dc,stroke:#37474F,stroke-width:2px;
classDef format fill:#b2dfdb,stroke:#00695c,stroke-width:2px;
S3[("S3 / MinIO<br/>(Object Storage)")]:::storage
Iceberg("Apache Iceberg<br/>(Table Format / ACID)"):::format
S3 --- Iceberg
Nessie
What is Nessie?
Nessie is a Git-like version control system for data lakes.
Key Features:
- Branch and merge data
- Time travel across tables
- Catalog management
- Multi-table transactions
Benefits:
- Data versioning
- Reproducible analytics
- Safe experimentation
- Rollback capabilities
Integration:
flowchart TB
Nessie[Nessie Catalog]
Iceberg[Iceberg Tables]
S3[Object Storage]
Nessie -->|Manages| Iceberg
Iceberg -->|Stored in| S3
Complete Architecture
flowchart LR
%% Styles
classDef bronze fill:#EFEBE9,stroke:#8D6E63,stroke-width:3px;
classDef silver fill:#ECEFF1,stroke:#78909C,stroke-width:3px;
classDef gold fill:#FFFDE7,stroke:#FBC02D,stroke-width:3px;
classDef storage fill:#cfd8dc,stroke:#37474F,stroke-width:2px;
classDef format fill:#b2dfdb,stroke:#00695c,stroke-width:2px;
classDef quality fill:#ffecb3,stroke:#ff6f00,stroke-width:2px,stroke-dasharray: 5 5;
classDef code fill:#e1bee7,stroke:#4a148c,stroke-width:1px,stroke-dasharray: 2 2;
%% Physical Foundation
subgraph PhysicalLayer [Physical Foundation]
S3[("S3 / MinIO")]:::storage
end
%% Table Management
subgraph ManagementLayer [Table Management]
Iceberg("Apache Iceberg"):::format
Nessie("Nessie Catalog"):::format
Iceberg -.->|Managed by| Nessie
S3 --- Iceberg
end
%% Logical Flow
subgraph Lakehouse [Logical Data Flow]
Source[Raw Sources]
subgraph Ingest [Ingestion]
Pydantic1[("Pydantic")]:::code
end
Bronze[("BRONZE")]:::bronze
subgraph Process1 [Cleaning]
GX1[("Great Expectations")]:::quality
end
Silver[("SILVER")]:::silver
subgraph Process2 [Aggregation]
GX2[("Great Expectations")]:::quality
end
Gold[("GOLD")]:::gold
Source --> Pydantic1 --> Bronze
Bronze --> GX1 --> Silver
Silver --> GX2 --> Gold
end
Bronze -.-> Iceberg
Silver -.-> Iceberg
Gold -.-> Iceberg
AI-Assisted Analytics
Modern AI tools to accelerate data analytics work.
Code Assistants
1. GitHub Copilot
- AI pair programmer
- Context-aware suggestions
- Multi-language support
- IDE integration
2. Google Code Assist (Gemini)
- Google’s AI coding assistant
- Available in VS Code
- Context-aware completions
Installation:
code --install-extension Google.geminicodeassist
3. Open-Source Alternatives
Windsurf (formerly Codeium):
- Free AI code completion
- Multi-language support
- Privacy-focused
Tabby:
- Self-hosted coding assistant
- Open-source
- Customizable models
Bito AI:
- Code explanation
- Test generation
- Documentation
AI-Enhanced IDEs
1. Cursor
- AI-first code editor
- Use your own API keys (OpenAI/Anthropic/Azure)
- Context-aware editing
2. Zed
- High-performance editor
- Built-in AI features
- Collaborative editing
3. Rivet
- Visual AI workflow builder
- Integrate with Ollama/Claude/GPT-4
- Node-based interface
Python AI Libraries
PandasAI:
from pandasai import SmartDataframe
import pandas as pd
df = pd.read_csv("sales.csv")
sdf = SmartDataframe(df)
# Ask questions in natural language
response = sdf.chat("What are the top 5 products by revenue?")
print(response)
Installation:
pip install pandasai
Sketch:
import pandas as pd
import sketch
df = pd.read_csv("data.csv")
# Ask questions about your data
df.sketch.ask("What is the correlation between age and income?")
df.sketch.howto("create a new column with age groups")
Installation:
pip install sketch
Workflow Orchestration
Apache Airflow
What is Airflow?
A platform to programmatically author, schedule, and monitor workflows.
Key Features:
- DAG-based workflows
- Rich UI for monitoring
- Extensive integrations
- Scalable architecture
Use Cases:
- ETL pipelines
- Data warehouse loading
- ML model training
- Report generation
Example DAG:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract():
print("Extracting data...")
def transform():
print("Transforming data...")
def load():
print("Loading data...")
with DAG(
'etl_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily'
) as dag:
extract_task = PythonOperator(
task_id='extract',
python_callable=extract
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform
)
load_task = PythonOperator(
task_id='load',
python_callable=load
)
extract_task >> transform_task >> load_task
Airflow vs Jenkins
| Feature | Airflow | Jenkins |
|---|---|---|
| Purpose | Workflow orchestration | CI/CD automation |
| Best For | Data pipelines | Software builds |
| Scheduling | Built-in | Plugin-based |
| UI | Rich, data-focused | Build-focused |
| Use Case | ETL, ML workflows | Code deployment |
Alternatives
Prefect:
- Modern Python workflow engine
- Dynamic workflows
- Better error handling
- Hybrid execution model
Mage:
- Modern data pipeline tool
- Notebook-based development
- Built-in data quality checks
- Easy deployment
Best Practices
Data Quality Strategy
1. Define Quality Metrics:
- Completeness
- Accuracy
- Consistency
- Timeliness
- Validity
2. Implement Validation Gates:
- Bronze → Silver: Schema validation, null checks
- Silver → Gold: Business rule validation
3. Monitor Continuously:
- Track data quality metrics
- Alert on failures
- Generate quality reports
4. Document Expectations:
- Clear data contracts
- Expected schemas
- Business rules
Validation Pipeline
# Example validation pipeline
from great_expectations.data_context import DataContext
from pydantic import BaseModel
# 1. Schema validation with Pydantic
class DataSchema(BaseModel):
customer_id: int
email: str
age: int
# 2. Data quality with Great Expectations
context = DataContext()
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="my_suite"
)
# 3. Execute validations
results = validator.validate()
# 4. Handle failures
if not results.success:
# Log failures
# Send alerts
# Stop pipeline
raise ValueError("Data quality check failed")
Conclusion
Modern data quality management requires:
1. Profiling: Understand your data 2. Validation: Enforce quality rules 3. Versioning: Track changes (Iceberg + Nessie) 4. Automation: Use orchestration tools 5. AI Assistance: Accelerate development
Key Takeaways:
- Use Pydantic for application-level validation
- Use Great Expectations for pipeline-level validation
- Implement medallion architecture (Bronze/Silver/Gold)
- Version your data with Iceberg and Nessie
- Leverage AI tools to accelerate analytics work
Next Steps:
- Profile your existing data
- Define quality expectations
- Implement validation gates
- Set up orchestration
- Monitor and improve