[jira] [Commented] (FLINK-30945) FTS does not support multiple writers into the same table and topic

2023-02-17 Thread Xinbin Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690568#comment-17690568
 ] 

Xinbin Huang commented on FLINK-30945:
--

[~lzljs3620320] would you assign this to me?

> FTS does not support multiple writers into the same table and topic
> ---
>
> Key: FLINK-30945
> URL: https://issues.apache.org/jira/browse/FLINK-30945
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Vicky Papavasileiou
>Priority: Major
>
> When creating two different streaming jobs that INSERT INTO the same table 
> and kafka topic, the second job is never able to make progress as the 
> transaction gets constantly aborted due to the producer getting fenced.
> FTS should set the transactionalIdPrefix to avoid transactions of different 
> jobs clashing.
> {code:java}
> 2023-02-06 17:13:36,088 WARN org.apache.flink.runtime.taskmanager.Task [] - 
> Writer -> Global Committer -> Sink: end (1/1)#0 
> (8cf4197af9716623c3c19e7fa3d7c071_b5c8d46f3e7b141acf271f12622e752b_0_0) 
> switched from RUNNING to FAILED with failure cause: 
> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions 
> failed, logging first encountered failure at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:323)
>  at 
> org.apache.flink.table.store.connector.sink.StoreWriteOperator.notifyCheckpointComplete(StoreWriteOperator.java:175)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
>  at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
>  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479)
>  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at 
> java.lang.Thread.run(Thread.java:750) Caused by: 
> org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException:
>  There is a newer producer with the same transactionalId which fences the 
> current one. {code}
> Sample queries:
>  
>  
> {code:java}
> CREATE CATALOG table_store_catalog WITH (
>     'type'='table-store',
>     'warehouse'='s3://my-bucket/table-store'
>  );
> USE CATALOG table_store_catalog;
> SET 'execution.checkpointing.interval' = '10 s';
> CREATE TABLE word_count_kafka (
>      word STRING PRIMARY KEY NOT ENFORCED,
>      cnt BIGINT
>  ) WITH (
>      'log.system' = 'kafka',
>      'kafka.bootstrap.servers' = 'broker:9092',
>      'kafka.topic' = 'word_count_log'
>  );
> CREATE TEMPORARY TABLE word_table (
>      word STRING
>  ) WITH (
>      'connector' = 'datagen',
>      'fields.word.length' = '1'
>  );
> {code}
>  
> And the two INSERT jobs:
> {code:java}
> INSERT INTO word_count_kafka SELECT word, COUNT(*) FROM word_table GROUP BY 
> word;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30945) FTS does not support multiple writers into the same table and topic

2023-02-09 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686919#comment-17686919
 ] 

Jingsong Lee commented on FLINK-30945:
--

[~vicky_papavas] Thanks for reporting! I think this needs to be improved!

> FTS does not support multiple writers into the same table and topic
> ---
>
> Key: FLINK-30945
> URL: https://issues.apache.org/jira/browse/FLINK-30945
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Reporter: Vicky Papavasileiou
>Priority: Major
>
> When creating two different streaming jobs that INSERT INTO the same table 
> and kafka topic, the second job is never able to make progress as the 
> transaction gets constantly aborted due to the producer getting fenced.
> FTS should set the transactionalIdPrefix to avoid transactions of different 
> jobs clashing.
> {code:java}
> 2023-02-06 17:13:36,088 WARN org.apache.flink.runtime.taskmanager.Task [] - 
> Writer -> Global Committer -> Sink: end (1/1)#0 
> (8cf4197af9716623c3c19e7fa3d7c071_b5c8d46f3e7b141acf271f12622e752b_0_0) 
> switched from RUNNING to FAILED with failure cause: 
> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions 
> failed, logging first encountered failure at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:323)
>  at 
> org.apache.flink.table.store.connector.sink.StoreWriteOperator.notifyCheckpointComplete(StoreWriteOperator.java:175)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
>  at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
>  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479)
>  at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at 
> java.lang.Thread.run(Thread.java:750) Caused by: 
> org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException:
>  There is a newer producer with the same transactionalId which fences the 
> current one. {code}
> Sample queries:
>  
>  
> {code:java}
> CREATE CATALOG table_store_catalog WITH (
>     'type'='table-store',
>     'warehouse'='s3://my-bucket/table-store'
>  );
> USE CATALOG table_store_catalog;
> SET 'execution.checkpointing.interval' = '10 s';
> CREATE TABLE word_count_kafka (
>      word STRING PRIMARY KEY NOT ENFORCED,
>      cnt BIGINT
>  ) WITH (
>      'log.system' = 'kafka',
>      'kafka.bootstrap.servers' = 'broker:9092',
>      'kafka.topic' = 'word_count_log'
>  );
> CREATE TEMPORARY TABLE word_table (
>      word STRING
>  ) WITH (
>      'connector' = 'datagen',
>      'fields.word.length' = '1'
>  );
> {code}
>  
> And the two INSERT jobs:
> {code:java}
> INSERT INTO word_count_kafka SELECT word, COUNT(*) FROM word_table GROUP BY 
> word;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)