[ https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu reassigned FLINK-20947: ------------------------------- Assignee: Weijia Xu > Unable to trigger Hive partition commit when use table sql to write data from > Kafka into a Hive table > ----------------------------------------------------------------------------------------------------- > > Key: FLINK-20947 > URL: https://issues.apache.org/jira/browse/FLINK-20947 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.0 > Reporter: Weijia Xu > Assignee: Weijia Xu > Priority: Major > Fix For: 1.13.0 > > > I use table sql to create stream with kafka source, and write data from Kafka > into a Hive partitioned table. > The following sql to create kafka table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` > STRING,`payload` STRING,`timestamp` BIGINT, " + > " procTime AS PROCTIME()," + > " eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / > 1000,'yyyy-MM-dd HH:mm:ss'))," + > " WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" > + > " WITH ('connector' = 'kafka'," + > " 'topic' = 'XXX-topic'," + > " 'properties.bootstrap.servers'='kafka-server:9092'," + > " 'properties.group.id' = 'XXX-group_id'," + > " 'scan.startup.mode' = 'latest-offset'," + > " 'format' = 'json'," + > " 'json.fail-on-missing-field' = 'false'," + > " 'json.ignore-parse-errors' = 'true' )" > );{code} > > And, the following sql to create Hive table: > {code:java} > // code placeholder > tableEnv.executeSql( > "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` > STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " + > " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute > STRING)" + > " STORED AS PARQUET TBLPROPERTIES (" + > " 'sink.partition-commit.trigger' = 'partition-time'," + > " 'sink.partition-commit.delay' = '1 min'," + > " 'sink.partition-commit.policy.kind'='metastore,success-file'," > + > " 'sink.partition-commit.success-file.name'='_SUCCESS'," + > " 'partition.time-extractor.timestamp-pattern' = '$ts_date > $ts_hour:$ts_minute:00')"); > {code} > > For the kafka topic used above, which has multi partitions, with some of > the partitions there's data coming in, while other partitions with no data > coming. > I noticed that there's config "_table.exec.source.idle-timeout_" can handle > the situation for the "idle" source partition, but event though set this > config, it still cannot trigger the Hive partition commit (that means the > "_SUCCESS" file will not be created for the partition). > After do the analysis for this issue, find the root cause is that the > watermark for the "idle" partition will not advance, which cause the Hive > partition cannot be committed. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)