Tag: Hadoop Hive Streaming Serde Deserializer

  • 使用Hive做数据分析

    在大规模推广streaming方式的数据分析后,我们发现这个模式虽然入门成本低,但是执行效率也一样低。
    每一个map task都要在TaskTracker上启动两个进程,一个java和一个perl/bash/python。
    输入输出都多复制一次。

    经过了一系列调研后,我们开始将部分streaming任务改写为Hive。

    Hive是什么?

    1. Hive是单机运行的SQL解析引擎,本身并不运行在Hadoop上。
    2. SQL经过Hive解析为MapReduce任务,在Hadoop上运行。
    3. 使用Hive可以降低沟通成本,因为SQL语法的普及度较高。
    4. Hive翻译的任务效率不错,但是依然不如优化过的纯MapReduce任务。

    数据准备

    原始日志文件是这样的:
    1323431269786 202911262 RE_223500512 AT_BLOG_788514510 REPLY BLOG_788514510_202911262

    分别对应的字段是 <时间> <操作人> [[说明] [说明]……] <操作> <实体>
    上面的例子对应的含义是:
    • <时间>: 1323431269786
    • <操作人>: 202911262
    • [说明]: RE_223500512
    • [说明]: AT_BLOG_788514510
    • <操作>: REPLY
    • <实体>: BLOG_788514510_202911262

    扩展Hive的Deserializer

    要用SQL分析数据,Hive必须知道如何切分整行的日志。Hive提供了一个接口,留给我们扩展自己的序列化和反序列化方法。

    import java.util.Properties;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hive.serde2.Deserializer;
    import org.apache.hadoop.hive.serde2.SerDeException;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.io.Writable;

    public class RawActionDeserializer implements Deserializer {

    @Override
    public Object deserialize(Writable obj) throws SerDeException {
    // TODO Auto-generated method stub
    return null;
    }

    @Override
    public ObjectInspector getObjectInspector() throws SerDeException {
    // TODO Auto-generated method stub
    return null;
    }

    @Override
    public void initialize(Configuration conf, Properties props)
    throws SerDeException {
    // TODO Auto-generated method stub

    }

    }

    三个函数作用分别是:

    • initialize:在启动时调用,根据运行时参数调整行为或者分配资源。
    • getObjectInspector:返回字段定义名称和类型。
    • deserialize:对每一行数据进行反序列化,返回结果。

    定义表结构

    在我们这个例子中,字段是固定的含义,不需要在initialize方法配置运行期参数。我们把字段的定义写成static,如下。

    private static List structFieldNames = new ArrayList();

    private static List structFieldObjectInspectors = new ArrayList();
    static {
    structFieldNames.add("time");
    structFieldObjectInspectors.add(ObjectInspectorFactory
    .getReflectionObjectInspector(Long.TYPE, ObjectInspectorOptions.JAVA));

    structFieldNames.add("id");
    structFieldObjectInspectors.add(ObjectInspectorFactory
    .getReflectionObjectInspector(
    java.lang.Integer.TYPE, ObjectInspectorOptions.JAVA));

    structFieldNames.add("adv");
    structFieldObjectInspectors.add(ObjectInspectorFactory
    .getStandardListObjectInspector(
    ObjectInspectorFactory.getReflectionObjectInspector(
    String.class, ObjectInspectorOptions.JAVA)));

    structFieldNames.add("verb");
    structFieldObjectInspectors
    .add(ObjectInspectorFactory.getReflectionObjectInspector(
    String.class, ObjectInspectorOptions.JAVA));

    structFieldNames.add("obj");
    structFieldObjectInspectors
    .add(ObjectInspectorFactory.getReflectionObjectInspector(
    String.class, ObjectInspectorOptions.JAVA));
    }

    @Override
    public ObjectInspector getObjectInspector() throws SerDeException {
    return ObjectInspectorFactory.getStandardStructObjectInspector(
    structFieldNames, structFieldObjectInspectors);
    }

    定义解析函数

    为了能够让Java MapReduce任务复用代码,我们在外部实现了一个与Hive无关的类,这里不再贴代码。这个类定义了与日志字段相同的成员变量,并且提供一个static的valueOf方法用于从字符串构造自己。

    @Override
    public Object deserialize(Writable blob) throws SerDeException {
    if (blob instanceof Text) {
    String line = ((Text) blob).toString();
    RawAction act = RawAction.valueOf(line);
    List result = new ArrayList();
    if (act == null)
    return null;
    result.add(act.getTime());
    result.add(act.getUserId());
    result.add(act.getAdv());
    result.add(act.getVerb());
    result.add(act.getObj());
    return result;
    }
    return null;
    }

    建表

    把上面程序编译并传到hive部署目录后,进入hive:

    $ ./hive --auxpath /home/bochun.bai/dp-base-1.0-SNAPSHOT.jar


    hive> CREATE TABLE ac_raw ROW FORMAT SERDE 'com.renren.dp.hive.RawActionDeserializer';
    OK
    Time taken: 0.117 seconds
    hive> DESC ac_raw;
    OK
    time bigint from deserializer
    id int from deserializer
    adv array from deserializer
    verb string from deserializer
    obj string from deserializer
    Time taken: 0.145 seconds


    hive> LOAD DATA INPATH '/user/bochun.bai/hivedemo/raw_action' OVERWRITE INTO TABLE ac_raw;
    Loading data to table default.ac_raw
    Deleted hdfs://NAMENODE/user/bochun.bai/warehouse/ac_raw
    OK
    Time taken: 0.173 seconds


    hive> SELECT count(1) FROM ac_raw;
    …...显示很多MapReduce进度之后......
    OK
    332
    Time taken: 15.404 seconds


    hive> SELECT count(1) as cnt, verb FROM ac_raw GROUP BY verb;
    …...显示很多MapReduce进度之后......
    OK
    4 ADD_FOOTPRINT
    1 REPLY
    24 SHARE_BLOG
    299 VISIT
    4 add_like
    Time taken: 15.242 seconds