[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table
[ https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17270227#comment-17270227 ] Jark Wu commented on FLINK-20947: - Hi [~weijiaxu], we have fixed this problem in master branch. Could you help to check whether this resolve your problem if you have time? > Unable to trigger Hive partition commit when use table sql to write data from > Kafka into a Hive table > - > > Key: FLINK-20947 > URL: https://issues.apache.org/jira/browse/FLINK-20947 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Weijia Xu >Assignee: Shengkai Fang >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0, 1.12.2 > > > I use table sql to create stream with kafka source, and write data from Kafka > into a Hive partitioned table. > The following sql to create kafka table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` > STRING,`payload` STRING,`timestamp` BIGINT, " + > " procTime AS PROCTIME()," + > " eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / > 1000,'-MM-dd HH:mm:ss'))," + > " WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" > + > " WITH ('connector' = 'kafka'," + > " 'topic' = 'XXX-topic'," + > " 'properties.bootstrap.servers'='kafka-server:9092'," + > " 'properties.group.id' = 'XXX-group_id'," + > " 'scan.startup.mode' = 'latest-offset'," + > " 'format' = 'json'," + > " 'json.fail-on-missing-field' = 'false'," + > " 'json.ignore-parse-errors' = 'true' )" > );{code} > > And, the following sql to create Hive table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` > STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " + > " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute > STRING)" + > " STORED AS PARQUET TBLPROPERTIES (" + > " 'sink.partition-commit.trigger' = 'partition-time'," + > " 'sink.partition-commit.delay' = '1 min'," + > " 'sink.partition-commit.policy.kind'='metastore,success-file'," > + > " 'sink.partition-commit.success-file.name'='_SUCCESS'," + > " 'partition.time-extractor.timestamp-pattern' = '$ts_date > $ts_hour:$ts_minute:00')"); > {code} > > For the kafka topic used above, which has multi partitions, with some of > the partitions there's data coming in, while other partitions with no data > coming. > I noticed that there's config "_table.exec.source.idle-timeout_" can handle > the situation for the "idle" source partition, but event though set this > config, it still cannot trigger the Hive partition commit (that means the > "_SUCCESS" file will not be created for the partition). > After do the analysis for this issue, find the root cause is that the > watermark for the "idle" partition will not advance, which cause the Hive > partition cannot be committed. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table
[ https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17270226#comment-17270226 ] Jark Wu commented on FLINK-20947: - Fixed in - master: 90e680c9c579b2ffa1ca93ad7dfbaa5502dd8701 > Unable to trigger Hive partition commit when use table sql to write data from > Kafka into a Hive table > - > > Key: FLINK-20947 > URL: https://issues.apache.org/jira/browse/FLINK-20947 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Weijia Xu >Assignee: Shengkai Fang >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0, 1.12.2 > > > I use table sql to create stream with kafka source, and write data from Kafka > into a Hive partitioned table. > The following sql to create kafka table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` > STRING,`payload` STRING,`timestamp` BIGINT, " + > " procTime AS PROCTIME()," + > " eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / > 1000,'-MM-dd HH:mm:ss'))," + > " WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" > + > " WITH ('connector' = 'kafka'," + > " 'topic' = 'XXX-topic'," + > " 'properties.bootstrap.servers'='kafka-server:9092'," + > " 'properties.group.id' = 'XXX-group_id'," + > " 'scan.startup.mode' = 'latest-offset'," + > " 'format' = 'json'," + > " 'json.fail-on-missing-field' = 'false'," + > " 'json.ignore-parse-errors' = 'true' )" > );{code} > > And, the following sql to create Hive table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` > STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " + > " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute > STRING)" + > " STORED AS PARQUET TBLPROPERTIES (" + > " 'sink.partition-commit.trigger' = 'partition-time'," + > " 'sink.partition-commit.delay' = '1 min'," + > " 'sink.partition-commit.policy.kind'='metastore,success-file'," > + > " 'sink.partition-commit.success-file.name'='_SUCCESS'," + > " 'partition.time-extractor.timestamp-pattern' = '$ts_date > $ts_hour:$ts_minute:00')"); > {code} > > For the kafka topic used above, which has multi partitions, with some of > the partitions there's data coming in, while other partitions with no data > coming. > I noticed that there's config "_table.exec.source.idle-timeout_" can handle > the situation for the "idle" source partition, but event though set this > config, it still cannot trigger the Hive partition commit (that means the > "_SUCCESS" file will not be created for the partition). > After do the analysis for this issue, find the root cause is that the > watermark for the "idle" partition will not advance, which cause the Hive > partition cannot be committed. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table
[ https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17266708#comment-17266708 ] Shengkai Fang commented on FLINK-20947: --- Sorry for the late response. I think the tests works like the rule `PushPartitionIntoTableSourceScanRule`. I will fix it soon. > Unable to trigger Hive partition commit when use table sql to write data from > Kafka into a Hive table > - > > Key: FLINK-20947 > URL: https://issues.apache.org/jira/browse/FLINK-20947 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Weijia Xu >Assignee: Shengkai Fang >Priority: Major > Fix For: 1.13.0, 1.12.2 > > > I use table sql to create stream with kafka source, and write data from Kafka > into a Hive partitioned table. > The following sql to create kafka table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` > STRING,`payload` STRING,`timestamp` BIGINT, " + > " procTime AS PROCTIME()," + > " eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / > 1000,'-MM-dd HH:mm:ss'))," + > " WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" > + > " WITH ('connector' = 'kafka'," + > " 'topic' = 'XXX-topic'," + > " 'properties.bootstrap.servers'='kafka-server:9092'," + > " 'properties.group.id' = 'XXX-group_id'," + > " 'scan.startup.mode' = 'latest-offset'," + > " 'format' = 'json'," + > " 'json.fail-on-missing-field' = 'false'," + > " 'json.ignore-parse-errors' = 'true' )" > );{code} > > And, the following sql to create Hive table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` > STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " + > " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute > STRING)" + > " STORED AS PARQUET TBLPROPERTIES (" + > " 'sink.partition-commit.trigger' = 'partition-time'," + > " 'sink.partition-commit.delay' = '1 min'," + > " 'sink.partition-commit.policy.kind'='metastore,success-file'," > + > " 'sink.partition-commit.success-file.name'='_SUCCESS'," + > " 'partition.time-extractor.timestamp-pattern' = '$ts_date > $ts_hour:$ts_minute:00')"); > {code} > > For the kafka topic used above, which has multi partitions, with some of > the partitions there's data coming in, while other partitions with no data > coming. > I noticed that there's config "_table.exec.source.idle-timeout_" can handle > the situation for the "idle" source partition, but event though set this > config, it still cannot trigger the Hive partition commit (that means the > "_SUCCESS" file will not be created for the partition). > After do the analysis for this issue, find the root cause is that the > watermark for the "idle" partition will not advance, which cause the Hive > partition cannot be committed. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table
[ https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264104#comment-17264104 ] Jark Wu commented on FLINK-20947: - 1. I find that we don't have watermark pushdown with idle timeout **plan test**. We need add one into {{PushWatermarkIntoTableSourceScanRuleTest}}. 2. We can add an IT case in {{KafkaTableITCase}}, - prepare 2 partitions, each with several messages. The last event time in the first partition is {{2021-01-13 20:00:00}}, the last event time in the second partition is {{2021-01-13 19:30:00}}. - window query on the source with tumbling 1 hour - the query should have output with window [19:00, 20:00) which means the watermark has reached 20:00. > Unable to trigger Hive partition commit when use table sql to write data from > Kafka into a Hive table > - > > Key: FLINK-20947 > URL: https://issues.apache.org/jira/browse/FLINK-20947 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Weijia Xu >Assignee: Shengkai Fang >Priority: Major > Fix For: 1.13.0, 1.12.2 > > > I use table sql to create stream with kafka source, and write data from Kafka > into a Hive partitioned table. > The following sql to create kafka table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` > STRING,`payload` STRING,`timestamp` BIGINT, " + > " procTime AS PROCTIME()," + > " eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / > 1000,'-MM-dd HH:mm:ss'))," + > " WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" > + > " WITH ('connector' = 'kafka'," + > " 'topic' = 'XXX-topic'," + > " 'properties.bootstrap.servers'='kafka-server:9092'," + > " 'properties.group.id' = 'XXX-group_id'," + > " 'scan.startup.mode' = 'latest-offset'," + > " 'format' = 'json'," + > " 'json.fail-on-missing-field' = 'false'," + > " 'json.ignore-parse-errors' = 'true' )" > );{code} > > And, the following sql to create Hive table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` > STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " + > " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute > STRING)" + > " STORED AS PARQUET TBLPROPERTIES (" + > " 'sink.partition-commit.trigger' = 'partition-time'," + > " 'sink.partition-commit.delay' = '1 min'," + > " 'sink.partition-commit.policy.kind'='metastore,success-file'," > + > " 'sink.partition-commit.success-file.name'='_SUCCESS'," + > " 'partition.time-extractor.timestamp-pattern' = '$ts_date > $ts_hour:$ts_minute:00')"); > {code} > > For the kafka topic used above, which has multi partitions, with some of > the partitions there's data coming in, while other partitions with no data > coming. > I noticed that there's config "_table.exec.source.idle-timeout_" can handle > the situation for the "idle" source partition, but event though set this > config, it still cannot trigger the Hive partition commit (that means the > "_SUCCESS" file will not be created for the partition). > After do the analysis for this issue, find the root cause is that the > watermark for the "idle" partition will not advance, which cause the Hive > partition cannot be committed. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table
[ https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264020#comment-17264020 ] Weijia Xu commented on FLINK-20947: --- My pleasure, [~jark] I think fix this issue is easy, what's more important, as you said, need setup the test for certain scenarios. If I can be involved in the discussion for how to setup the test and learn more about this ? > Unable to trigger Hive partition commit when use table sql to write data from > Kafka into a Hive table > - > > Key: FLINK-20947 > URL: https://issues.apache.org/jira/browse/FLINK-20947 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Weijia Xu >Assignee: Shengkai Fang >Priority: Major > Fix For: 1.13.0, 1.12.2 > > > I use table sql to create stream with kafka source, and write data from Kafka > into a Hive partitioned table. > The following sql to create kafka table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` > STRING,`payload` STRING,`timestamp` BIGINT, " + > " procTime AS PROCTIME()," + > " eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / > 1000,'-MM-dd HH:mm:ss'))," + > " WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" > + > " WITH ('connector' = 'kafka'," + > " 'topic' = 'XXX-topic'," + > " 'properties.bootstrap.servers'='kafka-server:9092'," + > " 'properties.group.id' = 'XXX-group_id'," + > " 'scan.startup.mode' = 'latest-offset'," + > " 'format' = 'json'," + > " 'json.fail-on-missing-field' = 'false'," + > " 'json.ignore-parse-errors' = 'true' )" > );{code} > > And, the following sql to create Hive table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` > STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " + > " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute > STRING)" + > " STORED AS PARQUET TBLPROPERTIES (" + > " 'sink.partition-commit.trigger' = 'partition-time'," + > " 'sink.partition-commit.delay' = '1 min'," + > " 'sink.partition-commit.policy.kind'='metastore,success-file'," > + > " 'sink.partition-commit.success-file.name'='_SUCCESS'," + > " 'partition.time-extractor.timestamp-pattern' = '$ts_date > $ts_hour:$ts_minute:00')"); > {code} > > For the kafka topic used above, which has multi partitions, with some of > the partitions there's data coming in, while other partitions with no data > coming. > I noticed that there's config "_table.exec.source.idle-timeout_" can handle > the situation for the "idle" source partition, but event though set this > config, it still cannot trigger the Hive partition commit (that means the > "_SUCCESS" file will not be created for the partition). > After do the analysis for this issue, find the root cause is that the > watermark for the "idle" partition will not advance, which cause the Hive > partition cannot be committed. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table
[ https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263907#comment-17263907 ] Jark Wu commented on FLINK-20947: - Good catch [~weijiaxu]! I think you are right. > Unable to trigger Hive partition commit when use table sql to write data from > Kafka into a Hive table > - > > Key: FLINK-20947 > URL: https://issues.apache.org/jira/browse/FLINK-20947 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Weijia Xu >Assignee: Weijia Xu >Priority: Major > Fix For: 1.13.0 > > > I use table sql to create stream with kafka source, and write data from Kafka > into a Hive partitioned table. > The following sql to create kafka table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` > STRING,`payload` STRING,`timestamp` BIGINT, " + > " procTime AS PROCTIME()," + > " eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / > 1000,'-MM-dd HH:mm:ss'))," + > " WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" > + > " WITH ('connector' = 'kafka'," + > " 'topic' = 'XXX-topic'," + > " 'properties.bootstrap.servers'='kafka-server:9092'," + > " 'properties.group.id' = 'XXX-group_id'," + > " 'scan.startup.mode' = 'latest-offset'," + > " 'format' = 'json'," + > " 'json.fail-on-missing-field' = 'false'," + > " 'json.ignore-parse-errors' = 'true' )" > );{code} > > And, the following sql to create Hive table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` > STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " + > " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute > STRING)" + > " STORED AS PARQUET TBLPROPERTIES (" + > " 'sink.partition-commit.trigger' = 'partition-time'," + > " 'sink.partition-commit.delay' = '1 min'," + > " 'sink.partition-commit.policy.kind'='metastore,success-file'," > + > " 'sink.partition-commit.success-file.name'='_SUCCESS'," + > " 'partition.time-extractor.timestamp-pattern' = '$ts_date > $ts_hour:$ts_minute:00')"); > {code} > > For the kafka topic used above, which has multi partitions, with some of > the partitions there's data coming in, while other partitions with no data > coming. > I noticed that there's config "_table.exec.source.idle-timeout_" can handle > the situation for the "idle" source partition, but event though set this > config, it still cannot trigger the Hive partition commit (that means the > "_SUCCESS" file will not be created for the partition). > After do the analysis for this issue, find the root cause is that the > watermark for the "idle" partition will not advance, which cause the Hive > partition cannot be committed. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table
[ https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263895#comment-17263895 ] Weijia Xu commented on FLINK-20947: --- I think the root cause is the watermark for the "idle" partition will not advance, the reason why this happens, is because when set the "_table.exec.source.idle-timeout_", and the idle timeout value is valid, the watermark strategy that enriched with idleness detection should be updated, but actually not (after checking from the source code) source code class: _PushWatermarkIntoTableSourceScanRuleBase.java_ > Unable to trigger Hive partition commit when use table sql to write data from > Kafka into a Hive table > - > > Key: FLINK-20947 > URL: https://issues.apache.org/jira/browse/FLINK-20947 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Weijia Xu >Priority: Major > Fix For: 1.13.0 > > > I use table sql to create stream with kafka source, and write data from Kafka > into a Hive partitioned table. > The following sql to create kafka table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` > STRING,`payload` STRING,`timestamp` BIGINT, " + > " procTime AS PROCTIME()," + > " eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / > 1000,'-MM-dd HH:mm:ss'))," + > " WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" > + > " WITH ('connector' = 'kafka'," + > " 'topic' = 'XXX-topic'," + > " 'properties.bootstrap.servers'='kafka-server:9092'," + > " 'properties.group.id' = 'XXX-group_id'," + > " 'scan.startup.mode' = 'latest-offset'," + > " 'format' = 'json'," + > " 'json.fail-on-missing-field' = 'false'," + > " 'json.ignore-parse-errors' = 'true' )" > );{code} > > And, the following sql to create Hive table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` > STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " + > " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute > STRING)" + > " STORED AS PARQUET TBLPROPERTIES (" + > " 'sink.partition-commit.trigger' = 'partition-time'," + > " 'sink.partition-commit.delay' = '1 min'," + > " 'sink.partition-commit.policy.kind'='metastore,success-file'," > + > " 'sink.partition-commit.success-file.name'='_SUCCESS'," + > " 'partition.time-extractor.timestamp-pattern' = '$ts_date > $ts_hour:$ts_minute:00')"); > {code} > > For the kafka topic used above, which has multi partitions, with some of > the partitions there's data coming in, while other partitions with no data > coming. > I noticed that there's config "_table.exec.source.idle-timeout_" can handle > the situation for the "idle" source partition, but event though set this > config, it still cannot trigger the Hive partition commit (that means the > "_SUCCESS" file will not be created for the partition). > After do the analysis for this issue, find the root cause is that the > watermark for the "idle" partition will not advance, which cause the Hive > partition cannot be committed. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table
[ https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263885#comment-17263885 ] Jark Wu commented on FLINK-20947: - cc [~fsk119] could you help to check whether the idle timeout works for the push downed watermark? [~weijiaxu] could you share the explain result of your query? > Unable to trigger Hive partition commit when use table sql to write data from > Kafka into a Hive table > - > > Key: FLINK-20947 > URL: https://issues.apache.org/jira/browse/FLINK-20947 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Weijia Xu >Priority: Major > > I use table sql to create stream with kafka source, and write data from Kafka > into a Hive partitioned table. > The following sql to create kafka table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` > STRING,`payload` STRING,`timestamp` BIGINT, " + > " procTime AS PROCTIME()," + > " eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / > 1000,'-MM-dd HH:mm:ss'))," + > " WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" > + > " WITH ('connector' = 'kafka'," + > " 'topic' = 'XXX-topic'," + > " 'properties.bootstrap.servers'='kafka-server:9092'," + > " 'properties.group.id' = 'XXX-group_id'," + > " 'scan.startup.mode' = 'latest-offset'," + > " 'format' = 'json'," + > " 'json.fail-on-missing-field' = 'false'," + > " 'json.ignore-parse-errors' = 'true' )" > );{code} > > And, the following sql to create Hive table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` > STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " + > " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute > STRING)" + > " STORED AS PARQUET TBLPROPERTIES (" + > " 'sink.partition-commit.trigger' = 'partition-time'," + > " 'sink.partition-commit.delay' = '1 min'," + > " 'sink.partition-commit.policy.kind'='metastore,success-file'," > + > " 'sink.partition-commit.success-file.name'='_SUCCESS'," + > " 'partition.time-extractor.timestamp-pattern' = '$ts_date > $ts_hour:$ts_minute:00')"); > {code} > > For the kafka topic used above, which has multi partitions, with some of > the partitions there's data coming in, while other partitions with no data > coming. > I noticed that there's config "_table.exec.source.idle-timeout_" can handle > the situation for the "idle" source partition, but event though set this > config, it still cannot trigger the Hive partition commit (that means the > "_SUCCESS" file will not be created for the partition). > After do the analysis for this issue, find the root cause is that the > watermark for the "idle" partition will not advance, which cause the Hive > partition cannot be committed. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)