Hi Dream, 可以详述下你的测试场景吗?
Best, Jingsong On Mon, Jul 20, 2020 at 5:40 PM Dream-底限 <zhan...@akulaku.com> wrote: > hi、 > 请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据 > > 李佳宸 <lijiachen...@gmail.com> 于2020年7月16日周四 下午10:39写道: > > > 好的,谢谢~~~ > > > > JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道: > > > > > hi > > > 需要开启checkpoint > > > > > > > > > | | > > > JasonLee > > > | > > > | > > > 邮箱:17610775...@163.com > > > | > > > > > > Signature is customized by Netease Mail Master > > > > > > 在2020年07月16日 18:03,李佳宸 写道: > > > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。 > > > 批量的hive写入,流环境的读取是正常的。 > > > > > > 附代码,很简短: > > > > > > public class KafkaToHiveStreaming { > > > public static void main(String[] arg) throws Exception{ > > > StreamExecutionEnvironment bsEnv = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > EnvironmentSettings bsSettings = > > > > > > > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > > StreamTableEnvironment bsTableEnv = > > > StreamTableEnvironment.create(bsEnv, bsSettings); > > > String name = "myhive"; > > > String defaultDatabase = "default"; > > > String hiveConfDir = > > > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local > > > path > > > String version = "3.1.2"; > > > > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > > > hiveConfDir, version); > > > bsTableEnv.registerCatalog("myhive", hive); > > > bsTableEnv.useCatalog("myhive"); > > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > > bsTableEnv.executeSql("CREATE TABLE topic_products (" + > > > " id BIGINT ," + > > > " order_id STRING," + > > > " amount DECIMAL(10, 2)," + > > > " create_time TIMESTAMP " + > > > ") WITH (" + > > > " 'connector' = 'kafka'," + > > > " 'topic' = 'order.test'," + > > > " 'properties.bootstrap.servers' = 'localhost:9092'," + > > > " 'properties.group.id' = 'testGroup'," + > > > " 'scan.startup.mode' = 'earliest-offset', " + > > > " 'format' = 'json' " + > > > ")"); > > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > > > > > bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming > (" + > > > " id BIGINT ," + > > > " order_id STRING," + > > > " amount DECIMAL(10, 2)" + > > > " )"); > > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > > bsTableEnv.executeSql("CREATE TABLE print_table WITH > > > ('connector' = 'print')" + > > > "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING > > > ALL)"); > > > > > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > > bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming > > SELECT > > > " + > > > "id, " + > > > "order_id, " + > > > "amount " + > > > "FROM topic_products"); > > > > > > Table table1 = bsTableEnv.from("hive_sink_table_streaming"); > > > table1.executeInsert("print_table"); > > > } > > > } > > > > > > -- Best, Jingsong Lee