Menu Close

PySpark RDD Actions with Examples

PySpark RDD Actions with Examples

As we know, Apache Spark is a powerful open-source distributed computing system that provides fast and general-purpose data processing. PySpark, the Python API for Spark, enables developers to use the features of Spark with the help of Python programming language.

In today’s article, we will see PySpark RDD Actions and all useful PySpark RDD Actions methods with the help of examples.

Prerequisites:

Before starting this article, you must know the PySpark RDD ( Resilient Distributed Datasets ). You can check our PySpark RDD tutorial for more information about the PySpark RDD.

PySpark RDD Actions

PySpark RDD actions trigger the execution of computations on the RDDs and return values to the driver program or write data to an external storage system.

Creating SparkContex

SparkContext is the entry point to any Spark functionality in an application. It represents the connection to a Spark cluster and can be used to create RDDs. To create an RDD we must have a spark context.

Let’s create a sparkcontext.

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("ProgrammingFunda.com")
    .master("local[*]")
    .getOrCreate()
)

sc = spark.sparkContext

sc‘ is the spark context used to create PySpark RDD throughout the article.

Here are some of the essential RDD actions in PySpark:

count()first()collect()countByKey()
countByValue()top()take()min()
max()mean()sum()reduce()
treeReduce()fold()collectAsMap()foreach()
foreachPartition()aggregate()treeAggregate()takeSample()
countApprox()countApproxDistinct()takeOrder()

Here I have created two datasets key value paired and without key value paired and these two datasets will be used throughout this article.

key_value_dataset = [('Math', 40), ('Physics', 71), ('Math', 60), ('English', 80), ('Chemistry', 75), ('English', 65), ('Physics', 62), ('Chemistry', 55), ('Social Science', 75), ('Math', 73)]


without_key_value = [1, 2, 3, 4, 5, 6, 7]

Let’s understand all the above actions one by one with the help of the example.

PySpark RDD count()

The count() actions return the total items available in RDD.

key_value_dataset = [
    ("Math", 40),
    ("Physics", 71),
    ("Math", 60),
    ("English", 80),
    ("Chemistry", 75),
    ("English", 65),
    ("Physics", 62),
    ("Chemistry", 55),
    ("Social Science", 75),
    ("Math", 73),
]

rdd = sc.parallelize(key_value_dataset, 3)
print(rdd.count())

The Output will be 10

PySpark RDD first()

The first() action method always returns the first item of the RDD.

key_value_dataset = [
    ("Math", 40),
    ("Physics", 71),
    ("Math", 60),
    ("English", 80),
    ("Chemistry", 75),
    ("English", 65),
    ("Physics", 62),
    ("Chemistry", 55),
    ("Social Science", 75),
    ("Math", 73),
]


rdd = sc.parallelize(key_value_dataset, 3)
print(rdd.first())

The output will be:- (‘Math’, 40)

PySpark RDD collect()

The collect() action is used to retrieve all the elements from the RDD and return them to the driver program as a list.

key_value_dataset = [
    ("Math", 40),
    ("Physics", 71),
    ("Math", 60),
    ("English", 80),
    ("Chemistry", 75),
    ("English", 65),
    ("Physics", 62),
    ("Chemistry", 55),
    ("Social Science", 75),
    ("Math", 73),
]

rdd = sc.parallelize(key_value_dataset, 3)
print(rdd.collect())

Output:

[('Math', 40), ('Physics', 71), ('Math', 60), ('English', 80), ('Chemistry', 75), ('English', 65), ('Physics', 62), ('Chemistry', 55), ('Social Science', 75), ('Math', 73)]

PySpark RDD countByValue()

The countByValue() function will return the number of times an item appears in the RDD as a dictionary.

key_value_dataset = [
    ("Math", 40),
    ("Physics", 71),
    ("Math", 60),
    ("English", 80),
    ("Chemistry", 75),
    ("English", 65),
    ("Physics", 62),
    ("Chemistry", 55),
    ("Social Science", 75),
    ("Math", 73),
]

rdd = sc.parallelize(key_value_dataset, 3)
print(rdd.countByValue())

Output

defaultdict(<class 'int'>, {('Math', 40): 1, ('Physics', 71): 1, ('Math', 60): 1, ('English', 80): 1, ('Chemistry', 75): 1, ('English', 65): 1, ('Physics', 62): 1, ('Chemistry', 55): 1, ('Social Science', 75): 1, ('Math', 73): 1})
Note:- countByValue() can be work with both paired rdd and without paired rdd.

PySpark RDD countByKey()

This action method is used to count the number of elements per key in the RDD.

key_value_dataset = [
    ("Math", 40),
    ("Physics", 71),
    ("Math", 60),
    ("English", 80),
    ("Chemistry", 75),
    ("English", 65),
    ("Physics", 62),
    ("Chemistry", 55),
    ("Social Science", 75),
    ("Math", 73),
]

rdd = sc.parallelize(key_value_dataset, 3)
print(rdd.countByKey())

Output

defaultdict(<class 'int'>, {'Math': 3, 'Physics': 2, 'English': 2, 'Chemistry': 2, 'Social Science': 1})
Note:- countByKey() can be only work with paired rdd.

PySpark RDD top()

The top() function is used to return the top n items from the RDD to the driver program.

In this example, I have returned the top 5 elements from the RDD.

key_value_dataset = [
    ("Math", 40),
    ("Physics", 71),
    ("Math", 60),
    ("English", 80),
    ("Chemistry", 75),
    ("English", 65),
    ("Physics", 62),
    ("Chemistry", 55),
    ("Social Science", 75),
    ("Math", 73),
]

rdd = sc.parallelize(key_value_dataset, 3)
print(rdd.top(5))

Output

[('Social Science', 75), ('Physics', 71), ('Physics', 62), ('Math', 73), ('Math', 60)]

PySpark RDD take()

PySpark RDD take() function is used to return the first n items from the RDD.

key_value_dataset = [
    ("Math", 40),
    ("Physics", 71),
    ("Math", 60),
    ("English", 80),
    ("Chemistry", 75),
    ("English", 65),
    ("Physics", 62),
    ("Chemistry", 55),
    ("Social Science", 75),
    ("Math", 73),
]

rdd = sc.parallelize(key_value_dataset, 3)
print(rdd.take(5))

Output

[('Math', 40), ('Physics', 71), ('Math', 60), ('English', 80), ('Chemistry', 75)]

PySpark RDD min()

The min() function returns the minimum value from the RDD.

without_key_value = [1, 2, 3, 4, 5, 6, 7]
rdd = sc.parallelize(without_key_value, 3)
print(rdd.min())

The output will be 1

PySpark RDD mean()

The mean() function returns the mean value of the rdd’s elements.

without_key_value = [1, 2, 3, 4, 5, 6, 7]
rdd = sc.parallelize(without_key_value, 3)
print(rdd.mean())

The output will be 4.0

PySpark RDD reduce()

The reduce() function is used to return the single value after reducing the items of the RDD by binary operator.

from operator import add

without_key_value = [1, 2, 3, 4, 5, 6, 7]
rdd = sc.parallelize(without_key_value, 3)
print(rdd.reduce(add))

The Output will be 28

PySpark RDD treeReduce()

The treeReduce() function is used to return the single value after reducing the items of the RDD by binary operator.

from operator import add
without_key_value = [1, 2, 3, 4, 5, 6, 7]
rdd = sc.parallelize(without_key_value, 3)
print(rdd.treeReduce(add))

The Output will be 28

PySpark RDD fold()

The fold() works the same as the reduce() action unlike reduce it takes the initial value as the first argument and initializes the initial value for each partition. The value of the fold() and reduce() actions will be the same until the initial value is passed to the fold() action. For example, I have passed 100 to be added to the sum of all the items of the RDD.

without_key_value = [1, 2, 3, 4, 5, 6, 7]
rdd = sc.parallelize(without_key_value, 3)
print(rdd.fold(100, lambda x, y: x + y))

The Output will be 428

PySpark RDD collectAsMap()

This method returns the dictionary of key-value pairs. Remember, If the duplicate key exists in the RDD, Then it will return any random key value from the RDD. As you can see in the below example.

I have RDD with duplicate keys but collectAsMap() returned only a single key-value for each key.

key_value_dataset = [
    ("Math", 40),
    ("Physics", 71),
    ("Math", 60),
    ("English", 80),
    ("Chemistry", 75),
    ("English", 65),
    ("Physics", 62),
    ("Chemistry", 55),
    ("Social Science", 75),
    ("Math", 73),
]

rdd = sc.parallelize(key_value_dataset, 3)
print(rdd.collectAsMap())

Output

{'Math': 73, 'Physics': 62, 'English': 65, 'Chemistry': 55, 'Social Science': 75}

PySpark RDD foreach()

It is used to apply a function to each item of the RDD. It takes a function as a parameter and applies that function to each item of the passed PySpark RDD. The foreach() action is used to perform some side-effecting operations like writing output to the file, writing output to the database, and displaying output to the console.

key_value_dataset = [
    ("Math", 40),
    ("Physics", 71),
    ("Math", 60),
    ("English", 80),
    ("Chemistry", 75),
    ("English", 65),
    ("Physics", 62),
    ("Chemistry", 55),
    ("Social Science", 75),
    ("Math", 73),
]


def function_foreach(x):
    # Here you can perform some operations on x,
    print(f"Element is:- ", x)


rdd = sc.parallelize(key_value_dataset, 3)
rdd.foreach(function_foreach)

Output

Element is:-  ('Math', 40)
Element is:-  ('Physics', 71)
Element is:-  ('Math', 60)
Element is:-  ('Physics', 62)
Element is:-  ('Chemistry', 55)
Element is:-  ('Social Science', 75)
Element is:-  ('Math', 73)
Element is:-  ('English', 80)
Element is:-  ('Chemistry', 75)
Element is:-  ('English', 65)

PySpark RDD foreachPartition(function)

This method is used to apply a function to each partition of the RDD.foreachPartition action is used to perform some external operations like foreach() action.

def function_foreach(x):
    # Here you can perform some operations on x,
    print(f"Element is:- ", list(x))


without_key_value = [1, 2, 3, 4, 5, 6, 7]
rdd = sc.parallelize(without_key_value, 3)
rdd.foreachPartition(function_foreach)

Output

Element is:-  [1, 2]
Element is:-  [5, 6, 7]
Element is:-  [3, 4]

PySpark RDD aggregate()

The aggregate() action aggregates the elements of each partition of the RDD using the given combined functions and a neutral “zero value“.

without_key_value = [1, 2, 3, 4, 5, 6, 7]
rdd = sc.parallelize(without_key_value, 3)
print(rdd.aggregate(100, lambda x, y: x + y, lambda x, y: x + y))

The Output would be:- 428

Conclusion

So, We have successfully covered the PySpark RDD actions with examples. Actions in PySpark RDD always return value to the driver programmer, You have to always remember it does not return any RDD as output.

You can use any RDD actions as per your need. If you found this article helpful, please share and keep visiting for further PySpark
tutorials.

Thanks for your valuable timeā€¦

How to Make Scatter Plots with Matplotlib with Examples
Internal Working of Reduce Action in PySpark

Related Posts