想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。

附代码,很简短:

public class KafkaToHiveStreaming {
    public static void main(String[] arg) throws Exception{
        StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir     =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
        String version         = "3.1.2";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
        bsTableEnv.registerCatalog("myhive", hive);
        bsTableEnv.useCatalog("myhive");
        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        bsTableEnv.executeSql("CREATE TABLE topic_products (" +
                "  id BIGINT ," +
                "  order_id STRING," +
                "  amount DECIMAL(10, 2)," +
                "  create_time TIMESTAMP " +
                ") WITH (" +
                " 'connector' = 'kafka'," +
                " 'topic' = 'order.test'," +
                " 'properties.bootstrap.servers' = 'localhost:9092'," +
                " 'properties.group.id' = 'testGroup'," +
                " 'scan.startup.mode' = 'earliest-offset', " +
                " 'format' = 'json'  " +
                ")");
        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

        bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
                "  id BIGINT ," +
                "  order_id STRING," +
                "  amount DECIMAL(10, 2)" +
                "  )");
        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
                "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");

        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
                "id, " +
                "order_id, " +
                "amount " +
                "FROM topic_products");

        Table table1 = bsTableEnv.from("hive_sink_table_streaming");
        table1.executeInsert("print_table");
    }
}

回复