hi、 请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据
李佳宸 <lijiachen...@gmail.com> 于2020年7月16日周四 下午10:39写道: > 好的,谢谢~~~ > > JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道: > > > hi > > 需要开启checkpoint > > > > > > | | > > JasonLee > > | > > | > > 邮箱:17610775...@163.com > > | > > > > Signature is customized by Netease Mail Master > > > > 在2020年07月16日 18:03,李佳宸 写道: > > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。 > > 批量的hive写入,流环境的读取是正常的。 > > > > 附代码,很简短: > > > > public class KafkaToHiveStreaming { > > public static void main(String[] arg) throws Exception{ > > StreamExecutionEnvironment bsEnv = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > EnvironmentSettings bsSettings = > > > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > StreamTableEnvironment bsTableEnv = > > StreamTableEnvironment.create(bsEnv, bsSettings); > > String name = "myhive"; > > String defaultDatabase = "default"; > > String hiveConfDir = > > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local > > path > > String version = "3.1.2"; > > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > > hiveConfDir, version); > > bsTableEnv.registerCatalog("myhive", hive); > > bsTableEnv.useCatalog("myhive"); > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > bsTableEnv.executeSql("CREATE TABLE topic_products (" + > > " id BIGINT ," + > > " order_id STRING," + > > " amount DECIMAL(10, 2)," + > > " create_time TIMESTAMP " + > > ") WITH (" + > > " 'connector' = 'kafka'," + > > " 'topic' = 'order.test'," + > > " 'properties.bootstrap.servers' = 'localhost:9092'," + > > " 'properties.group.id' = 'testGroup'," + > > " 'scan.startup.mode' = 'earliest-offset', " + > > " 'format' = 'json' " + > > ")"); > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > > > bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" + > > " id BIGINT ," + > > " order_id STRING," + > > " amount DECIMAL(10, 2)" + > > " )"); > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > bsTableEnv.executeSql("CREATE TABLE print_table WITH > > ('connector' = 'print')" + > > "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING > > ALL)"); > > > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming > SELECT > > " + > > "id, " + > > "order_id, " + > > "amount " + > > "FROM topic_products"); > > > > Table table1 = bsTableEnv.from("hive_sink_table_streaming"); > > table1.executeInsert("print_table"); > > } > > } > > >