有什么异常信息吗




在 2020-07-29 14:07:26,"kcz" <573693...@qq.com> 写道:
>确认数据源有数据,全部代码如下,但是hive就是没有数据
>
>package com.hive;
>
>import org.apache.flink.runtime.state.filesystem.FsStateBackend;
>import org.apache.flink.streaming.api.CheckpointingMode;
>import org.apache.flink.streaming.api.TimeCharacteristic;
>import 
>org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.SqlDialect;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.catalog.hive.HiveCatalog;
>
>import java.time.Duration;
>
>public class HiveTest {
>    private static final String path = "hdfs_path";
>    public static void main(String []args)  {
>        System.setProperty("HADOOP_USER_NAME", "work");
>        StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>        env.setParallelism(1);
>        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>        // 同一时间只允许进行一个检查点
>        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
>        env.setStateBackend(new FsStateBackend(path));
>        EnvironmentSettings tableEnvSettings = 
> EnvironmentSettings.newInstance()
>                .useBlinkPlanner()
>                .inStreamingMode()
>                .build();
>        StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env,tableEnvSettings);
>        
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE);
>        
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>  Duration.ofSeconds(20));
>
>        String name            = "myhive";
>        String defaultDatabase = "situation";
>        String hiveConfDir     = "/load/data/hive/hive-conf"; // a local path
>        String version         = "1.2.1";
>
>        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
> version);
>        tableEnv.registerCatalog("myhive", hive);
>
>// set the HiveCatalog as the current catalog of the session
>        tableEnv.useCatalog("myhive");
>        tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation");
>        tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table");
>
>
>        tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
>                "\thost STRING,\n" +
>                "\turl STRING,\n" +
>                "\tpublic_date STRING\n" +
>                ") WITH (\n" +
>                "\t'connector.type' = 'kafka',\n" +
>                "\t'connector.version' = 'universal',\n" +
>                "\t'connector.startup-mode' = 'latest-offset',\n" +
>                "\t'connector.topic' = 'sendMessage',\n" +
>                "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
>                "\t'connector.properties.zookeeper.connect' = 
> '127.0.0.1:2181',\n" +
>                "\t'connector.properties.bootstrap.servers' = 
> '127.0.0.1:9092',\n" +
>                "\t'update-mode' = 'append',\n" +
>                "\t'format.type' = 'json',\n" +
>                "\t'format.derive-schema' = 'true'\n" +
>                ")");
>
>        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>        tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table");
>
>        String hiveSql = "\n" +
>                "  CREATE TABLE situation.fs_table (\n" +
>                " \n" +
>                "    host STRING,\n" +
>                "    url STRING,\n" +
>                "    public_date STRING\n" +
>                "  \n" +
>                "  ) PARTITIONED BY (\n" +
>                "    ts_date STRING,\n" +
>                "    ts_hour STRING,\n" +
>                "    ts_minute STRING\n" +
>                "  ) STORED AS PARQUET\n" +
>                "  TBLPROPERTIES (\n" +
>                "    'sink.partition-commit.trigger' = 'process time',\n" +
>                "    'sink.partition-commit.delay' = '1 min',\n" +
>                "    'sink.partition-commit.policy.kind' = 
> 'metastore,success-file',\n" +
>                "    'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00'\n" +
>                "  )\n" +
>                "  ";
>        tableEnv.executeSql(hiveSql);
>
>        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>
>        tableEnv.executeSql("INSERT INTO  situation.fs_table SELECT host, 
> url,public_date," +
>                " DATE_FORMAT(public_date,'yyyy-MM-dd') 
> ,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm')  FROM 
> situation.source_table");
>
>
>
>    }
>}

回复