- Published on
Common PySpark Operations for Data Wrangling
- Authors

- Name
- Kamrul Hasan
Introduction
PySpark is a powerful tool for big data processing, but its true power lies in its rich set of APIs for data wrangling—the process of cleaning and transforming data into a usable format. This blog post builds on the fundamentals by showing some of the most common and useful PySpark operations for everyday data engineering tasks.
1. Data Cleaning and Transformation
Data is rarely clean. Missing values, duplicates, and incorrect data types are common issues. PySpark provides easy-to-use functions to tackle these problems.
Handling Missing Values
Missing data (represented as null or None) can be a major issue. You can either drop rows with missing values or fill them in.
Dropping Rows
To remove rows containing null in any column, use dropna(). To be more specific and only drop rows where a null exists in a particular column, use the subset parameter.
from pyspark.sql.functions import col# Drop rows with any null valuedf_cleaned = df.dropna()# Drop rows where 'price' column is nulldf_cleaned = df.dropna(subset=['price'])
Filling Nulls
To replace null values with a specific value, use fillna(). This is great for assigning a default value or a placeholder.
# Fill all nulls with 'Unknown'df_filled = df.fillna('Unknown')# Fill nulls in specific columns with a dictionarydf_filled = df.fillna({'category': 'unknown', 'rating': 0})
Renaming and Dropping Columns
Sometimes you need to rename columns for clarity or remove ones you no longer need. The withColumnRenamed() and drop() functions are perfect for this.
# Rename a single columndf_renamed = df.withColumnRenamed('old_name', 'new_name')# Drop a single columndf_dropped = df.drop('unwanted_column')# Drop multiple columnsdf_dropped = df.drop('unwanted_column_1', 'unwanted_column_2')
2. Joins
Joining is a fundamental operation in data processing that allows you to combine two DataFrames based on a common key. PySpark supports various types of joins.
Example Setup
Let's create two simple DataFrames to demonstrate.
# Sample dataemployee_data = [("John", "HR"), ("Jane", "IT"), ("Tom", "Finance")]dept_data = [("HR", "New York"), ("IT", "San Francisco"), ("Sales", "London")]# Create DataFramesemployees_df = spark.createDataFrame(employee_data, ["name", "department"])departments_df = spark.createDataFrame(dept_data, ["dept_name", "location"])employees_df.show()# +----+----------+# |name|department|# +----+----------+# |John| HR|# |Jane| IT|# | Tom| Finance|# +----+----------+departments_df.show()# +-------------+-------------+# | dept_name| location|# +-------------+-------------+# | HR| New York|# | IT|San Francisco|# | Sales| London|# +-------------+-------------+
Common Join Types
- Inner Join: Returns only the rows with matching keys in both DataFrames.
- Left Join: Returns all rows from the left DataFrame and matching rows from the right.
nullis used for non-matches. - Right Join: Returns all rows from the right DataFrame and matching rows from the left.
- Full Outer Join: Returns all rows from both DataFrames, filling in
nullwhere no match exists.
# Inner Join: Finds employees with a matching departmentemployees_df.join(departments_df, employees_df.department == departments_df.dept_name, "inner").show()# +----+----------+---------+-------------+# |name|department|dept_name| location|# +----+----------+---------+-------------+# |Jane| IT| IT|San Francisco|# |John| HR| HR| New York|# +----+----------+---------+-------------+# Left Join: Keeps all employees, even those without a matching departmentemployees_df.join(departments_df, employees_df.department == departments_df.dept_name, "left").show()# +----+----------+---------+-------------+# |name|department|dept_name| location|# +----+----------+---------+-------------+# |Jane| IT| IT|San Francisco|# |John| HR| HR| New York|# | Tom| Finance| null| null|# +----+----------+---------+-------------+
3. Window Functions
Window functions are a game-changer for data analysis in PySpark. They allow you to perform calculations across a set of rows that are related to the current row, without collapsing the rows into a single result. This is perfect for tasks like calculating running totals, rankings, or moving averages.
Defining a Window
A window is defined using Window.partitionBy() to group the data and orderBy() to sort it within each group.
from pyspark.sql import Windowfrom pyspark.sql.functions import rank, avg, sum# Create a sample DataFramedata = [("A", 1, 100), ("A", 2, 150), ("A", 3, 200),("B", 1, 50), ("B", 2, 75)]df = spark.createDataFrame(data, ["category", "month", "sales"])# Define a window specification: Partition by category, order by monthwindow_spec = Window.partitionBy("category").orderBy("month")
Common Window Functions
- Ranking:
row_number(),rank(), anddense_rank()assign a rank to each row within a partition. - Aggregate:
sum(),avg(),max(), andmin()can be applied to a window.
# Add a running total of sales for each categorydf_running_total = df.withColumn('running_total_sales',sum('sales').over(window_spec))# Show the resultdf_running_total.show()# +--------+-----+-----+-------------------+# |category|month|sales|running_total_sales|# +--------+-----+-----+-------------------+# | A| 1| 100| 100|# | A| 2| 150| 250|# | A| 3| 200| 450|# | B| 1| 50| 50|# | B| 2| 75| 125|# +--------+-----+-----+-------------------+
Calculating a Moving Average
You can also specify a frame within a window to calculate a moving average. This example calculates the average of the current row and the two preceding rows.
from pyspark.sql.functions import mean# Define a window for a 3-row moving averagemoving_avg_window = Window.partitionBy("category").orderBy("month").rowsBetween(-2, 0)df_moving_avg = df.withColumn('moving_avg_sales',mean('sales').over(moving_avg_window))df_moving_avg.show()# +--------+-----+-----+----------------+# |category|month|sales|moving_avg_sales|# +--------+-----+-----+----------------+# | A| 1| 100| 100.0|# | A| 2| 150| 125.0|# | A| 3| 200| 150.0|# | B| 1| 50| 50.0|# | B| 2| 75| 62.5|# +--------+-----+-----+----------------+
4. GroupBy and Aggregations
The groupBy() operation is essential for summarizing data. You can perform powerful aggregations like sum(), count(), avg(), min(), and max() on grouped data.
from pyspark.sql.functions import sum, count# Group by category and calculate total salesdf.groupBy('category').agg(sum('sales').alias('total_sales'),count('month').alias('number_of_months')).show()# +--------+-----------+----------------+# |category|total_sales|number_of_months|# +--------+-----------+----------------+# | A| 450| 3|# | B| 125| 2|# +--------+-----------+----------------+
Conclusion
Mastering these core PySpark operations is fundamental to becoming a proficient data engineer or data scientist. From the basic yet crucial steps of handling missing data to the advanced power of window functions, these techniques form the building blocks for creating robust and scalable data pipelines. By combining these methods, you can transform messy, raw data into clean, insightful information ready for analysis, machine learning, and informed decision-making. Keep practicing with real-world datasets, and you’ll soon find that PySpark is not just a tool, but a powerful ally in the world of big data.