Hi team,
I wrote a simple SQL job to select data from Kafka. I can see results
printing out in IDE but when I submit the job to a standalone cluster in
CLI there is no result shown. I am sure the job is running well in the
cluster with debug log suggesting that the kafka consumer is fetching data
from Kafka. I enabled debug log in CLI and I don't see any obvious log.
Here is the job code snippet
public static void main(String[] args) throws Exception {
StreamTableEnvironment tableEnv = StreamTableEnvironment
.create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1));
String sqls = new String(Files.readAllBytes(Paths.get(args[0])));
splitIgnoreQuota(sqls, ';').forEach(sql -> {
TableResult tableResult = tableEnv.executeSql(sql);
tableResult.print();
});
}
It simply parses a sql file and execute the statements
Here is the SQL statements
CREATE TABLE t1 (
`f1` STRING,
`f2` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic',
'properties.group.id' = 'test1',
'properties.max.partition.fetch.bytes' = '16384',
'properties.enable.auto.commit' = 'false',
'properties.bootstrap.servers' = 'kafka:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
SELECT * FROM t1
Below is the result I got from IDE
| +I | b8f5 | abcd |
| +I | b8f5 | abcd |
And this is the result from CLI
bin/flink run -m localhost:8081 -c kafka.sample.flink.SQLSample
~/workspaces/kafka-sample/target/kafka-sample-0.1.2-jar-with-dependencies.jar
/sample.sql
+--------+
| result |
+--------+
| OK |
+--------+
1 row in set
Job has been submitted with JobID ace45d2ff850675243e2663d3bf11701
+----+--------------------------------+--------------------------------+
| op | uuid | ots |
+----+--------------------------------+--------------------------------+
--
Regards,
Tao