October 20, 2018

Spark Streaming + Kafka Integration

There are two approaches for Spark Streaming & Kafka integration, one with receivers and another is direct approach( without receivers) .

Receiver-based Approach




This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka consumer API. The data received by Receiver is stored in Spark executors and then jobs launched by Spark streaming processes the data.



Pros:
This approach update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools can be used.

Cons:
1. Under failure data loss is possible. To avoid data loss, you have to additionally enable Write Ahead Logs( WAL) in Spark Streaming ( Spark 1.2), it adds to additional replication of data.
2. Does not increase parallelism of Spark in the processing of data in case of single receiver and multiple Topic partitions in Kafka, no one-to-one mapping between Kafka and RDD partitions.
3. Uses Kafka's API to store consumed offsets in Zookeeper and replicates data which have small chance some records may get consumed twice under some failures.

Direct Approach (No Receivers)

This Receiver-less  approach periodically queries Kafka for latest offset in Topic + Partition when Spark Streaming job is launched.


Pros:
1.Ensure stronger end-to-end guarantees
2. Simplified Parallelism: There is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.No need to create multiple input Kafka streams and union them
3.  As long as you have sufficient Kafka retention, messages can be recovered from Kafka and no need of replication of data using Receiver.

4. Offsets are tracked by Spark Streaming within its checkpoints and each record is received by Spark Streaming effectively exactly once despite failures.

Cons:
In absence of Zookeeper, Zookeeper-based Kafka monitoring tools not available.


October 18, 2018

Safe Mode in Hadoop

Safe Mode in hadoop is a maintenance state of NameNode during which NameNode doesn’t allow any changes to the file system. HDFS cluster is in safemode state during start up because the cluster needs to validate all the blocks and their locations.


Common errors occurs when Hadoop is in safe mode :

Error when reading/writing HDFS data
Cannot create directory

e.g.
# ./bin/hdfs  dfs -mkdir /usr/local/hdfsdata
mkdir: Cannot create directory /usr/local/hdfsdata. Name node is in safe mode.

If required, HDFS could be placed in safe mode explicitly and  manually for administration activities with dfsadmin command utility,  using bin/hadoop dfsadmin -safemode command.

# hdfs dfsadmin -safemode get
Safe mode is ON

Once validated, safemode is then disabled.
# hdfs  dfsadmin -safemode leave
Safe mode is OFF

HDFS Cheat Sheet

Commonly used & most useful  HDFS Commands



1. Create a directory in HDFS at given path(s).

# cd $HADOOP_HOME

# hdfs  dfs -mkdir /usr/local/hadoopdata



2. Upload and download a file in HDFS

   Upload:

    hdfs  dfs -put  /usr/local/localdata/names.csv/  /usr/local/hadoopdata/names.csv

 
Download:

   dfs -get     /usr/local/hadoopdata/names.csv  /usr/local/localdata/hnames.csv



3.  List the contents of a directory.

 hdfs  dfs -ls     /usr/local/hadoopdata



4. Copy a file from/To Local file system to HDFS

Works similarly to the get and put  commands, except that the destination is restricted to a local file reference.

hdfs  dfs -copyFromLocal  /usr/local/localdata/names.csv  /usr/local/hadoopdata/hnames.csv

hdfs  dfs -copyToLocal    /usr/local/hadoopdata/hnames.csv  /usr/local/localdata/fromhdnames2.csv

5. See contents of a file



 hdfs  dfs -cat    /usr/local/hadoopdata/hnames.csv/names.csv

6. Display last few lines of a file.

hdfs  dfs -tail    /usr/local/hadoopdata/hnames.csv/names.csv

7. Display the aggregate length of a file.

hdfs  dfs -du    /usr/local/hadoopdata/hnames.csv/names.csv


8. Run some of the examples provided:

hadoop jar hadoop-mapreduce-examples-2.7.6.jar wordcount  /usr/local/hdfsdata/names/names.csv  /usr/local/hdfsdata/names/namescount.csv

9. Examine the output files

 hdfs dfs -ls /usr/local/hdfsdata/names/namescount.csv

 hdfs dfs -cat /usr/local/hdfsdata/names/namescount.csv/part-r-00000

10. Example: Top 5 words in a text file

   10.1 Copy local file to Hadoop
    hdfs dfs -copyFromLocal  /usr/local/localdata/data.txt  /usr/local/hdfsdata/names/data.txt

   10.2 Run wordcount to produce result
   hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.6.jar wordcount  /usr/local/hdfsdata/names/data.txt  /usr/local/hdfsdata/names/datacount

  10.3 Use sort and head for top 5
 hdfs dfs -cat  /usr/local/hdfsdata/names/datacount/part-r-00000 | sort  -k 2 -r | head -5

  10.4 Create file

    hdfs dfs -cat  /usr/local/hdfsdata/names/datacount/part* | sort  -k 2 -r | head -3 > /usr/local/localdata/top3.txt

Creating DataFrames from CSV in Apache Spark

 from pyspark.sql import SparkSession spark = SparkSession.builder.appName("CSV Example").getOrCreate() sc = spark.sparkContext Sp...