Hadoop

Hadoop是一个生态圈,分布式的基础架构,主要解决海量数据的存储和分析计算问题。

1

Hadoop的优势

  • 高可靠性: Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元素
    或存储出现故障,也不会导致数据的丢失。

  • 高扩展性:在集群间分配任务数据,可方便的扩展数以干计的节点。

  • 高效性:在MapReduce的思想下, Hadoop是并行工作的,以加快任务处理的速度。

  • 高容错性:能够自动将失败的任务重新分配。

1. Hadoop的组成

1.1 Hadoop1与Hadoop2的区别

Hadoop2增加了yarn来实现资源的调度,cpu的调度和内存、磁盘的管理,从而使得MapReduce“专心”地进行计算任务。

img

1.2 HDFS架构

三部分组成:目录 + 数据 + 目录备份

  • NameNode:存储的是元数据文件目录名和结构、属性,每个分布式存储文件的块列表和块所在的DataNode等。
  • DataNode:真正存储数据的地方,存储文件块数据,以及块数据的校验和。
  • Secondary NameNode:监控HDFS状态,定时每隔断时间获取HDFS元数据的快照。

1.3 Yarn架构

作为调度的功能组件,ResourceManager主要是接受请求,分发调度给后续的NodeManager。ApplicationMaster的作用是负责数据的切分,管理单个任务,为程序申请资源分配给内部任务。

引用尚硅谷的图片:

img

1.4 MapReduce架构

MapReduce将计算过程分为两个阶段:Map和Reduce,接受过程是个并行的过程。

  • Map阶段并行处理输入数据
  • Reduce阶段对Map结果进行汇总

img

2. Hadoop目录结构

重要目录

(1)bin目录:存放对Hadoop相关服务(HDFS,YARN)进行操作的脚本

(2)etc目录:Hadoop的配置文件目录,存放Hadoop的配置文件

(3)lib目录:存放Hadoop的本地库(对数据进行压缩解压缩功能)

(4)sbin目录:存放启动或停止Hadoop相关服务的脚本

例如 sbin/hdfs.sh sbin/yarn.sh

(5)share目录:存放Hadoop的依赖jar包、文档、和官方案例

3. Hadoop运行模式

3.1 本地运行模式

官方为我们提供了几个案例,都在share里面 share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar

例如Grep和WordCount案例,使用里面 hadoop-mapreduce-examples-2.7.2.jar 进行wordcount任务计算,计算wcinput的文件中单词个数,输出到输出文件wcoutput中

yangyifan@hadoop101 hadoop-2.7.2$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount wcinput wcoutput

2 (1)

3

3.2 伪分布式运行模式

在单台机器中,进行伪分布式的集群操作。

1)配置集群的配置文件,HDFS与Yarn,core-site.xml 与 hdfs-site.xml、yarn-site.xml

2)在单台机子上启动NameNode、DataNode、才能再启动ResourceManager与NodeManager

img

3)利用MapReduce执行WordCount案例

查看MapReduce程序进程

img

img

3.3 完全分布式运行模式

我将启动两台机器进行完全分布式的运行模式,两台机器要关闭防火墙和设置一些ip和主机信息。

1 (1)

2 (1)

3

由于jdk与hadoop等文件与配置挨个分发过于麻烦,所以采用编写集群分发脚本xsync来进行集群的分发。

下面是脚本代码,#5 循环 是指分发到hadoop102和hadoop103处(两台分布式机器上)

#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi

#2 获取文件名称
p1=$1
fname=`basename $p1`
echo fname=$fname

#3 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir

#4 获取当前用户名称
user=`whoami`

#5 循环
for((host=103; host<104; host++)); do
echo ------------------- hadoop$host --------------
rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
done
  • 集群配置

4

  • 单点启动

sbin/hadoop-daemon.sh start namenode

sbin/yarn-daemon.sh start resourcemanager

  • 集群启动

先要配置slaves

yangyifan@hadoop102 hadoop$ vim slaves

在该文件中增加如下内容:hadoop102 和 hadoop103,文件中添加的内容结尾不允许有空格,文件中不允许有空行。

集群群启命令为sbin/start-dfs.sh``sbin/start-yarn.sh

5

yarn集群要在ResourceManager处启动,例如我这台就是hadoop103机器

6

HDFS

1. HDFS文件块大小

HDFS的文件在物理上是分块存储,每个块大小可以通过配置参数来进行默认是128m

总结: HDFS块的大小设置主要取决于磁盘传输速率。

img

大文件进行分块存储:

img

面试题:为什么块的大小不能设置太小,也不能设置太大?
(1) HDFS的块设置太小,会增加寻址时间,程序一直在找块的开始位置;
(2)如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。导致程序在处理这块数据时,会非常慢。

2. Shell操作

语法:bin/hadoop fs或者是bin/hdfs dfs + 具体命令,dfs是fs的实现类。

例如:[yangyifan@hadoop102 hadoop-2.7.2]$ hadoop fs -ls /

重要的操作-setrep:设置HDFS中文件的副本数量 [yangyifan@hadoop102 hadoop-2.7.2]$ hadoop fs -setrep 10 /user/yangyifan/yangyifan.txt 这里设置的副本数只是记录在NameNode的元数据中,真实的副本数量,还得看DataNode的数量。因为目前只有2台设备,最多也就2个副本,只有节点数的增加到10台时,副本数才能达到10。

  • 一些特殊的操作

-moveFromLocal:从本地剪切粘贴到HDFS [yangyifan@hadoop102 hadoop-2.7.2]$ hadoop fs -moveFromLocal ./yyf.txt /user/yangyifan

-copyFromLocal从本地文件系统中拷贝文件到HDFS路径去 [yangyifan@hadoop102 hadoop-2.7.2]$ hadoop fs -copyFromLocal README.txt /

-copyToLocal:从HDFS拷贝到本地 [yangyifan@hadoop102 hadoop-2.7.2]$ hadoop fs -copyToLocal /user/yangyifan/yangyifan.txt ./

-cp:从HDFS的一个路径拷贝到HDFS的另一个路径 [yangyifan@hadoop102 hadoop-2.7.2]$ hadoop fs -cp /user/yangyifan/yangyifan.txt /yiweicheng.txt

-mv:在HDFS目录中移动文件 [yangyifan@hadoop102 hadoop-2.7.2]$ hadoop fs -mv /zhuge.txt /user/yangyifan/

-get:等同于copyToLocal,就是从HDFS下载文件到本地 [yangyifan@hadoop102 hadoop-2.7.2]$ hadoop fs -get /user/yangyifan/yangyifan.txt ./

-getmerge:合并下载多个文件,比如HDFS的目录 /user/yangyifan/test下有多个文件:log.1, log.2,log.3,… [yangyifan@hadoop102 hadoop-2.7.2]$ hadoop fs -getmerge /user/yangyifan/test/* ./yihao.txt

-put:等同于copyFromLocal [yangyifan@hadoop102 hadoop-2.7.2]$ hadoop fs -put ./yihao.txt /user/yangyifan/test/

3. IDEA客户端操作

先将hadoop-2.7.2下载到windows路径下,配置HADOOP_HOME环境变量,和Path环境变量。

image-20210521130441804

image-20210521130455547

导入hadoop的Maven依赖,并添加日志配置文件,在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入

img

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j.appender.logfile=org.apache.log4j.FileAppender

log4j.appender.logfile.File=target/spring.log

log4j.appender.logfile.layout=org.apache.log4j.PatternLayout

log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3.1 API文件操作

(1)新建目录

public static void main(String[] args) throws Exception {
// 1 获取文件系统
Configuration configuration = new Configuration();
// 配置在集群上运行
// configuration.set("fs.defaultFS", "hdfs://hadoop102:9000");
// FileSystem fs = FileSystem.get(configuration);
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "yangyifan");
// 2 创建目录
fs.mkdirs(new Path("/1108/daxian/banzhang"));
// 3 关闭资源
fs.close();
}

(2)文件上传

// API测试
// (1)文件上传
fs.copyFromLocalFile(new Path("g:/data.txt"), new Path("/"));

(3)文件下载

// 2 执行下载操作
// boolean delSrc 指是否将原文件删除
// Path src 指要下载的文件路径
// Path dst 指将文件下载到的路径
// boolean useRawLocalFileSystem 是否开启文件校验
fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("g:/banhua.txt"), true);

(4)文件删除

fs.delete(new Path("/0508/"), true);

(5)文件名更改

fs.rename(new Path("/banzhang.txt"), new Path("/banhua.txt"));

(6)查看文件名称、权限、长度、块信息

RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while(listFiles.hasNext()){
LocatedFileStatus status = listFiles.next();
// 文件名称
System.out.println(status.getPath().getName());
// 长度
System.out.println(status.getLen());
// 权限
System.out.println(status.getPermission());
// 分组
System.out.println(status.getGroup());

// 获取存储的块信息
BlockLocation[] blockLocations = status.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
// 获取块存储的主机节点
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("---------------------");
}

(7)文件和文件夹判断

FileStatus[] listStatus = fs.listStatus(new Path("/"));	
for (FileStatus fileStatus : listStatus) {
// 如果是文件
if (fileStatus.isFile()) {
System.out.println("f:"+fileStatus.getPath().getName());
} else {
System.out.println("d:"+fileStatus.getPath().getName());
}
}

3.2 I/O流操作

上面我们学的API操作HDFS系统都是框架封装好的。那么如果我们想自己实现上述API的操作该怎么实现呢?

我们可以采用IO流的方式实现数据的上传和下载。具体不再赘述,可以网上查看文档。

4. NN和SNN

4.1 NN和SNN的工作机制

NN中的元数据需要存放在内存中来保证效率,但如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。

因此磁盘中有一个备份元数据的FsImage。这样又会带来新的问题,当在内存中的元数据更新时,如果同时更新FsImage,就会导致效率过低,但如果不更新,就会发生一致性问题,一旦NameNode节点断电,就会产生数据丢失。因此,引入Edits文件(只进行追加操作,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。这样,一旦NameNode节点断电,可以通过FsImage和Edits的合并,合成元数据。

需要定期进行FsImage和Edits的合并,引入一个新的节点SecondaryNamenode,专门用于FsImage和Edits的合并。

NN和2NN工作机制,引用尚硅谷的图:

1

NameNode:第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。

SecondaryNameNode:询问NameNode是否需要CheckPoint(定时时间到、Edit中的数据满了)请求执行CheckPoint。

然后将编辑日志和镜像文件拷贝到Secondary NameNode。SNN将其加载到内存,并合并。生成新的镜像文件fsimage.chkpoint,拷贝到NameNode。NameNode将其重新命名成fsimage。

拓展:工作机制详解

NameNode启动时,先滚动Edits并生成一个空的edits.inprogress,然后加载Edits和Fsimage到内存中,此时NameNode内存就持有最新的元数据信息。Client开始对NameNode发送元数据的增删改的请求,这些请求的操作首先会被记录到edits.inprogress中(查询元数据的操作不会被记录在Edits中,因为查询操作不会更改元数据信息),如果此时NameNode挂掉,重启后会从Edits中读取元数据的信息。然后,NameNode会在内存中执行元数据的增删改的操作。

由于Edits中记录的操作会越来越多,Edits文件会越来越大,导致NameNode在启动加载Edits时会很慢,所以需要对Edits和Fsimage进行合并(所谓合并,就是将Edits和Fsimage加载到内存中,照着Edits中的操作一步步执行,最终形成新的Fsimage)。SecondaryNameNode的作用就是帮助NameNode进行Edits和Fsimage的合并工作。

SecondaryNameNode首先会询问NameNode是否需要CheckPoint(触发CheckPoint需要满足两个条件中的任意一个,定时时间到和Edits中数据写满了)。直接带回NameNode是否检查结果。SecondaryNameNode执行CheckPoint操作,首先会让NameNode滚动Edits并生成一个空的edits.inprogress,滚动Edits的目的是给Edits打个标记,以后所有新的操作都写入edits.inprogress,其他未合并的Edits和Fsimage会拷贝到SecondaryNameNode的本地,然后将拷贝的Edits和Fsimage加载到内存中进行合并,生成fsimage.chkpoint,然后将fsimage.chkpoint拷贝给NameNode,重命名为Fsimage后替换掉原来的Fsimage。NameNode在启动时就只需要加载之前未合并的Edits和Fsimage即可,因为合并过的Edits中的元数据信息已经被记录在Fsimage中。

4.2 Fsimage和Edits解析

NameNode被格式化之后,将在hadoop下data/tmp/dfs/name/current目录中产生如下文件
img
(1) Fsimage文件: HDFS文件系统元数据的一个永久性的检查点,其中包含HDFS文件系统的所有目
录和文件inode的序列化信息。
(2) Edits文件: 存放HDFS文件系统的所有更新操作的路径,文件系统客户端执行的所有写操作首先
会被记录到Edits文件中。
(3) seen_ txid文件保存的是一个数字, 就是最后一个edits_的数字,是当前最新的编辑日志。
(4)每次NameNode启动的时候都会将Fsimage文件读入内存 ,加载Edits里面的更新操作,保证内存
中的元数据信息是最新的、同步的。

注意:NameNode如何确定下次开机启动的时候合并哪些Edits?根据seen_txtid来获取最新的edit编辑文件进行合并

查看文件:oiv查看Fsimage文件和oev查看Edits文件

hdfs oiv -p 文件类型 -i镜像文件 -o 转换后文件输出路径
hdfs oev -p 文件类型 -i编辑日志 -o 转换后文件输出路径

例如:hdfs oev -p XML -i edits_0000000000000000012-0000000000000000013 -o /opt/module/hadoop-2.7.2/edits.xml将显示的xml文件内容拷贝到IDEA中创建的xml文件中

4.3 CheckPoint时间设置

(1)通常情况下,SecondaryNameNode定时每隔一小时执行一次。

在hdfs-default.xml中:

<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
</property>

(2)一分钟检查一次操作次数,当操作次数达到1百万时,SecondaryNameNode执行一次。

<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
<description>操作动作次数</description>
</property>
<property>
<name>dfs.namenode.checkpoint.check.period</name>
<value>60</value>
<description> 1分钟检查一次操作次数</description>
</property >

4.4 NameNode故障处理

方法一:将SecondaryNameNode中数据拷贝到NameNode存储数据的目录;

  1. kill -9 NameNode进程

  2. 删除NameNode存储的数据(/opt/module/hadoop-2.7.2/data/tmp/dfs/name)

[yangyifan@hadoop101 hadoop-2.7.2]$ rm -rf /opt/module/hadoop-2.7.2/data/tmp/dfs/name/*

  1. 拷贝SecondaryNameNode中数据到原NameNode存储数据目录

[yangyifan@hadoop101 dfs]$ scp -r yangyifan@hadoop103:/opt/module/hadoop-2.7.2/data/tmp/dfs/namesecondary/* ./name/

  1. 重新启动NameNode

[yangyifan@hadoop101 hadoop-2.7.2]$ sbin/hadoop-daemon.sh start namenode

方法二:使用-importCheckpoint 选项启动 NameNode守护进程,从而将SecondaryNameNode中数据拷贝到NameNode目录中。

  1. 修改hdfs-site.xml中的
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>120</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/opt/module/hadoop-2.7.2/data/tmp/dfs/name</value>
</property>
  1. kill -9 NameNode进程

  2. 删除NameNode存储的数据(/opt/module/hadoop-2.7.2/data/tmp/dfs/name)

[yangyifan@hadoop101 hadoop-2.7.2]$ rm -rf /opt/module/hadoop-2.7.2/data/tmp/dfs/name/*

  1. 如果SecondaryNameNode不和NameNode在一个主机节点上,需要将SecondaryNameNode存储数据的目录拷贝到NameNode存储数据的平级目录并删除in_use.lock文件

[yangyifan@hadoop101 dfs]$ scp -r yangyifan@hadoop103:/opt/module/hadoop-2.7.2/data/tmp/dfs/namesecondary ./

[yangyifan@hadoop101 namesecondary]$ rm -rf in_use.lock

[yangyifan@hadoop101 dfs]$ pwd

/opt/module/hadoop-2.7.2/data/tmp/dfs

[yangyifan@hadoop101 dfs]$ ls

data name namesecondary

  1. 导入检查点数据(等待一会ctrl+c结束掉)

[yangyifan@hadoop101 hadoop-2.7.2]$ bin/hdfs namenode -importCheckpoint

  1. 启动NameNode

[yangyifan@hadoop101 hadoop-2.7.2]$ sbin/hadoop-daemon.sh start namenode

4.6 NameNode高可用配置

NameNode的本地目录可以配置成多个,且每个目录存放内容相同,增加了可靠性。

当进行增删改查操作时,其中的镜像文件和编辑日志都一样。

具体配置如下

​ (1)在hdfs-site.xml文件中增加如下内容

注意:先删除配置文件中先前配置的 -importCheckpoint 配置

<property>
<name>dfs.namenode.name.dir</name>
<value>file:///${hadoop.tmp.dir}/dfs/name1,file:///${hadoop.tmp.dir}/dfs/name2</value>
</property>

(2)停止集群,删除data和logs中所有数据。

[yangyifan@hadoop102 hadoop-2.7.2]$ rm -rf data/ logs/

[yangyifan@hadoop103 hadoop-2.7.2]$ rm -rf data/ logs/

[yangyifan@hadoop104 hadoop-2.7.2]$ rm -rf data/ logs/

(3)格式化集群并启动。

[yangyifan@hadoop102 hadoop-2.7.2]$ bin/hdfs namenode –format

[yangyifan@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh

(4)查看结果

[yangyifan@hadoop102 dfs]$ ll

总用量 12

drwx——. 3 yangyifan yangyifan 4096 12月 11 08:03 data

drwxrwxr-x. 3 yangyifan yangyifan 4096 12月 11 08:03 name1

drwxrwxr-x. 3 yangyifan yangyifan 4096 12月 11 08:03 name2

5. DataNode

5.1 DataNode工作机制

image-20210531191537568

1)一个数据块Block在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。

2)DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息。

3)心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。

4)集群运行中可以安全加入和退出一些机器。

注意:最好提前设置白名单。

5.2 DataNode损坏

(1)数据完整性

DataNode节点上的数据损坏了,却没有发现,是否也很危险,那么如何解决呢?

如下是DataNode节点保证数据完整性的方法。

1)当DataNode读取Block的时候,它会计算CheckSum校验和。

2)如果计算后的CheckSum,与Block创建时值不一样,说明Block已经损坏。

3)Client读取其他DataNode上的Block。

4)DataNode在其文件创建后周期验证CheckSum

(2)掉线时限参数设置

DataNode进程掉线或者故障,超过超时时间判断为死亡

image-20210531192259406

需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。

<property>
<name>dfs.namenode.heartbeat.recheck-interval</name>
<value>300000</value>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>3</value>
</property>

5.3 服役新DataNode(白名单)

我们需要在原有集群基础上动态添加新的数据节点。

  1. 环境准备

​ (1)在hadoop103主机上再克隆一台hadoop104主机

​ (2)修改IP地址和主机名称

​ (3)删除原来HDFS文件系统留存的文件(/opt/module/hadoop-2.7.2/data和log)

​ (4)source一下配置文件 [yangyifan@hadoop104 hadoop-2.7.2]$ source /etc/profile

  1. 添加到白名单

(1)在NameNode的/opt/module/hadoop-2.7.2/etc/hadoop目录下创建dfs.hosts文件

添加如下hadoop104

(2)在NameNode的hdfs-site.xml配置文件中增加dfs.hosts属性,开启白名单

<property>
<name>dfs.hosts</name>
<value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts</value>
</property>

(3)配置文件分发

xsync hdfs-site.xml

(4)刷新NameNode

hdfs dfsadmin -refreshNodes

(5)更新ResourceManager节点

yarn rmadmin -refreshNodes

注意:如果数据不均衡,可以用命令实现集群的再平衡

[yangyifan@hadoop101 sbin]$ ./start-balancer.sh

  1. 直接启动DataNode,即可关联到集群

[yangyifan@hadoop104 hadoop-2.7.2]$ sbin/hadoop-daemon.sh start datanode

[yangyifan@hadoop104 hadoop-2.7.2]$ sbin/yarn-daemon.sh start nodemanager

image-20210531193215264

5.4 黑名单退役

在黑名单上面的主机都会被强制退出。如果退役的小于设置的副本数,则不允许退役。

  1. NameNode的/opt/module/hadoop-2.7.2/etc/hadoop目录下创建dfs.hosts.exclude文件

添加如下主机名称(要退役的节点)

hadoop103

  1. 在NameNode的hdfs-site.xml配置文件中增加dfs.hosts.exclude属性
<property>
<name>dfs.hosts.exclude</name>
<value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude</value>
</property>
  1. 刷新NameNode、刷新ResourceManager

hdfs dfsadmin -refreshNodes

yarn rmadmin -refreshNodes

  1. 检查Web浏览器,退役节点的状态为decommission in progress(退役中),说明数据节点正在复制块到其他节点,如图:

image-20210531193444561

  1. 等待退役节点状态为decommissioned(所有块已经复制完成),停止该节点及节点资源管理器。注意:如果副本数是3,服役的节点小于等于3,是不能退役成功的,需要修改副本数后才能退役。

sbin/hadoop-daemon.sh stop datanode

sbin/yarn-daemon.sh stop nodemanager

  1. 如果数据不均衡,可以用命令实现集群的再平衡

sbin/start-balancer.sh

注意:不允许白名单和黑名单中同时出现同一个主机名称。

5.5 Datanode高可用

  1. DataNode也可以配置成多个目录,每个目录存储的数据不一样。即:数据不是副本

  2. 具体配置如下 hdfs-site.xml

<property>
<name>dfs.datanode.data.dir</name>
<value>file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2</value>
</property>

MapReduce

MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

1. MapReduce工作流程

一个完整的MapReduce程序在分布式运行时有三类实例进程:

1

1) MrAppMaster:负责整个程序的过程调度及状态协调。

2) MapTask:负责Map阶段的整个数据处理流程。

3) ReduceTask:负责Reduce阶段的整个数据处理流程。

下面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:

1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中

2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

3)多个溢出文件会被合并成大的溢出文件

4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序

5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据

6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)

7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

6

7

2. Mapper、Reducer和Driver编程

用户编写的程序分成三个部分:Mapper、Reducer和Driver。

  • Mapper阶段

    (1)用户自定义的Mapper要继承自己的父类
    (2) Mapper的输入数据是KV对的形式(KV的类型可自定义)
    (3) Mapper中的业务逻辑写在map()方法中
    (4) Mapper的输出数据是KV对的形式(KV的类型可自定义)
    (5) map()方法(MapTak进程) 对每一个<K,V>调用一次

  • Reducer阶段
    (1)用户自定义的Reducer要继承自己的父类
    (2) Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
    (3) Reducer的业务逻辑写在reduce()方法中
    (4) ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法

如何认为相同?默认是相同对象,后续可以进行分组排序,指定按照某一个或者多个字段相同则是同一个key。

  • Driver阶段
    相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是
    封装了MapReduce程序相关运行参数的job对象

3. 序列化

Hadoop不使用Java的序列化,因为它是一个重量级序列化框架(Serializable) ,一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header, 继承体系等), 不便于在网络中高效传输。

Hadoop使用自己的一套序列化机制 (Writable)
Hadoop序列化特点:(1)高效使用存储空间。(2)读写数据的额外开销小。

2

自定义bean实现序列化,必须要实现序列化接口(Writable)

(1)必须实现Writable接口

(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {   
super();
}

(3)重写序列化和反序列化方法,注意反序列化的顺序和序列化的顺序完全一致

@Override  public void write(DataOutput  out) throws IOException {    
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

@Override public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

(4)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。

(5)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口因为MapReduce框中的Shuffle过程要求对key必须能排序。

@Override  public int compareTo(FlowBean o)  {    
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

4. InputFormat数据输入

数据块:Block是HDFS物理上把数据分成一块一块。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

3

5. FileInputFormat实现类

5.1 FileInputFormat切片机制

切片大小默认是Block大小,不考虑数据集整体,切时只针对本文件进行切分。

4

在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型, MapReduce是如何读取这些数据的呢?

FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormnat、 CombineTextInputFormat和自定义InputFormat等。

5.2 TextInputFormat切片实现

TextInputFormat是默认的FileInputFormat实现类。

按行读取每条记录。键是存储该行在整个文件中的,key:起始字节偏移量,LongWritable类型。值是这行的内容 value:Text类型,不包括任何行终止符( 换行符和回车符),

示例:4条文本记录。
Rich learning form
Inte lligent learning engine
Lear ning more C onvenient
From the real demand for more close to the enter prise
每条记录表示为以下键/值对:
(0, Rich learning form)
(19, Intelligent learning engine)
(47, Le arning more convenient)
(72 , From the real demand for more close to the ente rprise)

5.3 CombineTextInputFormat切片实现

框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。CombineTextInputFormat它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

虚拟存储切片最大值设置(在Driver里面)

// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

//虚拟存储切片最大值设置20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);

注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

  • 切片机制

生成切片过程包括:虚拟存储过程和切片过程二部分。

当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。

5

6. OutputFormat接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

  1. 文本输出TextOutputFormat
    默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputF ormat调用toString0方法把它们转换为字符串。

  2. SequenceFileOutputFormat将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出

格式,因为它的格式紧凑,很容易被压缩。
3. 自定义OutputFormat
根据用户需求,自定义实现输出。

// 要将自定义的输出格式组件设置到job中
job.setOutputFormatClass(FilterOutputFormat.class);

7. Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

1

7.1 Partition分区

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

默认Partitioner分区

public class HashPartioner<KV> extends Partitioner<K, V> {
public int getPartition(K key, V value,int numReduceTasks) {
return (key.hashCode () & Integer.MAX_VALUE) % numReduceTasks ;
}

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

自定义Partition后,要根自定Partition的逻讲设置对应数量的ReduceTask。job. setNumRednceTasks(xxx);

注意:如果ReduceTask默认1个,则不管多少个分区,都还是产生一个结果文件

3

7.2 WritableComparable排序

排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。而默认排序是按照字典顺序排序且实现该排序的方法是快速排序。

  • 排序种类:

(1) 部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

(2) 全排序
最终输出结果只有一个文件, 且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在
处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

(3) GroupingComparato分组排序
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部
字段比较不相同)的key 进入到同一个reduce方法时,可以采用分组排序。

(4) 二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为1二次排序。

  1. MapTask排序

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值80%后,再对缓冲区中的数据进行一次快速排序,并溢写到磁盘上,然后会对磁盘上所有文件进行归并排序。

  1. ReduceTask排序

对于ReduceTask,它从每个MapTask上远程拷贝copy相应的数据文件

(1)如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件

(2)如果内存中文件大小或者数目超过一定阈值,则进行一次合并后combiner将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

  • 案例实战

增加自定义分区,分区按照省份手机号设置,文件中按照总流量内部降序排序。

img

(1)增加自定义分区类

public class ProvincePartitioner extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean key, Text value, int numPartitions) {
// 1 获取手机号码前三位
String preNum = value.toString().substring(0, 3);
int partition = 4;
// 2 根据手机号归属地设置分区
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}

(2)在驱动类中添加分区类

// 加载自定义分区类
job.setPartitionerClass(ProvincePartitioner.class);
// 设置Reducetask个数
job.setNumReduceTasks(5);

(3)排序

@Override
public int compareTo(FlowBean o) {
int result;
// 按照总流量大小,倒序排列
if (sumFlow > bean.getSumFlow()) {
result = -1;
}else if (sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}

(4)Mapper

就简单的获取然后拼接<对象,总流量>就行

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
FlowBean bean = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 截取
String[] fields = line.split("\t");
// 3 封装对象
String phoneNbr = fields[0];
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);

bean.set(upFlow, downFlow);
v.set(phoneNbr);
// 4 输出
context.write(bean, v);
}
}

(5)Reduce

排序和分区后的数据直接输出就行。

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 循环输出,避免总流量相同情况
for (Text text : values) {
context.write(text, key);
}
}
}

8. Combiner合并

(1)Combiner和Reducer的区别在于运行的位置:

Combiner是在每一个MapTask所在的节点运行,

Reducer是接收全局所有Mapper的输出结果;

(2)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。

例如:4

(3)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

  • 实战

直接将Reduce作为Combiner合并组件即可,避免再写一个。

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1 汇总
int sum = 0;
for(IntWritable value :values){
sum += value.get();
}
v.set(sum);
// 2 写出
context.write(key, v);
}
}

将WordcountReducer作为Combiner在WordcountDriver驱动类中指定

// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑
job.setCombinerClass(WordcountReducer.class);

5

9. GroupingComparator分组排序

对要输入到Reduce阶段的数据,根据某一个或几个字段进行分组,再输入到Reduce中。

例如:现在要输出每个订单中最贵的。6

(1)利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。

(2)在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品。

7

//map阶段排序二次排序
@Override
public int compareTo(OrderBean o) {
int result;
if (order_id > o.getOrder_id()) {
result = 1;
} else if (order_id < o.getOrder_id()) {
result = -1;
} else {
// 价格倒序排序
result = price > o.getPrice() ? -1 : 1;
}
return result;
}

//分组排序,订单id相同则认为一组
public class OrderGroupingComparator extends WritableComparator {
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;

int result;
if (aBean.getOrder_id() > bBean.getOrder_id()) {
result = 1;
} else if (aBean.getOrder_id() < bBean.getOrder_id()) {
result = -1;
} else {
result = 0;
}

return result;
}
}

// Reduce
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}

// 8 设置reduce端的分组
job.setGroupingComparatorClass(OrderGroupingComparator.class);

10. MapTask与ReduceTask工作机制( 补充)

8

​ (1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。

​ (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

​ (3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

​ (4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

​ 溢写阶段详情:

​ 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

​ 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

​ 步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

​ (5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

​ 当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

​ 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

​ 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

9

​ (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

​ (2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

​ (3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

​ (4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

2.设置ReduceTask并行度(个数)

​ ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

Yarn

Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。

1. Yarn基本架构

​ YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等组件构成,如图4-23所示。

1

2. Yarn工作机制(MR作业提交过程)

2

​ (1)MR程序提交Job到客户端所在的节点。

(2)YarnRunner向ResourceManager申请一个Application。(一个Job就是一个应用)

​ (3)RM将该应用程序的资源路径hdfs返回给YarnRunner。

​ (4)该程序将运行所需资源提交到HDFS上。

(5)程序资源提交完毕后,申请运行mrAppMaster。

(6)RM将用户的请求初始化成一个Task。

(7)其中一个NodeManager领取到Task任务。

​ (8)该NodeManager创建容器Container,并产生MRAppmaster。

(9)Container从HDFS上拷贝资源到本地。

(10)MRAppmaster向RM 申请运行MapTask资源。

​ (11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。

​ (12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。

(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。

​ (14)ReduceTask向MapTask获取相应分区的数据。

​ (15)程序运行完毕后,MR会向RM申请注销自己。

3. 资源调度器——FIFO、Capacity Scheduler容量 和Fair Scheduler公平

Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler容量 和Fair Scheduler公平。

Hadoop2.7.2默认的资源调度器是Capacity Scheduler。

  • FIFO

1

  • Capacity Scheduler

2

  • Fair Scheduler

3