想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。 批量的hive写入,流环境的读取是正常的。
附代码,很简短: public class KafkaToHiveStreaming { public static void main(String[] arg) throws Exception{ StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local path String version = "3.1.2"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); bsTableEnv.registerCatalog("myhive", hive); bsTableEnv.useCatalog("myhive"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); bsTableEnv.executeSql("CREATE TABLE topic_products (" + " id BIGINT ," + " order_id STRING," + " amount DECIMAL(10, 2)," + " create_time TIMESTAMP " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'order.test'," + " 'properties.bootstrap.servers' = 'localhost:9092'," + " 'properties.group.id' = 'testGroup'," + " 'scan.startup.mode' = 'earliest-offset', " + " 'format' = 'json' " + ")"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" + " id BIGINT ," + " order_id STRING," + " amount DECIMAL(10, 2)" + " )"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); bsTableEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print')" + "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " + "id, " + "order_id, " + "amount " + "FROM topic_products"); Table table1 = bsTableEnv.from("hive_sink_table_streaming"); table1.executeInsert("print_table"); } }