What is the best way to learn big data technologies?

Answer by Brent Bai:

I have to say, my career of big data start with "small" data.
You need real big data in hand to understand why these technologies designed.
Most of the big data frameworks are slower than centralised solution when it is about hundreds gigabyte.

Big data is an expensive toy.

What is the best way to learn big data technologies?

还要不要做大数据

我5月14日发了一条微博,后面的评论和私聊引起了我很多反思。这条微博是这样的:“我从0.16版本开始用Hadoop到现在已经5年了,一直相信大数据会是未来决胜的关键。但是,这个未来看来还有很远。或者说我们遇到的问题不同,是这些发明大数据的老外难以理解的,我们已经超前了很多。”

大数据这个话题近些年越来越热,我其实觉得它热过了头,所以我这里是想泼冷水的。

历史

估计会看到这个内容的大多数是我以前的同事,你们都知道我进这行不晚,2008年在人人网开始搭Hadoop,用0.16.3,那时候还没有什么人谈大数据,第一本Hadoop的书[1]也是2009年6月才出版的。那时候我们也没有概念要怎么用这个东西,唯一的目的就是改变“打点统计Log”模式,一开始就把生产服务搞死了三次。

那时候的服务叫ActiveLog,每一个PV记录一行,格式跟Apache Combined Log很类似,我们把WebServer的日志集中记录在统一的Server上(是的,比Facebook开源Scribe早半年[2])。为了存储空间的问题,引入了Hadoop,分布的存储在几百台服务器上。也就是这个结构,运行MapReduce占交换机带宽过大就会把生产集群挤死。

我记得最早的一个完整24小时Log文件的日期时2008年3月15日。那时候的日志是196GB/天。

当然后来大数据火了,我们有了更多的内部用户,也有了独立服务器甚至独立的机房,千兆直连核心交换机,到我2012年离职时集群已经有700台的规模了。

反思

饮水思源,这些年大数据概念红火带来了项目的红利,受这个影响我自己职业发展也不错。但是,掩盖不了一个一个具体问题的产生,应用范围一直是我最困扰的难题。

这让我回到源头去重新审视自己设计的系统和整个应用体系,然后我才发了最开始的那条微博。

数据量

多大的数据量敢叫大数据呢?Wikipedia里面有一句话:As of 2012 ranging from a few dozen terabytes to many petabytes of data in a single data set.[3]

2008年人人网的Log数据一个月有6TB,将就着算half dozen吧,偶尔也要算整年的数据。

所以我只敢说我做过Hadoop,实在不敢说成大数据。现在在谈大数据的书和文章,有多少作者是处理过上PB数据的?国内PB级容量的集群又有几个呢?

几百G就用awk吧,几T其实也可以用数据库的。

应用范围

谈到大数据应用就涉及三件事:1) Distributed/Parallel computing. 2) Data mining 3) Business Intelligence

这三个是互相依赖的,直接的需求来自BI,间接的需求来自数据挖掘,实现在Computing上。可是现实的情况给我的切身感受用一句老话来比喻:粗放型经济向集约型经济转型。现在谈集约的下一步绿色经济,还为时尚早。

我们的互联网有几乎取之不尽的用户,打擦边球都能上市的公司,我们真的在乎数据吗?

炒作完大数据概念,真的应用到业务里,产生了利润吗?能挣回成本吗?

我知道国内大多数互联网公司的PM是不用数据做决策的,在谈大数据之前,应该从“小数据”开始。

 

这个切身体会我是到国外工作以后才有的,发那条微博前一周,我转了大概8%的现金到另一家银行开户,第二天,我的客户经理就要约我谈谈“投资需求”。要知道我去招行销金葵花可都没人问原因,销户一个月我的客户经理还打电话跟我说“因为我是金葵花客户,所以邀请办百夫长黑卡”,这是多么大的差距。

但是这还是“小数据”,这些事情还没办法做好,国内的大数据怎么做,做出来给谁看,谁又真的会看?

 

其实我觉得这个问题是无解的,市场决定了这个粗放的大环境,短期内是不会改变的。

现实能做的,不是去贩卖大数据的概念和技术,而是实实在在的让“小数据”先得到应用。
[1] http://wiki.apache.org/hadoop/Books
[2] http://en.wikipedia.org/wiki/Scribe_(log_server)
[3]  http://en.wikipedia.org/wiki/Big_data

使用Hive做数据分析

在大规模推广streaming方式的数据分析后,我们发现这个模式虽然入门成本低,但是执行效率也一样低。
每一个map task都要在TaskTracker上启动两个进程,一个java和一个perl/bash/python。
输入输出都多复制一次。

经过了一系列调研后,我们开始将部分streaming任务改写为Hive。

Hive是什么?

  1. Hive是单机运行的SQL解析引擎,本身并不运行在Hadoop上。
  2. SQL经过Hive解析为MapReduce任务,在Hadoop上运行。
  3. 使用Hive可以降低沟通成本,因为SQL语法的普及度较高。
  4. Hive翻译的任务效率不错,但是依然不如优化过的纯MapReduce任务。

数据准备

原始日志文件是这样的:
1323431269786 202911262 RE_223500512 AT_BLOG_788514510 REPLY BLOG_788514510_202911262

分别对应的字段是 <时间> <操作人> [[说明] [说明]……] <操作> <实体>
上面的例子对应的含义是:
  • <时间>: 1323431269786
  • <操作人>: 202911262
  • [说明]: RE_223500512
  • [说明]: AT_BLOG_788514510
  • <操作>: REPLY
  • <实体>: BLOG_788514510_202911262

扩展Hive的Deserializer

要用SQL分析数据,Hive必须知道如何切分整行的日志。Hive提供了一个接口,留给我们扩展自己的序列化和反序列化方法。

import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Writable;

public class RawActionDeserializer implements Deserializer {

@Override
public Object deserialize(Writable obj) throws SerDeException {
// TODO Auto-generated method stub
return null;
}

@Override
public ObjectInspector getObjectInspector() throws SerDeException {
// TODO Auto-generated method stub
return null;
}

@Override
public void initialize(Configuration conf, Properties props)
throws SerDeException {
// TODO Auto-generated method stub

}

}

三个函数作用分别是:

  • initialize:在启动时调用,根据运行时参数调整行为或者分配资源。
  • getObjectInspector:返回字段定义名称和类型。
  • deserialize:对每一行数据进行反序列化,返回结果。

定义表结构

在我们这个例子中,字段是固定的含义,不需要在initialize方法配置运行期参数。我们把字段的定义写成static,如下。

private static List structFieldNames = new ArrayList();

private static List structFieldObjectInspectors = new ArrayList();
static {
structFieldNames.add("time");
structFieldObjectInspectors.add(ObjectInspectorFactory
.getReflectionObjectInspector(Long.TYPE, ObjectInspectorOptions.JAVA));

structFieldNames.add("id");
structFieldObjectInspectors.add(ObjectInspectorFactory
.getReflectionObjectInspector(
java.lang.Integer.TYPE, ObjectInspectorOptions.JAVA));

structFieldNames.add("adv");
structFieldObjectInspectors.add(ObjectInspectorFactory
.getStandardListObjectInspector(
ObjectInspectorFactory.getReflectionObjectInspector(
String.class, ObjectInspectorOptions.JAVA)));

structFieldNames.add("verb");
structFieldObjectInspectors
.add(ObjectInspectorFactory.getReflectionObjectInspector(
String.class, ObjectInspectorOptions.JAVA));

structFieldNames.add("obj");
structFieldObjectInspectors
.add(ObjectInspectorFactory.getReflectionObjectInspector(
String.class, ObjectInspectorOptions.JAVA));
}

@Override
public ObjectInspector getObjectInspector() throws SerDeException {
return ObjectInspectorFactory.getStandardStructObjectInspector(
structFieldNames, structFieldObjectInspectors);
}

定义解析函数

为了能够让Java MapReduce任务复用代码,我们在外部实现了一个与Hive无关的类,这里不再贴代码。这个类定义了与日志字段相同的成员变量,并且提供一个static的valueOf方法用于从字符串构造自己。

@Override
public Object deserialize(Writable blob) throws SerDeException {
if (blob instanceof Text) {
String line = ((Text) blob).toString();
RawAction act = RawAction.valueOf(line);
List result = new ArrayList();
if (act == null)
return null;
result.add(act.getTime());
result.add(act.getUserId());
result.add(act.getAdv());
result.add(act.getVerb());
result.add(act.getObj());
return result;
}
return null;
}

建表

把上面程序编译并传到hive部署目录后,进入hive:

$ ./hive --auxpath /home/bochun.bai/dp-base-1.0-SNAPSHOT.jar


hive> CREATE TABLE ac_raw ROW FORMAT SERDE 'com.renren.dp.hive.RawActionDeserializer';
OK
Time taken: 0.117 seconds
hive> DESC ac_raw;
OK
time bigint from deserializer
id int from deserializer
adv array from deserializer
verb string from deserializer
obj string from deserializer
Time taken: 0.145 seconds


hive> LOAD DATA INPATH '/user/bochun.bai/hivedemo/raw_action' OVERWRITE INTO TABLE ac_raw;
Loading data to table default.ac_raw
Deleted hdfs://NAMENODE/user/bochun.bai/warehouse/ac_raw
OK
Time taken: 0.173 seconds


hive> SELECT count(1) FROM ac_raw;
…...显示很多MapReduce进度之后......
OK
332
Time taken: 15.404 seconds


hive> SELECT count(1) as cnt, verb FROM ac_raw GROUP BY verb;
…...显示很多MapReduce进度之后......
OK
4 ADD_FOOTPRINT
1 REPLY
24 SHARE_BLOG
299 VISIT
4 add_like
Time taken: 15.242 seconds

[Updated] Hive for Hadoop 0.21.0

终于要升级0.21.0了,目前最难的是Hive-0.7.0与Hadoop-0.21兼容问题。

这里有一个我修改过的Hive,可以在0.21运行:
http://sinofool.com/hive-0.7.0-r1043843.tar.bz2
MD5SUM: 8adb62c176b203b9d3cf5edc5d37b375
代码在:HIVE-1612

P.S. 这个版本有点儿旧,修改的时间是2010年11月10日,比官方的0.7.0落后一些。

UPDATED:

最新的2011年5月20日代码,还是用上面的patch编译的,基本可用。

去掉了HBase的支持,因为HBase本身也还不支持0.21.0。接下来解决HBase的兼容问题,再去jira里面提交。

hive-0.8.0-SNAPSHOT-r1125002-bin.tar.gz
MD5SUM: 7a48b50d375aae5ee69cd42dbd7bdd16

UPDATED:
最新的2011年5月23日代码,同上。
hive-0.8.0-SNAPSHOT-r1127826-bin.tar.gz
MD5SUM: 9dd4cb9d850894353a18df399b8c7b53

Hadoop reduce 慢

又一次从博客流量来源上看到一组有意思的词:hadoop reduce 慢。
我试着搜了一下,结果没找到自己的文章排在哪。

言归正传,慢真是个大问题!

首先是技术问题,也是最容易解决的问题,调参数。
我看到有人在网上问,说WordCount都慢,那就是环境问题了。调HeapSize,调GC参数,调TaskSlot,再不行加加机器。总是能解决的。

其实我更想说非技术问题,很多人误把hadoop妖魔化了,什么都往上套,一定会慢。
所谓快慢是要对比的,要么是跟旧系统比,要么是跟心理预期比。
如果你有旧系统,也是分布式,也是大数据量,写的并不太差,那hadoop是一定慢的。Hadoop能够带来的更多是开发效率的提高。
如果没有旧系统,比心理预期慢,那就必须先拷问一下自己凭什么预期它快。
还有就是,reduce天生就比map慢,这个不能比。

我们遇到过很多挫折:
在reduce的时候做矩阵运算,肯定快不起来。
在reduce输入和map一样的数据量,因为reduce个数少,肯定快不起来。
在map输出某个特殊的key,数据量不平衡,那某个reducer肯定快不起来。
在繁忙的机器上运行,也一定快不起来。

在遇到问题时,我会去调查:
Reduce要从网络读取多少数据。
排序能不能在内存完成。
Reduce有没有占很多内存。

Hadoop现在的名气大,能力相对没有那么多。盲目选择有风险,须谨慎。

Hadoop存在的问题

在睡觉前偶然看看Google Analytics的统计,有几个人是通过搜索引擎搜索“Hadoop存在的问题”来访问了这里。
我也试着搜了一下,我的上一篇写Hadoop的内容在Google出现第10页,baidu第4页。

既然这个问题都让人翻了这么多页来寻找答案,我就开始反问自己了。

Hadoop存在什么问题?

思考了最近两年的过程,我认为Hadoop存在的问题都不是技术问题:

1. API不稳定

Hadoop从0.17开始,到目前我用到0.21,每次升级都让我苦不堪言。
HDFS和MapReduce的向前兼容做的不错,但是过多的外围程序跟进了这个漩涡,例如Hive,HBase。
由于这些外围程序的不跟进,每次升级都会损失一些追随者。

当然0.xx版本,本来就没有承诺API是稳定的。只是至今也都没有1.0的规划,很可能将来要出到0.99。
没有盼头的一直跟随更新,实在是很辛苦。

2. 宣传和普及的不够

在刚接触了Hadoop几个月的时候,有幸与[email protected]座谈过几个小时。
我想这可能是我能一直坚持着跟进Hadoop开发的一个动力。

宣传的少会让谨慎的人不敢用,普及的多会让激进的人滥用。
我自认为在技术选型上是个保守派,所以关于应用HBase的各种讨论,我都投的反对票。

我认为Hadoop及其衍生品,根源上是批处理系统,高容错,高延时。
这种系统我会用来做异步的计算,但是一定不能染指用户操作行为。

应该加强宣传,让大家知道Hadoop不是万能的,它最擅长的是并行处理。

3. 小众

整个MapReduce这个概念是基于Google的一篇论文,本是解决PB级别搜索索引问题开发的架构。

随着发展,虽然功能已经抽象的不仅限于索引,但是量级的优势还是一直存在。
如果数据量在100TB以下,实在是没有必要用Hadoop。

可是有100TB数据的组织,真的不多。大部分的试用者,没有办法从应用Hadoop的过程里获得边际收益。

中小规模Hadoop集群优化

我们有一个Hadoop集群从上个月开始遇到一系列性能问题,在逐一解决的过程中,积累了以下的优化经验。

1. 网络带宽

Hadoop集群的服务器在规划时就在统一的交换机下,这是在官方文档中建议的部署方式。

但是我们的这台交换机和其他交换机的互联带宽有限,所以在客户端遇到了HDFS访问速度慢的问题。

把操作集群的客户端也联入DataNode的交换机内部,解决了这个问题。

2. 系统参数

对ulimit -c的修改也是官方文档建议的修改,在集群只有10台服务器时,并没有遇到问题。

随着机器增加和任务增加,这个值需要改的更大。

3. 配置文件管理

这个集群用的是Cloudera发行的版本,配置文件默认存在/etc/hadoop/conf位置。这是一个只有root才能修改的位置。

为了修改方便,我把配置文件统一保存在一台机器上,修改后用脚本分发。保证所有服务器都是统一的配置。

4. mapred.tasktracker.map.tasks.maximum

这个参数控制每个TaskTracker同时运行的Map任务数。

以前的设置是和CPU核数相同的,偶尔遇到任务挤占DataNode资源的问题。

现在改成map+reduce+1==num_cpu_cores。

5. 严格控制root权限

Cloudera的发行版会创建一个hadoop用户,各种守护进程都应该以这个用户运行。

曾经有误操作(/usr/lib/hadoop/bin/hadoop datanode &)导致本地的数据目录被root写入新文件,于是正确启动的hadoop用户进程无法读写。

所以现在的集群服务器不提供日常的root权限访问。

6. Java的GC模式

在mapred.child.java.opts和HADOOP_OPTS都增加了-XX:+UseConcMarkSweepGC。

JDK的文档中推荐现代多核处理器系统,采用这种GC方式,可以充分利用CPU的并发能力。

这个改动对性能的积极影响很大。

7. 选择正确的JDK

这个集群有部分服务器的JDK用的是32位版本,不能创建-Xmx4g以上的进程。

统一为x64版本的JDK。

8. mapred.reduce.slowstart.completed.maps

这个参数控制slowstart特性的时机,默认是在5%的map任务完成后,就开始调度reduce进程启动,开始copy过程。

但是我们的机器数量不多,有一次大量的任务堆积在JobTracker里,每个TaskTracker的map和reduce slots都跑满了。

由于map没有足够资源迅速完成,reduce也就无法结束,造成集群的资源互相死锁。

把这个参数改成了0.75,任务堆积的列表从平均10个,变成了3个。

9. mapred.fairscheduler.preemption

这个参数设为了true。以便fairscheduler在用户最小资源不能满足时,kill其他人的任务腾出足够的资源。

集群运行着各种类型的任务,有些map任务需要运行数小时。这个参数会导致这类任务被频繁kill,几乎无法完成。曾经有个任务在7小时内被kill了137次。

可以通过调整fairscheduler的pool配置解决,给这种任务单独配置一个minMap==maxMap的pool。

10. mapred.jobtracker.completeuserjobs.maximum

限制每个用户在JobTracker的内存中保存任务的个数。

因为这个参数过大,我们的JobTracker启动不到24小时就会陷入频繁的FullGC当中。

目前改为5,JT平稳运行一天处理1500个任务,只占用800M内存。

这个参数在>0.21.0已经没有必要设置了,因为0.21版本改造了completeuserjobs的用法,会尽快的写入磁盘,不再内存中长期存在了。

11. mapred.jobtracker.update.faulty.tracker.interval和mapred.jobtracker.max.blacklist.percent

一个写错的任务,会导致一大批TaskTracker进入黑名单,而且要24小时才能恢复。这种状况对中小规模的集群性能影响是非常大的。只能通过手工重启TaskTracker来修复。所以我们就修改了部分JobTracker的代码,暴露了两个参数:

mapred.jobtracker.update.faulty.tracker.interval控制黑名单重置时间,默认是24小时不能改变,我们现在改成了1小时。

mapred.jobtracker.max.blacklist.percent控制进入黑名单TT的比例,我们改成了0.2。

我正在补充这两个参数的TestCase,准备提交到trunk中。

12. 多用hive少用streaming

由于streaming的方便快捷,我们做了很多基于它的开发。但是由于streaming的任务在运行时还要有一个java进程读写stdin/out,有一定的性能开销。

类似的需求最好改用自定义的Deserializer+hive来完成。

Hadoop技术沙龙 感想

昨天参加了由CSDN和Yahoo公司组织的技术沙龙活动,听Milind Bhandarkar分享了《Hadoop应用性能调优案例分析》的经验,整个过程给我很多启发。

按照时间的顺序回忆,首先是出发之前。本来这次会议我们要有三个部门参加,各派出一名工程师。但是到临行前突然有两名工程师遇到紧急的事务,甚至预计要熬夜赶工。最终我只有半小时更换人选。

不要乱了阵脚

错过一次学习的机会只是偶然的,我们被工作控制的情况每个人都遇到过。我认为程序员的主要工作不是完成复杂的功能,反而是化繁为简,用同简单的方法完成复杂的事情,并且千方百计的提高自己的效率。

社区和企业

大概六点半左右我们到了Yahoo的办公室,当时会场还在布置,所以有时间参观了一下这里的休闲区。面积也就只有我们一半,墙面刷成淡黄色,一个水池一台咖啡机一台冰箱两台售货机。别的东西可能因为开会收走了。

这半个小时见到了中文Hadoop社区认识的韩轶平先生,他也是这次沙龙的组织者之一,我们谈论了在社区中的趣事和各自公司遇到的技术和管理问题。在对开源的使用上我很向往Yahoo和Hadoop的共生方式,互相依靠一起发展。

架构师

大概进行到七点半,我对台上这个人做的事非常钦佩。他把自己团队对Hadoop使用的经验变成了工具。这个方面的尝试我们现在做的太少了,经验传承现在是靠文档甚至是作坊式的师徒。作为架构师,把团队的经验固化并传播,我认为应该是最重要的。

后面的讨论大概进行了一个小时深浅不一,提到了HA NameNode的几个实现,HBase等等。会议时间感觉还短,要是再有机会我还要去参加。

京ICP备06058813号  京ICP备13006450号