• BLOG
  • ABOUT US
  • CONTACT US
  • FAMILY FIRST
  • BECOME A TEACHER
  • BECOME A TEACHER
  • CONTACT US
  • EVENTS
  • FAQS
  • FORUMS
  • DONATE
MY ACCOUNT
Jonglei Institute of Technology

ABOUT JIT

Learn more about who we are and what we do.

FAMILY FIRST

Learn more about the JIT family & youth program.

COURSES

Join the JIT data science program for free.

Blog

Learn data science, statistics, and machine learning.
ENROLL NOW

Blog

Home » Blog » Data Wrangling with PySpark

Data Wrangling with PySpark

  • Posted by Alier Reng
  • Categories Blog, Data Science
  • Date July 14, 2021

Performing Data Wrangling with PySpark¶

I recently completed PySpark Essentials for Data Scientists (Big Data + Python) by Layla AI on udemy.com, course link here!, but I never had time to showcase my newly acquired PySpark skills. However, I got to work today, and herein is a sneak peek into Data wrangling with PySpark.

Our objective is to highlight some of the primary data manipulation techniques within PySpark. And as usual, we're using our favorite dataset - South Sudan 2008 Census Data.

We begin by importing pyspark and SparkSession from the pyspark.sql module and then initializing the SparkSession. This article assumes that you have already installed PySpark with the necessary dependencies; otherwise, please see the installation instructions here - we recommend installing PySpark through anaconda, especially if you are new to Python.

Importing the Modules¶

In [213]:
# Import pyspark and SparkSession
import pyspark 
from pyspark.sql import SparkSession
# Initialize PySpark session
spark = SparkSession.builder.appName("data wrangling").getOrCreate()
# Add the core usage information
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
# Display the spark session information
spark
You are working with 1 core(s)
Out[213]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.1.1
Master
local[*]
AppName
data wrangling

Importing the Data¶

Here, we import the dataset by creating the path within the current directory. Next, we combine the path with the file name, and then we tell PySpark to infer column data types, and we indicate that our dataset contains column names.

In [214]:
# Import the dataset
path = 'Datasets/'
ss_2008_data_raw = spark.read.csv(path + 'ss_2008_census_data.csv', inferSchema = True, header = True)

We display the first 10 rows of our dataset with the show() method. And we also indicate that we want PySpark to show column contents without truncating them.

In [215]:
# Inspect the first 5 rows of the original dataset
ss_2008_data_raw.show(10, truncate = False)
+------+-----------+-----------------+--------+--------------------------+------+--------+-----+-------+------+
|Region|Region Name|Region - RegionId|Variable|Variable Name             |Age   |Age Name|Scale|Units  |2008  |
+------+-----------+-----------------+--------+--------------------------+------+--------+-----+-------+------+
|KN.A2 |Upper Nile |SS-NU            |KN.B2   |Population, Total (Number)|KN.C1 |Total   |units|Persons|964353|
|KN.A2 |Upper Nile |SS-NU            |KN.B2   |Population, Total (Number)|KN.C2 |0 to 4  |units|Persons|150872|
|KN.A2 |Upper Nile |SS-NU            |KN.B2   |Population, Total (Number)|KN.C3 |5 to 9  |units|Persons|151467|
|KN.A2 |Upper Nile |SS-NU            |KN.B2   |Population, Total (Number)|KN.C4 |10 to 14|units|Persons|126140|
|KN.A2 |Upper Nile |SS-NU            |KN.B2   |Population, Total (Number)|KN.C5 |15 to 19|units|Persons|103804|
|KN.A2 |Upper Nile |SS-NU            |KN.B2   |Population, Total (Number)|KN.C6 |20 to 24|units|Persons|82588 |
|KN.A2 |Upper Nile |SS-NU            |KN.B2   |Population, Total (Number)|KN.C7 |25 to 29|units|Persons|76754 |
|KN.A2 |Upper Nile |SS-NU            |KN.B2   |Population, Total (Number)|KN.C8 |30 to 34|units|Persons|63134 |
|KN.A2 |Upper Nile |SS-NU            |KN.B2   |Population, Total (Number)|KN.C9 |35 to 39|units|Persons|56806 |
|KN.A2 |Upper Nile |SS-NU            |KN.B2   |Population, Total (Number)|KN.C10|40 to 44|units|Persons|42139 |
+------+-----------+-----------------+--------+--------------------------+------+--------+-----+-------+------+
only showing top 10 rows

Next, we inspect the Schema (or column data types), and we see that all the columns are of string data type; however, the last column, '2008', which is the population column, should be an integer. So we'll change it later.

In [216]:
# Inspect column data types
print(ss_2008_data_raw.printSchema())
root
 |-- Region: string (nullable = true)
 |-- Region Name: string (nullable = true)
 |-- Region - RegionId: string (nullable = true)
 |-- Variable: string (nullable = true)
 |-- Variable Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Age Name: string (nullable = true)
 |-- Scale: string (nullable = true)
 |-- Units: string (nullable = true)
 |-- 2008: string (nullable = true)
None

Selecting the Columns¶

Below, we select the columns of interest with the select() method. Next, we inspect the updated dataset with the show() function and toPandas() method. The toPandas() method shows the dataset in a Python format; however, it's computationally expensive and should be avoided unless necessary.

In [217]:
# Select the columns to focus on
ss_2008_data_cleaned = ss_2008_data_raw.select('Region Name', 'Variable Name', 'Age Name', '2008')
# Inspect the updated dataset with the show() function
ss_2008_data_cleaned.show(5, truncate = False)
+-----------+--------------------------+--------+------+
|Region Name|Variable Name             |Age Name|2008  |
+-----------+--------------------------+--------+------+
|Upper Nile |Population, Total (Number)|Total   |964353|
|Upper Nile |Population, Total (Number)|0 to 4  |150872|
|Upper Nile |Population, Total (Number)|5 to 9  |151467|
|Upper Nile |Population, Total (Number)|10 to 14|126140|
|Upper Nile |Population, Total (Number)|15 to 19|103804|
+-----------+--------------------------+--------+------+
only showing top 5 rows
In [218]:
# Inspect the first 5 rows with the toPandas() method 
ss_2008_data_cleaned.limit(5).toPandas()
Out[218]:
Region NameVariable NameAge Name2008
0Upper NilePopulation, Total (Number)Total964353
1Upper NilePopulation, Total (Number)0 to 4150872
2Upper NilePopulation, Total (Number)5 to 9151467
3Upper NilePopulation, Total (Number)10 to 14126140
4Upper NilePopulation, Total (Number)15 to 19103804

Changing Column Data Types¶

To convert data from one data type to another, we need to import data types from pyspark.sql.types and SQL functions from pyspark.sql.functions. Here, we are using the wildcard, *.

In [219]:
# Import SQL data types and functions
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Change the population column data type to the integer data type 
# we use the backslash to break a long line of code
ss_2008_data_cleaned = ss_2008_data_cleaned.withColumn('2008', \
                                      ss_2008_data_cleaned['2008'].cast(IntegerType()))
# View the Schema
print(ss_2008_data_cleaned.printSchema())
root
 |-- Region Name: string (nullable = true)
 |-- Variable Name: string (nullable = true)
 |-- Age Name: string (nullable = true)
 |-- 2008: integer (nullable = true)
None
In [220]:
# Inspect the first 5 rows with the toPandas() method 
ss_2008_data_cleaned.limit(5).toPandas()
Out[220]:
Region NameVariable NameAge Name2008
0Upper NilePopulation, Total (Number)Total964353
1Upper NilePopulation, Total (Number)0 to 4150872
2Upper NilePopulation, Total (Number)5 to 9151467
3Upper NilePopulation, Total (Number)10 to 14126140
4Upper NilePopulation, Total (Number)15 to 19103804

Renaming Columns with the withColumnRenamed()¶

Here we rename our columns of interest with the withColumnRenamed() method. Please note, you need to chain several withColumnRenamed() calls together to rename multiple columns. Call the withColumnRenamed(), enter the old name, and enter the new name to rename a column. For example, in the below chunk, Region Name is the old column name, and its new name is State.

In [221]:
# Renaming columns
ss_2008_data_cleaned = ss_2008_data_cleaned.withColumnRenamed('Region Name', 'State')\
.withColumnRenamed('2008','Population').withColumnRenamed('Variable Name', 'Gender')\
.withColumnRenamed('Age Name', 'Age Category')
# View the first 4 rows with the toPandas()
ss_2008_data_cleaned.limit(4).toPandas()
Out[221]:
StateGenderAge CategoryPopulation
0Upper NilePopulation, Total (Number)Total964353
1Upper NilePopulation, Total (Number)0 to 4150872
2Upper NilePopulation, Total (Number)5 to 9151467
3Upper NilePopulation, Total (Number)10 to 14126140

Checking Row and Column Counts¶

We display the number of rows and the number of columns with the print() function.

In [236]:
# Inspect the number of rows and columns
print('Your dataset has', ss_2008_data_cleaned.count(), 'rows and', len(ss_2008_data_cleaned.columns), 'columns.')
Your dataset has 453 rows and 4 columns.

Removing NAs¶

Here we remove nas with the na.drop() method. However, it is imperative to be careful when dropping nas as this may negatively impact your data.

In [223]:
# Drop the rows with nas or missing values
ss_2008_census_df = ss_2008_data_cleaned.na.drop()
ss_2008_census_df.limit(5).toPandas()
Out[223]:
StateGenderAge CategoryPopulation
0Upper NilePopulation, Total (Number)Total964353
1Upper NilePopulation, Total (Number)0 to 4150872
2Upper NilePopulation, Total (Number)5 to 9151467
3Upper NilePopulation, Total (Number)10 to 14126140
4Upper NilePopulation, Total (Number)15 to 19103804
In [239]:
# Verify the dimensions of the dataset
print('Your dataset has', ss_2008_census_df.count(), 'rows and', len(ss_2008_census_df.columns), 'columns.')
Your dataset has 300 rows and 4 columns.

Splitting Column Values with the Split()¶

Earlier, we saw that the Gender column is a 3-part column; comprising Population, 'Total/Female/Male', and (Number). However, we are only interested in the male and female rows. So, we clean this column by splitting it with the split() method and then retain the middle potion, or the second index (1).

In [225]:
# Transform the gender column with the split() method
ss_2008_census_df = ss_2008_census_df.withColumn('gender', split(ss_2008_census_df['Gender'], ' ').getItem(1))
# Inspect the first 10 rows
ss_2008_census_df.show(10, truncate=False)
+----------+------+------------+----------+
|State     |gender|Age Category|Population|
+----------+------+------------+----------+
|Upper Nile|Total |Total       |964353    |
|Upper Nile|Total |0 to 4      |150872    |
|Upper Nile|Total |5 to 9      |151467    |
|Upper Nile|Total |10 to 14    |126140    |
|Upper Nile|Total |15 to 19    |103804    |
|Upper Nile|Total |20 to 24    |82588     |
|Upper Nile|Total |25 to 29    |76754     |
|Upper Nile|Total |30 to 34    |63134     |
|Upper Nile|Total |35 to 39    |56806     |
|Upper Nile|Total |40 to 44    |42139     |
+----------+------+------------+----------+
only showing top 10 rows
In [226]:
# Filter the gender column to keep only the rows with 'female' and 'male' in them
ss_2008_census_df = ss_2008_census_df.filter(ss_2008_census_df['gender'] != "Total")
# Inspect the first 10 rows
ss_2008_census_df.show(10, truncate=False)
+----------+------+------------+----------+
|State     |gender|Age Category|Population|
+----------+------+------------+----------+
|Upper Nile|Male  |Total       |525430    |
|Upper Nile|Male  |0 to 4      |82690     |
|Upper Nile|Male  |5 to 9      |83744     |
|Upper Nile|Male  |10 to 14    |71027     |
|Upper Nile|Male  |15 to 19    |57387     |
|Upper Nile|Male  |20 to 24    |42521     |
|Upper Nile|Male  |25 to 29    |38795     |
|Upper Nile|Male  |30 to 34    |32236     |
|Upper Nile|Male  |35 to 39    |30228     |
|Upper Nile|Male  |40 to 44    |22290     |
+----------+------+------------+----------+
only showing top 10 rows
In [240]:
# Re-inspect the number of rows and the number of columns
print('Your dataset has',  ss_2008_census_df.count(), 'rows and', len(ss_2008_census_df.columns),'columns.')
Your dataset has 300 rows and 4 columns.

Selecting Row Values with the filter()¶

Below, we remove the rows with the 'Total' in them.

In [228]:
# Modify the Age Category column
ss_2008_census_df_1 = ss_2008_census_df.filter(ss_2008_census_df['Age Category'] != "Total")
ss_2008_census_df_1.show(10, truncate=False)
+----------+------+------------+----------+
|State     |gender|Age Category|Population|
+----------+------+------------+----------+
|Upper Nile|Male  |0 to 4      |82690     |
|Upper Nile|Male  |5 to 9      |83744     |
|Upper Nile|Male  |10 to 14    |71027     |
|Upper Nile|Male  |15 to 19    |57387     |
|Upper Nile|Male  |20 to 24    |42521     |
|Upper Nile|Male  |25 to 29    |38795     |
|Upper Nile|Male  |30 to 34    |32236     |
|Upper Nile|Male  |35 to 39    |30228     |
|Upper Nile|Male  |40 to 44    |22290     |
|Upper Nile|Male  |45 to 49    |18163     |
+----------+------+------------+----------+
only showing top 10 rows
In [242]:
# Re-inspect the number of rows and columns
print('Your dataset has', ss_2008_census_df_1.count(), 'rows and', len(ss_2008_census_df_1.columns), 'columns.') 
Your dataset has 280 rows and 4 columns.

Replacing Column Values¶

In the below chunk, we regroup the Age Category values to reduce the number of classes. While there are multiple methods for transforming column values, we opted to use the PySpark replace() method.

In [230]:
# Combine Age Category values
ss_2008_census_final_df = ss_2008_census_df_1.replace(['0 to 4',
                                                     '5 to 9',
                                                     '10 to 14',
                                                     '15 to 19',
                                                     '20 to 24',
                                                     '25 to 29',
                                                     '30 to 34',
                                                     '35 to 39',
                                                     '40 to 44',
                                                     '45 to 49',
                                                     '50 to 54',
                                                     '55 to 59',
                                                     '60 to 64',
                                                     '65+'], 
                                                    ['0-9', '0-9',
                                                    '10-19', '10-19',
                                                     '20-29', '20-29',
                                                     '30-39', '30-39',
                                                     '40-49', '40-49',
                                                     '50-59', '50-59',
                                                     '60+', '60+'
                                                    ], 'Age Category')

# Inspect the first 5 rows
ss_2008_census_final_df.show(5, False)
+----------+------+------------+----------+
|State     |gender|Age Category|Population|
+----------+------+------------+----------+
|Upper Nile|Male  |0-9         |82690     |
|Upper Nile|Male  |0-9         |83744     |
|Upper Nile|Male  |10-19       |71027     |
|Upper Nile|Male  |10-19       |57387     |
|Upper Nile|Male  |20-29       |42521     |
+----------+------+------------+----------+
only showing top 5 rows

Converting Column Values into a List¶

In [231]:
ss_2008_census_final_df.select('Age Category').distinct().collect()
Out[231]:
[Row(Age Category='30-39'),
 Row(Age Category='0-9'),
 Row(Age Category='20-29'),
 Row(Age Category='60+'),
 Row(Age Category='10-19'),
 Row(Age Category='40-49'),
 Row(Age Category='50-59')]
In [243]:
# Re-inspect the number of rows and the number of columns
print('Your dataset has', ss_2008_census_final_df.count(), 'rows and', len(ss_2008_census_final_df.columns), 'columns.')
Your dataset has 280 rows and 4 columns.

Summarizing the Dataset¶

In the next three chunks, we group the data by various columns, compute the sum of the population column, and then display the results. We will accomplish this using the groupBy(), agg(), and orderBy() methods.

In [233]:
# Compute the state totals
state_totals = ss_2008_census_final_df.groupBy("State")\
.agg(sum("Population").alias('Total Population')).orderBy(col('Total Population').desc())
# Display the results
state_totals.show(truncate = False)
+-----------------------+----------------+
|State                  |Total Population|
+-----------------------+----------------+
|Jonglei                |1358602         |
|Central Equatoria      |1103557         |
|Warrap                 |972928          |
|Upper Nile             |964353          |
|Eastern Equatoria      |906161          |
|Northern Bahr el Ghazal|720898          |
|Lakes                  |695730          |
|Western Equatoria      |619029          |
|Unity                  |585801          |
|Western Bahr el Ghazal |333431          |
+-----------------------+----------------+
In [234]:
# Compute the state totals by gender
state_totals_by_gender = ss_2008_census_final_df.groupBy('State', 'Gender')\
.agg(sum("Population").alias('Population')).orderBy(col('Population').desc())
state_totals
# Display the results 
state_totals_by_gender.show(truncate = False)
+-----------------------+------+----------+
|State                  |Gender|Population|
+-----------------------+------+----------+
|Jonglei                |Male  |734327    |
|Jonglei                |Female|624275    |
|Central Equatoria      |Male  |581722    |
|Upper Nile             |Male  |525430    |
|Central Equatoria      |Female|521835    |
|Warrap                 |Female|502194    |
|Warrap                 |Male  |470734    |
|Eastern Equatoria      |Male  |465187    |
|Eastern Equatoria      |Female|440974    |
|Upper Nile             |Female|438923    |
|Northern Bahr el Ghazal|Female|372608    |
|Lakes                  |Male  |365880    |
|Northern Bahr el Ghazal|Male  |348290    |
|Lakes                  |Female|329850    |
|Western Equatoria      |Male  |318443    |
|Western Equatoria      |Female|300586    |
|Unity                  |Male  |300247    |
|Unity                  |Female|285554    |
|Western Bahr el Ghazal |Male  |177040    |
|Western Bahr el Ghazal |Female|156391    |
+-----------------------+------+----------+
In [235]:
# Compute the state totals by gender and age category
state_totals_by_gender_n_age = ss_2008_census_final_df.groupBy('State', 'Gender', 'Age Category')\
.agg(sum("Population").alias('Population')).orderBy(col('Population').desc(), col('State'))
# Print the results
state_totals_by_gender_n_age.show(truncate = False)
+-----------------------+------+------------+----------+
|State                  |Gender|Age Category|Population|
+-----------------------+------+------------+----------+
|Jonglei                |Male  |0-9         |236691    |
|Jonglei                |Female|0-9         |185854    |
|Jonglei                |Male  |10-19       |182491    |
|Central Equatoria      |Male  |0-9         |169365    |
|Warrap                 |Male  |0-9         |168004    |
|Upper Nile             |Male  |0-9         |166434    |
|Warrap                 |Female|0-9         |161900    |
|Central Equatoria      |Female|0-9         |155756    |
|Jonglei                |Female|10-19       |143194    |
|Eastern Equatoria      |Male  |0-9         |140458    |
|Central Equatoria      |Male  |10-19       |139570    |
|Upper Nile             |Female|0-9         |135905    |
|Eastern Equatoria      |Male  |10-19       |133946    |
|Northern Bahr el Ghazal|Male  |0-9         |133311    |
|Upper Nile             |Male  |10-19       |128414    |
|Northern Bahr el Ghazal|Female|0-9         |128164    |
|Central Equatoria      |Female|10-19       |127336    |
|Eastern Equatoria      |Female|0-9         |126538    |
|Eastern Equatoria      |Female|10-19       |117104    |
|Jonglei                |Female|20-29       |116624    |
+-----------------------+------+------------+----------+
only showing top 20 rows

Closing Remarks¶

This article has just scratched the surface of Data wrangling with PySpark, but we hope it will help you get started in PySpark and data science in general.

With that said, please follow me on Twitter @tongakuot, LinkedIn @tongakuot, and GitHub @alierwai for more data science, Python, R, statistics, mathematics, PySpark, and Shiny tutorials, and articles.

Acknowledgements¶

We are grateful to Layla AI and Mike Cohen for their phenomenal courses on PySpark and Master Math by Coding in Python, respectively.

Tag:PySpark, Python

  • Share:
Alier Reng
Alier Reng

A data science professional, statistician, and educator with over 5-year experience in healthcare analytics and more than seven years of university teaching experience. A passionate and results-oriented data scientist with a proven record of project completion with efficiency and speed.

Holds a B.S.in Neuroscience from the University of Texas, an M.S. in professional science with a concentration in Biostatistics from Middle Tennessee State University, and an MBA in Information Systems Management from LeTourneau University.

Areas of interest include data science, statistics, R Shiny, and deep learning.

Previous post

Weekly Devotion
July 14, 2021

Next post

Marrying-R-with-Python-1
July 20, 2021

You may also like

pandas_method_chaining_thumbnail
Method Chaining in Pandas
12 September, 2021
unnamed-chunk-20-1
Marrying-R-with-Python-1
20 July, 2021
mercy_award_2020
The Living Dead: Walking in the glory of my loving God
10 June, 2021

Search

Categories

  • Blog
  • Data Science
  • General
  • JIT Weekly Devotion
Getting Started in R

Getting Started in R

Coming soon

Jonglei Institute

Jonglei Institute prides itself as the FIRST South Sudanese online educational platform, leading the way in training the next generation of South Sudan data analysts and data scientists.

© 2020
Jonglei Institute of Technology, Inc. All rights reserved.

COMPANY

  • BLOG
  • ABOUT US
  • CONTACT US
  • FAMILY FIRST
  • BECOME A TEACHER

USEFUL LINKES

  • FAQS
  • COURSES
  • SITEMAP
  • PRIVACY POLICY

JIT NEWSLETTER

Get the latest JIT news delivered to your inbox.
CONNECT WITH US
Twitter Linkedin-in Youtube Github

Login with your site account

Connect with

Login with Google Login with Linkedin Login with Amazon


Lost your password?

We use cookies on our website to give you the most relevant experience by remembering your preferences and repeat visits. By clicking “Accept All”, you consent to the use of ALL the cookies. However, you may visit "Cookie Settings" to provide a controlled consent.
Cookie SettingsAccept All
Manage consent

Privacy Overview

This website uses cookies to improve your experience while you navigate through the website. Out of these, the cookies that are categorized as necessary are stored on your browser as they are essential for the working of basic functionalities of the website. We also use third-party cookies that help us analyze and understand how you use this website. These cookies will be stored in your browser only with your consent. You also have the option to opt-out of these cookies. But opting out of some of these cookies may affect your browsing experience.
Necessary
Always Enabled
Necessary cookies are absolutely essential for the website to function properly. These cookies ensure basic functionalities and security features of the website, anonymously.
CookieDurationDescription
cookielawinfo-checkbox-analytics11 monthsThis cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Analytics".
cookielawinfo-checkbox-functional11 monthsThe cookie is set by GDPR cookie consent to record the user consent for the cookies in the category "Functional".
cookielawinfo-checkbox-necessary11 monthsThis cookie is set by GDPR Cookie Consent plugin. The cookies is used to store the user consent for the cookies in the category "Necessary".
cookielawinfo-checkbox-others11 monthsThis cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Other.
cookielawinfo-checkbox-performance11 monthsThis cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Performance".
viewed_cookie_policy11 monthsThe cookie is set by the GDPR Cookie Consent plugin and is used to store whether or not user has consented to the use of cookies. It does not store any personal data.
Functional
Functional cookies help to perform certain functionalities like sharing the content of the website on social media platforms, collect feedbacks, and other third-party features.
Performance
Performance cookies are used to understand and analyze the key performance indexes of the website which helps in delivering a better user experience for the visitors.
Analytics
Analytical cookies are used to understand how visitors interact with the website. These cookies help provide information on metrics the number of visitors, bounce rate, traffic source, etc.
Advertisement
Advertisement cookies are used to provide visitors with relevant ads and marketing campaigns. These cookies track visitors across websites and collect information to provide customized ads.
Others
Other uncategorized cookies are those that are being analyzed and have not been classified into a category as yet.
SAVE & ACCEPT