Brock Palen | brockp@umich.edu |
Seth Meyer | smeyer@umich.edu |
Apache Hadoop is a framework for distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines by co-locating data and computation on each machine.
Flux Hadoop is the name of the trial ARC-TS Hadoop cluster.
# List files HDFS home directory
hdfs dfs -ls
hdfs dfs -ls /user/brockp
# Make a directory
hdfs dfs -mkdir directory
# Get full list of HDFS options
hdfs dfs -help
# Get help with single HDFS option, leave off the -
hdfs dfs -help ls
Create a directory in HDFS called arcdata
and check its contents.
hdfs dfs -mkdir arcdata
hdfs dfs -ls arcdata
# Get the sample data used later in the presentation
git clone https://bitbucket.org/umarcts/umarcts.bitbucket.org.git
cd umarcts.bitbucket.org/presentations
# Upload sample data for later use
hdfs dfs -mkdir -p warehouse/ngrams
hdfs dfs -put hadoop/ngrams.data .
hdfs dfs -put hadoop/ngrams.data ngrams.hive
# Download some data
hdfs dfs -get ngrams.hive .
# Start the interactive hive shell with default queue
hive
# Run a SQL script with Hive
hive -f path/to/script.sql
# specify a queue name
hive --hiveconf mapreduce.job.queuename=training
-- Create the 'ngrams' table. For example, the word 'snake' appearred
-- 6449 times in 1755 volumes in the year 1943. This is represented by:
--
-- snake 1943 6449 1755
CREATE EXTERNAL TABLE brockp_ngrams
(ngram STRING,
year INT,
matches BIGINT,
volumes BIGINT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/user/brockp/warehouse/ngrams';
-- Load the data from HDFS into a Hive table. This must be done so that Hive
-- knows where to find the data.
LOAD DATA INPATH 'ngrams.hive' OVERWRITE INTO TABLE brockp_ngrams;
-- Execute a query that finds all words that were only in a single volume in
-- any given year.
SELECT ngram, COUNT(ngram)
FROM ngrams
WHERE volumes = 1
GROUP BY ngram
ORDER BY ngram;
snake 22
snaketail 10
# Open the pig shell on flux-login
pig
# Open the pig shell for local development
pig -x local
# Run the included warez/ngrams.pig script
cd hadoop-pres
hdfs dfs -put ngrams.data ngrams.data
pig -f warez/ngrams.pig
# Specify queue name
pig -Dmapreduce.job.queuename=training
-- Load the data.
data = LOAD 'ngrams.data' AS (ngram:chararray, year:int, matches:long,
volumes:long);
-- Running 'describe data' gives:
-- data: {ngram: chararray, year: int, matches: long, volumes: long}
filtered = FILTER data BY volumes == 1;
-- filtered: {ngram: chararray, year: int, matches: long, volumes: long}
grouped = GROUP filtered BY ngram;
-- grouped: {group: chararray, filtered: {(ngram: chararray, year: int,
-- matches: long, volumes: long)}}
answer = FOREACH grouped GENERATE group, SUM($1.volumes);
-- answer: {group: chararray, long}
ordered = ORDER answer BY $0 ASC;
-- ordered: {group: chararray, long}
DUMP ordered;
(snake,22)
(snaketail,10)
#!/usr/bin/env python3
import fileinput
for line in fileinput.input():
split = line.split("\t")
if int(split[3]) == 1:
print('\t'.join([split[0], '1']))
#!/usr/bin/env python3
import fileinput
year_count = dict()
for line in fileinput.input():
tmp = line.split('\t')
if tmp[0] not in year_count.keys():
year_count[tmp[0]] = 0
year_count[tmp[0]] = year_count[tmp[0]] + int(tmp[1])
for year in sorted(year_count.keys()):
print('\t'.join([year, str(year_count[year])]))
# Run Python on a Hadoop cluster
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-Dmapreduce.job.queuename=training \
-input ngrams.data -output ngrams-output \
-mapper warez/map.py -reducer warez/reduce.py \
-file warez/map.py -file warez/reduce.py
# Look at the output from the job
hdfs dfs -cat ngrams-output/part*
# Run locally for testing
cat ngrams.data | python3 warez/map.py | python3 warez/reduce.py
snake 22
snaketail 10
# Open the interactive PySpark shell
pyspark --master yarn-client
# Open the interactive PySpark shell locally
pyspark --master local
# Submit a PySpark job to the cluster
spark-submit --master yarn-client \
--num-executors 2 \
--executor-memory 3g \
warez/ngrams-spark.py
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
conf = SparkConf().setAppName('NGrams')
sc = SparkContext(conf=conf)
ngrams = sc.textFile("ngrams.data")
columns = ngrams.map(lambda line: line.split("\t"))
filtered = columns.filter(lambda arr: int(arr[3]) == 1)
reducable = filtered.map(lambda arr: (arr[0], 1))
answer = reducable.reduceByKey(lambda a, b: a + b)
for k, v in answer.collect():
print('\t'.join([k, str(v)]))
ans = spark.sql("SELECT ngram, COUNT(ngram) FROM ngrams \
WHERE volumes=1 GROUP BY ngram ORDER BY ngram")
ans.show()
# ans is really a dataframe and we can mix the syntax
ans.count()
ans.printSchema()
ans.filter(ans['ngram'] == 'snake').show()
You can test both Pig and Spark locally if you have them installed. In addition, there is a Dockerfile here that provides a pre-built environment for testing Pig and Spark if you are a Docker user.
# start hive drop old ngrams and replace with full size copy on cluster
# note lack of importing data, if data already exists its left in place
# There are also ways to make these data even faster contact for help
DROP TABLE ngrams;
CREATE EXTERNAL TABLE ngrams
( ngram STRING,
year INT,
count BIGINT,
volumes BIGINT )
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/var/ngrams';
# Large data makes more partitions
# Parallelism is limited to the extent of these blocks
# 23GB -- 396 partitions
# 1,201,784,959 Rows
SELECT year, COUNT(ngram)
FROM ngrams
WHERE volumes = 1
GROUP BY year;