Dabbling with Spark Essentials Again

Dabbling with Spark Essentials Again

A Beginner's Guide to Using Apache Spark 2

Apache Spark stands out as a pivotal tool for navigating the complexities of Big Data analysis. This article embarks on a comprehensive journey through the core of Spark, from unraveling the intricacies of Big Data and its foundational concepts to mastering sophisticated data processing techniques with PySpark. Whether you're a beginner eager to dip your toes into the vast ocean of data analysis or a seasoned analyst aiming to refine your skills with Spark's advanced functionalities, this guide offers a structured pathway to enhance your proficiency in transforming raw data into insightful, actionable knowledge. I delve deep into essential concepts such as lazy evaluation, explain plan, and manipulation of Spark RDDs.

SparkContext

The entry point to programming Spark with the RDD interface. It is responsible for managing and distributing data across the Spark cluster and creating RDDs. It can load data from various sources like HDFS (Hadoop Distributed File System), local file systems, and external databases. Through SparkContext, you can set various Spark properties and configurations that control aspects like memory usage, core utilization, and the behavior of Spark’s scheduler.

from pyspark import SparkContext
sc = SparkContext(master="local", appName="My App")

Info

Retrieves the SparkContext version, the Python version, and the master URL, respectively.

print(sc.version, sc.pythonVer, sc.master)

SparkContext vs SparkSession

While SparkContext is used for accessing Spark features through RDDs, SparkSession provides a single point of entry for DataFrames, streaming, or Hive features including HiveContext, SQLContext or Streaming Context. Also, with the introduction of Spark 2.0, SparkSession was introduced as a new entry point for Spark applications.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("My App").getOrCreate()

Resilient Distributed Datasets - Under the Hood

In the last article, I briefly explained about the lineage of transformations that promotes the fault tolerant nature of Resilient Distributed Datasets. Let's look at that in detail now:

Lazy Evaluation

When you apply a transformation like sort to your data in Spark, it's crucial to understand that Spark operates under a principle known as Lazy Evaluation. This means that when you call a transformation, Spark doesn't immediately manipulate your data. Instead, what happens is that Spark queues up a series of transformations, building a plan for how it will eventually execute these operations across the cluster when necessary.

Lazy evaluation in Spark ensures that transformations like sort don't trigger any immediate action on the data. The transformation is registered as a part of the execution plan, but Spark waits to execute any operations until an action (such as collect, count, or saveAsTextFile) is called.

This delay in execution allows Spark to map out a comprehensive strategy for distributing the data processing workload, enhancing the system's ability to manage resources and execute tasks in an optimized manner.

Explain Plan

To inspect the execution plan that Spark has prepared, including the sequence of transformations and how they will be applied, you can use the explain method on any DataFrame object. This method reveals the DataFrame's lineage, illustrating how Spark intends to execute the given query. This insight into the execution plan is particularly valuable for optimizing performance and understanding the sequence of operations Spark will undertake to process your data.

Lineage Graph

Now, the question arises- What actually are these "lineages"?

RDD lineage is essentially a record of what operations were performed on the data from the time it was loaded into memory up to the present. It’s a directed acyclic graph (DAG) of the entire parent RDDs of an RDD. This graph consists of edges and nodes, where the nodes represent the RDDs and the edges represent the transformations that lead from one RDD to the next.

Creating and Using RDDs

Parallelized collection

Distributes a local Python collection to form an RDD. Useful for parallelizing existing Python collections into Spark RDDs.

parallelized_data = sc.parallelize([1, 2, 3, 4, 5])

From external tables

Used for creating distributed collections of unstructured data from a text file.

rdd_from_text_file = sc.textFile("path/to/textfile")

Partitioning

Partitioning in RDDs refers to the division of the dataset into smaller, logical portions that can be distributed across multiple nodes in a Spark cluster. This concept is fundamental to distributed computing in Spark, enabling parallel processing, which significantly enhances performance for large datasets. More on this topic will be covered in an upcoming article.

rdd_with_partitions = rdd_from_file.repartition(10)
print(rdd_with_partitions.getNumPartitions())

Alters the number of partitions in an RDD to distribute data more evenly across the cluster.

Converting an RDD to a DataFrame

The conversion of an RDD to a DataFrame in PySpark can be accomplished through multiple methods, involving the definition of the schema in different ways.

  • StructType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])
rdd = sc.parallelize([("Ankit", 30, "Bangalore"), ("Anitha", 25, "Chennai"), ("Aditya", 35, "Mumbai")])
df = spark.createDataFrame(rdd, schema)
  • pyspark.sql.Row
from pyspark.sql import Row
rdd = sc.parallelize([Row(name="Ankit", age=30, city="Bangalore"), Row(name="Anitha", age=25, city="Chennai"), Row(name="Aditya", age=35, city="Mumbai")])
df = spark.createDataFrame(rdd)

Converting CSV to DataFrame

df_from_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)

Directly reads a CSV file into a DataFrame, inferring the schema.

Inspecting Data in DataFrame

df_from_csv.printSchema()
df_from_csv.describe().show()

Displays the schema of the DataFrame and shows summary statistics.

DataFrame Visualization

pandas_df = df_from_csv.toPandas()

Converts a Spark DataFrame to a Pandas DataFrame for visualization.

SQL queries

df_from_csv.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE age > 30").show()

Executes SQL queries directly on DataFrames that have been registered as a temporary view.

Types of RDDs

RDDs are categorized based on their characteristics, which can include their partitioning strategy or the data types they contain. Understanding these categories is essential for optimizing the performance of Spark applications, as each type has its implications for how data is processed and distributed across the cluster. Here's a deeper look into these categorizations:

Based on Partitioning

ParallelCollection RDDs

These RDDs are created from existing Scala or Python collections. When you parallelize a collection to create an RDD using the parallelize() method, Spark distributes the elements of the collection across the cluster. The distribution is determined by the number of partitions you specify or by the default parallelism level of the cluster. ParallelCollection RDDs are useful for parallelizing small datasets or for testing and prototyping with Spark.

MapPartitions RDDs

MapPartitions RDDs result from transformations that apply a function to every partition of the parent RDD, rather than to individual elements. Transformations like mapPartitions() and mapPartitionsWithIndex() create MapPartitions RDDs. This approach can be more efficient than applying a function to each element, especially when initialising an expensive operation (like opening a database connection) that is better done once per partition rather than once per element.

Shuffled RDDs

Shuffled RDDs are the result of transformations that cause data to be redistributed across the cluster, such as repartition() or by key-based transformations like reduceByKey(), groupBy(), and join(). These operations involve shuffling data across the partitions to regroup it according to the transformation's requirements. Shuffles are expensive operations in terms of network and disk I/O, so understanding when your operations result in Shuffled RDDs can help in optimizing your Spark jobs.

Based on Data Types

RDDs can also be distinguished by the types of data they contain. This classification is more about the content of the RDD rather than its structural characteristics.

Generic RDDs (like Python RDD)

These are RDDs that contain any type of object, such as integers, strings, or custom objects. They are the most flexible type of RDD, as Spark does not impose any constraints on the data type. However, operations on these RDDs may not be as optimized as those on more specific types of RDDs.

Key-Value Pair RDDs (Pair RDDs)

Pair RDDs contain tuples as elements, where each tuple consists of a key and a value. This structure is especially useful for operations that need to process data based on keys, such as aggregating or grouping data by key. I will also go into detail about this specific type of RDD later.

Understanding the different types of RDDs and their characteristics is crucial for writing efficient Spark applications. By choosing the appropriate type of RDD and transformations, developers can optimize data distribution and processing strategies, improving the performance and scalability of their Spark applications.

Transformations on RDDs

Transformations create a new RDD from an existing one. Here are some key transformations:

map()

Applies a function to each element of the RDD, returning a new RDD with the results.

rdd = sc.parallelize([1, 2, 3, 4])
mapped_rdd = rdd.map(lambda x: x * 2)
# Resulting RDD: [2, 4, 6, 8]

filter()

Returns a new RDD containing only the elements that satisfy a condition.

filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
# Resulting RDD: [2, 4]

This operation does not modify the original RDD but creates a new one with the filtered results.

flatMap()

Similar to map, but each input item can be mapped to 0 or more output items.

sentences_rdd = sc.parallelize(["hello world", "how are you"])
flattened_rdd = sentences_rdd.flatMap(lambda x: x.split(" "))
# Resulting RDD: ["hello", "world", "how", "are", "you"]

Actions on RDDs

Actions trigger the execution of the transformations and return results.

collect()

Gathers the entire RDD's data to the driver program. Suitable for small datasets.

all_elements = mapped_rdd.collect()
# Result: [2, 4, 6, 8]

take(N)

Returns an array with the first N elements of the RDD.

first_two = mapped_rdd.take(2)
# Result: [2, 4]

first()

Retrieves the first element of the RDD.

first_element = rdd.first()
# Result: 1

count()

Counts the number of elements in the RDD.

total = rdd.count()
# Result: 4

reduce()

Aggregates the elements of the RDD using a function, bringing the result to the driver.

sum = rdd.reduce(lambda x, y: x + y)
# Result: 10
sum_mapped = mapped_rdd.reduce(lambda x, y: x + y)
# Result: 20
product = rdd.reduce(lambda x, y: x * y)
# Result: 24

saveAsTextFile()

Saves the RDD to the filesystem as a text file.

rdd.saveAsTextFile("path/to/output")

coalesce()

Reduces the number of partitions in the RDD, useful for reducing the dataset size before collecting it on the driver. This method is particularly useful for increasing the efficiency of operations that may benefit from fewer partitions, such as reducing the number of files generated by actions like saveAsTextFile() or optimizing data for export to a single file.

# This operation prepares the RDD for efficient saving or collecting 
# but does not alter its content.

coalesced_rdd = rdd.coalesce(1)

#Result - reduces the number of partitions in a dataset to 1

Each of these transformations and actions plays a vital role in manipulating and retrieving data from RDDs in Spark, enabling efficient distributed data processing across a cluster.

Key-Value Pair RDDs in Detail

Spark includes dedicated transformations and actions designed for use with Pair RDDs, and I'll cover these in the following discussion.

Creating Pair RDDs

Tuples

You can create Pair RDDs by mapping an existing RDD to a format where each element is a tuple representing a key-value pair.

rdd = sc.parallelize(['apple', 'banana', 'apple', 'orange', 'banana', 'apple'])
pair_rdd = rdd.map(lambda fruit: (fruit, 1))

Regular RDDs

Regular RDDs can be transformed into Pair RDDs using transformations that produce key-value pairs, demonstrating the flexibility in handling structured data.

pair_rdd_from_regular = sc.parallelize([("apple", 1), ("banana", 2), ("apple", 3)])

Transformations

Transformations on Pair RDDs allow for sophisticated aggregation and sorting based on keys.

reduceByKey()

Aggregates values for each key using a specified reduce function.

reduced_rdd = pair_rdd.reduceByKey(lambda a, b: a + b)
# Output example: [('banana', 2), ('apple', 3), ('orange', 1)]

sortByKey()

Sorts the RDD by keys.

sorted_rdd = reduced_rdd.sortByKey()
# Output example: [('apple', 3), ('banana', 2), ('orange', 1)]

groupByKey()

Groups values with the same key.

grouped_rdd = pair_rdd.groupByKey().mapValues(list)
# Output example: [('banana', [1, 1]), ('apple', [1, 1, 1]), ('orange', [1])]

This operation groups all the values under each key into a list. It demonstrates the distribution of individual occurrences before aggregation.

join()

Joins two RDDs based on their keys.

price_rdd = sc.parallelize([("apple", 0.5), ("banana", 0.75), ("orange", 0.8)])
joined_rdd = reduced_rdd.join(price_rdd)

Actions

Actions on Pair RDDs trigger computation and return results to the driver program or store them in external storage.

countByKey()

Counts the number of elements for each key.

counts_by_key = pair_rdd.countByKey()
# Output example: defaultdict(<class 'int'>, {'apple': 3, 'banana': 2, 'orange': 1})

This result is a Python dictionary-like object (specifically a defaultdict of type <class 'int'>) that shows how many times each fruit appears in the original dataset.

💡
Notice how we got a very similar result in reduceByKey() on using that specific lambda function. Both operations seem to aggregate data based on keys. But, what is the difference apart from the defaultdict type result?

reduceByKey(): Performs a shuffle across the nodes to aggregate values by key. It's more flexible, allowing for complex aggregation functions beyond simple counting. It's suitable when your RDD contains complex data manipulations or when the dataset is not initially in the form of (key, 1) pairs.

countByKey(): Provides a more direct approach for counting keys when the RDD is already structured as (key, 1) pairs, making it efficient for simple counting tasks. However, it collects the results to the driver, which could be problematic for very large datasets.

Now you understand which option to use and when.

collectAsMap()

Collects the result as a dictionary to bring back to the driver program.

result_map = reduced_rdd.collectAsMap()
# Output example: {'banana': 2, 'apple': 3, 'orange': 1}

These operations on Pair RDDs illustrate Spark's powerful capabilities for distributed data processing, especially when working with data that naturally fits into key-value pairs.

Conclusion

In conclusion, this article has delved deeply into the foundational aspects of Apache Spark, highlighting the pivotal role of SparkContext as the gateway to leveraging Spark's powerful capabilities. Additionally, we've explored the intricacies of Resilient Distributed Datasets (RDDs), uncovering how they serve as the backbone for distributed data processing within Spark. As we move forward, anticipate further exploration into the realms of Spark optimization and memory management. I'm excited to have you join me on this learning journey!