October 20, 2018

Spark Streaming + Kafka Integration

There are two approaches for Spark Streaming & Kafka integration, one with receivers and another is direct approach( without receivers) .

Receiver-based Approach




This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka consumer API. The data received by Receiver is stored in Spark executors and then jobs launched by Spark streaming processes the data.



Pros:
This approach update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools can be used.

Cons:
1. Under failure data loss is possible. To avoid data loss, you have to additionally enable Write Ahead Logs( WAL) in Spark Streaming ( Spark 1.2), it adds to additional replication of data.
2. Does not increase parallelism of Spark in the processing of data in case of single receiver and multiple Topic partitions in Kafka, no one-to-one mapping between Kafka and RDD partitions.
3. Uses Kafka's API to store consumed offsets in Zookeeper and replicates data which have small chance some records may get consumed twice under some failures.

Direct Approach (No Receivers)

This Receiver-less  approach periodically queries Kafka for latest offset in Topic + Partition when Spark Streaming job is launched.


Pros:
1.Ensure stronger end-to-end guarantees
2. Simplified Parallelism: There is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.No need to create multiple input Kafka streams and union them
3.  As long as you have sufficient Kafka retention, messages can be recovered from Kafka and no need of replication of data using Receiver.

4. Offsets are tracked by Spark Streaming within its checkpoints and each record is received by Spark Streaming effectively exactly once despite failures.

Cons:
In absence of Zookeeper, Zookeeper-based Kafka monitoring tools not available.


October 18, 2018

Safe Mode in Hadoop

Safe Mode in hadoop is a maintenance state of NameNode during which NameNode doesn’t allow any changes to the file system. HDFS cluster is in safemode state during start up because the cluster needs to validate all the blocks and their locations.


Common errors occurs when Hadoop is in safe mode :

Error when reading/writing HDFS data
Cannot create directory

e.g.
# ./bin/hdfs  dfs -mkdir /usr/local/hdfsdata
mkdir: Cannot create directory /usr/local/hdfsdata. Name node is in safe mode.

If required, HDFS could be placed in safe mode explicitly and  manually for administration activities with dfsadmin command utility,  using bin/hadoop dfsadmin -safemode command.

# hdfs dfsadmin -safemode get
Safe mode is ON

Once validated, safemode is then disabled.
# hdfs  dfsadmin -safemode leave
Safe mode is OFF

HDFS Cheat Sheet

Commonly used & most useful  HDFS Commands



1. Create a directory in HDFS at given path(s).

# cd $HADOOP_HOME

# hdfs  dfs -mkdir /usr/local/hadoopdata



2. Upload and download a file in HDFS

   Upload:

    hdfs  dfs -put  /usr/local/localdata/names.csv/  /usr/local/hadoopdata/names.csv

 
Download:

   dfs -get     /usr/local/hadoopdata/names.csv  /usr/local/localdata/hnames.csv



3.  List the contents of a directory.

 hdfs  dfs -ls     /usr/local/hadoopdata



4. Copy a file from/To Local file system to HDFS

Works similarly to the get and put  commands, except that the destination is restricted to a local file reference.

hdfs  dfs -copyFromLocal  /usr/local/localdata/names.csv  /usr/local/hadoopdata/hnames.csv

hdfs  dfs -copyToLocal    /usr/local/hadoopdata/hnames.csv  /usr/local/localdata/fromhdnames2.csv

5. See contents of a file



 hdfs  dfs -cat    /usr/local/hadoopdata/hnames.csv/names.csv

6. Display last few lines of a file.

hdfs  dfs -tail    /usr/local/hadoopdata/hnames.csv/names.csv

7. Display the aggregate length of a file.

hdfs  dfs -du    /usr/local/hadoopdata/hnames.csv/names.csv


8. Run some of the examples provided:

hadoop jar hadoop-mapreduce-examples-2.7.6.jar wordcount  /usr/local/hdfsdata/names/names.csv  /usr/local/hdfsdata/names/namescount.csv

9. Examine the output files

 hdfs dfs -ls /usr/local/hdfsdata/names/namescount.csv

 hdfs dfs -cat /usr/local/hdfsdata/names/namescount.csv/part-r-00000

10. Example: Top 5 words in a text file

   10.1 Copy local file to Hadoop
    hdfs dfs -copyFromLocal  /usr/local/localdata/data.txt  /usr/local/hdfsdata/names/data.txt

   10.2 Run wordcount to produce result
   hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.6.jar wordcount  /usr/local/hdfsdata/names/data.txt  /usr/local/hdfsdata/names/datacount

  10.3 Use sort and head for top 5
 hdfs dfs -cat  /usr/local/hdfsdata/names/datacount/part-r-00000 | sort  -k 2 -r | head -5

  10.4 Create file

    hdfs dfs -cat  /usr/local/hdfsdata/names/datacount/part* | sort  -k 2 -r | head -3 > /usr/local/localdata/top3.txt

September 30, 2018

Install Hadoop on CentOS on Amazon AWS EC2 Instance



Install and configure Hadoop  on CentOS OS on AWS EC2 instance  in 8 steps:

Step 1:  Setup CentOS AMI on Amazon AWS EC2 Instance
Choose CentOS AMI on Amazon AWS EC2 Instance

Step 2:  Connect to CentOS AWS instance
ssh -i ~/.ssh/filename.pem centos@awsinstanceip

Step 3:  Connecting with root user ( super admin)
sudo su -   

Step 4:  Update all the available packages from repository 
yum update

Step 5: Install JDK
yum install java-1.6.0-openjdkx86_64
Check java version
java –showversion Test
java –version
whereis java
sudo alternatives --config javac

Step 6: Install Hadoop
Install wget to allow download softwares
yum –y install wget
cd /usr/local 
wget http://apache.javapipe.com/hadoop/common/hadoop-2.7.6/hadoop-2.7.6.tar.gz  
          tar -zxvf hadoop-2.7.6.tar.g
set Hadoop home : /usr/local/hadoop-2.7.6
set Java Home: 
cd /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/

Step 7: Configure Hadoop
set the JAVA_HOME and HADOOP_HOME in the root/.bashrc file, by copying the following content

7.1 Open file vi /root/.bashrc
7.2 Copy the content

export HADOOP_HOME= /usr/local/hadoop-2.7.6
export JAVA_HOME= /usr/lib/jvm/jre-1.8.0-openjdk
unalias fs &> /dev/null
alias fs="hadoop fs"
unalias hls &> /dev/null
alias hls="fs -ls"
lzohead () {
hadoop fs -cat $1 | lzop -dc | head -1000 | less
}
export PATH=$PATH:$HADOOP_HOME/bin

7.3 Restart instance and check Java & Hadoop locations

echo $JAVA_HOME
echo $HADOOP_HOME


7.4 Create temp directory for Hadoop Data storage
mkdir -p /tmp/hadoop/data

7.5 Set JAVA_HOME in /usr/local/hadoop-2.7.6/etc/Hadoop/hadoop-env.sh

7.6 Configure the conf/core-site.xml

hadoop.tmp.dir
/tmp/hadoop/data
Location for HDFS.

fs.default.name
hdfs://localhost:54310
The name of the default file system. A URI whose
scheme and authority determine the FileSystem implementation.

-->


7.7
Configure the conf/mapred-site.xml with following content. It is the configuration for JobTracker.

mapred.job.tracker
localhost:54311
The host and port that the MapReduce job tracker runs at.

-->



7.8  configure conf/hdfs-site.xml. Replication factor configuration for the HDFS blocks

dfs.replication
1
Default number of block replications.


Step 8: Start Hadoop

8.1  Formatting the Hadoop filesystem, which is implemented on top of the local filesystems of your cluster, you need to do this the first time you set up a Hadoop installation.

./bin/hdfs namenode –format

8.2  start your Hadoop Single Node Cluster
./sbin/start-dfs.sh
./sbin/start-yarn.sh
8.3 JPS (Java Virtual Machine Process Status Tool )
JPS is a command is used to check all the Hadoop daemons like NameNode, DataNode, ResourceManager, NodeManager etc. which are running on the machine. If JPS doesn’t run , install it via ant.
sudo yum install ant

jps output

1600 ResourceManager
1703 NodeManager
1288 DataNode
1449 SecondaryNameNode
2331 Jps
1164 NameNode


Apache Spark:

Download & Install  Spark

wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-bin-hadoop2.7.tgz
tar xf spark-2.0.0-bin-hadoop2.7.tgz

mkdir /usr/local/spark
cp -r spark-2.0.0-bin-hadoop2.7/* /usr/local/spark
export SPARK_EXAMPLES_JAR=/usr/local/spark/examples/jars/spark-examples_2.11-2.0.0.jar
PATH=$PATH:$HOME/bin:/usr/local/spark/bin
source ~/.bash_profile

Start Pyspark session  
./bin/pyspark
Python 2.7.5 (default, Jul 13 2018, 13:06:57)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
18/09/30 17:07:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.0.0
      /_/

Using Python version 2.7.5 (default, Jul 13 2018 13:06:57)
SparkSession available as 'spark'.
>>>

The text from the input text file is tokenized into words to form a key value pair with all the words present in the input text file. The key is the word from the input file and value is ‘1’.
For instance if you consider the sentence “Hello World”. The pyspark in the WordCount example will split the string into individual tokens i.e. words. In this case, the entire sentence will be split into 2 tokens (one for each word) with a value 1.
(Hello,1)
(World,1)
file.txt contains “Hello World”


Test  Pyspark code
  
PySpark Code:

lines = sc.textFile("file.txt")


sorted(lines.flatMap(lambda line: line.split()).map(lambda w: (w,1)).reduceByKey(lambda v1, v2: v1+v2).collect())


Output:
[(u'Hello', 1), (u'World', 1)]







Creating DataFrames from CSV in Apache Spark

 from pyspark.sql import SparkSession spark = SparkSession.builder.appName("CSV Example").getOrCreate() sc = spark.sparkContext Sp...