[ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-20947:
-------------------------------

    Assignee: Shengkai Fang  (was: Weijia Xu)

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20947
>                 URL: https://issues.apache.org/jira/browse/FLINK-20947
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.0
>            Reporter: Weijia Xu
>            Assignee: Shengkai Fang
>            Priority: Major
>             Fix For: 1.13.0
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>     "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
>             "  procTime AS PROCTIME()," +
>             "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'yyyy-MM-dd HH:mm:ss'))," +
>             "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
>             "  WITH ('connector' = 'kafka'," +
>             " 'topic' = 'XXX-topic'," +
>             " 'properties.bootstrap.servers'='kafka-server:9092'," +
>             " 'properties.group.id' = 'XXX-group_id'," +
>             " 'scan.startup.mode' = 'latest-offset'," +
>             " 'format' = 'json'," +
>             " 'json.fail-on-missing-field' = 'false'," +
>             " 'json.ignore-parse-errors' = 'true' )"
>             );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>      "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>              " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>              " STORED AS PARQUET TBLPROPERTIES (" +
>              " 'sink.partition-commit.trigger' = 'partition-time'," +
>              " 'sink.partition-commit.delay' = '1 min'," +
>              " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>              " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>              " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to