注:hive是在博主所在公司已经淘汰的开发语言,将当时在组内备受好评的速成手册分享在此。
###hive 介绍
###hive VS hadoop mr program
hive元信息是“数据的数据”,用来描述table、partition、index以及其他信息的数据。
hive元信息存储在RDBMS中,常见访问方式如下:
hive支持四种数据模型
为了避免table名称冲突,hive用database作为顶层域名,如果不设定database,就采用default database。
hive索引
hive索引构建
hive视图可以把一个query保存下来,并以table的形式进行使用,可以降低query语句的复杂度。
hive视图特性:
hive支持多种数据类型:
hive对类型异常的处理:
###hive DDL
create table if not exists test_table (field_1 int, field_2 string, field_3 double, field_4 map<int, array<string>>)
partitioned by (data_date string)
clustered by (field_1) sorted by (field_2) into 4 buckets
row format delimited fields terminated by '\t'
stored as textfile;
alter table test_table rename to new_table;
alter table test_table drop partition (data_date='20121106');
alter table test_table clustered by (field_1) sorted by (field_2) into 8 buckets;
alter table test_table add columns (field_5 float);
drop table test_table;
drop index field_1_index on test_table;
alter table drop test_table partiton (data_date='20121106');
show functions [like "str*"];
show databases [like "impl*"];
show tables [like "diradv*"];
show partitions test_table;
describe database default;
describe [extended|formatted] test_table;
describe test_table partition (data_date='20121106');
hive内置丰富的操作符和通用函数:
SHOW FUNCTIONS -- 列出目前hive中所有函数
DESCRIBE FUNCTION function_name -- 显示函数简单描述
DESCRIBE FUNCTION EXTENDED function_name -- 获取函数详细描述
###hive DML
####hive dml 数据导入
create external table one_test_table (id string, account_id bigint, weight float)
row format delimited fields terminated by '\t'
location '/group/one_test_table';
from one_test_table
insert overwrite table two_test_table
select id, accound_id, weight
where weight >= 0.5;
load data local inpath /home/work/data/tmp/\* into table one_test_table;
load data inpath '/group/tmp_table_name/*' into table one_test_table;
####hive dml 数据查询
from one_test_table
insert overwrite table two_test_table
select *
limit 10;
from one_test_table left outer join two_test_table on (one_test_table.id = two_test_table.winfo_id)
select one_test_table.id, one_test_table.accound_id, two_test_table.id, two_test_table.accound_id
where two_test_table.account_id is null;
from one_test_table
select id, account_id, sum(weight)
group by id, account_id;
from (
from one_test_table
select *
union all
from two_test_table
select *
) union_table
select *
where weight >= 0.5;
from (
from one_test_table
select id, account_id, sum(weight) as sum_weight
group by id, account_id
) sum_table
select *
where sum_weight >= 1;
add file ./top_n_map.py;
add file ./top_n_red.py;
add cachearchive ./python26.tar.gz; -- 你需要的python版本源文件
from (
from log_table
map query as query, winfo_id as winfo_id, clk as clk
using 'python26.tar.gz/bin/python26.sh top_n_map.py'
as query, winfo_id, clk
cluster by query
) map_table
reduce map_table.query, map_table.winfo_id, map_table.clk
using 'python26.tar.gz/bin/python26.sh top_n_red.py'
as query, winfo_id, clk;
add file ./page_url_to_id.py;
add file ./my_python_session_cutter.py;
from (
select transform(user_id, page_url, unix_time)
using 'page_url_to_id.py' as (user_id, page_id, unix_time)
from mylog
distribute by user_id sort by user_id, unix_time
) mylog2
select transform(user_id, page_id, unix_time)
using 'my_python_session_cutter.py' as (user_id, session_info);
类似于其他语言编写的程序,hive脚本也需要进行性能调优,但需要注意的是:
如果之后性能还有问题,可以对query的执行计划进行分析:
能提升性能的任务设置方式:
能提升性能的hive语言写法:
from (
from one_test_table
select id, account_id, weight
union all
from two_test_table
select id, account_id, weight
) union_table
select *
where weight >= 0.5;
from one_test_table
insert overwrite table a_table
select *
where src = 'a'
insert overwrite table b_table
select *
where src = 'b';
from user_table join winfo_table on (user_table.user_id = winfo_table.user_id)
insert overwrite table valid_winfo_table
select /*\+ mapjoin(user_table) \*/ winfo_table.winfo_id, winfo_table.bidword;
from user_table join winfo_table on (user_table.user_id = winfo_table.user_id and winfo_table.date = '20121106')
insert overwrite table valid_winfo_table
select /*\+ mapjoin(user_table) \*/ winfo_table.winfo_id, winfo_table.bidword;
from user_table
join winfo_table on (user_table.user_id = winfo_table.user_id and winfo_table.date = '20121106')
join idea_table on (user_table.user_id = idea_table.user_id and idea_table.date = '20121106')
insert overwrite table valid_winfo_idea_table
select winfo_table.winfo_id, idea_table.idea_id;
hive扩展功能主要包括:
其中udf的类型有:
package org.apache.hadoop.hive.contrib.udf.example;
import java.util.Date;
import java.text.SimpleDateFormat;
import org.apache.hadoop.hive.ql.exec.UDF;
@Description(name = "YourUDFName",
value = "_FUNC_(InputDataType) - using the input datatype X argument, "+
"returns YYY.",
extended = "Example:\n"
+ " > SELECT _FUNC_(InputDataType) FROM tablename;")
public class YourUDFName extends UDF{
..
public YourUDFName( InputDataType InputValue ){
..;
}
public String evaluate( InputDataType InputValue ){
..;
}
}
开发UDFs, GenericUDFs, UDAFs, and UDTFs
public class YourUDFName extends UDF{
public class YourGenericUDFName extends GenericUDF {..}
public class YourGenericUDAFName extends AbstractGenericUDAFResolver {..}
public class YourGenericUDTFName extends GenericUDTF {..}
加载和卸载UDFs
ADD JAR /full_path_to_jar/YourUDFName.jar;
CREATE TEMPORARY FUNCTION YourUDFName AS 'org.apache.hadoop.hive.contrib.udf.example.YourUDFName'; --加载
DROP TEMPORARY FUNCTION IF EXISTS YourUDFName; -- 卸载
select a.key from a left outer join b on a.key=b.key where b.key1 is null;
select count(case when type = 1 then 1 end), count(case when type = 2 then 1 end) from one_test_table;
Describe extended one_test_table;
--step1 hive 步骤
hive -e "set hive.exec.compress.output = false;
insert overwrite directory '/group/hive/one_test_table/tmp'
select * from one_test_table where pt = '${bizdate}';"
--step2 shell 步骤
hadoop fs -cat /group/one_test_table/tmp/* |
sed -e 's/\x01/\t/g' |
awk '{print $1"\t"$2;}' > ~/one_test_table.txt
行转列使用的group_concat函数
列转行使用explode函数
select id, item_id from (
select id, split(get_json_object(features, '$.items'),',') as itemlist
from tmp_table_name
where status = 0
and get_json_object(features, '$.items') is not null
) t
lateral view explode(itemlist) tmp_table_name as item_id;