
MapReduce Streaming

1. streaming



1.1 介绍

  Hadoop streaming能够让Ruby、Python、PHP、和C++等非Java类语言编写的map或reduce程序在hadoop集群上运行,且map/reduce程序只要遵循从标准输入stdin,到标准输出stdout即可。

  • MapReduceHDFS采用Java实现,默认提供Java编程接口
  • Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中 使用
  • Streaming方便已有程序向Hadoop平台移植

1.2 原理




1.3 优点

  • 开发效率高
  方便移植Hadoop平台,只需按照一定的格式从标准输入读取数据、向标准输出写入数据
  • 原有的单机程序稍加改动就可以在Hadoop平台进行分布式处理
  • 容易单机调试**cat input | mapper | sort | reducer > output**

  • 程序运行效率

  • 对于CPU密集的计算,有些语言如C/C++编写的程序可能比用Java编写的程序效率更高一些

  • 便于平台进行资源控制

  • Streaming框架中通过limit等方式可以灵活地限制应用程序使用的内存等资源

1.4 局限

  Streaming默认只能处理文本数据,如果要对二进制数据进行处理,比较好的方法是将二进制数据转换为文本数据


  • 两次数据拷贝和解析(分割),带来一定的开销

1.5 工作流程

Hadoop Streaming的工作流程大概如下:

  • hadoop-streaming.jar向Hadoop集群注册一个新的job,传入input path和output path等
  • 开始mapper时,Hadoop Streaming会将输入文件按行传入stdin
  • 我们自己编写的mapper需要读取stdin每一行,对其进行处理
  • mapper处理完毕的中间结果也要写入stdout,在Python中print语句默认即输出到stdout,当然若是不放心,也可以手动指定输出流。对于输出到stdout中的每一行,hadoop将默认以’\t’作为分隔符切割成k-v的格式。
  • mapper处理结束后,Hadoop 自动进行partition、sort、group,准备进行reduce任务
  • Hadoop Streaming将中间结果按行传给reducer
  • 我们编写的reducer需要读取stdin的每一行,对其进行处理
  • 处理结束之后仍然输出到stdout中
  • Hadoop 转存到output path中

1.6 Hadoop命令使用

参数 可选/必选 解释
-input 必选 输入文件路径
-output 必选 输出文件路径
-mapper 必选 用户自己写的mapper程序,可以是可执行文件或者脚本
-reducer 必选 用户自己写的reducer程序,可以是可执行文件或者脚本
-file 可选 打包文件到提交的作业中,可以是mapper
-partitioner 可选 用户自定义的partitioner程序
-combiner 可选 用户自定义的combiner程序(必须用java实现)
参数 可选/必选 参数描述
必选 Mapper的输入数据,文件要在任务提交前手动上传到HDFS
# 路径不能已存在,否则认为是其他job的输出
必选 reducer输出结果的HDFS存放路径, 不能已存在,但脚本中一定要配置
-mapper <可执行命令或java类>
-mapper “python map.py”
-mapper “bash map.sh”
-mapper “perl map.perl”
必选 Mapper程序
-reducer <可执行命令或java类>
-reducer “python reducer.py”
-reducer “bash reducer.sh”
-reducer “perl reducer.sh”
可选 Reducer程序,不需要reduce处理就不指定
-combiner <可执行命令或java类>
-combiner “python map.py”
-combiner “bash map.sh”
-combiner “perl map.perl”
可选 处理mapper输出的combiner程序
-file <本地mapper、reducer程序文件、程序运行需要的其他文件>
-file map.py
-file reduce.py
-file white_list
可选 文件在本地,小文件 将本地文件分发给计算节点 文件作为作业的一部分,一起被打包并提交,所有分发的文件最终会被放置在datanode该job的同一个专属目录下:jobcache/job_xxx/jar
-cacheFile hdfs://master:9000/cachefile_dir/white_list 分发HDFS文件Job运行需要的程序,辅助文件都先放到HDFS上,指定HDFS文件路径,将HDFS文件拷贝到计算节点,也是都放置在job的同一个专属目录下:jobcache/job_xxx/jar
-cacheArchive “hdfs://master:9000/w.tar.gz#WLDIR” 分发HDFS压缩文件、压缩文件内部具有目录结构
-numReduceTasks <数字> -numReduceTasks 2 可选 指定该任务的reducer个数
-inputformat 可选 指定自己定义的inputformat类,默认TextInputformat类
-outputformat 可选 指定自己定义的outputformat类,默认TextOutputformat类
-cmdenv name=value 可选 传递给streaming命令的环境变量

2. 实验

2.1 统词

  Hadoop是使用Java语言编写的,所以最直接的方式的就是使用Java语言来实现Mapper和Reducer,然后配置MapReduce Job,提交到集群计算环境来完成计算。但是很多开发者可能对Java并不熟悉,而是对一些具有脚本特性的语言,如C++、Shell、Python、 Ruby、PHP、Perl有实际开发经验,Hadoop Streaming为这一类开发者提供了使用Hadoop集群来进行处理数据的工具,即工具包hadoop-streaming-*.jar。

  Hadoop Streaming使用了Unix的标准输入输出作为Hadoop和其他编程语言的开发接口,因此在其他的编程语言所写的程序中,只需要将标准输入作为程 序的输入,将标准输出作为程序的输出就可以了。在标准的输入输出中,Key和Value是以Tab作为分隔符,并且在Reducer的标准输入中,Hadoop框架保证了输入的数据是经过了按Key排序的。

  如何使用Hadoop Streaming工具呢?我们可以查看该工具的使用方法,通过命令行来获取,如下所示

[root@master ~]# hadoop jar /usr/local/hadoop-2.6.5/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.5.jar
An example program must be given as the first argument.
Valid program names are:
  aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.
  aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.
  bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.
  dbcount: An example job that count the pageview counts from a database.
  distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.
  grep: A map/reduce program that counts the matches of a regex in the input.
  join: A job that effects a join over sorted, equally partitioned datasets
  multifilewc: A job that counts words from several files.
  pentomino: A map/reduce tile laying program to find solutions to pentomino problems.
  pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method.
  randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.
  randomwriter: A map/reduce program that writes 10GB of random data per node.
  secondarysort: An example defining a secondary sort to the reduce.
  sort: A map/reduce program that sorts the data written by the random writer.
  sudoku: A sudoku solver.
  teragen: Generate data for the terasort
  terasort: Run the terasort
  teravalidate: Checking results of terasort
  wordcount: A map/reduce program that counts the words in the input files.
  wordmean: A map/reduce program that counts the average length of the words in the input files.
  wordmedian: A map/reduce program that counts the median length of the words in the input files.
  wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.
我们分别选择几个可以使用Hadoop Streaming工具来进行计算的例子,比如对单词词频进行统计计算,即WordCount功能。首先,我们准备测试使用的数据集,如下所示.

[root@master ~]# mkdir -p /usr/local/src/tmp/
[root@master ~]# cd /usr/local/src/tmp/
[root@master tmp]# cp /etc/hosts /etc/passwd .
[root@master tmp]# hadoop fs -ls /
Found 8 items
drwxr-xr-x   - root supergroup          0 2019-06-24 11:01 /hbase
-rw-r--r--   3 root supergroup        225 2019-06-23 17:13 /hosts
drwxr-xr-x   - root supergroup          0 2019-06-26 20:22 /inputdata
drwxr-xr-x   - root supergroup          0 2019-06-26 20:21 /outputdata
drwxr-xr-x   - root supergroup          0 2019-06-26 20:26 /outputdata1
drwxr-xr-x   - root supergroup          0 2019-06-26 20:27 /outputdata2
drwx-wx-wx   - root supergroup          0 2019-06-26 20:23 /tmp
drwxr-xr-x   - root supergroup          0 2019-06-27 20:55 /user
[root@master tmp]# hadoop fs -mkdir /streaming
[root@master tmp]# hadoop fs -put /etc/hosts /etc/passwd /streaming
[root@master tmp]# hadoop fs -ls /streaming
Found 2 items
-rw-r--r--   3 root supergroup        225 2019-06-29 15:56 /streaming/hosts
-rw-r--r--   3 root supergroup        997 2019-06-29 15:56 /streaming/passwd
[root@master tmp]# 
下面,选择Python语言来实现MapReduce Job的运行。使用Python实现Mapper,代码文件为word_count_mapper.py
[root@master tmp]# cat word_count_mapper.py 
#!/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    words = filter(lambda word: word, line.split())
    for word in words:
        print '%s\t%s' % (word, 1)
[root@master tmp]# cat word_count_reducer.py 
#!/usr/bin/env python

import sys
from operator import itemgetter

wc_dict = {}

for line in sys.stdin:
    line = line.strip()
    word, count = line.split()
        count = int(count)
        wc_dict[word] = wc_dict.get(word, 0) + count
    except ValueError:

sorted_dict = sorted(wc_dict.items(), key=itemgetter(0))
for word, count in sorted_dict:
    print '%s\t%s' % (word, count)
[root@master tmp]# cat run.sh 
-input /streaming/ \
-output /streaming_output \
-mapper word_count_mapper.py \
-reducer word_count_reducer.py \
-numReduceTasks 2 \
-file *.py


[root@master tmp]# cat hosts | python word_count_mapper.py   1
localhost   1
localhost.localdomain   1
localhost4  1
localhost4.localdomain4 1
::1 1
localhost   1
localhost.localdomain   1
localhost6  1
localhost6.localdomain6 1  1
master  1  1
slave1  1  1
slave2  1
[root@master tmp]# cat hosts | python word_count_mapper.py |python word_count_reducer.py   1  1  1  1
::1 1
localhost   2
localhost.localdomain   2
localhost4  1
localhost4.localdomain4 1
localhost6  1
localhost6.localdomain6 1
master  1
slave1  1
slave2  1


hadoop 运行
[root@master tmp]# sh run.sh 
19/06/29 16:05:33 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [word_count_mapper.py, word_count_reducer.py, /tmp/hadoop-unjar3187438588282227932/] [] /tmp/streamjob6299520794908170392.jar tmpDir=null
19/06/29 16:05:34 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/29 16:05:34 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/29 16:05:35 INFO mapred.FileInputFormat: Total input paths to process : 2
19/06/29 16:05:35 INFO mapreduce.JobSubmitter: number of splits:3
19/06/29 16:05:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561552001692_0003
19/06/29 16:05:35 INFO impl.YarnClientImpl: Submitted application application_1561552001692_0003
19/06/29 16:05:35 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561552001692_0003/
19/06/29 16:05:35 INFO mapreduce.Job: Running job: job_1561552001692_0003
19/06/29 16:05:45 INFO mapreduce.Job: Job job_1561552001692_0003 running in uber mode : false
19/06/29 16:05:45 INFO mapreduce.Job:  map 0% reduce 0%
19/06/29 16:05:56 INFO mapreduce.Job:  map 100% reduce 0%
19/06/29 16:06:03 INFO mapreduce.Job:  map 100% reduce 50%
19/06/29 16:06:04 INFO mapreduce.Job:  map 100% reduce 100%
19/06/29 16:06:04 INFO mapreduce.Job: Job job_1561552001692_0003 completed successfully
19/06/29 16:06:04 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=1415
        FILE: Number of bytes written=556266
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=1868
        HDFS: Number of bytes written=1271
        HDFS: Number of read operations=15
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Job Counters 
        Launched map tasks=3
        Launched reduce tasks=2
        Data-local map tasks=3
        Total time spent by all maps in occupied slots (ms)=28325
        Total time spent by all reduces in occupied slots (ms)=9302
        Total time spent by all map tasks (ms)=28325
        Total time spent by all reduce tasks (ms)=9302
        Total vcore-milliseconds taken by all map tasks=28325
        Total vcore-milliseconds taken by all reduce tasks=9302
        Total megabyte-milliseconds taken by all map tasks=29004800
        Total megabyte-milliseconds taken by all reduce tasks=9525248
    Map-Reduce Framework
        Map input records=28
        Map output records=48
        Map output bytes=1307
        Map output materialized bytes=1439
        Input split bytes=260
        Combine input records=0
        Combine output records=0
        Reduce input groups=46
        Reduce shuffle bytes=1439
        Reduce input records=48
        Reduce output records=46
        Spilled Records=96
        Shuffled Maps =6
        Failed Shuffles=0
        Merged Map outputs=6
        GC time elapsed (ms)=1527
        CPU time spent (ms)=11410
        Physical memory (bytes) snapshot=1034854400
        Virtual memory (bytes) snapshot=10570633216
        Total committed heap usage (bytes)=756547584
    Shuffle Errors
    File Input Format Counters 
        Bytes Read=1608
    File Output Format Counters 
        Bytes Written=1271
19/06/29 16:06:04 INFO streaming.StreamJob: Output directory: /streaming_output


[root@master tmp]# hadoop fs -ls /
Found 10 items
drwxr-xr-x   - root supergroup          0 2019-06-24 11:01 /hbase
-rw-r--r--   3 root supergroup        225 2019-06-23 17:13 /hosts
drwxr-xr-x   - root supergroup          0 2019-06-26 20:22 /inputdata
drwxr-xr-x   - root supergroup          0 2019-06-26 20:21 /outputdata
drwxr-xr-x   - root supergroup          0 2019-06-26 20:26 /outputdata1
drwxr-xr-x   - root supergroup          0 2019-06-26 20:27 /outputdata2
drwxr-xr-x   - root supergroup          0 2019-06-29 15:56 /streaming
drwxr-xr-x   - root supergroup          0 2019-06-29 16:06 /streaming_output
drwx-wx-wx   - root supergroup          0 2019-06-26 20:23 /tmp
drwxr-xr-x   - root supergroup          0 2019-06-27 20:55 /user
[root@master tmp]# hadoop fs -ls /streaming_output
Found 3 items
-rw-r--r--   3 root supergroup          0 2019-06-29 16:06 /streaming_output/_SUCCESS
-rw-r--r--   3 root supergroup        737 2019-06-29 16:06 /streaming_output/part-00000
-rw-r--r--   3 root supergroup        534 2019-06-29 16:06 /streaming_output/part-00001

[root@master tmp]# hadoop fs -text /streaming_output/part-00000   1  1  1
::1 1
SSH:/var/empty/sshd:/sbin/nologin   1
bus:/:/sbin/nologin 1
chrony:x:998:996::/var/lib/chrony:/sbin/nologin 1
daemon:x:2:2:daemon:/sbin:/sbin/nologin 1
dbus:x:81:81:System 1
for 1
games:x:12:100:games:/usr/games:/sbin/nologin   1
halt:x:7:0:halt:/sbin:/sbin/halt    1
localhost   2
localhost4  1
localhost6  1
mail:x:8:12:mail:/var/spool/mail:/sbin/nologin  1
message 1
nobody:x:99:99:Nobody:/:/sbin/nologin   1
operator:x:11:0:operator:/root:/sbin/nologin    1
polkitd:x:999:998:User  1
postfix:x:89:89::/var/spool/postfix:/sbin/nologin   1
shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown    1
slave2  1
sshd:x:74:74:Privilege-separated    1
systemd-network:x:192:192:systemd   1
user:/run/saslauthd:/sbin/nologin   1
[root@master tmp]# hadoop fs -text /streaming_output/part-00001  1
Management:/:/sbin/nologin  1
Network 1
Server:/var/lib/mysql:/bin/false    1
User:/var/ftp:/sbin/nologin 1
adm:x:3:4:adm:/var/adm:/sbin/nologin    1
bin:x:1:1:bin:/bin:/sbin/nologin    1
ftp:x:14:50:FTP 1
localhost.localdomain   2
localhost4.localdomain4 1
localhost6.localdomain6 1
lp:x:4:7:lp:/var/spool/lpd:/sbin/nologin    1
master  1
mysql:x:27:27:MySQL 1
ntp:x:38:38::/etc/ntp:/sbin/nologin 1
polkitd:/:/sbin/nologin 1
root:x:0:0:root:/root:/bin/bash 1
saslauth:x:997:76:Saslauthd 1
slave1  1
sync:x:5:0:sync:/sbin:/bin/sync 1

2.2 file


  • 如果要分发的文件在本地且没有目录结构,可以使用-file /path/to/FILENAME选项


  • 在Streaming程序中通过./FILENAME就可以访问该文件

  • 对于本地可执行的文件,除了指定的mapper或reducer程序外,可能分发后没有可

执行权限,所以需要在包装程序如mapper.sh中运行chmod +x ./FILENAME设置

可执行权限,然后设置-mapper “mapper.sh”。

  顺便注意下 ./mapred/local/taskTracker/root/jobcache/job_201704060437_xxxx这个目录


import sys
import time
def read_local_file_func(f):
    word_set = set()
    file_in = open(f, 'r')
    for line in file_in:
        word = line.strip()
    return word_set

def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            print "===="
            word = s.strip()
            if word != "" and (word in word_set):
        #print s + "\t" + "1"
        #print '\t'.join([s, "1"])
                print "%s\t%s" % (s, 1)

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
import sys
def reduer_func():
    current_word = None
    count_pool = []
    sum = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')
        if current_word == None:
            current_word = word
        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s\t%s" % (current_word, sum)
            current_word = word
            count_pool = []
            sum = 0
    for count in count_pool:
        sum += count
    print "%s\t%s" % (current_word, str(sum))

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]


$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func white_list" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=3" \
    -file ./map.py \
    -file ./red.py \
    -file ./white_list

#$HADOOP_CMD jar $STREAM_JAR_PATH -input $INPUT_FILE_PATH_1 -output $OUTPUT_PATH -mapper "python map.py mapp
# er_func white_list" -reducer "python red.py reduer_func" -jobconf "mapred.reduce.tasks=3" -file ./map.py -file ./red.py -file ./white_list


[root@master mr_file_broadcast]# cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reduer_func > cmz.out
[root@master mr_file_broadcast]# cat cmz.out 
against 93
recent  2
suitable    2
[root@master mr_file_broadcast]# hadoop fs -ls /
Found 4 items
-rw-r--r--   3 root supergroup    1896621 2019-06-21 13:54 /1.data
drwxr-xr-x   - root supergroup          0 2019-06-21 14:03 /output
-rw-r--r--   3 root supergroup        944 2019-06-20 19:50 /passwd
drwx------   - root supergroup          0 2019-06-21 14:03 /tmp
[root@master mr_file_broadcast]# hadoop fs -put The_Man_of_Property.txt /
[root@master mr_file_broadcast]# hadoop fs -ls /
Found 5 items
-rw-r--r--   3 root supergroup    1896621 2019-06-21 13:54 /1.data
-rw-r--r--   3 root supergroup     632207 2019-06-21 21:11 /The_Man_of_Property.txt
drwxr-xr-x   - root supergroup          0 2019-06-21 14:03 /output
-rw-r--r--   3 root supergroup        944 2019-06-20 19:50 /passwd
drwx------   - root supergroup          0 2019-06-21 14:03 /tmp

[root@master mr_file_broadcast]# bash run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
rmr: `/output_file_broadcast': No such file or directory
19/06/21 21:11:55 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/21 21:11:55 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/21 21:11:55 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
packageJobJar: [./map.py, ./red.py, ./white_list, /tmp/hadoop-unjar5356561627923561485/] [] /tmp/streamjob514955250889635986.jar tmpDir=null
19/06/21 21:11:55 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/21 21:11:55 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/21 21:11:56 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/21 21:11:56 INFO mapreduce.JobSubmitter: number of splits:2
19/06/21 21:11:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561122558196_0002
19/06/21 21:11:57 INFO impl.YarnClientImpl: Submitted application application_1561122558196_0002
19/06/21 21:11:57 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561122558196_0002/
19/06/21 21:11:57 INFO mapreduce.Job: Running job: job_1561122558196_0002
19/06/21 21:12:03 INFO mapreduce.Job: Job job_1561122558196_0002 running in uber mode : false
19/06/21 21:12:03 INFO mapreduce.Job:  map 0% reduce 0%
19/06/21 21:12:09 INFO mapreduce.Job:  map 100% reduce 0%
19/06/21 21:12:17 INFO mapreduce.Job:  map 100% reduce 67%
19/06/21 21:12:18 INFO mapreduce.Job:  map 100% reduce 100%
19/06/21 21:12:18 INFO mapreduce.Job: Job job_1561122558196_0002 completed successfully
19/06/21 21:12:18 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=1182
        FILE: Number of bytes written=556758
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=636491
        HDFS: Number of bytes written=31
        HDFS: Number of read operations=15
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=6
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=3
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=8635
        Total time spent by all reduces in occupied slots (ms)=16566
        Total time spent by all map tasks (ms)=8635
        Total time spent by all reduce tasks (ms)=16566
        Total vcore-milliseconds taken by all map tasks=8635
        Total vcore-milliseconds taken by all reduce tasks=16566
        Total megabyte-milliseconds taken by all map tasks=8842240
        Total megabyte-milliseconds taken by all reduce tasks=16963584
    Map-Reduce Framework
        Map input records=2866
        Map output records=97
        Map output bytes=970
        Map output materialized bytes=1200
        Input split bytes=188
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=1200
        Reduce input records=97
        Reduce output records=3
        Spilled Records=194
        Shuffled Maps =6
        Failed Shuffles=0
        Merged Map outputs=6
        GC time elapsed (ms)=1042
        CPU time spent (ms)=9350
        Physical memory (bytes) snapshot=1057755136
        Virtual memory (bytes) snapshot=10571837440
        Total committed heap usage (bytes)=654311424
    Shuffle Errors
    File Input Format Counters 
        Bytes Read=636303
    File Output Format Counters 
        Bytes Written=31
19/06/21 21:12:18 INFO streaming.StreamJob: Output directory: /output_file_broadcast
[root@master mr_file_broadcast]# hadoop fs -ls /
Found 6 items
-rw-r--r--   3 root supergroup    1896621 2019-06-21 13:54 /1.data
-rw-r--r--   3 root supergroup     632207 2019-06-21 21:11 /The_Man_of_Property.txt
drwxr-xr-x   - root supergroup          0 2019-06-21 14:03 /output
drwxr-xr-x   - root supergroup          0 2019-06-21 21:12 /output_file_broadcast
-rw-r--r--   3 root supergroup        944 2019-06-20 19:50 /passwd
drwx------   - root supergroup          0 2019-06-21 14:03 /tmp
[root@master mr_file_broadcast]# hadoop fs -ls /output_file_broadcast/
Found 4 items
-rw-r--r--   3 root supergroup          0 2019-06-21 21:12 /output_file_broadcast/_SUCCESS
-rw-r--r--   3 root supergroup         22 2019-06-21 21:12 /output_file_broadcast/part-00000
-rw-r--r--   3 root supergroup          9 2019-06-21 21:12 /output_file_broadcast/part-00001
-rw-r--r--   3 root supergroup          0 2019-06-21 21:12 /output_file_broadcast/part-00002
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/part-00000
against 93
suitable    2
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/part-00001
recent  2
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/part-00002


[root@slave1 tmp]# pwd
[root@slave1 tmp]# tree .
└── nm-local-dir
    ├── filecache
    ├── nmPrivate
    │   └── application_1561122558196_0003
    │       ├── container_1561122558196_0003_01_000001
    │       │   ├── container_1561122558196_0003_01_000001.pid
    │       │   ├── container_1561122558196_0003_01_000001.tokens
    │       │   └── launch_container.sh
    │       ├── container_1561122558196_0003_01_000002
    │       │   ├── container_1561122558196_0003_01_000002.pid
    │       │   ├── container_1561122558196_0003_01_000002.tokens
    │       │   └── launch_container.sh
    │       └── container_1561122558196_0003_01_000003
    │           ├── container_1561122558196_0003_01_000003.pid
    │           ├── container_1561122558196_0003_01_000003.tokens
    │           └── launch_container.sh
    └── usercache
        └── root
            ├── appcache
            │   └── application_1561122558196_0003
            │       ├── container_1561122558196_0003_01_000001
            │       │   ├── container_tokens
            │       │   ├── default_container_executor_session.sh
            │       │   ├── default_container_executor.sh
            │       │   ├── job.jar -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/11/job.jar
            │       │   ├── jobSubmitDir
            │       │   │   ├── job.split -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/12/job.split
            │       │   │   └── job.splitmetainfo -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/10/job.splitmetainfo
            │       │   ├── job.xml -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/13/job.xml
            │       │   ├── launch_container.sh
            │       │   ├── map.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/13/map.py
            │       │   ├── red.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/15/red.py
            │       │   ├── tmp
            │       │   └── white_list -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/14/white_list
            │       ├── container_1561122558196_0003_01_000002
            │       │   ├── container_tokens
            │       │   ├── default_container_executor_session.sh
            │       │   ├── default_container_executor.sh
            │       │   ├── job.jar -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/11/job.jar
            │       │   ├── job.xml
            │       │   ├── launch_container.sh
            │       │   ├── map.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/13/map.py
            │       │   ├── red.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/15/red.py
            │       │   ├── tmp
            │       │   └── white_list -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/14/white_list
            │       ├── container_1561122558196_0003_01_000003
            │       │   ├── container_tokens
            │       │   ├── default_container_executor_session.sh
            │       │   ├── default_container_executor.sh
            │       │   ├── job.jar -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_1561122558196_0003/filecache/11/job.jar
            │       │   ├── job.xml
            │       │   ├── launch_container.sh
            │       │   ├── map.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/13/map.py
            │       │   ├── red.py -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/15/red.py
            │       │   ├── tmp
            │       │   └── white_list -> /usr/local/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/14/white_list
            │       ├── filecache
            │       │   ├── 10
            │       │   │   └── job.splitmetainfo
            │       │   ├── 11
            │       │   │   └── job.jar
            │       │   │       └── job.jar
            │       │   ├── 12
            │       │   │   └── job.split
            │       │   └── 13
            │       │       └── job.xml
            │       └── work
            └── filecache
                ├── 10
                │   └── map.py
                ├── 11
                │   └── white_list
                ├── 12
                │   └── red.py
                ├── 13
                │   └── map.py
                ├── 14
                │   └── white_list
                └── 15
                    └── red.py

35 directories, 45 files

2.3 cachefile

  指定计算白名单内单词的wordcount,如果文件(如字典文件)存放在HDFS中,希望计算时在每个计算节点上将 文件当作本地文件处理,可以使用-cacheFile,hdfs://host:port/path/to/file#linkname选项在计算节点缓存文件, Streaming程序通过./linkname访问文件。

[root@master mr_cachefile_broadcast]# pwd
[root@master mr_cachefile_broadcast]# hadoop fs -put white_list /
[root@master mr_cachefile_broadcast]# hadoop fs -ls /
Found 7 items
-rw-r--r--   3 root supergroup    1896621 2019-06-21 13:54 /1.data
-rw-r--r--   3 root supergroup     632207 2019-06-21 21:11 /The_Man_of_Property.txt
drwxr-xr-x   - root supergroup          0 2019-06-21 14:03 /output
drwxr-xr-x   - root supergroup          0 2019-06-21 21:38 /output_file_broadcast
-rw-r--r--   3 root supergroup        944 2019-06-20 19:50 /passwd
drwx------   - root supergroup          0 2019-06-21 14:03 /tmp
-rw-r--r--   3 root supergroup         24 2019-06-21 21:57 /white_list

import sys

def read_local_file_func(f):
    word_set = set()
    file_in = open(f, 'r')
    for line in file_in:
        word = line.strip()
    return word_set

def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            word = s.strip()
            if word != "" and (word in word_set):
                print "%s\t%s" % (s, 1)

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

import sys

def reduer_func():
    current_word = None
    count_pool = []
    sum = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if current_word == None:
            current_word = word

        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s\t%s" % (current_word, sum)
            current_word = word
            count_pool = []
            sum = 0


    for count in count_pool:
        sum += count
    print "%s\t%s" % (current_word, str(sum))

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]


$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func ABC" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=2" \
    -jobconf  "mapred.job.name=cachefile_demo" \
    -cacheFile "hdfs://master:9000/white_list#ABC" \
    -file "./map.py" \
    -file "./red.py"

    #-cacheFile "$HDFS_FILE_PATH#WH" \


[root@master mr_cachefile_broadcast]# bash run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
rmr: `/output_cachefile_broadcast': No such file or directory
19/06/21 22:08:16 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/21 22:08:16 WARN streaming.StreamJob: -cacheFile option is deprecated, please use -files instead.
19/06/21 22:08:16 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/21 22:08:16 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
19/06/21 22:08:16 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar8224574872481683209/] [] /tmp/streamjob7921590089314550150.jar tmpDir=null
19/06/21 22:08:17 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/21 22:08:17 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/21 22:08:17 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/21 22:08:17 INFO mapreduce.JobSubmitter: number of splits:2
19/06/21 22:08:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561122558196_0005
19/06/21 22:08:18 INFO impl.YarnClientImpl: Submitted application application_1561122558196_0005
19/06/21 22:08:18 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561122558196_0005/
19/06/21 22:08:18 INFO mapreduce.Job: Running job: job_1561122558196_0005
19/06/21 22:20:45 INFO mapreduce.Job: Job job_1561122558196_0005 running in uber mode : false
19/06/21 22:20:45 INFO mapreduce.Job:  map 0% reduce 0%
19/06/21 22:20:49 INFO mapreduce.Job:  map 100% reduce 0%
19/06/21 22:20:55 INFO mapreduce.Job:  map 100% reduce 100%
19/06/21 22:20:55 INFO mapreduce.Job: Job job_1561122558196_0005 completed successfully
19/06/21 22:20:56 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=1176
        FILE: Number of bytes written=445494
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=636491
        HDFS: Number of bytes written=31
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=2
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=5072
        Total time spent by all reduces in occupied slots (ms)=6478
        Total time spent by all map tasks (ms)=5072
        Total time spent by all reduce tasks (ms)=6478
        Total vcore-milliseconds taken by all map tasks=5072
        Total vcore-milliseconds taken by all reduce tasks=6478
        Total megabyte-milliseconds taken by all map tasks=5193728
        Total megabyte-milliseconds taken by all reduce tasks=6633472
    Map-Reduce Framework
        Map input records=2866
        Map output records=97
        Map output bytes=970
        Map output materialized bytes=1188
        Input split bytes=188
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=1188
        Reduce input records=97
        Reduce output records=3
        Spilled Records=194
        Shuffled Maps =4
        Failed Shuffles=0
        Merged Map outputs=4
        GC time elapsed (ms)=268
        CPU time spent (ms)=3520
        Physical memory (bytes) snapshot=880959488
        Virtual memory (bytes) snapshot=8461725696
        Total committed heap usage (bytes)=558891008
    Shuffle Errors
    File Input Format Counters 
        Bytes Read=636303
    File Output Format Counters 
        Bytes Written=31
19/06/21 22:20:56 INFO streaming.StreamJob: Output directory: /output_cachefile_broadcast

2.4 cacheArchive

  如果要分发的文件有目录结构,可以先将整个目录打包,然后上传到HDFS,再用-cacheArchive, hdfs://host:port/path/to/archivefile#linkname分发压缩包。

[root@master tp2]# tree .
├── map.py
├── red.py
├── run.sh
└── tmp
    ├── white_list_1
    └── white_list_2

1 directory, 5 files

[root@master tmp]# cat white_list_1
[root@master tmp]# cat white_list_2
[root@master tmp]# tar zcf white_list.tar.gz *
[root@master tmp]# ls
white_list_1  white_list_2  white_list.tar.gz
[root@master tmp]# hadoop fs -put white_list.tar.gz /
[root@master tmp]# hadoop fs -ls /white_list.tar.gz 
-rw-r--r--   3 root supergroup        159 2019-06-29 21:00 /white_list.tar.gz
[root@master tmp]# cd ..
[root@master tp2]# ls
map.py  red.py  run.sh  tmp

import os
import sys
import gzip
import time

def get_file_handler(f):
    file_in = open(f, 'r')
    return file_in

def get_cachefile_handlers(f):
    f_handlers_list = []
    if os.path.isdir(f):
        for fd in os.listdir(f):
            f_handlers_list.append(get_file_handler(f + '/' + fd))
    return f_handlers_list

def read_local_file_func(f):
    word_set = set()
    for cachefile in get_cachefile_handlers(f):
        for line in cachefile:
            word = line.strip()
    return word_set

def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            word = s.strip()
            if word != "" and (word in word_set):
                print "%s\t%s" % (s, 1)

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

import sys

def reduer_func():
    current_word = None
    count_pool = []
    sum = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if current_word == None:
            current_word = word

        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s\t%s" % (current_word, sum)
            current_word = word
            count_pool = []
            sum = 0


    for count in count_pool:
        sum += count
    print "%s\t%s" % (current_word, str(sum))

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]


$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func WH.gz" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=2" \
    -jobconf  "mapred.job.name=cache_file_archive_demo" \
    -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \
    -file "./map.py" \
    -file "./red.py"


cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" 
mapper "python map.py mapper_func WH.gz" 
1. 期中WH.gz 就表示#前面的文件,就是个别名,理论上随便你起,最好顾名思义
2. 调用 WH.gz,也就是调用hdfs://master:9000/w.tar.gz
hadoop 运行
[root@master tp2]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /mp_output_cachearchive_broadcast
    19/06/29 22:29:21 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/29 22:29:22 WARN streaming.StreamJob: -cacheArchive option is deprecated, please use -archives instead.
19/06/29 22:29:22 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/29 22:29:22 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
19/06/29 22:29:22 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar4879077363844241273/] [] /tmp/streamjob4932232602831946314.jar tmpDir=null
19/06/29 22:29:22 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/29 22:29:22 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/29 22:29:23 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/29 22:29:23 INFO mapreduce.JobSubmitter: number of splits:2
19/06/29 22:29:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0003
19/06/29 22:29:23 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0003
19/06/29 22:29:23 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0003/
19/06/29 22:29:23 INFO mapreduce.Job: Running job: job_1561818301923_0003
19/06/29 22:29:28 INFO mapreduce.Job: Job job_1561818301923_0003 running in uber mode : false
19/06/29 22:29:28 INFO mapreduce.Job:  map 0% reduce 0%
19/06/29 22:29:33 INFO mapreduce.Job:  map 100% reduce 0%
19/06/29 22:29:40 INFO mapreduce.Job:  map 100% reduce 50%
19/06/29 22:29:41 INFO mapreduce.Job:  map 100% reduce 100%
19/06/29 22:29:41 INFO mapreduce.Job: Job job_1561818301923_0003 completed successfully
19/06/29 22:29:41 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=181
        FILE: Number of bytes written=446268
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=460
        HDFS: Number of bytes written=26
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Job Counters 
        Killed map tasks=1
        Launched map tasks=2
        Launched reduce tasks=2
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=5701
        Total time spent by all reduces in occupied slots (ms)=8328
        Total time spent by all map tasks (ms)=5701
        Total time spent by all reduce tasks (ms)=8328
        Total vcore-milliseconds taken by all map tasks=5701
        Total vcore-milliseconds taken by all reduce tasks=8328
        Total megabyte-milliseconds taken by all map tasks=5837824
        Total megabyte-milliseconds taken by all reduce tasks=8527872
    Map-Reduce Framework
        Map input records=4
        Map output records=20
        Map output bytes=129
        Map output materialized bytes=193
        Input split bytes=182
        Combine input records=0
        Combine output records=0
        Reduce input groups=4
        Reduce shuffle bytes=193
        Reduce input records=20
        Reduce output records=4
        Spilled Records=40
        Shuffled Maps =4
        Failed Shuffles=0
        Merged Map outputs=4
        GC time elapsed (ms)=251
        CPU time spent (ms)=6840
        Physical memory (bytes) snapshot=832233472
        Virtual memory (bytes) snapshot=8458309632
        Total committed heap usage (bytes)=553123840
    Shuffle Errors
    File Input Format Counters 
        Bytes Read=278
    File Output Format Counters 
        Bytes Written=26
19/06/29 22:29:41 INFO streaming.StreamJob: Output directory: /mp_output_cachearchive_broadcast
[root@master tp2]# bash run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /mp_output_cachearchive_broadcast
19/06/29 22:38:16 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/29 22:38:16 WARN streaming.StreamJob: -cacheArchive option is deprecated, please use -archives instead.
19/06/29 22:38:16 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/29 22:38:16 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
19/06/29 22:38:16 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar5517120324769277489/] [] /tmp/streamjob7237556585252410281.jar tmpDir=null
19/06/29 22:38:16 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/29 22:38:17 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/29 22:38:17 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/29 22:38:17 INFO mapreduce.JobSubmitter: number of splits:2
19/06/29 22:38:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0004
19/06/29 22:38:18 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0004
19/06/29 22:38:18 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0004/
19/06/29 22:38:18 INFO mapreduce.Job: Running job: job_1561818301923_0004
19/06/29 22:38:23 INFO mapreduce.Job: Job job_1561818301923_0004 running in uber mode : false
19/06/29 22:38:23 INFO mapreduce.Job:  map 0% reduce 0%
19/06/29 22:38:33 INFO mapreduce.Job:  map 67% reduce 0%
^C[root@master tp2]# hadoop job -kill job_1561818301923_0004
DEPRECATED: Use of this script to execute mapred command is deprecated.
Instead use the mapred command for it.

19/06/29 22:40:32 INFO client.RMProxy: Connecting to ResourceManager at master/
Killed job job_1561818301923_0004

2.5 compression

  • 输出数据量较大时,可以使用Hadoop提供的压缩机制对数据进行压缩,减少网络传输带宽和存储的消耗。
  • 可以指定对map的输出也就是中间结果进行压缩
  • 可以指定对reduce的输出也就是最终输出进行压缩
  • 可以指定是否压缩以及采用哪种压缩方式。
  • 对map输出进行压缩主要是为了减少shuffle过程中网络传输数据量
  • 对reduce输出进行压缩主要是减少输出结果占用的HDFS存储。
[root@master mr_compression]# ls
map.py  red.py  run_2.sh  run.sh

import os
import sys
import gzip

def get_file_handler(f):
    file_in = open(f, 'r')
    return file_in

def get_cachefile_handlers(f):
    f_handlers_list = []
    if os.path.isdir(f):
        for fd in os.listdir(f):
            f_handlers_list.append(get_file_handler(f + '/' + fd))
    return f_handlers_list

def read_local_file_func(f):
    word_set = set()
    for cachefile in get_cachefile_handlers(f):
        for line in cachefile:
            word = line.strip()
    return word_set

def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            word = s.strip()
            #if word != "" and (word in word_set):
            if word != "":
                print "%s\t%s" % (s, 1)

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

import sys

def reduer_func():
    current_word = None
    count_pool = []
    sum = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if current_word == None:
            current_word = word

        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s\t%s" % (current_word, sum)
            current_word = word
            count_pool = []
            sum = 0


    for count in count_pool:
        sum += count
    print "%s\t%s" % (current_word, str(sum))

if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func WH.gz" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=10" \
    -jobconf  "mapred.job.name=cachefile_demo" \
    -jobconf  "mapred.compress.map.output=true" \
    -jobconf  "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
    -jobconf  "mapred.output.compress=true" \
    -jobconf  "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
    -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \
    -file "./map.py" \
    -file "./red.py"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py mapper_func WH.gz" \
    -reducer "python red.py reduer_func" \
    -jobconf "mapred.reduce.tasks=10" \   # 最终结果可以看到10个压缩文件
    -jobconf  "mapred.job.name=cachefile_demo" \ # 任务名字,会在yarn后台 web上显示的名字
    -jobconf  "mapred.compress.map.output=true" \ # 开启压缩,压缩格式为gzip
    -jobconf  "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
    -jobconf  "mapred.output.compress=true" \  # 输出开启压缩,压缩格式为gzip
    -jobconf  "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \
    -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \   # 将HDFS上已有的压缩文件分发给Task
    -file "./map.py" \ # 分发本地的map程序到计算节点
    -file "./red.py"  # 分发本地的reduce程序到计算节点
hadoop 运行
[root@master mr_compression]# sh run
sh: run: No such file or directory
[root@master mr_compression]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
rmr: `/output_cachearchive_broadcast': No such file or directory
19/06/29 23:22:04 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/29 23:22:05 WARN streaming.StreamJob: -cacheArchive option is deprecated, please use -archives instead.
19/06/29 23:22:05 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.compress.map.output is deprecated. Instead, use mapreduce.map.output.compress
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.map.output.compression.codec is deprecated. Instead, use mapreduce.map.output.compress.codec
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
19/06/29 23:22:05 INFO Configuration.deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
packageJobJar: [./map.py, ./red.py, /tmp/hadoop-unjar8973310826152543911/] [] /tmp/streamjob4114282878628426821.jar tmpDir=null
19/06/29 23:22:05 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/29 23:22:05 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/29 23:22:06 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/29 23:22:06 INFO mapreduce.JobSubmitter: number of splits:2
19/06/29 23:22:06 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0005
19/06/29 23:22:07 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0005
19/06/29 23:22:07 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0005/
19/06/29 23:22:07 INFO mapreduce.Job: Running job: job_1561818301923_0005
19/06/29 23:22:12 INFO mapreduce.Job: Job job_1561818301923_0005 running in uber mode : false
19/06/29 23:22:12 INFO mapreduce.Job:  map 0% reduce 0%
19/06/29 23:22:21 INFO mapreduce.Job:  map 100% reduce 0%
19/06/29 23:22:37 INFO mapreduce.Job:  map 100% reduce 10%
19/06/29 23:22:41 INFO mapreduce.Job:  map 100% reduce 17%
19/06/29 23:22:42 INFO mapreduce.Job:  map 100% reduce 27%
19/06/29 23:22:44 INFO mapreduce.Job:  map 100% reduce 47%
19/06/29 23:22:46 INFO mapreduce.Job:  map 100% reduce 70%
19/06/29 23:22:49 INFO mapreduce.Job:  map 100% reduce 90%
19/06/29 23:22:50 INFO mapreduce.Job:  map 100% reduce 100%
19/06/29 23:22:50 INFO mapreduce.Job: Job job_1561818301923_0005 completed successfully
19/06/29 23:22:50 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=114091
        FILE: Number of bytes written=1594014
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=636491
        HDFS: Number of bytes written=80938
        HDFS: Number of read operations=36
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=20
    Job Counters 
        Killed reduce tasks=1
        Launched map tasks=2
        Launched reduce tasks=11
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=14495
        Total time spent by all reduces in occupied slots (ms)=200288
        Total time spent by all map tasks (ms)=14495
        Total time spent by all reduce tasks (ms)=200288
        Total vcore-milliseconds taken by all map tasks=14495
        Total vcore-milliseconds taken by all reduce tasks=200288
        Total megabyte-milliseconds taken by all map tasks=14842880
        Total megabyte-milliseconds taken by all reduce tasks=205094912
    Map-Reduce Framework
        Map input records=2866
        Map output records=111783
        Map output bytes=852872
        Map output materialized bytes=142941
        Input split bytes=188
        Combine input records=0
        Combine output records=0
        Reduce input groups=16984
        Reduce shuffle bytes=142941
        Reduce input records=111783
        Reduce output records=16984
        Spilled Records=223566
        Shuffled Maps =20
        Failed Shuffles=0
        Merged Map outputs=20
        GC time elapsed (ms)=5246
        CPU time spent (ms)=50310
        Physical memory (bytes) snapshot=2004160512
        Virtual memory (bytes) snapshot=25431437312
        Total committed heap usage (bytes)=1276641280
    Shuffle Errors
    File Input Format Counters 
        Bytes Read=636303
    File Output Format Counters 
        Bytes Written=80938
19/06/29 23:22:50 INFO streaming.StreamJob: Output directory: /output_cachearchive_broadcast

[root@master mr_compression]# hadoop fs -ls /output_cachearchive_broadcast
Found 11 items
-rw-r--r--   3 root supergroup          0 2019-06-29 23:29 /output_cachearchive_broadcast/_SUCCESS
-rw-r--r--   3 root supergroup       8091 2019-06-29 23:29 /output_cachearchive_broadcast/part-00000.gz
-rw-r--r--   3 root supergroup       7949 2019-06-29 23:29 /output_cachearchive_broadcast/part-00001.gz
-rw-r--r--   3 root supergroup       8067 2019-06-29 23:29 /output_cachearchive_broadcast/part-00002.gz
-rw-r--r--   3 root supergroup       8244 2019-06-29 23:29 /output_cachearchive_broadcast/part-00003.gz
-rw-r--r--   3 root supergroup       7913 2019-06-29 23:29 /output_cachearchive_broadcast/part-00004.gz
-rw-r--r--   3 root supergroup       8147 2019-06-29 23:29 /output_cachearchive_broadcast/part-00005.gz
-rw-r--r--   3 root supergroup       7912 2019-06-29 23:29 /output_cachearchive_broadcast/part-00006.gz
-rw-r--r--   3 root supergroup       8257 2019-06-29 23:29 /output_cachearchive_broadcast/part-00007.gz
-rw-r--r--   3 root supergroup       8349 2019-06-29 23:29 /output_cachearchive_broadcast/part-00008.gz
-rw-r--r--   3 root supergroup       8009 2019-06-29 23:29 /output_cachearchive_broadcast/part-00009.gz

2.6 全局排序 单reduce


[root@master mr_allsort_1reduce_python]# ls
a.txt  b.txt  map_sort.py  red_sort.py  run.sh
[root@master mr_allsort_1reduce_python]# cat map_sort.py 

import sys

#base_count = 10000
base_count = 99999

for line in sys.stdin:
    ss = line.strip().split('\t')
    key = ss[0]
    val = ss[1]

    new_key = base_count - int(key)
    #new_key = base_count + int(key)
    print "%s\t%s" % (new_key, val)

[root@master mr_allsort_1reduce_python]# cat red_sort.py 

import sys

#base_value = 10000
base_value = 99999

for line in sys.stdin:
    key, val = line.strip().split('\t')
    #print str(int(key) - base_value) + "\t" + val
    print str(base_value - int(key)) + "\t" + val

[root@master mr_allsort_1reduce_python]# cat run.sh 
#set -e -x



$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH

# Step 3.
    -output $OUTPUT_SORT_PATH \
    -mapper "python map_sort.py" \
    -reducer "python red_sort.py" \
    -jobconf "mapred.reduce.tasks=1" \
    -file ./map_sort.py \
    -file ./red_sort.py \


降序: print str(int(key) - base_value) + "\t" + val
升序: print str(int(key) + base_value) + "\t" + val
[root@master mr_allsort_1reduce_python]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
rmr: `/output_sort': No such file or directory
19/06/30 00:14:35 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/30 00:14:36 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/30 00:14:36 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
packageJobJar: [./map_sort.py, ./red_sort.py, /tmp/hadoop-unjar5718119915866454712/] [] /tmp/streamjob1423052824935220100.jar tmpDir=null
19/06/30 00:14:36 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/30 00:14:36 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/30 00:14:37 INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/root/.staging/job_1561818301923_0007
19/06/30 00:14:37 ERROR streaming.StreamJob: Error Launching job : Input path does not exist: hdfs://master:9000/a.txt
Input path does not exist: hdfs://master:9000/b.txt
Streaming Command Failed!
[root@master mr_allsort_1reduce_python]# ls
a.txt  b.txt  map_sort.py  red_sort.py  run.sh
[root@master mr_allsort_1reduce_python]# hadoop fs -put a.txt  b.txt /
[root@master mr_allsort_1reduce_python]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
rmr: `/output_sort': No such file or directory
19/06/30 00:15:03 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/30 00:15:03 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/30 00:15:03 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
packageJobJar: [./map_sort.py, ./red_sort.py, /tmp/hadoop-unjar5542916557294836465/] [] /tmp/streamjob810285156858649674.jar tmpDir=null
19/06/30 00:15:04 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/30 00:15:04 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/30 00:15:04 INFO mapred.FileInputFormat: Total input paths to process : 2
19/06/30 00:15:04 INFO mapreduce.JobSubmitter: number of splits:3
19/06/30 00:15:04 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0008
19/06/30 00:15:05 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0008
19/06/30 00:15:05 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0008/
19/06/30 00:15:05 INFO mapreduce.Job: Running job: job_1561818301923_0008
19/06/30 00:15:10 INFO mapreduce.Job: Job job_1561818301923_0008 running in uber mode : false
19/06/30 00:15:10 INFO mapreduce.Job:  map 0% reduce 0%
19/06/30 00:15:17 INFO mapreduce.Job:  map 33% reduce 0%
19/06/30 00:15:18 INFO mapreduce.Job:  map 100% reduce 0%
19/06/30 00:15:22 INFO mapreduce.Job:  map 100% reduce 100%
19/06/30 00:15:22 INFO mapreduce.Job: Job job_1561818301923_0008 completed successfully
19/06/30 00:15:22 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=1419
        FILE: Number of bytes written=445371
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=1173
        HDFS: Number of bytes written=899
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=3
        Launched reduce tasks=1
        Data-local map tasks=3
        Total time spent by all maps in occupied slots (ms)=16708
        Total time spent by all reduces in occupied slots (ms)=2409
        Total time spent by all map tasks (ms)=16708
        Total time spent by all reduce tasks (ms)=2409
        Total vcore-milliseconds taken by all map tasks=16708
        Total vcore-milliseconds taken by all reduce tasks=2409
        Total megabyte-milliseconds taken by all map tasks=17108992
        Total megabyte-milliseconds taken by all reduce tasks=2466816
    Map-Reduce Framework
        Map input records=101
        Map output records=101
        Map output bytes=1211
        Map output materialized bytes=1431
        Input split bytes=228
        Combine input records=0
        Combine output records=0
        Reduce input groups=101
        Reduce shuffle bytes=1431
        Reduce input records=101
        Reduce output records=101
        Spilled Records=202
        Shuffled Maps =3
        Failed Shuffles=0
        Merged Map outputs=3
        GC time elapsed (ms)=660
        CPU time spent (ms)=3980
        Physical memory (bytes) snapshot=931897344
        Virtual memory (bytes) snapshot=8446369792
        Total committed heap usage (bytes)=674758656
    Shuffle Errors
    File Input Format Counters 
        Bytes Read=945
    File Output Format Counters 
        Bytes Written=899
19/06/30 00:15:22 INFO streaming.StreamJob: Output directory: /output_sort
[root@master mr_allsort_1reduce_python]# hadoop fs -ls /output_sort
Found 2 items
-rw-r--r--   3 root supergroup          0 2019-06-30 00:15 /output_sort/_SUCCESS
-rw-r--r--   3 root supergroup        899 2019-06-30 00:15 /output_sort/part-00000
[root@master mr_allsort_1reduce_python]# hadoop fs -text /output_sort/part-00000|head
100 java
99  hadoop
98  java
97  hadoop
96  java
95  hadoop
94  java
93  hadoop
92  java
91  hadoop

2.7 全部排序 多reduce

[root@master mr_allsort_python]# ls
a.txt  b.txt  map_sort.py  red_sort.py  run.sh
[root@master mr_allsort_python]# cat a.txt 
1   hadoop
3   hadoop
5   hadoop
7   hadoop
9   hadoop
[root@master mr_allsort_python]# cat b.txt 
0   java
2   java
4   java
6   java
8   java
10  java
[root@master mr_allsort_python]# cat map_sort.py 

import sys

base_count = 10000

for line in sys.stdin:
    ss = line.strip().split('\t')
    key = ss[0]
    val = ss[1]

    new_key = base_count + int(key)

    red_idx = 1
    if new_key < (10000 + 10010) / 2:
        red_idx = 0

    print "%s\t%s\t%s" % (red_idx, new_key, val)

[root@master mr_allsort_python]# cat red_sort.py 

import sys

base_count = 10000

for line in sys.stdin:
    idx_id, key, val = line.strip().split('\t')

    new_key = int(key) - base_count
    print '\t'.join([str(new_key), val])

[root@master mr_allsort_python]# cat run.sh 



$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH

# Step 3.
    -output $OUTPUT_SORT_PATH \
    -mapper "python map_sort.py" \
    -reducer "python red_sort.py" \
    -file ./map_sort.py \
    -file ./red_sort.py \
    -jobconf mapred.reduce.tasks=2 \
    -jobconf stream.num.map.output.key.fields=2 \
    -jobconf num.key.fields.for.partition=1 \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
[root@master mr_allsort_python]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /cmz_output_sort
19/06/30 12:01:12 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/06/30 12:01:12 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
19/06/30 12:01:12 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
packageJobJar: [./map_sort.py, ./red_sort.py, /tmp/hadoop-unjar2785950129081995646/] [] /tmp/streamjob990761707239727652.jar tmpDir=null
19/06/30 12:01:12 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/30 12:01:13 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/30 12:01:13 INFO mapred.FileInputFormat: Total input paths to process : 2
19/06/30 12:01:13 INFO mapreduce.JobSubmitter: number of splits:2
19/06/30 12:01:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0015
19/06/30 12:01:13 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0015
19/06/30 12:01:13 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0015/
19/06/30 12:01:13 INFO mapreduce.Job: Running job: job_1561818301923_0015
19/06/30 12:01:17 INFO mapreduce.Job: Job job_1561818301923_0015 running in uber mode : false
19/06/30 12:01:17 INFO mapreduce.Job:  map 0% reduce 0%
19/06/30 12:01:23 INFO mapreduce.Job:  map 100% reduce 0%
19/06/30 12:01:28 INFO mapreduce.Job:  map 100% reduce 100%
19/06/30 12:01:28 INFO mapreduce.Job: Job job_1561818301923_0015 completed successfully
19/06/30 12:01:28 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=187
        FILE: Number of bytes written=444696
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=240
        HDFS: Number of bytes written=88
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=2
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=4630
        Total time spent by all reduces in occupied slots (ms)=5008
        Total time spent by all map tasks (ms)=4630
        Total time spent by all reduce tasks (ms)=5008
        Total vcore-milliseconds taken by all map tasks=4630
        Total vcore-milliseconds taken by all reduce tasks=5008
        Total megabyte-milliseconds taken by all map tasks=4741120
        Total megabyte-milliseconds taken by all reduce tasks=5128192
    Map-Reduce Framework
        Map input records=11
        Map output records=11
        Map output bytes=153
        Map output materialized bytes=199
        Input split bytes=152
        Combine input records=0
        Combine output records=0
        Reduce input groups=11
        Reduce shuffle bytes=199
        Reduce input records=11
        Reduce output records=11
        Spilled Records=22
        Shuffled Maps =4
        Failed Shuffles=0
        Merged Map outputs=4
        GC time elapsed (ms)=207
        CPU time spent (ms)=3010
        Physical memory (bytes) snapshot=834572288
        Virtual memory (bytes) snapshot=8461389824
        Total committed heap usage (bytes)=573571072
    Shuffle Errors
    File Input Format Counters 
        Bytes Read=88
    File Output Format Counters 
        Bytes Written=88
19/06/30 12:01:28 INFO streaming.StreamJob: Output directory: /cmz_output_sort
[root@master mr_allsort_python]# hadoop fs -text /cmz_output_sort/part-00001
5   hadoop
6   java
7   hadoop
8   java
9   hadoop
10  java
[root@master mr_allsort_python]# hadoop fs -text /cmz_output_sort/part-00000
0   java
1   hadoop
2   java
3   hadoop
4   java

2.8 自定义排序




[root@master mr_allsort_python2]# ls
aaa.txt  map_sort.py  red_sort.py  run.sh
[root@master mr_allsort_python2]# hadoop fs -ls /aaa.txt
-rw-r--r--   3 root supergroup         60 2019-06-30 12:22 /aaa.txt
[root@master mr_allsort_python2]# cat aaa.txt 
[root@master mr_allsort_python2]# cat map_sort.py 

import sys

for line in sys.stdin:
    ss = line.strip().split('\t')
    key = ss[0]
    val = ss[1]

    print "%s\t%s" % (key, val)

[root@master mr_allsort_python2]# cat red_sort.py 

import sys

for line in sys.stdin:
    print line.strip()

[root@master mr_allsort_python2]# cat run.sh 



$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH

# Step 3.
    -input $INPUT_FILE_PATH_A \
    -output $OUTPUT_SORT_PATH \
    -mapper "cat" \
    -reducer "cat" \
    -jobconf stream.num.map.output.key.fields=3 \
    -jobconf stream.map.output.field.separator=. \
    -jobconf mapred.text.key.partitioner.options=-k2,3 \
    -jobconf mapred.reduce.tasks=3

[root@master mr_allsort_python2]# sh run.sh 
rmr: DEPRECATED: Please use 'rm -r' instead.
Deleted /output_sort
19/06/30 12:29:37 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
packageJobJar: [/tmp/hadoop-unjar1891359708360072922/] [] /tmp/streamjob5747616032102106944.jar tmpDir=null
19/06/30 12:29:37 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/30 12:29:38 INFO client.RMProxy: Connecting to ResourceManager at master/
19/06/30 12:29:38 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/30 12:29:38 INFO mapreduce.JobSubmitter: number of splits:2
19/06/30 12:29:38 INFO Configuration.deprecation: mapred.text.key.partitioner.options is deprecated. Instead, use mapreduce.partition.keypartitioner.options
19/06/30 12:29:38 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
19/06/30 12:29:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1561818301923_0016
19/06/30 12:29:39 INFO impl.YarnClientImpl: Submitted application application_1561818301923_0016
19/06/30 12:29:39 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1561818301923_0016/
19/06/30 12:29:39 INFO mapreduce.Job: Running job: job_1561818301923_0016
19/06/30 12:29:44 INFO mapreduce.Job: Job job_1561818301923_0016 running in uber mode : false
19/06/30 12:29:44 INFO mapreduce.Job:  map 0% reduce 0%
19/06/30 12:29:48 INFO mapreduce.Job:  map 100% reduce 0%
19/06/30 12:29:53 INFO mapreduce.Job:  map 100% reduce 33%
19/06/30 12:29:54 INFO mapreduce.Job:  map 100% reduce 67%
19/06/30 12:29:55 INFO mapreduce.Job:  map 100% reduce 100%
19/06/30 12:29:55 INFO mapreduce.Job: Job job_1561818301923_0016 completed successfully
19/06/30 12:29:55 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=92
        FILE: Number of bytes written=548228
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=246
        HDFS: Number of bytes written=60
        HDFS: Number of read operations=15
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=6
    Job Counters 
        Killed reduce tasks=1
        Launched map tasks=2
        Launched reduce tasks=3
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=4835
        Total time spent by all reduces in occupied slots (ms)=8760
        Total time spent by all map tasks (ms)=4835
        Total time spent by all reduce tasks (ms)=8760
        Total vcore-milliseconds taken by all map tasks=4835
        Total vcore-milliseconds taken by all reduce tasks=8760
        Total megabyte-milliseconds taken by all map tasks=4951040
        Total megabyte-milliseconds taken by all reduce tasks=8970240
    Map-Reduce Framework
        Map input records=7
        Map output records=7
        Map output bytes=60
        Map output materialized bytes=110
        Input split bytes=156
        Combine input records=0
        Combine output records=0
        Reduce input groups=6
        Reduce shuffle bytes=110
        Reduce input records=7
        Reduce output records=7
        Spilled Records=14
        Shuffled Maps =6
        Failed Shuffles=0
        Merged Map outputs=6
        GC time elapsed (ms)=344
        CPU time spent (ms)=4810
        Physical memory (bytes) snapshot=976941056
        Virtual memory (bytes) snapshot=10577739776
        Total committed heap usage (bytes)=656932864
    Shuffle Errors
    File Input Format Counters 
        Bytes Read=90
    File Output Format Counters 
        Bytes Written=60
19/06/30 12:29:55 INFO streaming.StreamJob: Output directory: /output_sort
[root@master mr_allsort_python2]# hadoop fs -ls /output
^[[A^Cls: `/output': No such file or directory
^[[A[root@master mr_allsort_python2]# hadoop fs -ls /output_sort
Found 4 items
-rw-r--r--   3 root supergroup          0 2019-06-30 12:29 /output_sort/_SUCCESS
-rw-r--r--   3 root supergroup          8 2019-06-30 12:29 /output_sort/part-00000
-rw-r--r--   3 root supergroup         26 2019-06-30 12:29 /output_sort/part-00001
-rw-r--r--   3 root supergroup         26 2019-06-30 12:29 /output_sort/part-00002
[root@master mr_allsort_python2]# hadoop fs -text /output_sort/part-00000
e.9.4   5
[root@master mr_allsort_python2]# hadoop fs -text /output_sort/part-00001
a.7.2   6
d.1.5   23
e.5.9   22
[root@master mr_allsort_python2]# hadoop fs -text /output_sort/part-00002
e.5.1   45
e.5.1   23
f.8.3   3


[root@master mr_allsort_python2]# cat run.sh 



$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH  # 删除存在的目录

# Step 3.
    -input $INPUT_FILE_PATH_A \  # hdfs 源,数据输入的源
    -output $OUTPUT_SORT_PATH \  # hdfs 输出源,
    -mapper "cat" \  # 对输入不处理
    -reducer "cat" \ # 对输出不处理
    -jobconf stream.num.map.output.key.fields=3 \   # 源输入进行三等分
    -jobconf stream.map.output.field.separator=. \  # 以.为分割
    -jobconf mapred.text.key.partitioner.options=-k2,3 \  # 以第2,3列为一个桶排序也就是上面5.1这样的来分桶
    -jobconf mapred.reduce.tasks=3                        # 分三个桶

2.9 join

3. mapreduce 任务



