Options options = null;
    try {
OptionParser optionParser = new OptionParser(args);
options = optionParser.getOptions();
} catch (Exception e) {
e.printStackTrace();
return;
}
    
    String name            = options.getName();
    String defaultDatabase = options.getDatabase();//"dc_yunpingtai";
    String hiveConfDir     =  options.getHiveConfDir(); 
//"/Users/zhongbaoluo/Applications/app/apache-hive-3.1.2/conf"; // a local path
    String version         = "3.1.2";
    String sql = options.getSql();
    HiveUtils.hiveConfDir(hiveConfDir);
    
    
    HiveConf hiveConf = new HiveConf();
    hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,"thrift://dcmaster01:9083");
    
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,"/user/hive/warehouse");//hdfs://datacloud-hadoop-cluster
    hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "dcmaster02:2181");
    hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "2181");
    hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, "10000");
    hiveConf.set("hive.server2.zookeeper.namespace", "hiveserver2");
    hiveConf.set("hive.server2.zookeeper.publish.configs", "true");
    hiveConf.set("hive.server2.support.dynamic.service.discovery", "true");
    hiveConf.set("hive.metastore.warehouse.dir", "/user/hive/warehouse");
    
    try {
    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
    System.out.println("settings  创建完成");
        TableEnvironment tableEnv = TableEnvironment.create(settings);
        System.out.println("tableEnv  创建完成");
        
        MyHiveCatalog hive = new MyHiveCatalog(name, defaultDatabase, 
hiveConf,version);
        tableEnv.registerCatalog(name, hive);
        System.out.println("hive  创建完成");


        // set the HiveCatalog as the current catalog of the session
        tableEnv.useCatalog(name);
        tableEnv.useDatabase(defaultDatabase);
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql("show tables").print();
        System.out.println("sql:"+sql);
        //tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        
        tableEnv.executeSql("DROP TABLE print_table");
        tableEnv.executeSql("CREATE TABLE print_table(f0 BIGINT) WITH 
('connector' = 'print')");
//        tableEnv.executeSql("CREATE TABLE print_table_mysql (\n" + 
//        "f0 BIGINT\n" + 
//        ") WITH ('connector' = 'jdbc',\n" + 
//        "'url' = 
'jdbc:mysql://192.168.50.120:3306/datacloud?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useTimezone=true',\n"
 + 
//        "'table-name' = 't_dc_test',\n" + 
//        "'username' = 'dcuser',\n" + 
//        "'password' = 'datacloud37')");
        tableEnv.executeSql(sql);// 这里的 sql = INSERT INTO print_table select 
count(1) from t_mpos_integral_sign_water
        //Table result = tableEnv.sqlQuery(sql);//"select count(1) from 
t_mpos_integral_sign_water"
        System.out.println("tableResult  创建完成");
        //result.execute().print();
} catch (Exception e) {
e.printStackTrace();
}




MyHiveCatalog 类的代码:


public class MyHiveCatalog extends HiveCatalog{
private static final Logger LOG = LoggerFactory.getLogger(MyHiveCatalog.class);
public MyHiveCatalog(String catalogName, @Nullable String defaultDatabase, 
@Nullable HiveConf hiveConf, String hiveVersion) {
this(catalogName,defaultDatabase == null ? DEFAULT_DB : 
defaultDatabase,createHiveConf(hiveConf),hiveVersion,false);
}
protected MyHiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf, String hiveVersion,
boolean allowEmbedded) {
super(catalogName, defaultDatabase, hiveConf, hiveVersion, allowEmbedded);
// TODO Auto-generated constructor stub
}
private static HiveConf createHiveConf(@Nullable HiveConf hiveConf) {
//LOG.info("Setting hive conf dir as {}", hiveConfDir);


// try {
// HiveConf.setHiveSiteLocation(
// hiveConfDir == null ?
// null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL());
// } catch (MalformedURLException e) {
// throw new CatalogException(
// String.format("Failed to get hive-site.xml from %s", hiveConfDir), e);
// }


// create HiveConf from hadoop configuration
Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new 
org.apache.flink.configuration.Configuration());


// Add mapred-site.xml. We need to read configurations like compression codec.
for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new 
org.apache.flink.configuration.Configuration())) {
File mapredSite = new File(new File(possibleHadoopConfPath), "mapred-site.xml");
if (mapredSite.exists()) {
hadoopConf.addResource(new Path(mapredSite.getAbsolutePath()));
break;
}
}
HiveConf conf = new HiveConf(hadoopConf, HiveConf.class);
conf.addResource(hiveConf);
return conf;
}


}


**********************************************************************
Thanks & Best Regards!


杉欣集团-技术研究院  云平台
钟保罗


上海浦东新区东方路3261号振华广场B座23楼(杉欣集团)
email: zhongbao...@shxgroup.net
手机: 18157855633
 





 原始邮件 
发件人: taochanglian<taochangl...@163.com>
收件人: user-zh<user-zh@flink.apache.org>; zhongbaoluo<zhongbao...@shxgroup.net>
发送时间: 2020年9月8日(周二) 16:51
主题: Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询


贴一下代码

在 2020/9/8 14:09, zhongbaoluo 写道:

据插入数据执行失败,也没有找到异常。 yarn

Reply via email to