`
scholers
  • 浏览: 615063 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

HIVE开发那些事儿

 
阅读更多
HIVE是什么
来自度娘百科的解释:hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。
简单来说,就是用QL这种语言的方式来完成MapReduce程序的工作,由于大多数程序员都会写基于关系型数据库的SQL,那么HIVE的出现也就将Hadoop上的开发降低了很多门槛。

Hive常见的参数
io.sort.mb设置数据缓冲区的小大

HIVE基础语句
创建表
Hive的表,与普通关系型数据库,如mysql在表上有很大的区别,所有hive的表都是一个文件,它是基于Hadoop的文件系统来做的。
hive总体来说可以总结为三种不同类型的表。
1.     普通表
普通表的创建,如上所说,不讲了。其中,一个表,就对应一个表名对应的文件。

2.     外部表
EXTERNAL 关键字可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径(LOCATION),Hive 创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。在删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。具体sql如下:
CREATE EXTERNAL TABLE test_1(id INT, name STRING, city STRING) SORTED BY TEXTFILE ROW FORMAT DELIMITED‘\t’ LOCATION ‘hdfs://../../..’


3.     分区表
有分区的表可以在创建的时候使用 PARTITIONED BY 语句。一个表可以拥有一个或者多个分区,每一个分区单独存在一个目录下。而且,表和分区都可以对某个列进行 CLUSTERED BY 操作,将若干个列放入一个桶(bucket)中。也可以利用SORT BY 对数据进行排序。这样可以为特定应用提高性能。具体SQL如下:
CREATE TABLE test_1(id INT, name STRING, city STRING) PARTITIONED BY (pt STRING) SORTED BY TEXTFILE ROW FORMAT DELIMITED‘\t’


Hive的排序,因为底层实现的关系,比较不同于普通排序,这里先不讲。桶的概念,主要是为性能考虑,可以理解为对分区内列,进行再次划分,提高性能。在底层,一个桶其实是一个文件。如果桶划分过多,会导致文件数量暴增,一旦达到系统文件数量的上限,就杯具了。哪种是最优数量,这个哥也不知道。

分区表实际是一个文件夹,表名即文件夹名。每个分区,实际是表名这个文件夹下面的不同文件。分区可以根据时间、地点等等进行划分。比如,每天一个分区,等于每天存每天的数据;或者每个城市,存放每个城市的数据。每次查询数据的时候,只要写下类似 where pt=2010_08_23这样的条件即可查询指定时间得数据。

总体而言,普通表,类似mysql的表结构,外部表的意义更多是指数据的路径映射。分区表,是最难以理解,也是最hive最大的优势。之后会专门针对分区表进行讲解。
建表语句如下:
hive> CREATE TABLE pokes (foo INT, bar STRING);

实际示例
创建一个表
CREATE TABLE u_data (
userid INT,
movieid INT,
rating INT,
unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
其中,hive支持的字段类型,并不多,可以简单的理解为数字类型和字符串类型,详细列表如下:
TINYINT
SMALLINT
INT
BIGINT
BOOLEAN
FLOAT
DOUBLE
STRING


创建表并创建索引字段ds
hive> CREATE TABLE invites (foo INT, bar STRING) PARTITIONED BY (ds STRING);

显示所有表
hive> SHOW TABLES;

按正条件(正则表达式)显示表,
hive> SHOW TABLES '.*s';

表添加一列
hive> ALTER TABLE pokes ADD COLUMNS (new_col INT);

添加一列并增加列字段注释
hive> ALTER TABLE invites ADD COLUMNS (new_col2 INT COMMENT 'a comment');

更改表名
hive> ALTER TABLE events RENAME TO 3koobecaf;

删除列
hive> DROP TABLE pokes;

元数据存储
Hive不支持一条一条的用insert语句进行插入操作,也不支持update的操作。数据是以load的方式,加载到建立好的表中。数据一旦导入,则不可修改。要么drop掉整个表,要么建立新的表,导入新的数据。
将文件中的数据加载到表中
hive> LOAD DATA LOCAL INPATH './examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;

加载本地数据,同时给定分区信息
hive> LOAD DATA LOCAL INPATH './examples/files/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');

加载DFS数据 ,同时给定分区信息
hive> LOAD DATA INPATH '/user/myname/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');
The above command will load data from an HDFS file/directory to the table. Note that loading data from HDFS will result in moving the file/directory. As a result, the operation is almost instantaneous.
INSERT+SELECT 
Standard syntax:
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement

Hive extension (multiple inserts):
FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1
[INSERT OVERWRITE TABLE tablename2 [PARTITION ...] select_statement2] ...

Hive extension (dynamic partition inserts):
INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement

这个的用法,和上面两种直接操作file的方式,截然不同。排开分区


SQL 操作
按先件查询
hive> SELECT a.foo FROM invites a WHERE a.ds='<DATE>';
将查询数据输出至目录
hive> INSERT OVERWRITE DIRECTORY '/tmp/hdfs_out' SELECT a.* FROM invites a WHERE a.ds='<DATE>';
将查询结果输出至本地目录
hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/local_out' SELECT a.* FROM pokes a;
选择所有列到本地目录
hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a;
hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a WHERE a.key < 100;
hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/reg_3' SELECT a.* FROM events a;
hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_4' select a.invites, a.pokes FROM profiles a;
hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT COUNT(1) FROM invites a WHERE a.ds='<DATE>';
hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT a.foo, a.bar FROM invites a;
hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/sum' SELECT SUM(a.pc) FROM pc1 a;
将一个表的统计结果插入另一个表中
hive> FROM invites a INSERT OVERWRITE TABLE events SELECT a.bar, count(1) WHERE a.foo > 0 GROUP BY a.bar;
hive> INSERT OVERWRITE TABLE events SELECT a.bar, count(1) FROM invites a WHERE a.foo > 0 GROUP BY a.bar;
JOIN
hive> FROM pokes t1 JOIN invites t2 ON (t1.bar = t2.bar) INSERT OVERWRITE TABLE events SELECT t1.bar, t1.foo, t2.foo;
将多表数据插入到同一表中
FROM src
INSERT OVERWRITE TABLE dest1 SELECT src.* WHERE src.key < 100
INSERT OVERWRITE TABLE dest2 SELECT src.key, src.value WHERE src.key >= 100 and src.key < 200
INSERT OVERWRITE TABLE dest3 PARTITION(ds='2008-04-08', hr='12') SELECT src.key WHERE src.key >= 200 and src.key < 300
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/dest4.out' SELECT src.value WHERE src.key >= 300;
将文件流直接插入文件
hive> FROM invites a INSERT OVERWRITE TABLE events SELECT TRANSFORM(a.foo, a.bar) AS (oof, rab) USING '/bin/cat' WHERE a.ds > '2008-08-09';
This streams the data in the map phase through the script /bin/cat (like hadoop streaming). Similarly - streaming can be used on the reduce side (please see the Hive Tutorial or examples)

下载示例数据文件,并解压缩
wget http://www.grouplens.org/system/files/ml-data.tar__0.gz
tar xvzf ml-data.tar__0.gz
加载数据到表中
LOAD DATA LOCAL INPATH 'ml-data/u.data'
OVERWRITE INTO TABLE u_data;

统计数据总量
SELECT COUNT(1) FROM u_data;

现在做一些复杂的数据分析
创建一个 weekday_mapper.py: 文件,作为数据按周进行分割
import sys
import datetime

for line in sys.stdin:
line = line.strip()
userid, movieid, rating, unixtime = line.split('\t')
生成数据的周信息
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print '\t'.join([userid, movieid, rating, str(weekday)])
使用映射脚本
//创建表,按分割符分割行中的字段值
CREATE TABLE u_data_new (
userid INT,
movieid INT,
rating INT,
weekday INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
//将python文件加载到系统
add FILE weekday_mapper.py;
将数据按周进行分割
INSERT OVERWRITE TABLE u_data_new
SELECT
TRANSFORM (userid, movieid, rating, unixtime)
USING 'python weekday_mapper.py'
AS (userid, movieid, rating, weekday)
FROM u_data;

SELECT weekday, COUNT(1)
FROM u_data_new
GROUP BY weekday;
处理Apache Weblog 数据
将WEB日志先用正则表达式进行组合,再按需要的条件进行组合输入到表中
add jar ../build/contrib/hive_contrib.jar;

CREATE TABLE apachelog (
host STRING,
identity STRING,
user STRING,
time STRING,
request STRING,
status STRING,
size STRING,
referer STRING,
agent STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?",
"output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"
)
STORED AS TEXTFILE;

JOIN
HIVE中是等值JOIN的,也支持左右连接;
在使用写有 Join 操作的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操作符的左边。
原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生 OOM 错误的几率。

内连接(JOIN ON)与LEFT SEMI JOIN的陷阱
大家可以看如下的HIVE SQL:
写法一:
select
           a.bucket_id,
        a.search_type,
            a.level1,
        a.name1,
        a.level2,
        a.name2,
        cast((a.alipay_fee) as double) as zhuliu_alipay,
        cast(0 as double) as total_alipay
        from test_data_fdi_search_zhuliu_alipay_cocerage_bucket_1 a
     left semi join
     test_data_fdi_dim_main_auc b
     on (a.level2 = b.cat_id2
         and a.brand_id = b.brand_id
       and b.cat_id2 > 0
         and b.brand_id > 0
         and b.max_price = 0
     )

3121 条写法二:
select
           a.bucket_id,
        a.search_type,
            a.level1,
        a.name1,
        a.level2,
        a.name2,
        cast((a.alipay_fee) as double) as zhuliu_alipay,
        cast(0 as double) as total_alipay
        from test_data_fdi_search_zhuliu_alipay_cocerage_bucket_1 a
     join   test_data_fdi_dim_main_auc b
     on (a.level2 = b.cat_id2
         and a.brand_id = b.brand_id)
  where  b.cat_id2 > 0
         and b.brand_id > 0
         and b.max_price = 0


这两种写法带来的值居然不是相等的,我一直以为理解这两种方式的写法是一样的,
但是统计的结果却是不一样的。
经过一层一层的查找,发现是由于子表(test_data_fdi_dim_main_auc)中存在重复的数据,当使用JOIN ON的时候,A,B表会关联出两条记录,应为ON上的条件符合;
而是用LEFT SEMI JOIN 当A表中的记录,在B表上产生符合条件之后就返回,不会再继续查找B表记录了,所以如果B表有重复,也不会产生重复的多条记录。
大多数情况下JOIN ON 和left semi on是对等的,但是在上述情况下会出现重复记录。


Map 端部分聚合:
并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果。
基于 Hash
参数包括:
hive.map.aggr = true 是否在 Map 端进行聚合,默认为 True
hive.groupby.mapaggr.checkinterval = 100000
在 Map 端进行聚合操作的条目数目,有数据倾斜的时候进行负载均衡
hive.groupby.skewindata = false
当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

JOIN中where的过滤
on里面才能起到表的过滤,放在where里面起不到提前过滤的情况;

不等值的计算
HIVE中不等值的计算,采用mapjoin;根据mapjoin的计算原理,MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配。这种情况下即使笛卡尔积也不会对任务运行速度造成太大的效率影响。

场景:要统计搜索关键词中包含的品牌词的搜索次数:
业务上来说,品牌词不会是全词匹配,而是存在包含关系,即包含品牌词的就算,比如说”阿迪达斯”是一个品牌,但是用户搜索”阿迪达斯球鞋”,我们也会认为它是一个品牌词。
脚本一:
select  
    a.rewq as brand_name,
    count(1) as pv
from
    s_dw_sh_mall_pv  a join 
    (select distinct brandname from weique_brand) b on a.rewq = b.brandname
where
    a.ds>=20121001 and a.ds<='20121120'
    and a.page_type in(1, 2)
    and a.stats like '%querytype:4%'
    group by a.rewq


脚本二:
set hive.mapred.mode=nonstrict;
select  /*+ MAPJOIN(b) */
    a.rewq as brand_name,
    count(1) as pv
from
    s_dw_sh_mall_pv  a join 
    (select distinct brandname from weique_brand) b
where
    a.rewq like b.brandname
    and a.ds>=20121001 and a.ds<='20121120'
    and a.page_type in(1, 2)
    and a.stats like '%querytype:4%'
    group by a.rewq

解读:由于脚本二中的品牌词有包含关系,是属于不等值的计算,那么采用mapjoin;
DISTINCT
使用ALL和DISTINCT选项区分对重复记录的处理。默认是ALL,表示查询所有记录。DISTINCT表示去掉重复的记录。
hive> SELECT col1, col2 FROM t1 1 3 1 3 1 4 2 5 hive> SELECT DISTINCT col1, col2 FROM t1 1 3 1 4 2 5 hive> SELECT DISTINCT col1 FROM t1 1 2


减少JOB数量,提高运行速度
有如下案例,计算二级类目+品牌+价格区间主流商品成交,其HIVE SQL如下:
脚本一:
select
   a.bc_type bc_type,
    a.level1 catid,
     a.name1 catname,
     a.level2 catid2,
     a.name2 catname2,
     cast(a.alipay_fee as double) as zhuliu_alipay,
     cast(0 as double) as total_alipay
     from r_brand_gmv_alipay a
     join test_data_fdi_dim_zhuliu_temp b
     on a.level2 = b.cat_id2
     and a.brand_id = b.brand_id
     where  a.pt='20120920000000'
     and a.bc_type = 'b'
     and a.bid >= b.min_price and a.bid <= b.max_price
     and b.cat_id2 > 0
     and b.brand_id >0
     and b.max_price > 0

脚本二:
select 
   a.bc_type bc_type,
    a.level1 catid,
     a.name1 catname,
     a.level2 catid2,
     a.name2 catname2,
     cast(a.alipay_fee as double) as zhuliu_alipay,
     cast(0 as double) as total_alipay 
     from r_brand_gmv_alipay a
     join test_data_fdi_dim_zhuliu_temp b
     on a.level2 = b.cat_id2
     and a.brand_id = b.brand_id
     where  a.pt='20120920000000' 
     --and a.bc_type = 'b'
     and a.bid >= b.min_price and a.bid <= b.max_price
     and b.cat_id2 > 0
     and b.brand_id >0
     and b.max_price > 0
     group by a.bc_type,a.level1,a.name1,a.level2,a.name2

这两个的区别是什么?前面一种没有用GROUP BY,这样前面一种写法只有一个JOB数量,后面一种写法会增加一个JOB数量;
Ambiguous column reference property_id字段名称有重复的
脚本一:
        
cast(category_level as string) as property_id,
          cast(property_id as string) as property_id,
         cast(0 as string)   as  ipv,
         cast(0 as string)   as  ipvuv,
UNION ALL
         cast(category_level as string) as category_id,
          cast(property_id as string) as property_id,
         cast(0 as string)   as  ipv,
         cast(0 as string)   as  ipvuv,

是可以的
但是脚本二:
       
 cast(category_level as string) as property_id,
          cast(property_id as string) as property_id,
         cast(0 as string)   as  ipv,
         cast(0 as string)   as  ipvuv,
UNION ALL
         cast(category_level as string) ,
          cast(property_id as string) ,
         cast(0 as string)   as  ipv,
         cast(0 as string)   as  ipvuv,

HIVE会报错说字段不匹配

UNION ALL 不能嵌套
当你看到如下错误,
Hive history file=/tmp/weique.lqf/hive_job_log_weique.lqf_201209041729_460174_1017312942.txt
FAILED: Hive Internal Error: java.lang.NullPointerException(null)
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.generateErrorMessage(SemanticAnalyzer.java:435)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genUnionPlan(SemanticAnalyzer.java:5802)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genPlan(SemanticAnalyzer.java:6163)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genPlan(SemanticAnalyzer.java:6160)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genPlan(SemanticAnalyzer.java:6178)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:6953)
at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:137)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:294)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:392)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:177)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:257)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:389)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
over

说明在UNION ALL中存在了多层嵌套,最好的方式是将多层嵌套修改为临时表的方式,即在脚本中创建多个临时表,使用完之后删除掉就OK了;

Null与空值
hive里面null也是一个字符串,这点和JAVA中是有区别的,JAVA中的null是不占用内存空间的,在特定的情况下,hive中的null是有意义的,其长度为4;这样往往给统计的时候会带来困扰,如果你需要将null和‘’等价,那么你需要在你修改你的表,用如下的命令:
ALTER TABLE test_data_fdi_search_query_cat_qp_temp_1 SET SERDEPROPERTIES ('serialization.null.format'='');
null和‘’等价

NULL的计算陷阱
在进行统计计算的时候,是否有遇到过需要将多个字段进行汇总的呢?如果其中有字段有可能为NULL,那么计算的最终结果会则呢样,我们来看下面的例子:
sum(t.shop_gmvcount + t.GMVCOUNT_NEW + t.auc_shop_gmvcount +  t.spu_gmv_cnt) gmv_cnt,
这样的统计结果,当t.t.shop_gmvcount为NULL时,即使后面的t.GMVCOUNT_NEW 不为null,那么总计的结果这个计算仍然是NULL;
修改的方法是:采用sum(coalesce(t.shop_gmvcount,cast(0 as bigint)) + coalesce(t.GMVCOUNT_NEW,cast(0 as bigint))
这样的方式,coalesce函数类似于ORACLE数据库里面的nvl


left semi jioin的使用
LEFT SEMI JOIN 是 IN/EXISTS 子查询的一种更高效的实现。Hive 当前没有实现 IN/EXISTS 子查询,所以你可以用 LEFT SEMI JOIN 重写你的子查询语句。LEFT SEMI JOIN 的限制是, JOIN 子句中右边的表只能在 ON 子句中设置过滤条件,在 WHERE 子句、SELECT 子句或其他地方过滤都不行。

怎样写exist in子句?
Hive不支持where子句中的子查询,SQL常用的exist in子句需要改写。这一改写相对简单。考虑以下SQL查询语句:
SELECT a.key, a.value FROM a WHERE a.key in (SELECT b.key FROM B);

可以改写为
SELECT a.key, a.value FROM a LEFT OUTER JOIN b ON (a.key = b.key) WHERE b.key <> 
NULL;
一个更高效的实现是利用left semi join改写为:
SELECT a.key, a.val FROM a LEFT SEMI JOIN b on (a.key = b.key);
left semi join是0.5.0以上版本的特性。
分享到:
评论
2 楼 di1984HIT 2014-06-13  
写的不错啊。~
1 楼 dacoolbaby 2013-04-27  
请问楼主能对Hive现有的表做partition操作吗?
还是说需要重新建个分区表这样。。

相关推荐

Global site tag (gtag.js) - Google Analytics