hi、
请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据

李佳宸 <lijiachen...@gmail.com> 于2020年7月16日周四 下午10:39写道:

> 好的,谢谢~~~
>
> JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:
>
> > hi
> > 需要开启checkpoint
> >
> >
> > | |
> > JasonLee
> > |
> > |
> > 邮箱:17610775...@163.com
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年07月16日 18:03,李佳宸 写道:
> > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > 批量的hive写入,流环境的读取是正常的。
> >
> > 附代码,很简短:
> >
> > public class KafkaToHiveStreaming {
> >    public static void main(String[] arg) throws Exception{
> >        StreamExecutionEnvironment bsEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >        EnvironmentSettings bsSettings =
> >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >        StreamTableEnvironment bsTableEnv =
> > StreamTableEnvironment.create(bsEnv, bsSettings);
> >        String name            = "myhive";
> >        String defaultDatabase = "default";
> >        String hiveConfDir     =
> > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > path
> >        String version         = "3.1.2";
> >
> >        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> >        bsTableEnv.registerCatalog("myhive", hive);
> >        bsTableEnv.useCatalog("myhive");
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >        bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> >                "  id BIGINT ," +
> >                "  order_id STRING," +
> >                "  amount DECIMAL(10, 2)," +
> >                "  create_time TIMESTAMP " +
> >                ") WITH (" +
> >                " 'connector' = 'kafka'," +
> >                " 'topic' = 'order.test'," +
> >                " 'properties.bootstrap.servers' = 'localhost:9092'," +
> >                " 'properties.group.id' = 'testGroup'," +
> >                " 'scan.startup.mode' = 'earliest-offset', " +
> >                " 'format' = 'json'  " +
> >                ")");
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >
> >        bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
> >                "  id BIGINT ," +
> >                "  order_id STRING," +
> >                "  amount DECIMAL(10, 2)" +
> >                "  )");
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >        bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > ('connector' = 'print')" +
> >                "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > ALL)");
> >
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >        bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> SELECT
> > " +
> >                "id, " +
> >                "order_id, " +
> >                "amount " +
> >                "FROM topic_products");
> >
> >        Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> >        table1.executeInsert("print_table");
> >    }
> > }
> >
>

回复