JAlcocerTech E-books

PySpark: Big Data Analytics with Python

If you have been using SQL and Pandas for a while, PySpark is your next step to leverage distributed computing for Big Data Analytics.


Why PySpark?

PySpark is the Python API for Apache Spark.

While Pandas is excellent for tabular data on a single machine, PySpark excels when data doesn’t fit into memory or when computation benefits from massive parallelization across a cluster.

Choosing Between PySpark and Pandas

FeaturePandasPySpark
Data SizeSmall to Medium (fits in RAM)Big Data (Distributed)
ComputingSingle-nodeMulti-node (Cluster)
ExecutionEager (Immediate)Lazy (Optimized)
ComplexityHigh for complex mathHigh for parallel scaling

Core PySpark Concepts

1. Creating DataFrames

data = [("Jerez", "Yosua", "Dr.Yosu"),
        ("London", "John", "CEO"),
        ("Roberta", "Storm", "Manager")]

schema = ["Region", "Nick", "Job"]
df = spark.createDataFrame(data=data, schema=schema)

df.printSchema()
df.show(truncate=False)

2. SparkSQL

If you are comfortable with SQL, you can use it directly within Spark. Transformations are automatically optimized by the Catalyst engine.

df.createOrReplaceTempView("people")
result = spark.sql("SELECT * FROM people WHERE Job = 'CEO'")
result.show()

3. Distributed Connectivity (e.g., Trino)

PySpark can connect to external distributed engines like Trino (formerly PrestoSQL) via JDBC.

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:trino://trino-server:port") \
    .option("dbtable", "your_table_name") \
    .load()

Modern Data Lakes: Apache Iceberg

Apache Iceberg is an open table format for huge analytic datasets. It enables features like Time Travel and Atomic Commits on data lakes (S3, HDFS).

Time Travel Features

Time travel allows you to query the state of a table as it existed at a specific point in time or by a specific Snapshot ID.

  • By Timestamp:
spark.read \
        .option("as-of-timestamp", "499162860000") \
        .format("iceberg").load("path/to/table")
  • By Snapshot ID:
spark.read \
        .option("snapshot-id", 10963874102873L) \
        .format("iceberg").load("path/to/table")

Big Data Architecture Tools

  • Trino: A distributed SQL query engine that can query data from multiple sources (Hive, Kafka, MongoDB) in a single query.
  • Hadoop (HDFS/YARN): The foundational framework for distributed storage and resource negotiation.
  • DBT (Data Build Tool): The “T” in ELT. It focuses on SQL-first transformations with software engineering best practices.
  • DataHub/OpenDataHub: Platforms for data discovery, observability, and managing the AI/ML lifecycle.

FAQ: Deep Dive into Spark

[!IMPORTANT] Why is PySpark “Lazy”? Spark uses Lazy Evaluation. It doesn’t execute transformations (like filter or map) immediately. Instead, it builds a “Logical Plan.” Execution only triggers when an Action (like show, count, or write) is called. This allows the optimizer to combine steps and minimize data movement.

What is Data Skew?

Data skew is an uneven distribution of data across partitions. It often happens after a join or groupBy.

  • Impact: One task takes 10x longer than others, creating a bottleneck.
  • Measure: Use the skewness coefficient to identify asymmetric data distribution.

Understanding Partitions

Partitions are the atomic units of parallelism in Spark.

  • Hash Partitioning: Uses a hash of the key to distribute rows.
  • Range Partitioning: Distributes data based on sorted value ranges.
  • Grouping vs. Partitioning: A groupBy before a partition can ensure all data for a specific key resides on the same node for efficient aggregation.

Data Redistribution

The process of transferring data between clusters or storage to spread the load or improve availability.

This is often necessary when scaling or performing disaster recovery.


DRY Principle: Don’t Repeat Yourself

DRY stands for “Don’t Repeat Yourself” - a fundamental software engineering principle that applies powerfully to PySpark development.

Why DRY Matters in PySpark

Benefits:

  • Maintainability: Change logic once, not in 10 places
  • Readability: Clear, reusable functions are self-documenting
  • Performance: Spark can optimize reused transformations better
  • Testing: Easier to test isolated functions
  • Debugging: Fix bugs in one place

Common Anti-Patterns (What NOT to Do)

❌ Repeating Transformations:

# BAD: Repeating the same transformation logic
df1 = raw_df.filter(col("status") == "active") \
           .withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name"))) \
           .withColumn("year", year(col("created_at")))

df2 = another_df.filter(col("status") == "active") \
                .withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name"))) \
                .withColumn("year", year(col("created_at")))

✅ DRY Approach:

# GOOD: Reusable transformation function
def standardize_user_data(df):
    """Apply standard transformations to user data"""
    return df.filter(col("status") == "active") \
             .withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name"))) \
             .withColumn("year", year(col("created_at")))

df1 = standardize_user_data(raw_df)
df2 = standardize_user_data(another_df)

DRY Best Practices

1. Extract Common Transformations into Functions

from pyspark.sql.functions import col, when, trim, upper

def clean_string_column(df, column_name):
    """Standardize string column: trim, uppercase, handle nulls"""
    return df.withColumn(
        column_name,
        when(col(column_name).isNull(), "UNKNOWN")
        .otherwise(upper(trim(col(column_name))))
    )

# Use it multiple times
df = clean_string_column(df, "country")
df = clean_string_column(df, "region")
df = clean_string_column(df, "city")

2. Create Reusable Column Expressions

from pyspark.sql.functions import col, when

# Define once, use many times
def is_high_value(amount_col):
    """Reusable expression for high-value classification"""
    return when(col(amount_col) > 1000, "High") \
           .when(col(amount_col) > 500, "Medium") \
           .otherwise("Low")

# Apply to multiple columns
df = df.withColumn("sales_tier", is_high_value("sales_amount")) \
       .withColumn("revenue_tier", is_high_value("revenue_amount"))

3. Build Transformation Pipelines

def apply_pipeline(df, transformations):
    """Apply a list of transformation functions"""
    for transform in transformations:
        df = transform(df)
    return df

# Define transformations
def add_timestamp_features(df):
    return df.withColumn("year", year(col("created_at"))) \
             .withColumn("month", month(col("created_at")))

def add_derived_metrics(df):
    return df.withColumn("profit", col("revenue") - col("cost")) \
             .withColumn("margin", col("profit") / col("revenue"))

# Apply pipeline
pipeline = [
    standardize_user_data,
    add_timestamp_features,
    add_derived_metrics
]

result_df = apply_pipeline(raw_df, pipeline)

4. Parameterize Common Patterns

def aggregate_by_dimensions(df, group_cols, agg_col, agg_func="sum"):
    """Generic aggregation function"""
    from pyspark.sql.functions import sum, avg, count, max, min
    
    agg_functions = {
        "sum": sum,
        "avg": avg,
        "count": count,
        "max": max,
        "min": min
    }
    
    return df.groupBy(group_cols) \
             .agg(agg_functions[agg_func](col(agg_col)).alias(f"{agg_col}_{agg_func}"))

# Reuse with different parameters
sales_by_region = aggregate_by_dimensions(df, ["region"], "sales", "sum")
avg_revenue_by_product = aggregate_by_dimensions(df, ["product"], "revenue", "avg")

5. Configuration-Driven Transformations

# Define transformation config once
COLUMN_MAPPINGS = {
    "old_customer_id": "customer_id",
    "old_product_name": "product_name",
    "old_order_date": "order_date"
}

DATA_TYPES = {
    "customer_id": "int",
    "order_amount": "double",
    "order_date": "timestamp"
}

def apply_schema_standardization(df, mappings, types):
    """Apply column renaming and type casting"""
    # Rename columns
    for old_name, new_name in mappings.items():
        if old_name in df.columns:
            df = df.withColumnRenamed(old_name, new_name)
    
    # Cast types
    for col_name, col_type in types.items():
        if col_name in df.columns:
            df = df.withColumn(col_name, col(col_name).cast(col_type))
    
    return df

# Reuse across different datasets
df1 = apply_schema_standardization(raw_df1, COLUMN_MAPPINGS, DATA_TYPES)
df2 = apply_schema_standardization(raw_df2, COLUMN_MAPPINGS, DATA_TYPES)

Real-World Example: Data Quality Checks

❌ Without DRY:

# Checking nulls for each column separately
null_count_col1 = df.filter(col("column1").isNull()).count()
null_count_col2 = df.filter(col("column2").isNull()).count()
null_count_col3 = df.filter(col("column3").isNull()).count()

✅ With DRY:

def check_data_quality(df, columns_to_check):
    """Generate data quality report for specified columns"""
    from pyspark.sql.functions import sum as spark_sum, when
    
    quality_checks = []
    for col_name in columns_to_check:
        quality_checks.append(
            spark_sum(when(col(col_name).isNull(), 1).otherwise(0)).alias(f"{col_name}_nulls")
        )
    
    return df.agg(*quality_checks)

# Use it
quality_report = check_data_quality(df, ["column1", "column2", "column3"])
quality_report.show()

When to Break DRY

Sometimes repetition is acceptable:

  1. One-off transformations: If logic is truly unique and won’t be reused
  2. Performance-critical paths: Inline code can be faster than function calls
  3. Readability: If abstraction makes code harder to understand

Rule of Thumb: If you copy-paste code more than twice, refactor it into a function.


Sample PySpark queries

Interesting queries to get started with PySpark.

  1. How to detect null’s in a PySpark Dataframe
  2. How to query Kafka topics
  3. How to group by a Spark DF
  4. How to join Spark DF’s
### CREATES A SPARK DATAFRAME

df=spark.createDataFrame(
        data = [ ("1","2019-06-24 12:01:19.000")],
        schema=["id","input_timestamp"])

df.show(5, truncate=False)

#.withColumn("ts", f.expr("to_timestamp(CAST(ts / 1000 AS INT))"))
#join and filter the result
qoe.join(nmd,
                 qoe.id == nmd.mac,
                 'left')\
                 .select('id','mac')\
                 .filter(col("mac").isNull())

How to display the data of a Spark DF:

from pyspark.sql.functions import count

cm_stats_renamed.filter(cm_stats_renamed.ModelName == "CH7465LG")\
    .groupBy("ModelName", "MTA_LineStatus")\
    .agg(count("SerialNumber").alias("SerialNumberCount"))\
.orderBy("SerialNumberCount", ascending=False)\
.limit(30).toPandas().style.hide_index()
#.show(30, truncate=False)
nr.filter(nr.locationId == "something")\
.withColumn("year_month_day", date_format(nr["createdAt"], "yyyy-MM-dd"))\
.groupBy("year_month_day")\
    .agg(count("nodeId").alias("Node_Distctinct_Counts"))\
.orderBy("Node_Distctinct_Counts", ascending=False)\
.limit(30).toPandas().style.hide_index()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, split


def process_cable_modem_data(country_code):
# Initialize Spark session
spark = SparkSession.builder.appName("CableModemDataProcessing").getOrCreate()


# Load the DataFrame based on the country code
path = f"hdfs://123.45.67.89:9820/delta/refined_tables/{country_code}/dimensions/your_dimension_table/"
your_dimension_table_df = spark.read.format("delta").load(path)

# Add the node_id_prefix column
your_dimension_table_df = dim_cable_modem_df.withColumn('node_id_prefix', split(dim_cable_modem_df['node_id'], '\.')[0])

# Compare node_id_prefix and site_Id, and create a new column 'is_same'
your_dimension_table_df = dim_cable_modem_df.withColumn('is_same', when(col('node_id_prefix') == col('site_Id'), True).otherwise(False))

# Filter the DataFrame to keep only the rows where 'is_same' is False
filtered_df = dim_cable_modem_df.filter(col('is_same') == False)

# Show the result
filtered_df.select('node_id', 'node_id_prefix', 'site_Id', 'is_same').distinct().show(5, truncate=False)

# Example usage
process_cable_modem_data("CH")