Menu Close

PySpark Normal Built-in Functions

PySpark normal built-in functions

In this PySpark tutorial, You will learn all about PySpark Normal Built-in functions with the help of the proper examples so that you can use all the useful PySpark standard built-in functions in your real-life spark application.

PySpark built-in functions are the pre-defined functions in PySpark that come with PySpark by default and all the built-in functions have been written inside pyspark.sal.functions package, That’s why you have to import built-in functions from pyspark.sql.functions module.

Why do we need to use PySpark’s Built-in functions?

Built-in functions in any programming language like Java, Python, R, C, C++, etc are the most useful functions that come with programming languages by default. Sames as PySpark also comes with numerous built-in functions which easy to work with PySpark.
For example, PySpark provides some string built-in functions which will only be applicable to string kind of value, Same as PySpark provides DateTime functions that will be applicable to date time kind of value.

We don’t have to write a custom function to perform operations on top of RDD and DataFrame, However, PySpark provides the facility to write our own custom functions called UDF which stands for User Defined Functions.
In a later tutorial, we will see how we can use UDF in PySpark, as of now we will only focus on normal PySpark Built-in functions.

Let’s create a simple PySpark DataFrame so that we can use built-in functions on top of that.

Creating PySpark DataFrame

I have written a simple PySpark code to generate a PySpark data frame. You can follow this code base to create DataFrame.

PySpark DataFrame is nothing but it is a distributed collection of data and it same as an SQL table-like structure that contains data in the form of rows and columns and we can perform almost all the functionality of SQL on top of PySpark DataFrame.

from pyspark.sql import SparkSession


# list of tuples
data = [
    ("1", "Vishvajit", "Rao", None, "BCA10"),
    ("2", "Harsh", "Goal", "2021-12-10", "BT101"),
    ("3", "Pankaj", None, "2020-06-29", "BCA10"),
    ("4", "Pranjal", "Rao", None, "BT101"),
    ("5", "Ritika", "Kumari", "2019-11-26", "MT101"),
    ("6", "Diyanshu", "Saini", "2023-01-01", "BCA10"),
    ("7", "Pratiksha", None, "2018-07-10", "MT101"),
    ("8", "Shailja", "Srivastava", None, "BT101"),
]

# columns
column_names = [
    "serial_number",
    "first_name",
    "last_name",
    "date_of_admission",
    "course_code"
]

# creating spark session
spark = SparkSession.builder.appName("www.programmingfunda.com").getOrCreate()

# creating DataFrame
df = spark.createDataFrame(data=data, schema=column_names)

# displaying
df.show(truncate=False)

Output

After executing the above code the output will be like this.

+-------------+----------+----------+-----------------+-----------+
|serial_number|first_name|last_name |date_of_admission|course_code|
+-------------+----------+----------+-----------------+-----------+
|1            |Vishvajit |Rao       |null             |BCA10      |
|2            |Harsh     |Goal      |2021-12-10       |BT101      |
|3            |Pankaj    |null      |2020-06-29       |BCA10      |
|4            |Pranjal   |Rao       |null             |BT101      |
|5            |Ritika    |Kumari    |2019-11-26       |MT101      |
|6            |Diyanshu  |Saini     |2023-01-01       |BCA10      |
|7            |Pratiksha |null      |2018-07-10       |MT101      |
|8            |Shailja   |Srivastava|null             |BT101      |
+-------------+----------+----------+-----------------+-----------+

Let me break down the above code to know what’s going on with this code.

  • First, I imported the SparkSession class from the pyspark.sql package.
  • Second, I have defined a list of tuples and each tuple represents some information about the student like serial_number, first_name, last_name, date_of_admission, and course_code.
  • Third, I have defined a Python list that contained column names (serial_number, first_name, last_name, date_of_admission, course_code).
  • Fourth, I have created a spark session object using SparkSession.builder.appName(“testing”).getOrCreate().
    • SparkSession represents the class name that is defined inside the PySpark SQL package.
    • builder is the attribute of the SparkSession class which has a Builder class to initiate the spark session.
    • appName() is a method that is used to provide the name of the spark application, In my case the application name is www.programmingfunda.com.
    • getOrCreate() is also a method that is used to get the existing spark session or create a new spark session if the spark session is not available.
  • Fifth, I have accessed the createDataFrame() method using spark in order to create a new PySpark DataFrame. I have passed the two arguments in createDataFrame(), First is a list of tuples, second is column names as a list. The createDataFrame() method returns a new DataFrame.
  • Sixth, I have used the DataFrame show() method that is used to show the DataFrame-like table structure.

Now it’s time to explore PySpark Normal Built-in functions with the help of an example.

PySpark Normal Built-in functions

Let’s explore all the PySpark Normal Built-in functions one by one with the help of examples.

PySpark Normal Built-in Functions

col(col)

The col() function takes a parameter that indicates the column name and PySpark DataFrame and it returns the object of PySpark Column class. To use the col() function we will have to import it from pyspark.sql.functions because it has been defined there.

For example, I am about to select only first_name and last_name from PySpark DataFrame using the col() function.

from pyspark.sql.functions import col

# selecting only first_name and last_name from PySpark DataFrame
df = df.select(col('first_name'), col('last_name'))

Output

+----------+----------+
|first_name|last_name |
+----------+----------+
|Vishvajit |Rao       |
|Harsh     |Goal      |
|Pankaj    |null      |
|Pranjal   |Rao       |
|Ritika    |Kumari    |
|Diyanshu  |Saini     |
|Pratiksha |null      |
|Shailja   |Srivastava|
+----------+----------+

column(col)

The column() functions work the same as the col() functions.

For instance, I am about to select the first_name, last_name, and course_code of students.

from pyspark.sql.functions import column

# selecting only first_name and last_name from PySpark DataFrame
df = df.select(column('first_name'), column('last_name'), column('course_code'))

# displaying
df.show(truncate=False)

Output

+----------+----------+-----------+
|first_name|last_name |course_code|
+----------+----------+-----------+
|Vishvajit |Rao       |BCA10      |
|Harsh     |Goal      |BT101      |
|Pankaj    |null      |BCA10      |
|Pranjal   |Rao       |BT101      |
|Ritika    |Kumari    |MT101      |
|Diyanshu  |Saini     |BCA10      |
|Pratiksha |null      |MT101      |
|Shailja   |Srivastava|BT101      |
+----------+----------+-----------+

lit(col)

The lit() function is used to create a column with a literal value. We can pass the constant value as well as the column name inside the lit() function.
Let’s see both of them one by one.

Example: Using lit() function with the literal value

In this example, I have added a column named country which contained India as a value because in my case all the students belong to the country India.

from pyspark.sql.functions import lit

# adding a column country along with India value
df = df.withColumn('country', lit('India'))

# displaying
df.show(truncate=False)

Output

PySpark Normal Built-in Functions

broadcast(df)

The broadcast() function is one of the most useful functions in PySpark in order to perform broadcast joins. Broadcast join is always used when we want to join a large PySpark DataFrame with a small DataFrame. The broadcast() function takes a small data frame as a parameter and broadcasts that data frame to all executors and each executor keeps that data frame in the memory for further Join operations.

Broadcast join is a vast topic in PySpark, You can below links to get a detailed tutorial about broadcast join.

Here, I am giving just ideas for using the broadcast() function.

As we know, In the above created main PySpark DataFrame we have a column course_code which has contained course code and now we want to get the course name of each course code.
To get the course name I created a small PySpark DataFrame which contained only two values course code and their name after that I used the broadest join to broadcast that small DataFrame to all the executors in order to perform broadcast join as you can see below code.

The broadcast() function returns a DataFrame which is ready to perform broadcast join.

Example

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast


# list of tuples
data = [
    ("1", "Vishvajit", "Rao", None, "BCA10"),
    ("2", "Harsh", "Goal", "2021-12-10", "BT101"),
    ("3", "Pankaj", None, "2020-06-29", "BCA10"),
    ("4", "Pranjal", "Rao", None, "BT101"),
    ("5", "Ritika", "Kumari", "2019-11-26", "MT101"),
    ("6", "Diyanshu", "Saini", "2023-01-01", "BCA10"),
    ("7", "Pratiksha", None, "2018-07-10", "MT101"),
    ("8", "Shailja", "Srivastava", None, "BT101"),
]

# column
column_names = [
    "serial_number",
    "first_name",
    "last_name",
    "date_of_admission",
    "course_code",
]

# creating spark session
spark = SparkSession.builder.appName("www.programmingfunda.com").getOrCreate()

# creating large DataFrame
large_dataframe = spark.createDataFrame(data=data, schema=column_names)


# creating small PySpark DataFrame
small_dataframe = spark.createDataFrame(
    data=[("BCA10", "BCA"), ("BT101", "BTech"), ("MT101", "MTech")],
    schema=["code", "course_name"],
)

# creating broadcast variable
broadcast_variable = broadcast(small_dataframe)

# creating final dataframe
final_dataframe = large_dataframe.join(
    broadcast_variable, large_dataframe.course_code == broadcast_variable.code
)

# displaying final DataFrame
final_dataframe.select(
    "serial_number",
    "first_name",
    "last_name",
    "date_of_admission",
    "course_code",
    "course_name",
).show()

After executing the above code the final Output will be like this.

PySpark Normal Built-in Functions

coalesce(*col)

The coalesce() function returns the first column that is not null. It takes more than a column name or literal value as a parameter.
Let’s take an example to understand PySpark coalesce() function.

Example

In this example, I have replaced the None values in the date_of_admission column with the current date.

from pyspark.sql.functions import coalesce, current_date

# using coalesce() function
df = df.withColumn("date_of_admission", coalesce(df.date_of_admission, current_date()))

# displaying DataFrame
df.show()

Output

+-------------+----------+----------+-----------------+-----------+
|serial_number|first_name| last_name|date_of_admission|course_code|
+-------------+----------+----------+-----------------+-----------+
|            1| Vishvajit|       Rao|       2023-05-13|      BCA10|
|            2|     Harsh|      Goal|       2021-12-10|      BT101|
|            3|    Pankaj|      null|       2020-06-29|      BCA10|
|            4|   Pranjal|       Rao|       2023-05-13|      BT101|
|            5|    Ritika|    Kumari|       2019-11-26|      MT101|
|            6|  Diyanshu|     Saini|       2023-01-01|      BCA10|
|            7| Pratiksha|      null|       2018-07-10|      MT101|
|            8|   Shailja|Srivastava|       2023-05-13|      BT101|
+-------------+----------+----------+-----------------+-----------+

isnan(col)

The function isnan() is used to return True if the passed column name contained a nan value otherwise it returns False.
I have replaced some values in first_name with nan value.

Example

from pyspark.sql.functions import isnan 

# using isnan() function
df = df.select(isnan(df.first_name).alias('isnan'))

# displaying DataFrame
df.show()

isnull(col)

The function isnull() is used to return True if the passed column is Null otherwise it returns False.

Example

from pyspark.sql.functions import isnull

# using isnull() function
df = df.select(isnull(df.last_name).alias('isnull'))

# displaying DataFrame
df.show()

Output

+------+
|isnull|
+------+
| false|
| false|
|  true|
| false|
| false|
| false|
|  true|
| false|
+------+

rand(seed)

This function is used to generate a random column with independent and identically distributed. it takes the seed value as a parameter for the random generator.

Example

# using rand() function
df = df.withColumn('rand', rand())

# displaying DataFrame
df.show(4) 

Output

PySpark Normal Built-in Functions
Note:- 4 in show() indicates the number of rows we want to get from PySpark DataFrame.

randn(seed)

This function same as the rand() function. The randn() generates the random values with standard normal distribution.

Example

from pyspark.sql.functions import randn

# using randn() function
df = df.withColumn('randn', randn())

# displaying DataFrame
df.show(4)

Output

PySpark Normal Built-in Functions

when(condition, value)

The when(condition, value) function is one of the useful functions in PySpark DataFrame, If the condition evaluates True it returns the value of the value parameter. Remember, The return type of the when() function, is always PySpark Column instance. If the condition evaluates False the value passed in otherwise() function will return.

For example, I want to add a column course_name which will contain the course name based on the course code. The ‘BCA10‘ means ‘BCA‘, ‘BT101‘ means ‘BTech‘, and ‘MT101‘ means ‘MTech‘.

Example

# using when() function
df = df.select(
    "first_name",
    "last_name",
    "course_code",
    when(col("course_code").startswith("BCA"), "BCA")
    .when(col("course_code").startswith("BT"), "BTech")
    .when(col("course_code").startswith("MT"), "MTech")
    .alias("course_name"),
)

# displaying DataFrame
df.show()

Output

+----------+----------+-----------+-----------+
|first_name| last_name|course_code|course_name|
+----------+----------+-----------+-----------+
| Vishvajit|       Rao|      BCA10|        BCA|
|     Harsh|      Goal|      BT101|      BTech|
|    Pankaj|      null|      BCA10|        BCA|
|   Pranjal|       Rao|      BT101|      BTech|
|    Ritika|    Kumari|      MT101|      MTech|
|  Diyanshu|     Saini|      BCA10|        BCA|
| Pratiksha|      null|      MT101|      MTech|
|   Shailja|Srivastava|      BT101|      BTech|
+----------+----------+-----------+-----------+

greatest(*col)

The greatest() function is used to return the greatest value from a list of passed column names. skipping null values. This function takes at least two arguments. It returns Null if all the parameter is Null.

Example

from pyspark.sql.functions import greatest
# using greatest() function
df = df.select(greatest('serial_number', 'first_name').alias('greatest'))

# displaying DataFrame
df.show(4)

Output

+---------+
| greatest|
+---------+
|Vishvajit|
|    Harsh|
|   Pankaj|
|  Pranjal|
+---------+
only showing top 4 rows

least(*col)

The least() function is used to return the least value from a passed list of column names. It skips the null value. It takes at least two parameters and it returns a null value if all the passed column names are null.

Example

from pyspark.sql.functions least

# using least() function
df = df.select(least('serial_number', 'first_name').alias('least'))

# displaying DataFrame
df.show(4)

Output

+-----+
|least|
+-----+
|    1|
|    2|
|    3|
|    4|
+-----+
only showing top 4 rows

PySpark Useful Tutorials


👉 PySpark Normal Functions Documentation:- Click Here

Summary

So, In this article, we have seen almost all the PySpark normal built-in functions along with examples. These PySpark built-in functions play the most important role in real-life PySpark applications.
I hope the way of explanation of this article was easy and straightforward. You can use any PySpark normal built-in function according to your requirement for example, you can use the when() function to apply any condition on PySpark DataFrame, the col() function to return the Column instance, coalesce() function to return first not null values, etc.

If you found this article helpful, Please share and keep visiting for further PySpark tutorials.

Have a nice day……

PySpark SQL DateTime Functions with Examples
How to Explode Multiple Columns in PySpark DataFrame

Related Posts