[jira] [Comment Edited] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-16 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17266708#comment-17266708
 ] 

Shengkai Fang edited comment on FLINK-20947 at 1/17/21, 6:50 AM:
-

Sorry for the late response. I think the tests works like the rule test in 
`PushPartitionIntoTableSourceScanRule`. I will fix it soon.


was (Author: fsk119):
Sorry for the late response. I think the tests works like the rule 
`PushPartitionIntoTableSourceScanRule`. I will fix it soon.

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.13.0, 1.12.2
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-12 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263907#comment-17263907
 ] 

Jark Wu edited comment on FLINK-20947 at 1/13/21, 6:20 AM:
---

Good catch [~weijiaxu]! I think you are right. 

[~fsk119] will help to fix this. We should think about how to add a good test 
for this. It is not test covered when supporting the watermark pushdown. 


was (Author: jark):
Good catch [~weijiaxu]! I think you are right. 


> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.13.0
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)