[jira] [Updated] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-20947:
---
Labels: pull-request-available  (was: )

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.2
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-12 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-20947:

Fix Version/s: 1.12.1

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.13.0, 1.12.1
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-12 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-20947:

Fix Version/s: (was: 1.12.1)
   1.12.2

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Assignee: Shengkai Fang
>Priority: Major
> Fix For: 1.13.0, 1.12.2
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-12 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-20947:

Fix Version/s: 1.13.0

> Unable to trigger Hive partition commit when use table sql to write data from 
> Kafka into a Hive table
> -
>
> Key: FLINK-20947
> URL: https://issues.apache.org/jira/browse/FLINK-20947
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Weijia Xu
>Priority: Major
> Fix For: 1.13.0
>
>
> I use table sql to create stream with kafka source, and write data from Kafka 
> into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
> "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
> STRING,`payload` STRING,`timestamp` BIGINT, " +
> "  procTime AS PROCTIME()," +
> "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
> 1000,'-MM-dd HH:mm:ss'))," +
> "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" 
> +
> "  WITH ('connector' = 'kafka'," +
> " 'topic' = 'XXX-topic'," +
> " 'properties.bootstrap.servers'='kafka-server:9092'," +
> " 'properties.group.id' = 'XXX-group_id'," +
> " 'scan.startup.mode' = 'latest-offset'," +
> " 'format' = 'json'," +
> " 'json.fail-on-missing-field' = 'false'," +
> " 'json.ignore-parse-errors' = 'true' )"
> );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>  "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` 
> STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>  " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
> STRING)" +
>  " STORED AS PARQUET TBLPROPERTIES (" +
>  " 'sink.partition-commit.trigger' = 'partition-time'," +
>  " 'sink.partition-commit.delay' = '1 min'," +
>  " 'sink.partition-commit.policy.kind'='metastore,success-file'," 
> +
>  " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>  " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of 
> the partitions there's data coming in, while other partitions with no data 
> coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle 
> the situation for the "idle" source partition, but event though set this 
> config, it still cannot trigger the Hive partition commit (that means the 
> "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the 
> watermark for the "idle" partition will not advance, which cause the Hive 
> partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20947) Unable to trigger Hive partition commit when use table sql to write data from Kafka into a Hive table

2021-01-12 Thread Weijia Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijia Xu updated FLINK-20947:
--
Description: 
I use table sql to create stream with kafka source, and write data from Kafka 
into a Hive partitioned table.

The following sql to create kafka table:
{code:java}
// code placeholder
tableEnv.executeSql(
"CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
STRING,`payload` STRING,`timestamp` BIGINT, " +
"  procTime AS PROCTIME()," +
"  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
1000,'-MM-dd HH:mm:ss'))," +
"  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" +
"  WITH ('connector' = 'kafka'," +
" 'topic' = 'XXX-topic'," +
" 'properties.bootstrap.servers'='kafka-server:9092'," +
" 'properties.group.id' = 'XXX-group_id'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'format' = 'json'," +
" 'json.fail-on-missing-field' = 'false'," +
" 'json.ignore-parse-errors' = 'true' )"
);{code}
  

And, the following sql to create Hive table:
{code:java}
// code placeholder
tableEnv.executeSql(
 "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` STRING,`message_type` 
STRING,`payload` STRING,`event_ts` BIGINT ) " +
 " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
STRING)" +
 " STORED AS PARQUET TBLPROPERTIES (" +
 " 'sink.partition-commit.trigger' = 'partition-time'," +
 " 'sink.partition-commit.delay' = '1 min'," +
 " 'sink.partition-commit.policy.kind'='metastore,success-file'," +
 " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
 " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
$ts_hour:$ts_minute:00')");
{code}
 

For the kafka topic used above,  which has multi partitions,  with some of the 
partitions there's data coming in, while other partitions with no data coming.

I noticed that there's config "_table.exec.source.idle-timeout_" can handle the 
situation for the "idle" source partition, but event though set this config, it 
still cannot trigger the Hive partition commit (that means the "_SUCCESS" file 
will not be created for the partition).

After do the analysis for this issue, find the root cause is that the watermark 
for the "idle" partition will not advance, which cause the Hive partition 
cannot be committed.

 

 

  was:
I use table sql to create stream with kafka source, and write data from Kafka 
into a Hive partitioned table.

The following sql to create kafka table:

 
{code:java}
// code placeholder
tableEnv.executeSql(
"CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` 
STRING,`payload` STRING,`timestamp` BIGINT, " +
"  procTime AS PROCTIME()," +
"  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 
1000,'-MM-dd HH:mm:ss'))," +
"  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" +
"  WITH ('connector' = 'kafka'," +
" 'topic' = 'XXX-topic'," +
" 'properties.bootstrap.servers'='kafka-server:9092'," +
" 'properties.group.id' = 'XXX-group_id'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'format' = 'json'," +
" 'json.fail-on-missing-field' = 'false'," +
" 'json.ignore-parse-errors' = 'true' )"
);{code}
 

 

And, the following sql to create Hive table:
{code:java}
// code placeholder
tableEnv.executeSql(
 "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` STRING,`message_type` 
STRING,`payload` STRING,`event_ts` BIGINT ) " +
 " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute 
STRING)" +
 " STORED AS PARQUET TBLPROPERTIES (" +
 " 'sink.partition-commit.trigger' = 'partition-time'," +
 " 'sink.partition-commit.delay' = '1 min'," +
 " 'sink.partition-commit.policy.kind'='metastore,success-file'," +
 " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
 " 'partition.time-extractor.timestamp-pattern' = '$ts_date 
$ts_hour:$ts_minute:00')");
{code}
 

For the kafka topic used above,  which has multi partitions,  with some of the 
partitions there's data coming in, while other partitions with no data coming.

I noticed that there's config "_table.exec.source.idle-timeout_" can handle the 
situation for the "idle" source partition, but event though set this config, it 
still cannot trigger the Hive partition commit (that means the "_SUCCESS" file 
will not be created for the partition).

After do the analysis for this issue, find the root cause is that the watermark 
for the "idle" partition will not advance, which cause the Hive partition 
cannot be committed.

 

 


> Unable to trigger Hive p