hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:

异常:
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: No operators defined in streaming topology. Cannot
generate StreamGraph.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.IllegalStateException: No operators defined in
streaming topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at
com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
代码:

 StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(environment, settings);

        
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        environment.setStateBackend(new MemoryStateBackend());
        environment.getCheckpointConfig().setCheckpointInterval(5000);

        String name = "myhive";
        String defaultDatabase = "tmp";
        String hiveConfDir = "/etc/alternatives/hive-conf/";
        String version = "1.1.0";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
        tableEnv.registerCatalog("myhive", hive);
        tableEnv.useCatalog("myhive");

        tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
                "  user_id BIGINT,\n" +
                "  item_id STRING,\n" +
                "  behavior STRING,\n" +
                "  ts AS PROCTIME()\n" +
                ") WITH (\n" +
                " 'connector' = 'kafka-0.11',\n" +
                " 'topic' = 'user_behavior',\n" +
                " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
                " 'properties.group.id' = 'testGroup',\n" +
                " 'scan.startup.mode' = 'earliest-offset',\n" +
                " 'format' = 'json',\n" +
                " 'json.fail-on-missing-field' = 'false',\n" +
                " 'json.ignore-parse-errors' = 'true'\n" +
                ")");

//        tableEnv.executeSql("CREATE TABLE print_table (\n" +
//                " user_id BIGINT,\n" +
//                " item_id STRING,\n" +
//                " behavior STRING,\n" +
//                " tsdata STRING\n" +
//                ") WITH (\n" +
//                " 'connector' = 'print'\n" +
//                ")");
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
                " user_id BIGINT,\n" +
                " item_id STRING,\n" +
                " behavior STRING,\n" +
                " tsdata STRING\n" +
                ") STORED AS parquet TBLPROPERTIES (\n" +
                " 'sink.rolling-policy.file-size' = '12MB',\n" +
                " 'sink.rolling-policy.rollover-interval' = '1 min',\n" +
                " 'sink.rolling-policy.check-interval' = '1 min',\n" +
                " 'execution.checkpointing.interval' = 'true'\n" +
                ")");

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tableEnv.executeSql("insert into streamhivetest select
user_id,item_id,behavior,DATE_FORMAT(ts, 'yyyy-MM-dd') as tsdata from
user_behavior");

        tableEnv.execute("stream-write-hive");

回复