使用Hive做数据分析

在大规模推广streaming方式的数据分析后,我们发现这个模式虽然入门成本低,但是执行效率也一样低。
每一个map task都要在TaskTracker上启动两个进程,一个java和一个perl/bash/python。
输入输出都多复制一次。

经过了一系列调研后,我们开始将部分streaming任务改写为Hive。

Hive是什么?

  1. Hive是单机运行的SQL解析引擎,本身并不运行在Hadoop上。
  2. SQL经过Hive解析为MapReduce任务,在Hadoop上运行。
  3. 使用Hive可以降低沟通成本,因为SQL语法的普及度较高。
  4. Hive翻译的任务效率不错,但是依然不如优化过的纯MapReduce任务。

数据准备

原始日志文件是这样的:

1323431269786 202911262 RE_223500512 AT_BLOG_788514510 REPLY BLOG_788514510_202911262
分别对应的字段是 <时间> <操作人> [[说明] [说明]……] <操作> <实体>
上面的例子对应的含义是:
  • <时间>: 1323431269786
  • <操作人>: 202911262
  • [说明]: RE_223500512
  • [说明]: AT_BLOG_788514510
  • <操作>: REPLY
  • <实体>: BLOG_788514510_202911262

扩展Hive的Deserializer

要用SQL分析数据,Hive必须知道如何切分整行的日志。Hive提供了一个接口,留给我们扩展自己的序列化和反序列化方法。

import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Writable;

public class RawActionDeserializer implements Deserializer {

  @Override
  public Object deserialize(Writable obj) throws SerDeException {
    // TODO Auto-generated method stub
    return null;
  }

  @Override
  public ObjectInspector getObjectInspector() throws SerDeException {
    // TODO Auto-generated method stub
    return null;
  }

  @Override
  public void initialize(Configuration conf, Properties props)
      throws SerDeException {
    // TODO Auto-generated method stub
   
  }

}

三个函数作用分别是:

  • initialize:在启动时调用,根据运行时参数调整行为或者分配资源。
  • getObjectInspector:返回字段定义名称和类型。
  • deserialize:对每一行数据进行反序列化,返回结果。

定义表结构

在我们这个例子中,字段是固定的含义,不需要在initialize方法配置运行期参数。我们把字段的定义写成static,如下。

private static List<String> structFieldNames = new ArrayList<String>();

  private static List<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
  static {
    structFieldNames.add("time");
    structFieldObjectInspectors.add(ObjectInspectorFactory
        .getReflectionObjectInspector(Long.TYPE, ObjectInspectorOptions.JAVA));

    structFieldNames.add("id");
    structFieldObjectInspectors.add(ObjectInspectorFactory
        .getReflectionObjectInspector(
            java.lang.Integer.TYPE, ObjectInspectorOptions.JAVA));

    structFieldNames.add("adv");
    structFieldObjectInspectors.add(ObjectInspectorFactory
        .getStandardListObjectInspector(
            ObjectInspectorFactory.getReflectionObjectInspector(
                String.class, ObjectInspectorOptions.JAVA)));

    structFieldNames.add("verb");
    structFieldObjectInspectors
        .add(ObjectInspectorFactory.getReflectionObjectInspector(
            String.class, ObjectInspectorOptions.JAVA));

    structFieldNames.add("obj");
    structFieldObjectInspectors
        .add(ObjectInspectorFactory.getReflectionObjectInspector(
            String.class, ObjectInspectorOptions.JAVA));
  }

  @Override
  public ObjectInspector getObjectInspector() throws SerDeException {
    return ObjectInspectorFactory.getStandardStructObjectInspector(
        structFieldNames, structFieldObjectInspectors);
  }

定义解析函数

为了能够让Java MapReduce任务复用代码,我们在外部实现了一个与Hive无关的类,这里不再贴代码。这个类定义了与日志字段相同的成员变量,并且提供一个static的valueOf方法用于从字符串构造自己。

@Override
public Object deserialize(Writable blob) throws SerDeException {
  if (blob instanceof Text) {
    String line = ((Text) blob).toString();
    RawAction act = RawAction.valueOf(line);
    List<Object> result = new ArrayList<Object>();
    if (act == null)
      return null;
    result.add(act.getTime());
    result.add(act.getUserId());
    result.add(act.getAdv());
    result.add(act.getVerb());
    result.add(act.getObj());
    return result;
  }
  return null;
}

建表

把上面程序编译并传到hive部署目录后,进入hive:

$ ./hive –auxpath /home/bochun.bai/dp-base-1.0-SNAPSHOT.jar
hive> CREATE TABLE ac_raw ROW FORMAT SERDE ‘com.renren.dp.hive.RawActionDeserializer’;    
OK
Time taken: 0.117 seconds
hive> DESC ac_raw;
OK
time    bigint  FROM deserializer
id      int     FROM deserializer
adv     array<string>   FROM deserializer
verb    string  FROM deserializer
obj     string  FROM deserializer
Time taken: 0.145 seconds
hive> LOAD DATA INPATH ‘/user/bochun.bai/hivedemo/raw_action’ OVERWRITE INTO TABLE ac_raw;  
Loading DATA TO TABLE DEFAULT.ac_raw
Deleted hdfs://NAMENODE/user/bochun.bai/warehouse/ac_raw
OK
Time taken: 0.173 seconds
hive> SELECT count(1) FROM ac_raw;
显示很多MapReduce进度之后……
OK
332
Time taken: 15.404 seconds
hive> SELECT count(1) AS cnt, verb FROM ac_raw GROUP BY verb;
显示很多MapReduce进度之后……
OK
4       ADD_FOOTPRINT
1       REPLY
24      SHARE_BLOG
299     VISIT
4       add_like
Time taken: 15.242 seconds

技术和故障

这是写给内部同事的日志,也有部分概念有通用意义,就不加密了。

做技术开发的人知道,只要写代码就一定会出错,叫bug。
有的错误在上线之前没有检查出来,直到被用户使用了我们才知道。这种bug就是故障。
出现故障是再正常不过的事了,我认为处理过程,可以让故障成为宝贵的财富。

我设计的故障处理流程,分为三个阶段:反馈、处理、反思。
1 反馈阶段
说实话,大部分的故障是类似的。反馈阶段就是要把各种用户描述归一成为同一个技术问题。
这个阶段具体还要分成“用户反馈”,“已沟通”,“技术已确认”,三个状态。分别由“客服”和“技术经理”操作。
2 处理阶段
这个阶段就是写代码的阶段,把造成故障的bug修复。技术都懂得怎么做。
这个阶段具体还要分成“已分派”和“线上已修复”两个状态。这两个状态的执行者分别是“技术经理”和“测试经理”。
换句话说,一线的工程师编码的工作,是在“已分派”状态进行的。然后交给OP和QA。
3 反思阶段
这个阶段是最核心,同时是最欠缺的阶段。
这个阶段具体分成“已确认解决方案”和“已完成解决方案”两个状态。
所谓“解决方案”,一定是切中要害的解决,并且可以保证类似问题可以避免再次发生。
“解决方案”和“修复BUG”的区别在于有没有反思发生的根源。

整个流程的设计,核心目的是自我进化。只要持续犯错,再避免重复犯错,最终一定是伟大的团队。