Category Archives: Hadoop

还要不要做大数据

我5月14日发了一条微博,后面的评论和私聊引起了我很多反思。这条微博是这样的:“我从0.16版本开始用Hadoop到现在已经5年了,一直相信大数据会是未来决胜的关键。但是,这个未来看来还有很远。或者说我们遇到的问题不同,是这些发明大数据的老外难以理解的,我们已经超前了很多。”

大数据这个话题近些年越来越热,我其实觉得它热过了头,所以我这里是想泼冷水的。

历史

估计会看到这个内容的大多数是我以前的同事,你们都知道我进这行不晚,2008年在人人网开始搭Hadoop,用0.16.3,那时候还没有什么人谈大数据,第一本Hadoop的书[1]也是2009年6月才出版的。那时候我们也没有概念要怎么用这个东西,唯一的目的就是改变“打点统计Log”模式,一开始就把生产服务搞死了三次。

那时候的服务叫ActiveLog,每一个PV记录一行,格式跟Apache Combined Log很类似,我们把WebServer的日志集中记录在统一的Server上(是的,比Facebook开源Scribe早半年[2])。为了存储空间的问题,引入了Hadoop,分布的存储在几百台服务器上。也就是这个结构,运行MapReduce占交换机带宽过大就会把生产集群挤死。

我记得最早的一个完整24小时Log文件的日期时2008年3月15日。那时候的日志是196GB/天。

当然后来大数据火了,我们有了更多的内部用户,也有了独立服务器甚至独立的机房,千兆直连核心交换机,到我2012年离职时集群已经有700台的规模了。

反思

饮水思源,这些年大数据概念红火带来了项目的红利,受这个影响我自己职业发展也不错。但是,掩盖不了一个一个具体问题的产生,应用范围一直是我最困扰的难题。

这让我回到源头去重新审视自己设计的系统和整个应用体系,然后我才发了最开始的那条微博。

数据量

多大的数据量敢叫大数据呢?Wikipedia里面有一句话:As of 2012 ranging from a few dozen terabytes to many petabytes of data in a single data set.[3]

2008年人人网的Log数据一个月有6TB,将就着算half dozen吧,偶尔也要算整年的数据。

所以我只敢说我做过Hadoop,实在不敢说成大数据。现在在谈大数据的书和文章,有多少作者是处理过上PB数据的?国内PB级容量的集群又有几个呢?

几百G就用awk吧,几T其实也可以用数据库的。

应用范围

谈到大数据应用就涉及三件事:1) Distributed/Parallel computing. 2) Data mining 3) Business Intelligence

这三个是互相依赖的,直接的需求来自BI,间接的需求来自数据挖掘,实现在Computing上。可是现实的情况给我的切身感受用一句老话来比喻:粗放型经济向集约型经济转型。现在谈集约的下一步绿色经济,还为时尚早。

我们的互联网有几乎取之不尽的用户,打擦边球都能上市的公司,我们真的在乎数据吗?

炒作完大数据概念,真的应用到业务里,产生了利润吗?能挣回成本吗?

我知道国内大多数互联网公司的PM是不用数据做决策的,在谈大数据之前,应该从“小数据”开始。

 

这个切身体会我是到国外工作以后才有的,发那条微博前一周,我转了大概8%的现金到另一家银行开户,第二天,我的客户经理就要约我谈谈“投资需求”。要知道我去招行销金葵花可都没人问原因,销户一个月我的客户经理还打电话跟我说“因为我是金葵花客户,所以邀请办百夫长黑卡”,这是多么大的差距。

但是这还是“小数据”,这些事情还没办法做好,国内的大数据怎么做,做出来给谁看,谁又真的会看?

 

其实我觉得这个问题是无解的,市场决定了这个粗放的大环境,短期内是不会改变的。

现实能做的,不是去贩卖大数据的概念和技术,而是实实在在的让“小数据”先得到应用。
[1] http://wiki.apache.org/hadoop/Books
[2] http://en.wikipedia.org/wiki/Scribe_(log_server)
[3]  http://en.wikipedia.org/wiki/Big_data

使用Hive做数据分析

在大规模推广streaming方式的数据分析后,我们发现这个模式虽然入门成本低,但是执行效率也一样低。
每一个map task都要在TaskTracker上启动两个进程,一个java和一个perl/bash/python。
输入输出都多复制一次。

经过了一系列调研后,我们开始将部分streaming任务改写为Hive。

Hive是什么?

  1. Hive是单机运行的SQL解析引擎,本身并不运行在Hadoop上。
  2. SQL经过Hive解析为MapReduce任务,在Hadoop上运行。
  3. 使用Hive可以降低沟通成本,因为SQL语法的普及度较高。
  4. Hive翻译的任务效率不错,但是依然不如优化过的纯MapReduce任务。

数据准备

原始日志文件是这样的:

1323431269786 202911262 RE_223500512 AT_BLOG_788514510 REPLY BLOG_788514510_202911262
分别对应的字段是 <时间> <操作人> [[说明] [说明]……] <操作> <实体>
上面的例子对应的含义是:
  • <时间>: 1323431269786
  • <操作人>: 202911262
  • [说明]: RE_223500512
  • [说明]: AT_BLOG_788514510
  • <操作>: REPLY
  • <实体>: BLOG_788514510_202911262

扩展Hive的Deserializer

要用SQL分析数据,Hive必须知道如何切分整行的日志。Hive提供了一个接口,留给我们扩展自己的序列化和反序列化方法。

import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Writable;

public class RawActionDeserializer implements Deserializer {

  @Override
  public Object deserialize(Writable obj) throws SerDeException {
    // TODO Auto-generated method stub
    return null;
  }

  @Override
  public ObjectInspector getObjectInspector() throws SerDeException {
    // TODO Auto-generated method stub
    return null;
  }

  @Override
  public void initialize(Configuration conf, Properties props)
      throws SerDeException {
    // TODO Auto-generated method stub
   
  }

}

三个函数作用分别是:

  • initialize:在启动时调用,根据运行时参数调整行为或者分配资源。
  • getObjectInspector:返回字段定义名称和类型。
  • deserialize:对每一行数据进行反序列化,返回结果。

定义表结构

在我们这个例子中,字段是固定的含义,不需要在initialize方法配置运行期参数。我们把字段的定义写成static,如下。

private static List<String> structFieldNames = new ArrayList<String>();

  private static List<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
  static {
    structFieldNames.add("time");
    structFieldObjectInspectors.add(ObjectInspectorFactory
        .getReflectionObjectInspector(Long.TYPE, ObjectInspectorOptions.JAVA));

    structFieldNames.add("id");
    structFieldObjectInspectors.add(ObjectInspectorFactory
        .getReflectionObjectInspector(
            java.lang.Integer.TYPE, ObjectInspectorOptions.JAVA));

    structFieldNames.add("adv");
    structFieldObjectInspectors.add(ObjectInspectorFactory
        .getStandardListObjectInspector(
            ObjectInspectorFactory.getReflectionObjectInspector(
                String.class, ObjectInspectorOptions.JAVA)));

    structFieldNames.add("verb");
    structFieldObjectInspectors
        .add(ObjectInspectorFactory.getReflectionObjectInspector(
            String.class, ObjectInspectorOptions.JAVA));

    structFieldNames.add("obj");
    structFieldObjectInspectors
        .add(ObjectInspectorFactory.getReflectionObjectInspector(
            String.class, ObjectInspectorOptions.JAVA));
  }

  @Override
  public ObjectInspector getObjectInspector() throws SerDeException {
    return ObjectInspectorFactory.getStandardStructObjectInspector(
        structFieldNames, structFieldObjectInspectors);
  }

定义解析函数

为了能够让Java MapReduce任务复用代码,我们在外部实现了一个与Hive无关的类,这里不再贴代码。这个类定义了与日志字段相同的成员变量,并且提供一个static的valueOf方法用于从字符串构造自己。

@Override
public Object deserialize(Writable blob) throws SerDeException {
  if (blob instanceof Text) {
    String line = ((Text) blob).toString();
    RawAction act = RawAction.valueOf(line);
    List<Object> result = new ArrayList<Object>();
    if (act == null)
      return null;
    result.add(act.getTime());
    result.add(act.getUserId());
    result.add(act.getAdv());
    result.add(act.getVerb());
    result.add(act.getObj());
    return result;
  }
  return null;
}

建表

把上面程序编译并传到hive部署目录后,进入hive:

$ ./hive –auxpath /home/bochun.bai/dp-base-1.0-SNAPSHOT.jar
hive> CREATE TABLE ac_raw ROW FORMAT SERDE ‘com.renren.dp.hive.RawActionDeserializer’;    
OK
Time taken: 0.117 seconds
hive> DESC ac_raw;
OK
time    bigint  FROM deserializer
id      int     FROM deserializer
adv     array<string>   FROM deserializer
verb    string  FROM deserializer
obj     string  FROM deserializer
Time taken: 0.145 seconds
hive> LOAD DATA INPATH ‘/user/bochun.bai/hivedemo/raw_action’ OVERWRITE INTO TABLE ac_raw;  
Loading DATA TO TABLE DEFAULT.ac_raw
Deleted hdfs://NAMENODE/user/bochun.bai/warehouse/ac_raw
OK
Time taken: 0.173 seconds
hive> SELECT count(1) FROM ac_raw;
显示很多MapReduce进度之后……
OK
332
Time taken: 15.404 seconds
hive> SELECT count(1) AS cnt, verb FROM ac_raw GROUP BY verb;
显示很多MapReduce进度之后……
OK
4       ADD_FOOTPRINT
1       REPLY
24      SHARE_BLOG
299     VISIT
4       add_like
Time taken: 15.242 seconds

[Updated] Hive for Hadoop 0.21.0

终于要升级0.21.0了,目前最难的是Hive-0.7.0与Hadoop-0.21兼容问题。

这里有一个我修改过的Hive,可以在0.21运行:
http://sinofool.com/hive-0.7.0-r1043843.tar.bz2
MD5SUM: 8adb62c176b203b9d3cf5edc5d37b375
代码在:HIVE-1612

P.S. 这个版本有点儿旧,修改的时间是2010年11月10日,比官方的0.7.0落后一些。

UPDATED:

最新的2011年5月20日代码,还是用上面的patch编译的,基本可用。

去掉了HBase的支持,因为HBase本身也还不支持0.21.0。接下来解决HBase的兼容问题,再去jira里面提交。

hive-0.8.0-SNAPSHOT-r1125002-bin.tar.gz
MD5SUM: 7a48b50d375aae5ee69cd42dbd7bdd16

UPDATED:
最新的2011年5月23日代码,同上。
hive-0.8.0-SNAPSHOT-r1127826-bin.tar.gz
MD5SUM: 9dd4cb9d850894353a18df399b8c7b53

Hadoop reduce 慢

又一次从博客流量来源上看到一组有意思的词:hadoop reduce 慢。
我试着搜了一下,结果没找到自己的文章排在哪。

言归正传,慢真是个大问题!

首先是技术问题,也是最容易解决的问题,调参数。
我看到有人在网上问,说WordCount都慢,那就是环境问题了。调HeapSize,调GC参数,调TaskSlot,再不行加加机器。总是能解决的。

其实我更想说非技术问题,很多人误把hadoop妖魔化了,什么都往上套,一定会慢。
所谓快慢是要对比的,要么是跟旧系统比,要么是跟心理预期比。
如果你有旧系统,也是分布式,也是大数据量,写的并不太差,那hadoop是一定慢的。Hadoop能够带来的更多是开发效率的提高。
如果没有旧系统,比心理预期慢,那就必须先拷问一下自己凭什么预期它快。
还有就是,reduce天生就比map慢,这个不能比。

我们遇到过很多挫折:
在reduce的时候做矩阵运算,肯定快不起来。
在reduce输入和map一样的数据量,因为reduce个数少,肯定快不起来。
在map输出某个特殊的key,数据量不平衡,那某个reducer肯定快不起来。
在繁忙的机器上运行,也一定快不起来。

在遇到问题时,我会去调查:
Reduce要从网络读取多少数据。
排序能不能在内存完成。
Reduce有没有占很多内存。

Hadoop现在的名气大,能力相对没有那么多。盲目选择有风险,须谨慎。

Hadoop存在的问题

在睡觉前偶然看看Google Analytics的统计,有几个人是通过搜索引擎搜索“Hadoop存在的问题”来访问了这里。
我也试着搜了一下,我的上一篇写Hadoop的内容在Google出现第10页,baidu第4页。

既然这个问题都让人翻了这么多页来寻找答案,我就开始反问自己了。

Hadoop存在什么问题?

思考了最近两年的过程,我认为Hadoop存在的问题都不是技术问题:

1. API不稳定

Hadoop从0.17开始,到目前我用到0.21,每次升级都让我苦不堪言。
HDFS和MapReduce的向前兼容做的不错,但是过多的外围程序跟进了这个漩涡,例如Hive,HBase。
由于这些外围程序的不跟进,每次升级都会损失一些追随者。

当然0.xx版本,本来就没有承诺API是稳定的。只是至今也都没有1.0的规划,很可能将来要出到0.99。
没有盼头的一直跟随更新,实在是很辛苦。

2. 宣传和普及的不够

在刚接触了Hadoop几个月的时候,有幸与zshao@apache座谈过几个小时。
我想这可能是我能一直坚持着跟进Hadoop开发的一个动力。

宣传的少会让谨慎的人不敢用,普及的多会让激进的人滥用。
我自认为在技术选型上是个保守派,所以关于应用HBase的各种讨论,我都投的反对票。

我认为Hadoop及其衍生品,根源上是批处理系统,高容错,高延时。
这种系统我会用来做异步的计算,但是一定不能染指用户操作行为。

应该加强宣传,让大家知道Hadoop不是万能的,它最擅长的是并行处理。

3. 小众

整个MapReduce这个概念是基于Google的一篇论文,本是解决PB级别搜索索引问题开发的架构。

随着发展,虽然功能已经抽象的不仅限于索引,但是量级的优势还是一直存在。
如果数据量在100TB以下,实在是没有必要用Hadoop。

可是有100TB数据的组织,真的不多。大部分的试用者,没有办法从应用Hadoop的过程里获得边际收益。

中小规模Hadoop集群优化

我们有一个Hadoop集群从上个月开始遇到一系列性能问题,在逐一解决的过程中,积累了以下的优化经验。

1. 网络带宽

Hadoop集群的服务器在规划时就在统一的交换机下,这是在官方文档中建议的部署方式。

但是我们的这台交换机和其他交换机的互联带宽有限,所以在客户端遇到了HDFS访问速度慢的问题。

把操作集群的客户端也联入DataNode的交换机内部,解决了这个问题。

2. 系统参数

对ulimit -c的修改也是官方文档建议的修改,在集群只有10台服务器时,并没有遇到问题。

随着机器增加和任务增加,这个值需要改的更大。

3. 配置文件管理

这个集群用的是Cloudera发行的版本,配置文件默认存在/etc/hadoop/conf位置。这是一个只有root才能修改的位置。

为了修改方便,我把配置文件统一保存在一台机器上,修改后用脚本分发。保证所有服务器都是统一的配置。

4. mapred.tasktracker.map.tasks.maximum

这个参数控制每个TaskTracker同时运行的Map任务数。

以前的设置是和CPU核数相同的,偶尔遇到任务挤占DataNode资源的问题。

现在改成map+reduce+1==num_cpu_cores。

5. 严格控制root权限

Cloudera的发行版会创建一个hadoop用户,各种守护进程都应该以这个用户运行。

曾经有误操作(/usr/lib/hadoop/bin/hadoop datanode &)导致本地的数据目录被root写入新文件,于是正确启动的hadoop用户进程无法读写。

所以现在的集群服务器不提供日常的root权限访问。

6. Java的GC模式

在mapred.child.java.opts和HADOOP_OPTS都增加了-XX:+UseConcMarkSweepGC。

JDK的文档中推荐现代多核处理器系统,采用这种GC方式,可以充分利用CPU的并发能力。

这个改动对性能的积极影响很大。

7. 选择正确的JDK

这个集群有部分服务器的JDK用的是32位版本,不能创建-Xmx4g以上的进程。

统一为x64版本的JDK。

8. mapred.reduce.slowstart.completed.maps

这个参数控制slowstart特性的时机,默认是在5%的map任务完成后,就开始调度reduce进程启动,开始copy过程。

但是我们的机器数量不多,有一次大量的任务堆积在JobTracker里,每个TaskTracker的map和reduce slots都跑满了。

由于map没有足够资源迅速完成,reduce也就无法结束,造成集群的资源互相死锁。

把这个参数改成了0.75,任务堆积的列表从平均10个,变成了3个。

9. mapred.fairscheduler.preemption

这个参数设为了true。以便fairscheduler在用户最小资源不能满足时,kill其他人的任务腾出足够的资源。

集群运行着各种类型的任务,有些map任务需要运行数小时。这个参数会导致这类任务被频繁kill,几乎无法完成。曾经有个任务在7小时内被kill了137次。

可以通过调整fairscheduler的pool配置解决,给这种任务单独配置一个minMap==maxMap的pool。

10. mapred.jobtracker.completeuserjobs.maximum

限制每个用户在JobTracker的内存中保存任务的个数。

因为这个参数过大,我们的JobTracker启动不到24小时就会陷入频繁的FullGC当中。

目前改为5,JT平稳运行一天处理1500个任务,只占用800M内存。

这个参数在>0.21.0已经没有必要设置了,因为0.21版本改造了completeuserjobs的用法,会尽快的写入磁盘,不再内存中长期存在了。

11. mapred.jobtracker.update.faulty.tracker.interval和mapred.jobtracker.max.blacklist.percent

一个写错的任务,会导致一大批TaskTracker进入黑名单,而且要24小时才能恢复。这种状况对中小规模的集群性能影响是非常大的。只能通过手工重启TaskTracker来修复。所以我们就修改了部分JobTracker的代码,暴露了两个参数:

mapred.jobtracker.update.faulty.tracker.interval控制黑名单重置时间,默认是24小时不能改变,我们现在改成了1小时。

mapred.jobtracker.max.blacklist.percent控制进入黑名单TT的比例,我们改成了0.2。

我正在补充这两个参数的TestCase,准备提交到trunk中。

12. 多用hive少用streaming

由于streaming的方便快捷,我们做了很多基于它的开发。但是由于streaming的任务在运行时还要有一个java进程读写stdin/out,有一定的性能开销。

类似的需求最好改用自定义的Deserializer+hive来完成。

Hadoop技术沙龙 感想

昨天参加了由CSDN和Yahoo公司组织的技术沙龙活动,听Milind Bhandarkar分享了《Hadoop应用性能调优案例分析》的经验,整个过程给我很多启发。

按照时间的顺序回忆,首先是出发之前。本来这次会议我们要有三个部门参加,各派出一名工程师。但是到临行前突然有两名工程师遇到紧急的事务,甚至预计要熬夜赶工。最终我只有半小时更换人选。

不要乱了阵脚

错过一次学习的机会只是偶然的,我们被工作控制的情况每个人都遇到过。我认为程序员的主要工作不是完成复杂的功能,反而是化繁为简,用同简单的方法完成复杂的事情,并且千方百计的提高自己的效率。

社区和企业

大概六点半左右我们到了Yahoo的办公室,当时会场还在布置,所以有时间参观了一下这里的休闲区。面积也就只有我们一半,墙面刷成淡黄色,一个水池一台咖啡机一台冰箱两台售货机。别的东西可能因为开会收走了。

这半个小时见到了中文Hadoop社区认识的韩轶平先生,他也是这次沙龙的组织者之一,我们谈论了在社区中的趣事和各自公司遇到的技术和管理问题。在对开源的使用上我很向往Yahoo和Hadoop的共生方式,互相依靠一起发展。

架构师

大概进行到七点半,我对台上这个人做的事非常钦佩。他把自己团队对Hadoop使用的经验变成了工具。这个方面的尝试我们现在做的太少了,经验传承现在是靠文档甚至是作坊式的师徒。作为架构师,把团队的经验固化并传播,我认为应该是最重要的。

后面的讨论大概进行了一个小时深浅不一,提到了HA NameNode的几个实现,HBase等等。会议时间感觉还短,要是再有机会我还要去参加。