October 20, 2018

Spark Streaming + Kafka Integration

There are two approaches for Spark Streaming & Kafka integration, one with receivers and another is direct approach( without receivers) .

Receiver-based Approach




This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka consumer API. The data received by Receiver is stored in Spark executors and then jobs launched by Spark streaming processes the data.



Pros:
This approach update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools can be used.

Cons:
1. Under failure data loss is possible. To avoid data loss, you have to additionally enable Write Ahead Logs( WAL) in Spark Streaming ( Spark 1.2), it adds to additional replication of data.
2. Does not increase parallelism of Spark in the processing of data in case of single receiver and multiple Topic partitions in Kafka, no one-to-one mapping between Kafka and RDD partitions.
3. Uses Kafka's API to store consumed offsets in Zookeeper and replicates data which have small chance some records may get consumed twice under some failures.

Direct Approach (No Receivers)

This Receiver-less  approach periodically queries Kafka for latest offset in Topic + Partition when Spark Streaming job is launched.


Pros:
1.Ensure stronger end-to-end guarantees
2. Simplified Parallelism: There is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.No need to create multiple input Kafka streams and union them
3.  As long as you have sufficient Kafka retention, messages can be recovered from Kafka and no need of replication of data using Receiver.

4. Offsets are tracked by Spark Streaming within its checkpoints and each record is received by Spark Streaming effectively exactly once despite failures.

Cons:
In absence of Zookeeper, Zookeeper-based Kafka monitoring tools not available.


No comments:

Creating DataFrames from CSV in Apache Spark

 from pyspark.sql import SparkSession spark = SparkSession.builder.appName("CSV Example").getOrCreate() sc = spark.sparkContext Sp...