Menu Close

PySpark DataFrame Tutorial for Beginners

PySpark DataFrame Tutorial for Beginners

In this tutorial, You will learn everything about the PySpark DataFrame with the help of multiple examples, and believe me after reading this article, you wouldn’t need to go anywhere to learn about the PySpark DataFrame. DataFrame is one of the major Data structures in PySpark after RDD which is used to store the data in the form of rows and columns.

As a Data Scientist, Data Engineer, and Data Analyst enthusiast, you are probably familiar with storing files on the local computer and loading that files using Pandas or PySpark, and finally performing some operations on top of that data.

This is a very tedious task when we have large files which stored trillion records and sizes in terabytes and petabytes.
In that scenario, it is not possible to load and process such type of extremely large files.

To overcome such kind of problem, Distributed processing engines like Apache Spark and Hadoop come into the picture. I think, you are familiar with Pandas DataFrame, Pandas DataFrame is one of the widest data used structures to load and process large-scale datasets but Panadas DataFrame has some limitations which I have described in Pandas DataFrame vs PySpark DataFrame section.

Before going through this article, Let’s take a little bit overview of Spark.

What is Apache Spark?

Apache Spark is an open-source and unified data analytics engine for large-scale data processing. It has the capability to load large-scale datasets and perform Data engineering, Machine learning, and Data Science on single-node machines or cluster computing, or distributed computing. Apache Spark is written in Scala programming language but now it is available for various programming languages like Python, Java, R, etc.

These are some major features of Apache Spark that make it more popular and easy to use.

  • Batch/Streaming Data:- We can perform batch processing or streaming processing. The difference between batch processing and streaming processing is that In batch processing data comes to perform processing periodically but in streaming processing data comes continuously to perform processing. We can use our preferred language to process that data.
  • SQL Analytics:- Apache Spark also allows us to perform SQL queries to get the reporting for dashboarding.
  • Machine learning:- Spark provides a module called MLlib in order to perform machine learning operations.

Let’s move on to PySark.

What is PySpark?

PySpark is nothing but it is an interface written in Python programming just like other Python packages in order to interact with Apache Spark. Using PySpark APIs our application can use all the functionalities of Apache Spark in order to process large-scale datasets and perform operations on top of loaded datasets.

Note:- Interface means, A point where two systems meet to communicate with each other.

What is PySpark DataFrame?

DataFrame in PySpark is a kind of data structure that is used to store data in the form of rows and columns. In simple words, PySpark DataFrame is the same as a table in Relational Database Management Systems ( RDBMS ) or SQL. As we know, DataFrame in PySpark is the same as SQL table, that’s why we can perform approx all the SQL queries on top of PySpqrk DataFrame.

PySpark DataFrame is immutable means it can’t change, every time a new data frame will get created whenever a new transformation method will apply. It is also lazy evaluation in nature means it is built on top of PySpark RDD ( Resilient Distributed Datasets ).

The meaning of Lazy evaluation is that when we apply any transformation in the DataFrame it does not execute immediately however it plans to execute later.

When any action such as count(), collect(), show(), etc applies explicitly, then it returns the computed value or executed.
PySpark DataFrame runs in multiple nodes in a cluster or distributed computing. Any operation on DataFrame will perform parallel on multiple machines in a cluster.

For example, I have created a simple PySpark Dataframe having five rows and five columns as you can see below. Later we will see how to perform transformation and action on top of DataFrame.

+---------+----------+---------+--------------------+
|emp_id   |first_name|last_name|salary              |
+---------+----------+---------+--------------------+
|Vishvajit|Rao       |India    |Backend Developer   |
|Ajay     |Kumar     |India    |Front-end Developer |
|John     |Doe       |USA      |Data Engineer       |
|Vedika   |Kumari    |India    |Data Analyst        |
|Bharati  |Singh     |India    |Full Stack Developer|
+---------+----------+---------+--------------------+

PySpark DataFrame vs Pandas DataFrame

PySpark DataFrame and Pandas DataFrame are both kinds of data structures in order to store value in the form of rows and columns.

In other words, You can say, an It DataFrame is a kind of data structure that is used to store values in tabular format just like a table in SQL.

Here, I have described the difference between both PySpark DataFrame and Pandas DataFrame.

PySpark DataFrame:

  • PySpark DataFrame is a kind of data structure in PySpark that stores data in the form of a table like SQL database.
  • PySpark DataFrame supports all SQL queries.
  • PySpark Dtaframe runs on multiple nodes in a cluster.
  • It can handle large datasets.
  • PySpark can also use for Data Science, Machine Learning, and Data Engineering.
  • PySpark DataFrame is immutable which means it can not be changed when it has been created.
  • It performs the computation in memory ( RAM ).
  • PySpark DataFrame is lazy in nature which means it does not execute immediately after applying the transformation. It will execute when an action is performed.
  • PySpark DataFrame can run applications parallel means in multiple nodes in a cluster or even on a single node.
  • PySpark DataFrame assures fault tolerance means it is the capability of the system to work properly even if any sub-component of the system gets fails.

Pandas DataFrame:

  • Pandas DataFrame is also a kind of data structure that stores data like a table.
  • Pandas DataFrame does not support parallelism.
  • Pandas DataFrame un on only single machine or single node.
  • Pandas DataFrame works slowly in large datasets.
  • DataFrame in Pandas is a mutable data structure which means we can change it, once it has been created.
  • Pandas DataFrame executes immediately unlike PySpark DataFrame.
  • Pandas DataFrame does not support fault tolerance.
  • Pandas DataFrame is also used for Data Engineering, Data Analytics, Data Science, and Machine Learning.
  • It is perfect for small datasets.

How to Create PySpark Dataframe?

We can create PySpark DataFrame in many multiple ways. Here we are about some ways from them in order to create DataFrame.
To create DataFrame in PySpark, you have to follow some steps which are given below.

Creating Spark Session

Spark session is an entry point for any Pyspark or Spark application which allows us to work with PySpark RDD, DataFrame, and Datasets. PySpark SQL module has a class called SparkSession and it has a builder attribute that is used to create spark session. To create a spark session we have to import SparkSession class from pyspark.sql module.

As you can see below code, How have I created a spark session?

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("testing").getOrCreate()

In the above code, spark represents the name of the spark session, and SparkSession.builder is used to initiating the spark session and appName() is used to provide the name of the spark application, and the getOrCreate() method is used to return the existing spark session or create one if any spark session is not available.

Creating DataFrame

Here, I have shown multiple ways to create PySpark DataFrame.

Creating PySpark DataFrame from a list of tuples:

Now we have a spark session called spark that has a method called createDataFrame() to create a brand new DataFrame. The createDataFrame() method takes different parameters to create a new DataFrame like PySpark RDD, Pandas DataFrame, PySpark Row, Tuple, Data List, etc.

from pyspark.sql import SparkSession
data = [
    ('E1001', 'Vishvajit', 'Rao', 20000, 'IT'),
    ('E1002', 'Harsh', 'Kumar', 15000, 'HR'),
    ('E1003', 'Pankaj', 'Singh', 30000, 'IT'),
    ('E1004', 'Pratik', 'Kumar', 18000, 'HR'),
    ('E1005', 'Harshali', 'Kumari', 23000, 'IT'),
    ('E1006', 'Vinay', 'Saini', 13000, 'Account'),
    ('E1007', 'Pratiksha', 'Kaushik', 30000, 'HR'),
    ('E1008', 'Shailja', 'Srivastava', 30000, 'Account'),
    ('E1009', 'Vaibhav', 'Pathak', 32000, 'IT'),
    ('E10010', 'Harnoor', 'Singh', 50000, 'IT'),
    ('E10011', 'Vedika', 'Kumari', 40000, 'Account'),

]

# column
column = ['emp_id', 'first_name', 'last_name', 'salary', 'department']
# creating spark session
spark = SparkSession.builder.appName("testing").getOrCreate()

# creating DataFrame
df = spark.createDataFrame(data, column)
df.show()

When the above code will execute, the output will be like this.

+------+----------+----------+------+----------+
|emp_id|first_name|last_name |salary|department|
+------+----------+----------+------+----------+
|E1001 |Vishvajit |Rao       |20000 |IT        |
|E1002 |Harsh     |Kumar     |15000 |HR        |
|E1003 |Pankaj    |Singh     |30000 |IT        |
|E1004 |Pratik    |Kumar     |18000 |HR        |
|E1005 |Harshali  |Kumari    |23000 |IT        |
|E1006 |Vinay     |Saini     |13000 |Account   |
|E1007 |Pratiksha |Kaushik   |30000 |HR        |
|E1008 |Shailja   |Srivastava|30000 |Account   |
|E1009 |Vaibhav   |Pathak    |32000 |IT        |
|E10010|Harnoor   |Singh     |50000 |IT        |
|E10011|Vedika    |Kumari    |40000 |Account   |
+------+----------+----------+------+----------+

Creating PySpark DataFrame using PySpark Row:

PySpark provides a Row class which is defined inside pyspark.sql package. The Row class is used to create a single record of the DataFrame. It takes column name and their value as parameters and returns a Row class object.

The createDataFrame() is also used to use create DataFrame from Row class objects.

Let’s how can we create PySpark DataFrame with the help of the PySpark Row class.

from pyspark.sql import SparkSession
from pyspark.sql import Row


student_name = ['John', 'Rambo', 'Kaushik', 'Jatri']
student_age = [23, 25, 24, 30]

spark = SparkSession.builder.appName("testing").getOrCreate()

rows = []

# creating List of Row object
for i in zip(student_name, student_age):
    rows.append(Row(name=i[0], age=i[1]))

# creating dataframe
df = spark.createDataFrame(data=rows)
df.show()

The newly created data frame will look like this.

+-------+---+
|   name|age|
+-------+---+
|   John| 23|
|  Rambo| 25|
|Kaushik| 24|
|  Jatri| 30|
+-------+---+

Creating DataFrame using Pandas DataFrame:

As we know the spark session createDataFrame() method takes multiple parameters in order to create a new DataFrame, Pandas DataFrame DataFrame is also one of them.
Here, I have created a Pandas DataFrame and then created PySpark DataFrame with the help of the Pandas DataFrame.

from pyspark.sql import SparkSession
from pandas import DataFrame

data = [
    ('E1001', 'Vishvajit', 'Rao', 20000, 'IT'),
    ('E1002', 'Harsh', 'Kumar', 15000, 'HR'),
    ('E1003', 'Pankaj', 'Singh', 30000, 'IT'),
    ('E1004', 'Pratik', 'Kumar', 18000, 'HR'),
    ('E1005', 'Harshali', 'Kumari', 23000, 'IT'),
    ('E1006', 'Vinay', 'Saini', 13000, 'Account'),
    ('E1007', 'Pratiksha', 'Kaushik', 30000, 'HR'),
    ('E1008', 'Shailja', 'Srivastava', 30000, 'Account'),
    ('E1009', 'Vaibhav', 'Pathak', 32000, 'IT'),
    ('E10010', 'Harnoor', 'Singh', 50000, 'IT'),
    ('E10011', 'Vedika', 'Kumari', 40000, 'Account'),

]

# column
column = ['emp_id', 'first_name', 'last_name', 'salary', 'department']

# creating spark session
spark = SparkSession.builder.appName("testing").getOrCreate()

# creating pandas dataframe
pandas_df = DataFrame(data, columns=column)

# creating pyspark dataframe using pandas df
df = spark.createDataFrame(pandas_df)
df.show()

As you can see in the above code, How have I created PySpark DataFrame with the help of the Pandas DataFrame?

+------+----------+----------+------+----------+
|emp_id|first_name|last_name |salary|department|
+------+----------+----------+------+----------+
|E1001 |Vishvajit |Rao       |20000 |IT        |
|E1002 |Harsh     |Kumar     |15000 |HR        |
|E1003 |Pankaj    |Singh     |30000 |IT        |
|E1004 |Pratik    |Kumar     |18000 |HR        |
|E1005 |Harshali  |Kumari    |23000 |IT        |
|E1006 |Vinay     |Saini     |13000 |Account   |
|E1007 |Pratiksha |Kaushik   |30000 |HR        |
|E1008 |Shailja   |Srivastava|30000 |Account   |
|E1009 |Vaibhav   |Pathak    |32000 |IT        |
|E10010|Harnoor   |Singh     |50000 |IT        |
|E10011|Vedika    |Kumari    |40000 |Account   |
+------+----------+----------+------+----------+

Creating PySpark DataFrame from RDD:

RDD (Resilient Distributed Datasets) is also a kind of data structure in PySpark, We can also create PySpark DataFrame with the help of the RDD.
There are two ways to create PySpark DataFrame using RDD, First, we can pass created RDD inside the spark session createDataFrame() method, and second, use RDD toDF() method.

First ways:

In this way, I have created PySpark DataFrame with the help of the createDataFrame() method. I have just passed the created RDD along with the new column name in the createDataFrame() method.

data = [
    ('Rambo', 1100, 'BCA'),
    ('Shital', 1101, 'BTech'),
    ('Harrry', 1102, 'MCA'),
    ('Pankaj', 1103, 'MTech'),
    ('Jayanti', 1104, 'PHD'),


]
# columns name
column = ['student_name', 'roll_number', 'course']

# creating spark session
spark = SparkSession.builder.appName("testing").getOrCreate()

# creating spark context
sc = spark.sparkContext

# creating rdd
rdd = sc.parallelize(data)
pprint(rdd.collect())

# creating pyspark dataframe using rdd
df = spark.createDataFrame(rdd, column)
df.show()

Second Ways:

The second way of creating DataFrame is, by using RDD toDF() method. The RDD toDF() method takes schema or column names as a parameter and returns a new DataFrame for RDD.

rdd.toDF(column).show()

The newly created PySpark DataFrame will be like this.

+------------+-----------+------+
|student_name|roll_number|course|
+------------+-----------+------+
|       Rambo|       1100|   BCA|
|      Shital|       1101| BTech|
|      Harrry|       1102|   MCA|
|      Pankaj|       1103| MTech|
|     Jayanti|       1104|   PHD|
+------------+-----------+------+

Create Empty DataFrame in PySpark

In this section, I am going to create a PySpark DataFrame with no data or records. There are multiple ways to create an empty DataFrame in PySpark.You will see all of them.

Create empty DataFrame with emptyRDD():

The emptyRDD() is a spark context method that is used to create an empty RDD. After creating an empty RDD, we can pass it in the createDataFrame() method in order to create an empty DataFrame.

columns = StructType([])
rdd = spark.sparkContext.emptyRDD()
df = spark.createDataFrame(rdd, columns)
df.show()

Output

++
||
++
++

Create an empty data frame with an empty list

We can also use the spark session createDataFrame() method along with an empty list in order to create an empty PySpark DataFrame.

columns = StructType([])
df = spark.createDataFrame(data=[], schema=columns)
df.show()

Create empty DataFrame with schema

You can also create an empty PySpark DataFrame along with schema columns or schema.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, StructField, StructType


columns = StructType(
   [
    StructField("name", StringType(), nullable=True),
    StructField("roll_number", IntegerType(), nullable=True),
    StructField("course", StringType(), nullable=True),
    StructField("marks", IntegerType(), nullable=True),
   ]
    )

df = spark.createDataFrame(data=[], schema=columns)
df.show()

Output

+----+-----------+------+-----+
|name|roll_number|course|marks|
+----+-----------+------+-----+
+----+-----------+------+-----+

So, this is how you can create an empty DataFrame in PySpark.

Creating DataFrame from Data Sources

So far we have seen how to create PySpark DataFrame with the help of the list of tuples, RDD, and Pandas DataFrame, but in real-life projects, mostly you will load data from data sources like CSV files, JSON files, TXT files, Data warehouse or any database management system like MySQL, SQL Server, etc.

You don’t need to import any external libraries to work with these data sources because PySpark supports all these data sources by default.

Here, I am about to tell you how to load data from CSV, JSON, and TXT files into PySpark DataFrame with the help of the example.

loading data from CSV file:

I have created a CSV file named employee.csv having some records as you can see below. Now, I am about to load this employee.csv file into pySpark DataFrame.

name,designation,country
Hayati,Developer,India
Vishvajit,Developer,India
Vaibhav,Tester,India
Shital,HR,USA

Pyaprk provides a DataFrameReader class and it has various methods to load data into PySpark, The csv() method is one of the. This method takes a path of the CSV file as a parameter and loads CSV data into the PySpark DataFrame.

# creating spark session
spark = SparkSession.builder.appName("testing").getOrCreate()

df = spark.read.option('header', True).csv('employee.csv')
df.show()

After executing the above code, The above employee.csv will be like this in PySpark DataFrame.

+---------+-----------+-------+
|     name|designation|country|
+---------+-----------+-------+
|   Hayati|  Developer|  India|
|Vishvajit|  Developer|  India|
|  Vaibhav|     Tester|  India|
|   Shital|         HR|    USA|
+---------+-----------+-------+

Creating PySpark DataFrame from JSON file:

As a developer, sometimes we have to deal with JSON files, PySpark also has the capability to load JSON file data into PySpark DataFrame.
I have created a JSON file that contained the country name and their capital and Now I want to load data into DataFrame.

{"capital": "New Delhi","country_name": "India", }
{"capital": "Washington D.C.","country_name": "United States", }
{"capital": "Ottawa","country_name": "Canada",}
{"capital": "Beijing","country_name": "China",}

To load JSON data into PySpark DataFrame, I have used the json() method. The json() method takes the path of the JSON file and parameter and returns a new DataFrame.

df = spark.read.json('employee.json')
df.show()

The JSON data will be like this in DataFrame format.

+---------------+-------------+
|        capital| country_name|
+---------------+-------------+
|      New Delhi|        India|
|Washington D.C.|United States|
|         Ottawa|       Canada|
|        Beijing|        China|
+---------------+-------------+

Creating DataFrame from TXT file:

Similarly, with CSV, and JSON files we can also load data from a Txt file. Use the text() method to load data from a text file into PySpark Dataframe.

df = spark.read.option('header', True).csv("employee.txt")
df.show()

Output

+---------+-----------+-------+
|     name|designation|country|
+---------+-----------+-------+
|   Hayati|  Developer|  India|
|Vishvajit|  Developer|  India|
|  Vaibhav|     Tester|  India|
|   Shital|         HR|    USA|
+---------+-----------+-------+

Printing Schema of The PySpark DataFrame

Sometimes we want to define the schema or data type of the columns for a Pyspark DataFrame, Then we can also define schema by using StructType.


As you can see in the below code, how I have created a column schema for the DataFrame.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, StructField, StructType

data = [
    ('Rambo', 1100, 'BCA'),
    ('Shital', 1101, 'BTech'),
    ('Harrry', 1102, 'MCA'),
    ('Pankaj', 1103, 'MTech'),
    ('Jayanti', 1104, 'PHD'),


]
columns = StructType(
   [
       StructField("name", StringType(), nullable=True),
       StructField("roll_number", IntegerType(), nullable=True),
       StructField("course", StringType(), nullable=True),
   ]
    )

# # creating spark session
spark = SparkSession.builder.appName("testing").getOrCreate()

df = spark.createDataFrame(data, columns)
df.show()

Displaying the schema of the DataFrame:

Sometimes we want to check the schema of the PySpark Dataframe, so that we can change it according to our requirement, In that case, we can use a DataFrame method called printSchema() to display the schema of the existing DataFrame.
It displays column names along with their data types, as you can see below.
I have displayed the schema of the above-created DataFrame.

print(df.printSchema())

Output

root
 |-- name: string (nullable = true)
 |-- roll_number: integer (nullable = true)
 |-- course: string (nullable = true)

I hope you will have understood the meaning of the PySpark DataFrame as well as the process of creating a new Dataframe in PySpark.


Other Useful PySpark Tutorials:


Summary

In this article, we have seen all about PySpark DataFrame with the help of the examples. If you are planning to learn PySpark or are a beginner PySpark developer then you should have knowledge about the PySpark DataFrame because it is about to use frequently on your real-life project as a data engineer.

If you have knowledge of SQL queries, Then it will be easy to learn PySpark because your DataFrame in PySpark is the same as the table in the Relation database and you can perform approx all the SQL operations on the top of PySpark Dataframe.

YOu can also create PySpark Dataframe without the help of multiple ways like using Pandas Dataframe, RDD, List of tuples, PySpark Rows, Data Sources (JSON, CSV, TXT, Databases, etc), etc.

If you found this article helpful, please share and keep visiting for further PySpark tutorials.

Thanks for your valuable timeā€¦

How to Count Null and NaN Values in Each Column in PySpark DataFrame?
How to Drop Duplicate Rows from PySpark DataFrame

Related Posts