Table of Contents

  1. Prerequisites
  2. Introduction
  3. Using HDFS
  4. Hive
  5. Pig
  6. Streaming
  7. Java
  8. Sqoop
  9. RHadoop
  10. Spark
  11. mrjob
  12. Viewing Job Status
  13. Tips and Tricks
  14. Technical Introduction

Prerequisites

You must have an account on Flux to use the Fladoop cluster. Please read Getting Started with Flux if you are unfamiliar with Flux.

You should also have a basic knowledge of standard Unix/Linux tools, including ssh, mv, cp, etc.

Go to top

Introduction

Hadoop is an open source data processing framework. Traditional HPC methods usually assume CPU-bound operations; Hadoop assumes IO-bound operations. Hadoop uses a distributed filesystem with a distributed programming model focusing on high IO throughput with relatively cheap CPU operations on data to perform large amounts of simple operations as quickly as possible, and is faster than traditional HPC applications for processing large amounts of data.

Fladoop is the Flux Hadoop stack, consisting of:

  • Hadoop 2
  • Pig
  • Hive
  • Sqoop 1
  • RHadoop
  • Spark

Fladoop refers to the whole stack, while I'll use 'Hadoop' to refer to Hadoop specifically. All the client commands can be run from the main flux-login nodes. You can login to the login nodes by running

ssh $uniqname@flux-login.engin.umich.edu

A typical workflow looks something like this, where the tool used can be Hadoop or any of its related tools:

  1. Copy data from scratch into HDFS
  2. Run job using your tool of choice
  3. Copy job results into scratch
  4. View results

Go to top

Using HDFS

The following command is used to transfer files into HDFS:

hdfs dfs -copyFromLocal path/to/local/file path/to/hdfs/file

The following command is used to transfer files out of HDFS:

hdfs dfs -copyToLocal path/to/hdfs/file path/to/local/file

The hdfs dfs tool also allows many of the traditional Unix file tools, such as mv, ls, cp, chown, chmod, etc.

To transfer files between different locations on HDFS or between different Hadoop clusters, you can use the distcp tool.

hadoop distcp -Dmapreduce.job.queuename=$queue \ # Set the queue name
hdfs://namenode.example.com:port/path/to/file \ # General HDFS path
hdfs://fladoop/user/$uniqname/path/to/file # Fladoop HDFS path

hadoop distcp -Dmapreduce.job.queuename=$queue \
file:///scratch/path/to/file \ # It's also possible to transfer from scratch
hdfs://fladoop/user/$uniqname/path/to/file

To transfer files from scratch into HDFS, ensure that the permissions are loose enough on your scratch directory. For example, all directories should have permissions similar to 755, while files should have permissions similar to 744 or 644.

Go to top

Hive

Hive is a SQL to MapReduce compiler - it transforms SQL queries to a form suitable to run on a Hadoop cluster, and runs them. It can run scripts or start an interactive session to query the data using SQL syntax. It does not fully conform to the SQL standard, but most commands are implemented. For those familiar with SQL, Hive is a fantastic way to analyze large datasets. Hive processes SQL, turns it into MapReduce written in Java code, compiles it, and runs it with Hadoop. Because the jobs are transposed to Java, they run as fast as possible since Java is Hadoop's native language. This makes Hive great for batch analysis (performing a query over all the data in the table).

Following is a brief Hive tutorial. It uses data from the Google Ngrams Dataset. You can download one or all of the sets and put them into Hive to follow the tutorial, or use the your own data. If you're unfamiliar with SQL, just know that the semi-colons are in specific places and are important to the flow of SQL programming. Removing one will have unintended results (most likely interpretation failure), but it probably won't break anything too badly.

First, in order to initialize Hive, run:

hive -e "SHOW TABLES"

This command initializes the metastore, a fancy name for the location that stores information about table schema and such. Hive will not work without this; the metastore is stored in your home directory on Flux, so do not remove the .hive_metastore directory. SHOW TABLES is a SQL query that shows you the tables you currently have; running Hive with the -e flag tells Hive to run the command you pass it instead of running an interactive session.

Next, let's start up an interactive Hive session. You'll need to let Hive know which queue you would like to run jobs on:

hive --hiveconf mapreduce.job.queuename=$queuename

Pass your actual queue name in place of $queuename. Next, let's create a table to hold the ngrams data:

CREATE TABLE ngrams
(ngram STRING, year INT, match_count BIGINT, volume_count BIGINT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

What we did here is create a table with four columns. These columns have both names (ngram, match_count, etc.) and types (string, bigint, etc.). Since the ngrams data is tab-delimited columns, it's easy to assign a schema to the data. Next, we will load the data into the table we just created:

LOAD DATA INPATH 'path/to/data/file' OVERWRITE INTO TABLE ngrams;

This command loads the data in path/to/data/file into the ngrams table. You should replace path/to/data/file with the actual path to your data. Now that we actually have some data, let's search for the most common word:

SELECT MAX(match_count) FROM ngrams;

This query returns the maximum number of matches of a single word in a single year.

Finally, to end the interactive session, merely type:

QUIT;

Go to top

Java

Java jobs are submitted using the yarn jar command. The general form is

yarn jar path/to/jarfile MainClass <java args> <MainClass args>

An example of how this might look is

yarn jar MyJob.jar MyMainClass -Dmapreduce.job.queuename=default \
    path/to/input/directory path/to/output/directory [extra args...]

Assuming I've compiled and packaged my .java files into MyJob.jar, this command runs MyMainClass and supplies some options to it. The first option

-Dmapreduce.job.queuename=default

sets the name of the queue you're submitting to, and without this option, your job will not run. You should have received the name of the queue that you can submit jobs to when you applied for a Flux account listing Hadoop as a research need. The other two command line arguments

path/to/input/directory
path/to/output/directory

are typical of Hadoop jobs, and specify the input and output file directories. Please note that the output directory can not exist when you submit your job; if it does, the job will fail.

[extra args...]

Additionally, some jobs require extra arguments. Traditionally, these come after the input and output directories.

Go to top

Sqoop

Sqoop is a tool to export from a RDBMS to HDFS. This is fantastic if you have a database that you would like to do complex queries on that MySQL or Postgres cannot handle in a reasonable amount of time.

Running sqoop requires that you have a database to pull data from, and the permissions need to be set so that both flux-login and all the Fladoop nodes have access. I've found it's best to just grant all permissions to a temporary account not bound to a particular hostname, then revoke the permissions after you're done importing. Substitute $db_user with the database account username, $db_addr with the address of the database, and $table_name with the name of the table you want to import.

# For PostgreSQL
sqoop import -Dmapreduce.job.queuename=$queue_name --username $db_user \
--connect jdbc:postgresql://$db_addr/$db_name --table $table_name -m 1

# For MySQL
sqoop import -Dmapreduce.job.queuename=$queue_name --username $db_user \
--connect jdbc:mysql://$db_addr/$db_name --table $table_name -m 1

If you'd like to import straight into Hive, append --hive-import to the command. Additionally, if the SQL account used to login to the SQL server requires a password, add -P to enter in the account password interactively.

Go to top

RHadoop

R is a statistical analysis scripting language. RHadoop provides two libraries that ease the use of R to run Hadoop jobs:

  • rhdfs
  • rmr2

The RHadoop documentation provides some documentation, but briefly: rhdfs is used to manipulate the filesystem and rmr2 is used to submit MapReduce jobs.

Using rhdfs is simple; simpler than using the same hdfs command. Some sample commands are below:

# Load module and start R
module load RHadoop
R

# Initialize the HDFS library
library(rhdfs)
hdfs.init()

# Run some commands
hdfs.ls('.') # ls ~
hdfs.rm('tmp') # rm ~/tmp
hdfs.mkdir('input') # mkdir ~/input

All paths, if not preceeded with a /, are assumed to be relative to your home directory, which is /user/$uniqname.

There is little documentation on rmr2; below is a sample job to demonstrate use of the main function mapreduce():

# Load module and start R
module load RHadoop
R

# Load the MapReduce library
library(rmr2)

# Set queue
rmr.options(
    backend.parameters = list(
        hadoop = list(
            D = "mapreduce.job.queuename=staff"
        )
    )
)

# Sample job
small.ints = to.dfs(1:1000)
mapreduce(
    input = small.ints,
    map = function(k, v) cbind(v, v^2)
)

# Exit R
q()

Your data must be in HDFS for Hadoop to process it; you cannot load local data and just process it with Hadoop.

Go to top

Spark

Check out the separate Spark documentation.

Go to top

mrjob

mrjob is a library written by Yelp to make transitioning from testing locally to running on a cluster seamless. To use mrjob, the python-hadoop/2.7 module must be loaded. This module can be loaded by running:

module load python-hadoop/2.7

To configure mrjob with the queue that you would like to submit jobs to, open ~/.mrjob.conf and add the following:

runners:
    hadoop:
        jobconf:
            mapreduce.job.queuename: "queue_name"

Replace queue_name with the name of your queue, and make sure it is surrounded by the quotation marks. Now, you may submit mrjob scripts by running

python2.7 job.py <arguments> -r hadoop

You must use python2.7 as the name of the Python interpreter.

View Job Status

To view the status of your job(s), you can run:

yarn application -list

To view a web-based version of that information, you can set up an SSH tunnel to the scheduler by running:

ssh -L 8088:fladoop-rm01:8088 -N $uniq@flux-xfer.engin.umich.edu

where $uniq is your uniqname. Point a brower at 127.0.0.1:8088 and you can see various scheduler statistics.

Go to top

Tips and Tricks

There are a couple main tips to remember to avoid common mistakes:

  • Submit jobs to the correct queue
  • Use command aliases and/or scripts to reduce typing errors
  • Use the right tool for the job

Submitting a job to the wrong queue is not a terrible problem, just a waste of half a minute. It can also be a little confusing; it's easy to avoid using command aliases or scripting. Letting Hadoop know what queue you're using takes a lot of characters:

-Dmapreduce.job.queuename=

In addition, many of the Hadoop filesystem commands are long:

  • ls -> hdfs dfs -ls
  • mkdir -> hdfs dfs -mkdir

One way to shorten this up is to put aliases in your bashrc. The bashrc is a file located at ~/.bashrc that changes how your shell works. For example, my bashrc contains the following lines:

alias hls='hdfs dfs -ls'
alias hcat='hdfs dfs -cat'
alias hget='hdfs dfs -copyToLocal'
alias hput='hdfs dfs -copyFromLocal'
alias hrm='hdfs dfs -rm -r'
function hsub() {
    jarfile=${1}
    queue=${2}
    shift
    shift
    yarn jar ${jarfile} -Dmapreduce.job.queuename=${queue} "$@"
}

For example, now I can type hls and it does the same thing as typing:

hdfs dfs -ls

I can also type:

hsub path/to/jar my_queue <job args...>

and it is equivalent to:

yarn jar path/to/jar -Dmapreduce.job.queuename=my_queue <job args...>

All these aliases both speed up my work and reduce errors.

Go to top


Technical Hadoop Introduction

A Hadoop cluster contains multiple types of nodes that all do different things:

  • Namenode: Master filesystem node
  • ResourceManager: Master scheduling node
  • JournalNode: High-availability service node
  • Datanode: Worker filesystem node
  • NodeManager: Worker scheduling node

The Fladoop cluster contains two namenodes running in High Availability (HA) mode and a single ResourceManager. These three nodes are also JournalNodes, which support the HA Namenodes. The Fladoop cluster contains plenty of worker nodes, each of which is both a Datanode and a NodeManager.

HDFS

HDFS is the first component that makes up Hadoop. The goal of HDFS is to provide a high performance, fault tolerant filesystem. Under the hood, HDFS stores file in 128MB blocks on the worker nodes, keeping track of what blocks are a part of which file on the Namenode. The Namenode sees the pool of hard drives on all the worker nodes as essentially one massive disk, and tries to spread the load around equally. In order to provide fast IO, a typical worker node contains as many hard drives as CPU cores.

Although HDFS does some crazy stuff under the hood, it mimics a traditional Linux filesystem as much as possible. Each user in HDFS has a directory at /user/$uniqname owned by them with 750(rwxr-x---) permissions.

MapReduce

The second component in Hadoop, MapReduce, is a parallel programming paradigm that Google popularized by crunching the massive amounts of data they accumulate on a daily basis. They viewed each operation they wanted to do on their data as essentially two operations:

  • Map: Transform a line into key/value pair(s)
  • Reduce: Combine duplicate keys until you have a non-repeating key/value list

For example, let's say you want to process the entire Android operation system and come up with counts for various common variable names like 'tmp', 'i', and others. You could come up with a workflow that:

  1. Downloads all the Android source code and concatenates it all into a single file
  2. Pushes this massive file into HDFS
  3. Fire off a job that looks for various words
  4. Check out the results

The keys, in this instance, would be the word(s) you're looking for. Since there could be multiple words of interest per line, each line could potentially generate multiple key/value pairs. The value would be the total for the line. This is the map stage. The reduce stage takes all the generated key/value pairs and adds up all the values of the same keys. The end result is the ability to count all this up much faster than you could do on your local machine.

Word counting is the "Hello World" of Hadoop applications, and many jobs that are run are pretty similar to it.

Queues

Fladoop uses a scheduler that assigns a certain amount of resources to queues; specifically, we use the CapacityScheduler. Each queue gets a guaranteed minimum amount of resources as well as a maximum amount of resources it may use. When the cluster is under-utilized, extra capacity is given to queues that are using less than their maximum capacities.

Go to top