Published on

Common PySpark Operations for Data Wrangling

Authors

Introduction

PySpark is a powerful tool for big data processing, but its true power lies in its rich set of APIs for data wrangling—the process of cleaning and transforming data into a usable format. This blog post builds on the fundamentals by showing some of the most common and useful PySpark operations for everyday data engineering tasks.


1. Data Cleaning and Transformation

Data is rarely clean. Missing values, duplicates, and incorrect data types are common issues. PySpark provides easy-to-use functions to tackle these problems.

Handling Missing Values

Missing data (represented as null or None) can be a major issue. You can either drop rows with missing values or fill them in.

Dropping Rows

To remove rows containing null in any column, use dropna(). To be more specific and only drop rows where a null exists in a particular column, use the subset parameter.

from pyspark.sql.functions import col
# Drop rows with any null value
df_cleaned = df.dropna()
# Drop rows where 'price' column is null
df_cleaned = df.dropna(subset=['price'])

Filling Nulls

To replace null values with a specific value, use fillna(). This is great for assigning a default value or a placeholder.

# Fill all nulls with 'Unknown'
df_filled = df.fillna('Unknown')
# Fill nulls in specific columns with a dictionary
df_filled = df.fillna({'category': 'unknown', 'rating': 0})

Renaming and Dropping Columns

Sometimes you need to rename columns for clarity or remove ones you no longer need. The withColumnRenamed() and drop() functions are perfect for this.

# Rename a single column
df_renamed = df.withColumnRenamed('old_name', 'new_name')
# Drop a single column
df_dropped = df.drop('unwanted_column')
# Drop multiple columns
df_dropped = df.drop('unwanted_column_1', 'unwanted_column_2')

2. Joins

Joining is a fundamental operation in data processing that allows you to combine two DataFrames based on a common key. PySpark supports various types of joins.

Example Setup

Let's create two simple DataFrames to demonstrate.

# Sample data
employee_data = [("John", "HR"), ("Jane", "IT"), ("Tom", "Finance")]
dept_data = [("HR", "New York"), ("IT", "San Francisco"), ("Sales", "London")]
# Create DataFrames
employees_df = spark.createDataFrame(employee_data, ["name", "department"])
departments_df = spark.createDataFrame(dept_data, ["dept_name", "location"])
employees_df.show()
# +----+----------+
# |name|department|
# +----+----------+
# |John| HR|
# |Jane| IT|
# | Tom| Finance|
# +----+----------+
departments_df.show()
# +-------------+-------------+
# | dept_name| location|
# +-------------+-------------+
# | HR| New York|
# | IT|San Francisco|
# | Sales| London|
# +-------------+-------------+

Common Join Types

  • Inner Join: Returns only the rows with matching keys in both DataFrames.
  • Left Join: Returns all rows from the left DataFrame and matching rows from the right. null is used for non-matches.
  • Right Join: Returns all rows from the right DataFrame and matching rows from the left.
  • Full Outer Join: Returns all rows from both DataFrames, filling in null where no match exists.
# Inner Join: Finds employees with a matching department
employees_df.join(departments_df, employees_df.department == departments_df.dept_name, "inner").show()
# +----+----------+---------+-------------+
# |name|department|dept_name| location|
# +----+----------+---------+-------------+
# |Jane| IT| IT|San Francisco|
# |John| HR| HR| New York|
# +----+----------+---------+-------------+
# Left Join: Keeps all employees, even those without a matching department
employees_df.join(departments_df, employees_df.department == departments_df.dept_name, "left").show()
# +----+----------+---------+-------------+
# |name|department|dept_name| location|
# +----+----------+---------+-------------+
# |Jane| IT| IT|San Francisco|
# |John| HR| HR| New York|
# | Tom| Finance| null| null|
# +----+----------+---------+-------------+

3. Window Functions

Window functions are a game-changer for data analysis in PySpark. They allow you to perform calculations across a set of rows that are related to the current row, without collapsing the rows into a single result. This is perfect for tasks like calculating running totals, rankings, or moving averages.

Defining a Window

A window is defined using Window.partitionBy() to group the data and orderBy() to sort it within each group.

from pyspark.sql import Window
from pyspark.sql.functions import rank, avg, sum
# Create a sample DataFrame
data = [("A", 1, 100), ("A", 2, 150), ("A", 3, 200),
("B", 1, 50), ("B", 2, 75)]
df = spark.createDataFrame(data, ["category", "month", "sales"])
# Define a window specification: Partition by category, order by month
window_spec = Window.partitionBy("category").orderBy("month")

Common Window Functions

  • Ranking: row_number(), rank(), and dense_rank() assign a rank to each row within a partition.
  • Aggregate: sum(), avg(), max(), and min() can be applied to a window.
# Add a running total of sales for each category
df_running_total = df.withColumn(
'running_total_sales',
sum('sales').over(window_spec)
)
# Show the result
df_running_total.show()
# +--------+-----+-----+-------------------+
# |category|month|sales|running_total_sales|
# +--------+-----+-----+-------------------+
# | A| 1| 100| 100|
# | A| 2| 150| 250|
# | A| 3| 200| 450|
# | B| 1| 50| 50|
# | B| 2| 75| 125|
# +--------+-----+-----+-------------------+

Calculating a Moving Average

You can also specify a frame within a window to calculate a moving average. This example calculates the average of the current row and the two preceding rows.

from pyspark.sql.functions import mean
# Define a window for a 3-row moving average
moving_avg_window = Window.partitionBy("category").orderBy("month").rowsBetween(-2, 0)
df_moving_avg = df.withColumn(
'moving_avg_sales',
mean('sales').over(moving_avg_window)
)
df_moving_avg.show()
# +--------+-----+-----+----------------+
# |category|month|sales|moving_avg_sales|
# +--------+-----+-----+----------------+
# | A| 1| 100| 100.0|
# | A| 2| 150| 125.0|
# | A| 3| 200| 150.0|
# | B| 1| 50| 50.0|
# | B| 2| 75| 62.5|
# +--------+-----+-----+----------------+

4. GroupBy and Aggregations

The groupBy() operation is essential for summarizing data. You can perform powerful aggregations like sum(), count(), avg(), min(), and max() on grouped data.

from pyspark.sql.functions import sum, count
# Group by category and calculate total sales
df.groupBy('category').agg(
sum('sales').alias('total_sales'),
count('month').alias('number_of_months')
).show()
# +--------+-----------+----------------+
# |category|total_sales|number_of_months|
# +--------+-----------+----------------+
# | A| 450| 3|
# | B| 125| 2|
# +--------+-----------+----------------+

Conclusion

Mastering these core PySpark operations is fundamental to becoming a proficient data engineer or data scientist. From the basic yet crucial steps of handling missing data to the advanced power of window functions, these techniques form the building blocks for creating robust and scalable data pipelines. By combining these methods, you can transform messy, raw data into clean, insightful information ready for analysis, machine learning, and informed decision-making. Keep practicing with real-world datasets, and you’ll soon find that PySpark is not just a tool, but a powerful ally in the world of big data.