Dear All,

近期在research Hive on Flink的一些特性,之前有一个Mapreduce的任务,语句【HQL】如下:

逻辑简单,全连接两个表(table1和table2),将结果写进新表table3。table1、table2和table3三张表结构相同,都有35个字段,join的时候用id作为连接,进行比较,如果table2的字段值不为null,或者不为空,就用table2的字段,反之用table1字段。最后把结果写进新表table3。


到目前为止,已经执行了17h,还没有结束,看数据流,好像快完了,不知道我的使用方法是否合理?

【HQL 】
insert into table3 select
if(t2.id is not null and t2.id <> '', t2.id, t1.id) as id
,if(t2.field2 is not null and t2.field2 <> '', t2.field2, t1.field2) as
field2
......
......
......
......
......
......
......
......
,if(t2.field35 is not null and field35.dt <> '', field35.dt , field35.dt )
as field35
from (
select * from table1 where (id is not null and id <> '')
) as t1 full join (
select * from table2 where (id is not null and id <> '')
) as t2 on (t1.id = t2.id)




代码如下:



public class FlinkHiveIntegration1 {
        
        public static void main(String[] args) throws Exception {

        EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);
      
        String name = "myhive";
        String database = "mydatabase";
        String version = "1.1.0-cdh5.8.3";

        HiveConf hiveConf = new HiveConf();
                
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,"thrift://*******:9083");
        
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,"hdfs://nameservice1/user/hive/warehouse");
        
        
        HiveCatalog hive = new HiveCatalogTest(name, database, hiveConf ,
version);
        
        tableEnv.registerCatalog(name, hive);
        tableEnv.useCatalog(name);
        tableEnv.useDatabase(database);
 
        String HQL = HQL ;

        tableEnv.getConfig().addConfiguration(new
Configuration().set(CoreOptions.DEFAULT_PARALLELISM, 8));
        
        tableEnv.executeSql(HQL );
        
    }
}








<http://apache-flink.147419.n8.nabble.com/file/t1162/hiveonFlink.png> 



-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Reply via email to