[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-22 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17270227#comment-17270227
 ] 

Jark Wu commented on FLINK-20947:
-

Hi [~weijiaxu], we have fixed this problem in master branch. Could you help to 
check whether this resolve your problem if you have time?

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-22 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17270226#comment-17270226
 ] 

Jark Wu commented on FLINK-20947:
-

Fixed in 
 - master: 90e680c9c579b2ffa1ca93ad7dfbaa5502dd8701

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-16 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17266708#comment-17266708
 ] 

Shengkai Fang commented on FLINK-20947:
---

Sorry for the late response. I think the tests works like the rule 
`PushPartitionIntoTableSourceScanRule`. I will fix it soon.

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.13.0, 1.12.2
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-13 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264104#comment-17264104
 ] 

Jark Wu commented on FLINK-20947:
-

1. I find that we don't have watermark pushdown with idle timeout **plan 
test**. We need add one into {{PushWatermarkIntoTableSourceScanRuleTest}}.
2. We can add an IT case in {{KafkaTableITCase}}, 
   - prepare 2 partitions, each with several messages. The last event time in 
the first partition is {{2021-01-13 20:00:00}}, the last event time in the 
second partition is {{2021-01-13 19:30:00}}. 
   - window query on the source with tumbling 1 hour
   - the query should have output with window [19:00, 20:00) which means the 
watermark has reached 20:00. 

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.13.0, 1.12.2
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-13 Thread Weijia Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264020#comment-17264020
 ] 

Weijia Xu commented on FLINK-20947:
---

My pleasure, [~jark] 

I think fix this issue is easy, what's more important, as you said, need setup 
the test for certain scenarios.

If I can be involved in the discussion for how to setup the test and learn more 
about this ?

 

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.13.0, 1.12.2
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-12 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263907#comment-17263907
 ] 

Jark Wu commented on FLINK-20947:
-

Good catch [~weijiaxu]! I think you are right. 


> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Assignee: Weijia Xu
>Priority: Major
> Fix For: 1.13.0
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-12 Thread Weijia Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263895#comment-17263895
 ] 

Weijia Xu commented on FLINK-20947:
---

I think the root cause is the watermark for the "idle" partition will not 
advance, the reason why this happens, is because when set the 
"_table.exec.source.idle-timeout_", and the idle timeout value is valid, the 
watermark strategy that enriched with idleness detection should be updated, but 
actually not (after checking from the source code)

source code class:  _PushWatermarkIntoTableSourceScanRuleBase.java_

 

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Priority: Major
> Fix For: 1.13.0
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-12 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263885#comment-17263885
 ] 

Jark Wu commented on FLINK-20947:
-

cc [~fsk119] could you help to check whether the idle timeout works for the 
push downed watermark?

[~weijiaxu] could you share the explain result of your query?

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Priority: Major
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)