Hi Dream,

可以详述下你的测试场景吗?

Best,
Jingsong

On Mon, Jul 20, 2020 at 5:40 PM Dream-底限 <zhan...@akulaku.com> wrote:

> hi、
> 请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据
>
> 李佳宸 <lijiachen...@gmail.com> 于2020年7月16日周四 下午10:39写道:
>
> > 好的,谢谢~~~
> >
> > JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:
> >
> > > hi
> > > 需要开启checkpoint
> > >
> > >
> > > | |
> > > JasonLee
> > > |
> > > |
> > > 邮箱:17610775...@163.com
> > > |
> > >
> > > Signature is customized by Netease Mail Master
> > >
> > > 在2020年07月16日 18:03,李佳宸 写道:
> > > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > > 批量的hive写入,流环境的读取是正常的。
> > >
> > > 附代码,很简短:
> > >
> > > public class KafkaToHiveStreaming {
> > >    public static void main(String[] arg) throws Exception{
> > >        StreamExecutionEnvironment bsEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >        EnvironmentSettings bsSettings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > >        StreamTableEnvironment bsTableEnv =
> > > StreamTableEnvironment.create(bsEnv, bsSettings);
> > >        String name            = "myhive";
> > >        String defaultDatabase = "default";
> > >        String hiveConfDir     =
> > > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > > path
> > >        String version         = "3.1.2";
> > >
> > >        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > >        bsTableEnv.registerCatalog("myhive", hive);
> > >        bsTableEnv.useCatalog("myhive");
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >        bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> > >                "  id BIGINT ," +
> > >                "  order_id STRING," +
> > >                "  amount DECIMAL(10, 2)," +
> > >                "  create_time TIMESTAMP " +
> > >                ") WITH (" +
> > >                " 'connector' = 'kafka'," +
> > >                " 'topic' = 'order.test'," +
> > >                " 'properties.bootstrap.servers' = 'localhost:9092'," +
> > >                " 'properties.group.id' = 'testGroup'," +
> > >                " 'scan.startup.mode' = 'earliest-offset', " +
> > >                " 'format' = 'json'  " +
> > >                ")");
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >
> > >        bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming
> (" +
> > >                "  id BIGINT ," +
> > >                "  order_id STRING," +
> > >                "  amount DECIMAL(10, 2)" +
> > >                "  )");
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >        bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > > ('connector' = 'print')" +
> > >                "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > > ALL)");
> > >
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >        bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> > SELECT
> > > " +
> > >                "id, " +
> > >                "order_id, " +
> > >                "amount " +
> > >                "FROM topic_products");
> > >
> > >        Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> > >        table1.executeInsert("print_table");
> > >    }
> > > }
> > >
> >
>


-- 
Best, Jingsong Lee

回复