[jira] [Updated] (FLINK-35641) ParquetSchemaConverter should correctly handle field optionality

2024-06-18 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-35641:

Labels: patch-available  (was: )

> ParquetSchemaConverter should correctly handle field optionality
> 
>
> Key: FLINK-35641
> URL: https://issues.apache.org/jira/browse/FLINK-35641
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Alex Sorokoumov
>Assignee: Alex Sorokoumov
>Priority: Major
>  Labels: patch-available
>
> At the moment, 
> [ParquetSchemaConverter|https://github.com/apache/flink/blob/99d6fd3c68f46daf0397a35566414e1d19774c3d/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java#L64]
>  marks all fields as optional. This is not correct in general and especially 
> when it comes to handling maps. For example, 
> [parquet-tools|https://pypi.org/project/parquet-tools/] breaks on the Parquet 
> file produced by 
> [ParquetRowDataWriterTest#complexTypeTest|https://github.com/apache/flink/blob/99d6fd3c68f46daf0397a35566414e1d19774c3d/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java#L140-L151]:
> {noformat}
> parquet-tools inspect 
> /var/folders/sc/k3hr87fj4x169rdq9n107whwgp/T/junit14646865447948471989/3b328592-7315-48c6-8fa9-38da4048fb4e
> Traceback (most recent call last):
>   File "/Users/asorokoumov/.pyenv/versions/3.12.3/bin/parquet-tools", line 8, 
> in 
> sys.exit(main())
>  ^^
>   File 
> "/Users/asorokoumov/.pyenv/versions/3.12.3/lib/python3.12/site-packages/parquet_tools/cli.py",
>  line 26, in main
> args.handler(args)
>   File 
> "/Users/asorokoumov/.pyenv/versions/3.12.3/lib/python3.12/site-packages/parquet_tools/commands/inspect.py",
>  line 55, in _cli
> _execute_simple(
>   File 
> "/Users/asorokoumov/.pyenv/versions/3.12.3/lib/python3.12/site-packages/parquet_tools/commands/inspect.py",
>  line 63, in _execute_simple
> pq_file: pq.ParquetFile = pq.ParquetFile(filename)
>   
>   File 
> "/Users/asorokoumov/.pyenv/versions/3.12.3/lib/python3.12/site-packages/pyarrow/parquet/core.py",
>  line 317, in __init__
> self.reader.open(
>   File "pyarrow/_parquet.pyx", line 1492, in 
> pyarrow._parquet.ParquetReader.open
>   File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
> pyarrow.lib.ArrowInvalid: Map keys must be annotated as required.
> {noformat}
> [The correct thing to 
> do|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps] 
> is to mark nullable fields as optional, otherwise required.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35641) ParquetSchemaConverter should correctly handle field optionality

2024-06-18 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-35641:
---

Assignee: Alex Sorokoumov

> ParquetSchemaConverter should correctly handle field optionality
> 
>
> Key: FLINK-35641
> URL: https://issues.apache.org/jira/browse/FLINK-35641
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Alex Sorokoumov
>Assignee: Alex Sorokoumov
>Priority: Major
>
> At the moment, 
> [ParquetSchemaConverter|https://github.com/apache/flink/blob/99d6fd3c68f46daf0397a35566414e1d19774c3d/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java#L64]
>  marks all fields as optional. This is not correct in general and especially 
> when it comes to handling maps. For example, 
> [parquet-tools|https://pypi.org/project/parquet-tools/] breaks on the Parquet 
> file produced by 
> [ParquetRowDataWriterTest#complexTypeTest|https://github.com/apache/flink/blob/99d6fd3c68f46daf0397a35566414e1d19774c3d/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java#L140-L151]:
> {noformat}
> parquet-tools inspect 
> /var/folders/sc/k3hr87fj4x169rdq9n107whwgp/T/junit14646865447948471989/3b328592-7315-48c6-8fa9-38da4048fb4e
> Traceback (most recent call last):
>   File "/Users/asorokoumov/.pyenv/versions/3.12.3/bin/parquet-tools", line 8, 
> in 
> sys.exit(main())
>  ^^
>   File 
> "/Users/asorokoumov/.pyenv/versions/3.12.3/lib/python3.12/site-packages/parquet_tools/cli.py",
>  line 26, in main
> args.handler(args)
>   File 
> "/Users/asorokoumov/.pyenv/versions/3.12.3/lib/python3.12/site-packages/parquet_tools/commands/inspect.py",
>  line 55, in _cli
> _execute_simple(
>   File 
> "/Users/asorokoumov/.pyenv/versions/3.12.3/lib/python3.12/site-packages/parquet_tools/commands/inspect.py",
>  line 63, in _execute_simple
> pq_file: pq.ParquetFile = pq.ParquetFile(filename)
>   
>   File 
> "/Users/asorokoumov/.pyenv/versions/3.12.3/lib/python3.12/site-packages/pyarrow/parquet/core.py",
>  line 317, in __init__
> self.reader.open(
>   File "pyarrow/_parquet.pyx", line 1492, in 
> pyarrow._parquet.ParquetReader.open
>   File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
> pyarrow.lib.ArrowInvalid: Map keys must be annotated as required.
> {noformat}
> [The correct thing to 
> do|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps] 
> is to mark nullable fields as optional, otherwise required.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26409) Remove meaningless Kafka connector test case KafkaConsumerTestBase.runBrokerFailureTest

2023-11-24 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-26409:

Fix Version/s: kafka-3.0.3
   (was: kafka-3.0.2)

> Remove meaningless Kafka connector test case 
> KafkaConsumerTestBase.runBrokerFailureTest
> ---
>
> Key: FLINK-26409
> URL: https://issues.apache.org/jira/browse/FLINK-26409
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Qingsheng Ren
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: kafka-3.1.0, kafka-3.0.3
>
>
> {{KafkaConsumerTestBase#runBrokerFailureTest}} is actually validating the 
> functionality of Kafka broker and consumer (change partition leader when the 
> ex-leader is down and let consumer switch to the new one without exception) 
> instead of KafkaSource / FlinkKafkaConsumer, so we could consider removing 
> this case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30400) Stop bundling connector-base in externalized connectors

2023-11-24 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-30400:

Fix Version/s: kafka-3.0.2

> Stop bundling connector-base in externalized connectors
> ---
>
> Key: FLINK-30400
> URL: https://issues.apache.org/jira/browse/FLINK-30400
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Reporter: Chesnay Schepler
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
> Fix For: elasticsearch-3.1.0, aws-connector-4.2.0, kafka-3.1.0, 
> rabbitmq-3.0.2, kafka-3.0.2
>
>
> Check that none of the externalized connectors bundle connector-base; if so 
> remove the bundling and schedule a new minor release.
> Bundling this module is highly problematic w.r.t. binary compatibility, since 
> bundled classes may rely on internal APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30400) Stop bundling connector-base in externalized connectors

2023-11-24 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789514#comment-17789514
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-30400:
-

fixed for apache/flink-connector-kafka:v3.0 as well:
44e49e777f78bd3d13843f0711f617d373e4

> Stop bundling connector-base in externalized connectors
> ---
>
> Key: FLINK-30400
> URL: https://issues.apache.org/jira/browse/FLINK-30400
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Reporter: Chesnay Schepler
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
> Fix For: elasticsearch-3.1.0, aws-connector-4.2.0, kafka-3.1.0, 
> rabbitmq-3.0.2
>
>
> Check that none of the externalized connectors bundle connector-base; if so 
> remove the bundling and schedule a new minor release.
> Bundling this module is highly problematic w.r.t. binary compatibility, since 
> bundled classes may rely on internal APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-16 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786873#comment-17786873
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33545:
-

https://issues.apache.org/jira/browse/FLINK-33293 related issue

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of the KafkaSink process.
> I'm leaning towards 2nd option as it 

[jira] [Commented] (FLINK-33293) Data loss with Kafka Sink

2023-11-16 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786872#comment-17786872
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33293:
-

We have a new ticket whose description seems to break my understanding / 
assumption of how flushing has been working in the KafkaWriter: 
https://issues.apache.org/jira/browse/FLINK-33545

If what that ticket describes is indeed true, that would also answer the 
mystery here ...
i.e. in at-least-once mode, flushing the producer doesn't actually flush 
everything + ensure all records are successfully written before the checkpoint 
barrier is processed

> Data loss with Kafka Sink
> -
>
> Key: FLINK-33293
> URL: https://issues.apache.org/jira/browse/FLINK-33293
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1, 1.16.2, 1.17.1
>Reporter: Jasmin Redzepovic
>Priority: Major
> Attachments: job.log, job_1_16_2_run1.log, job_1_16_2_run2.log, 
> job_1_17_1_run1.log, job_1_17_1_run2.log
>
>
> More info in Slack discussion: 
> [https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1]
>  
> *TLDR:*
> (in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and 
> 1.17 I can reproduce it)
> I have created a sink topic with 8 partitions, a replication factor of 3, and 
> a minimum in-sync replicas of 2. The consumer properties are set to their 
> default values.
> For the producer, I made changes to the delivery.timeout.ms and 
> request.timeout.ms properties, setting them to *5000ms* and *4000ms* 
> respectively. (acks are set to -1 by default, which is equals to _all_ I 
> guess)
> KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job 
> parallelism is set to 1 and the checkpointing interval is set to 2000ms. I 
> started a Flink Job and monitored its logs. Additionally, I was consuming the 
> __consumer_offsets topic in parallel to track when offsets are committed for 
> my consumer group.
> The problematic part occurs during checkpoint 5. Its duration was 5009ms, 
> which exceeds the delivery timeout for Kafka (5000ms). Although it was marked 
> as completed, I believe that the output buffer of KafkaSink was not fully 
> acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 
> but immediately encountered a Kafka {_}TimeoutException: Expiring N 
> records{_}.
> I suspect that this exception originated from checkpoint 5 and that 
> checkpoint 5 should not have been considered successful. The job then failed 
> but recovered from checkpoint 5. Some time after checkpoint 7, consumer 
> offsets were committed to Kafka, and this process repeated once more at 
> checkpoint 9.
> Since the offsets of checkpoint 5 were committed to Kafka, but the output 
> buffer was only partially delivered, there has been data loss. I confirmed 
> this when sinking the topic to the database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-16 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786863#comment-17786863
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-33545 at 11/16/23 5:12 PM:
---

{code:java}
In a scenario that:
 * KafkaSource ingested records #1 to #100
 * KafkaSink only had chance to send records #1 to #96
 * with a batching interval of 5ms

when checkpoint has been initiated, flush will only confirm the sending of 
record #1 to #96.{code}
this is where I'm a bit lost. A flush() call on a producer is blocking until 
all records are acked + after the blocking flush returns, we check if any of 
the records that were flushed resulted in an error in which case we fail the 
job and not complete the checkpoint. See that code here:

[https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L201-L208]

So, how could the KafkaWriter only flush records #1 to #96 and already proceed 
to process the checkpoint barrier? This can only ever happen with unaligned 
checkpointing, and even in that case, records #97 to #100 will be part of the 
channel state that will be replayed upon recovery.


was (Author: tzulitai):
{code}

In a scenario that:
 * KafkaSource ingested records #1 to #100
 * KafkaSink only had chance to send records #1 to #96
 * with a batching interval of 5ms

{code}

this is where I'm a bit lost. A flush() call on a producer is blocking until 
all records are acked + after the blocking flush returns, we check if any of 
the records that were flushed resulted in an error in which case we fail the 
job and not complete the checkpoint. See that code here:

[https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L201-L208]

So, how could the KafkaWriter only flush records #1 to #96 and already proceed 
to process the checkpoint barrier?

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-16 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786863#comment-17786863
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33545:
-

{code}

In a scenario that:
 * KafkaSource ingested records #1 to #100
 * KafkaSink only had chance to send records #1 to #96
 * with a batching interval of 5ms

{code}

this is where I'm a bit lost. A flush() call on a producer is blocking until 
all records are acked + after the blocking flush returns, we check if any of 
the records that were flushed resulted in an error in which case we fail the 
job and not complete the checkpoint. See that code here:

[https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L201-L208]

So, how could the KafkaWriter only flush records #1 to #96 and already proceed 
to process the checkpoint barrier?

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-16 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786860#comment-17786860
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33545:
-

hey [~aeolus811tw], thanks for the detailed writeup.

I'm still trying to understand the need for the second flush, but wanted to 
give a quick comment on:

> prepareCommit will produce a list of KafkaCommittable that corresponds to 
>Transactional KafkaProducer to be committed. And a catch up flush will take 
>place during *commit* step. Whether this was intentional or not, due to the 
>fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
>end of EXACTLY_ONCE actually ensured everything fenced in the current 
>checkpoint will be sent to Kafka, or fail the checkpoint if not successful.

the {{producer.commitTransaction()}} call does a flush first before sending out 
the request to commit the Kafka transaction. The assumption in the 
{{KafkaWriter}} code has been that by the time {{commitTransaction()}} is 
called, all pending records are already flushed, so the internal flush done at 
commit time should have been a no-op.

If I am understanding you correctly, as it sounds like in this ticket, that 
second flush actually is not a no-op?

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first 

[jira] [Assigned] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-16 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-33545:
---

Assignee: Tzu-Li (Gordon) Tai

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of the KafkaSink process.
> I'm leaning towards 2nd option as it does not make sense to flush then do 
> checkpoint, it 

[jira] [Assigned] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-16 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-33545:
---

Assignee: Kevin Tseng  (was: Tzu-Li (Gordon) Tai)

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of the KafkaSink process.
> I'm leaning towards 2nd option as it does not make sense to flush then do 
> 

[jira] [Commented] (FLINK-33497) Update the Kafka connector to support DISTRIBUTED BY clause

2023-11-09 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784479#comment-17784479
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33497:
-

[~twalthr] makes sense to integrate the Kafka Connector for this as soon as the 
clause is ready. Thanks for the ping, I'll assign this to myself.

> Update the Kafka connector to support DISTRIBUTED BY clause
> ---
>
> Key: FLINK-33497
> URL: https://issues.apache.org/jira/browse/FLINK-33497
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Timo Walther
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> The Kafka connector can be one of the first connectors supporting the 
> DISTRIBUTED BY clause. The clause can be translated into 'key.fields' and 
> 'properties.num.partitons' in the WITH clause.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33497) Update the Kafka connector to support DISTRIBUTED BY clause

2023-11-09 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-33497:
---

Assignee: Tzu-Li (Gordon) Tai

> Update the Kafka connector to support DISTRIBUTED BY clause
> ---
>
> Key: FLINK-33497
> URL: https://issues.apache.org/jira/browse/FLINK-33497
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Timo Walther
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> The Kafka connector can be one of the first connectors supporting the 
> DISTRIBUTED BY clause. The clause can be translated into 'key.fields' and 
> 'properties.num.partitons' in the WITH clause.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26409) Remove meaningless Kafka connector test case KafkaConsumerTestBase.runBrokerFailureTest

2023-10-25 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-26409:

Fix Version/s: kafka-3.0.2
   (was: kafka-3.0.1)

> Remove meaningless Kafka connector test case 
> KafkaConsumerTestBase.runBrokerFailureTest
> ---
>
> Key: FLINK-26409
> URL: https://issues.apache.org/jira/browse/FLINK-26409
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Qingsheng Ren
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: kafka-3.1.0, kafka-3.0.2
>
>
> {{KafkaConsumerTestBase#runBrokerFailureTest}} is actually validating the 
> functionality of Kafka broker and consumer (change partition leader when the 
> ex-leader is down and let consumer switch to the new one without exception) 
> instead of KafkaSource / FlinkKafkaConsumer, so we could consider removing 
> this case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-10-23 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-28303:

Affects Version/s: 1.17.1
   1.16.2
   kafka-3.0.0
   1.15.4

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4, 1.15.4, kafka-3.0.0, 1.16.2, 1.17.1
>Reporter: Robert Metzger
>Assignee: tanjialiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-10-23 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-28303.
---
Resolution: Fixed

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Assignee: tanjialiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-10-23 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778783#comment-17778783
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-28303:
-

Merged.

apache/flink-connector-kafka:main - 54e3b70deb349538edba1ec2b051ed9d9f79b563
apache/flink-connector-kafka:v3.0 538e9c10463dbdf0942c8858678e98bf3522d566

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Assignee: tanjialiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26409) Remove meaningless Kafka connector test case KafkaConsumerTestBase.runBrokerFailureTest

2023-10-17 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-26409:

Affects Version/s: kafka-3.0.0

> Remove meaningless Kafka connector test case 
> KafkaConsumerTestBase.runBrokerFailureTest
> ---
>
> Key: FLINK-26409
> URL: https://issues.apache.org/jira/browse/FLINK-26409
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Qingsheng Ren
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> {{KafkaConsumerTestBase#runBrokerFailureTest}} is actually validating the 
> functionality of Kafka broker and consumer (change partition leader when the 
> ex-leader is down and let consumer switch to the new one without exception) 
> instead of KafkaSource / FlinkKafkaConsumer, so we could consider removing 
> this case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26409) Remove meaningless Kafka connector test case KafkaConsumerTestBase.runBrokerFailureTest

2023-10-17 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-26409:

Fix Version/s: kafka-3.0.1
   kafka-3.1.0

> Remove meaningless Kafka connector test case 
> KafkaConsumerTestBase.runBrokerFailureTest
> ---
>
> Key: FLINK-26409
> URL: https://issues.apache.org/jira/browse/FLINK-26409
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> {{KafkaConsumerTestBase#runBrokerFailureTest}} is actually validating the 
> functionality of Kafka broker and consumer (change partition leader when the 
> ex-leader is down and let consumer switch to the new one without exception) 
> instead of KafkaSource / FlinkKafkaConsumer, so we could consider removing 
> this case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26409) Remove meaningless Kafka connector test case KafkaConsumerTestBase.runBrokerFailureTest

2023-10-17 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776482#comment-17776482
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-26409:
-

Perhaps the only value that test gives us is "tests as proof/documentation" 
that the Kafka Source connector can continue to run as is in case of broker 
failures in a multi-broker setup. However, this does seem to be completely 
irrelevant to the connector code.

> Remove meaningless Kafka connector test case 
> KafkaConsumerTestBase.runBrokerFailureTest
> ---
>
> Key: FLINK-26409
> URL: https://issues.apache.org/jira/browse/FLINK-26409
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> {{KafkaConsumerTestBase#runBrokerFailureTest}} is actually validating the 
> functionality of Kafka broker and consumer (change partition leader when the 
> ex-leader is down and let consumer switch to the new one without exception) 
> instead of KafkaSource / FlinkKafkaConsumer, so we could consider removing 
> this case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-26409) Remove meaningless Kafka connector test case KafkaConsumerTestBase.runBrokerFailureTest

2023-10-17 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-26409:
---

Assignee: Tzu-Li (Gordon) Tai

> Remove meaningless Kafka connector test case 
> KafkaConsumerTestBase.runBrokerFailureTest
> ---
>
> Key: FLINK-26409
> URL: https://issues.apache.org/jira/browse/FLINK-26409
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> {{KafkaConsumerTestBase#runBrokerFailureTest}} is actually validating the 
> functionality of Kafka broker and consumer (change partition leader when the 
> ex-leader is down and let consumer switch to the new one without exception) 
> instead of KafkaSource / FlinkKafkaConsumer, so we could consider removing 
> this case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-26409) Remove meaningless Kafka connector test case KafkaConsumerTestBase.runBrokerFailureTest

2023-10-17 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-26409:
---

Assignee: (was: Tzu-Li (Gordon) Tai)

> Remove meaningless Kafka connector test case 
> KafkaConsumerTestBase.runBrokerFailureTest
> ---
>
> Key: FLINK-26409
> URL: https://issues.apache.org/jira/browse/FLINK-26409
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
>
> {{KafkaConsumerTestBase#runBrokerFailureTest}} is actually validating the 
> functionality of Kafka broker and consumer (change partition leader when the 
> ex-leader is down and let consumer switch to the new one without exception) 
> instead of KafkaSource / FlinkKafkaConsumer, so we could consider removing 
> this case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26409) Remove meaningless Kafka connector test case KafkaConsumerTestBase.runBrokerFailureTest

2023-10-17 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776480#comment-17776480
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-26409:
-

This test has indeed historically been quite flaky.
If I remember correctly, the test was modified to test against multi-broker 
clusters due to it being flaky in the past, but apparently that didn't fully 
resolve things.

A few data points:
 * I double checked if the KafkaSource is doing anything special with the Kafka 
consumer in order for it to handle partition leader switches, and it isn't - 
leader switching is internal implementation details of the Kafka consumer 
client.
 * We already have an e2e exactly-once test with artificial failures within a 
Flink job.

All in all, it sounds like a good call to remove this flaky test as it's not 
really adding much value for test coverage.

> Remove meaningless Kafka connector test case 
> KafkaConsumerTestBase.runBrokerFailureTest
> ---
>
> Key: FLINK-26409
> URL: https://issues.apache.org/jira/browse/FLINK-26409
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
>
> {{KafkaConsumerTestBase#runBrokerFailureTest}} is actually validating the 
> functionality of Kafka broker and consumer (change partition leader when the 
> ex-leader is down and let consumer switch to the new one without exception) 
> instead of KafkaSource / FlinkKafkaConsumer, so we could consider removing 
> this case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-26409) Remove meaningless Kafka connector test case KafkaConsumerTestBase.runBrokerFailureTest

2023-10-17 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-26409:
---

Assignee: Tzu-Li (Gordon) Tai

> Remove meaningless Kafka connector test case 
> KafkaConsumerTestBase.runBrokerFailureTest
> ---
>
> Key: FLINK-26409
> URL: https://issues.apache.org/jira/browse/FLINK-26409
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> {{KafkaConsumerTestBase#runBrokerFailureTest}} is actually validating the 
> functionality of Kafka broker and consumer (change partition leader when the 
> ex-leader is down and let consumer switch to the new one without exception) 
> instead of KafkaSource / FlinkKafkaConsumer, so we could consider removing 
> this case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33293) Data loss with Kafka Sink

2023-10-17 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776361#comment-17776361
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-33293 at 10/17/23 7:30 PM:
---

[~jredzepovic] was this actually reproduced with versions 1.16.2 and 1.17.1? I 
see the ticket tagged with these versions, but in the linked Slack thread it 
seems like it was only reproduced with version 1.16.1.

I'm asking because, at a first glance, checkpoints incorrectly succeeding in 
the case of flush failures seems to be caused by FLINK-31305, which is fixed 
via Flink Kafka connector versions 1.16.2, 1.17.1, and 3.0.0-1.17 (the 
externalized Kafka connector).

Prior to FLINK-31305, the KafkaSink was not correctly checking that the flush 
was fully successful before acknowledging checkpoint complete.

If you did not test against those versions, could you try that and report back?


was (Author: tzulitai):
[~jredzepovic] was this actually reproduced with versions 1.16.2 and 1.17.1? I 
see the ticket tagged with these versions, but in the linked Slack thread it 
seems like it was only reproduced with version 1.16.1.

I'm asking because, at a first glance, checkpoints incorrectly succeeding in 
the case of flush failures seems to be caused by FLINK-31305, which is fixed 
via Flink Kafka connector versions 1.16.2, 1.17.1, and 3.0.0-1.17 (the 
externalized Kafka connector).

If you did not test against those versions, could you try that and report back?

> Data loss with Kafka Sink
> -
>
> Key: FLINK-33293
> URL: https://issues.apache.org/jira/browse/FLINK-33293
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1, 1.16.2, 1.17.1
>Reporter: Jasmin Redzepovic
>Priority: Major
> Attachments: job.log
>
>
> More info in Slack discussion: 
> [https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1]
>  
> *TLDR:*
> (in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and 
> 1.17 I can reproduce it)
> I have created a sink topic with 8 partitions, a replication factor of 3, and 
> a minimum in-sync replicas of 2. The consumer properties are set to their 
> default values.
> For the producer, I made changes to the delivery.timeout.ms and 
> request.timeout.ms properties, setting them to *5000ms* and *4000ms* 
> respectively. (acks are set to -1 by default, which is equals to _all_ I 
> guess)
> KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job 
> parallelism is set to 1 and the checkpointing interval is set to 2000ms. I 
> started a Flink Job and monitored its logs. Additionally, I was consuming the 
> __consumer_offsets topic in parallel to track when offsets are committed for 
> my consumer group.
> The problematic part occurs during checkpoint 5. Its duration was 5009ms, 
> which exceeds the delivery timeout for Kafka (5000ms). Although it was marked 
> as completed, I believe that the output buffer of KafkaSink was not fully 
> acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 
> but immediately encountered a Kafka {_}TimeoutException: Expiring N 
> records{_}.
> I suspect that this exception originated from checkpoint 5 and that 
> checkpoint 5 should not have been considered successful. The job then failed 
> but recovered from checkpoint 5. Some time after checkpoint 7, consumer 
> offsets were committed to Kafka, and this process repeated once more at 
> checkpoint 9.
> Since the offsets of checkpoint 5 were committed to Kafka, but the output 
> buffer was only partially delivered, there has been data loss. I confirmed 
> this when sinking the topic to the database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33293) Data loss with Kafka Sink

2023-10-17 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776361#comment-17776361
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-33293 at 10/17/23 7:29 PM:
---

[~jredzepovic] was this actually reproduced with versions 1.16.2 and 1.17.1? I 
see the ticket tagged with these versions, but in the linked Slack thread it 
seems like it was only reproduced with version 1.16.1.

I'm asking because, at a first glance, checkpoints incorrectly succeeding in 
the case of flush failures seems to be caused by FLINK-31305, which is fixed 
via Flink Kafka connector versions 1.16.2, 1.17.1, and 3.0.0-1.17 (the 
externalized Kafka connector).

If you did not test against those versions, could you try that and report back?


was (Author: tzulitai):
[~jredzepovic] was this actually reproduced with versions 1.16.2 and 1.17.1?

I'm asking because, at a first glance, checkpoints incorrectly succeeding in 
the case of flush failures seems to be caused by FLINK-31305, which is fixed 
via Flink Kafka connector versions 1.16.2, 1.17.1, and 3.0.0-1.17 (the 
externalized Kafka connector).

If you did not test against those versions, could you try that and report back?

> Data loss with Kafka Sink
> -
>
> Key: FLINK-33293
> URL: https://issues.apache.org/jira/browse/FLINK-33293
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1, 1.16.2, 1.17.1
>Reporter: Jasmin Redzepovic
>Priority: Major
> Attachments: job.log
>
>
> More info in Slack discussion: 
> [https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1]
>  
> *TLDR:*
> (in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and 
> 1.17 I can reproduce it)
> I have created a sink topic with 8 partitions, a replication factor of 3, and 
> a minimum in-sync replicas of 2. The consumer properties are set to their 
> default values.
> For the producer, I made changes to the delivery.timeout.ms and 
> request.timeout.ms properties, setting them to *5000ms* and *4000ms* 
> respectively. (acks are set to -1 by default, which is equals to _all_ I 
> guess)
> KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job 
> parallelism is set to 1 and the checkpointing interval is set to 2000ms. I 
> started a Flink Job and monitored its logs. Additionally, I was consuming the 
> __consumer_offsets topic in parallel to track when offsets are committed for 
> my consumer group.
> The problematic part occurs during checkpoint 5. Its duration was 5009ms, 
> which exceeds the delivery timeout for Kafka (5000ms). Although it was marked 
> as completed, I believe that the output buffer of KafkaSink was not fully 
> acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 
> but immediately encountered a Kafka {_}TimeoutException: Expiring N 
> records{_}.
> I suspect that this exception originated from checkpoint 5 and that 
> checkpoint 5 should not have been considered successful. The job then failed 
> but recovered from checkpoint 5. Some time after checkpoint 7, consumer 
> offsets were committed to Kafka, and this process repeated once more at 
> checkpoint 9.
> Since the offsets of checkpoint 5 were committed to Kafka, but the output 
> buffer was only partially delivered, there has been data loss. I confirmed 
> this when sinking the topic to the database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33293) Data loss with Kafka Sink

2023-10-17 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776361#comment-17776361
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33293:
-

[~jredzepovic] was this actually reproduced with versions 1.16.2 and 1.17.1?

I'm asking because, at a first glance, checkpoints incorrectly succeeding in 
the case of flush failures seems to be caused by FLINK-31305, which is fixed 
via Flink Kafka connector versions 1.16.2, 1.17.1, and 3.0.0-1.17 (the 
externalized Kafka connector).

If you did not test against those versions, could you try that and report back?

> Data loss with Kafka Sink
> -
>
> Key: FLINK-33293
> URL: https://issues.apache.org/jira/browse/FLINK-33293
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1, 1.16.2, 1.17.1
>Reporter: Jasmin Redzepovic
>Priority: Major
> Attachments: job.log
>
>
> More info in Slack discussion: 
> [https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1]
>  
> *TLDR:*
> (in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and 
> 1.17 I can reproduce it)
> I have created a sink topic with 8 partitions, a replication factor of 3, and 
> a minimum in-sync replicas of 2. The consumer properties are set to their 
> default values.
> For the producer, I made changes to the delivery.timeout.ms and 
> request.timeout.ms properties, setting them to *5000ms* and *4000ms* 
> respectively. (acks are set to -1 by default, which is equals to _all_ I 
> guess)
> KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job 
> parallelism is set to 1 and the checkpointing interval is set to 2000ms. I 
> started a Flink Job and monitored its logs. Additionally, I was consuming the 
> __consumer_offsets topic in parallel to track when offsets are committed for 
> my consumer group.
> The problematic part occurs during checkpoint 5. Its duration was 5009ms, 
> which exceeds the delivery timeout for Kafka (5000ms). Although it was marked 
> as completed, I believe that the output buffer of KafkaSink was not fully 
> acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 
> but immediately encountered a Kafka {_}TimeoutException: Expiring N 
> records{_}.
> I suspect that this exception originated from checkpoint 5 and that 
> checkpoint 5 should not have been considered successful. The job then failed 
> but recovered from checkpoint 5. Some time after checkpoint 7, consumer 
> offsets were committed to Kafka, and this process repeated once more at 
> checkpoint 9.
> Since the offsets of checkpoint 5 were committed to Kafka, but the output 
> buffer was only partially delivered, there has been data loss. I confirmed 
> this when sinking the topic to the database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33191) Kafka Connector should directly depend on 3rd-party libs instead of flink-shaded repo

2023-10-11 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17774126#comment-17774126
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33191:
-

main
bd260f14dd280f464e5dcea76b3735782312b4ae

v3.0
58e500378193b76fe2262818de3703208a028728

> Kafka Connector should directly depend on 3rd-party libs instead of 
> flink-shaded repo
> -
>
> Key: FLINK-33191
> URL: https://issues.apache.org/jira/browse/FLINK-33191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Jing Ge
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33191) Kafka Connector should directly depend on 3rd-party libs instead of flink-shaded repo

2023-10-11 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-33191.
---
Resolution: Fixed

> Kafka Connector should directly depend on 3rd-party libs instead of 
> flink-shaded repo
> -
>
> Key: FLINK-33191
> URL: https://issues.apache.org/jira/browse/FLINK-33191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Jing Ge
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-33231:
---

Assignee: Tzu-Li (Gordon) Tai

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> KafkaSourceReader.notifyCheckpointComplete method.
> However if the committedPartitions is empty for the given checkpoint, then 
> the KafkaSourceFetcherManager.commitOffsets method returns.  
> [KafkaSourceFetcherManager line 
> 

[jira] [Updated] (FLINK-33191) Kafka Connector should directly depend on 3rd-party libs instead of flink-shaded repo

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-33191:

Affects Version/s: kafka-3.0.0
   (was: 1.18.0)

> Kafka Connector should directly depend on 3rd-party libs instead of 
> flink-shaded repo
> -
>
> Key: FLINK-33191
> URL: https://issues.apache.org/jira/browse/FLINK-33191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Jing Ge
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33191) Kafka Connector should directly depend on 3rd-party libs instead of flink-shaded repo

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-33191:

Fix Version/s: kafka-3.0.1
   kafka-3.1.0

> Kafka Connector should directly depend on 3rd-party libs instead of 
> flink-shaded repo
> -
>
> Key: FLINK-33191
> URL: https://issues.apache.org/jira/browse/FLINK-33191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Jing Ge
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773790#comment-17773790
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33231:
-

[~lauri.suurvali] I think that would work, but the issue is that in the 
callback, on success we log that a commit was successful, and also source 
reader metrics is bumped. Which can be confusing if no offsets were actually 
committed. Moreoever, with that approach we would be relying on internal 
details of the Kafka client that is hard to cover with tests (i.e. things might 
silently change such that a remote request is issued even if provided offsets 
are empty, which is not ideal).

So, I think we can be a bit cleaner by short-cutting the 
{{notifyCheckpointComplete}} method such that is the offsets for a checkpoint 
is empty, we don't even attempt to use the fetcher manager to try to commit 
offsets.

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Blocker
> Fix For: kafka-3.0.1, kafka-3.1.0
>
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: 

[jira] [Commented] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773739#comment-17773739
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33231:
-

Making this a blocker for the upcoming Kafka connector releases.

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Blocker
> Fix For: kafka-3.0.1, kafka-3.1.0
>
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> KafkaSourceReader.notifyCheckpointComplete method.
> However if the committedPartitions is empty for the given checkpoint, then 
> the KafkaSourceFetcherManager.commitOffsets method returns.  
> [KafkaSourceFetcherManager line 
> 

[jira] [Updated] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-33231:

Fix Version/s: kafka-3.0.1
   kafka-3.1.0

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Blocker
> Fix For: kafka-3.0.1, kafka-3.1.0
>
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> KafkaSourceReader.notifyCheckpointComplete method.
> However if the committedPartitions is empty for the given checkpoint, then 
> the KafkaSourceFetcherManager.commitOffsets method returns.  
> [KafkaSourceFetcherManager line 
> 78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78]
> 

[jira] [Updated] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-33231:

Priority: Blocker  (was: Major)

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Blocker
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> KafkaSourceReader.notifyCheckpointComplete method.
> However if the committedPartitions is empty for the given checkpoint, then 
> the KafkaSourceFetcherManager.commitOffsets method returns.  
> [KafkaSourceFetcherManager line 
> 78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78]
> {code:java}
> if (offsetsToCommit.isEmpty()) {
> return;
> } {code}
> We can 

[jira] [Commented] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773738#comment-17773738
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33231:
-

[~lauri.suurvali] great debugging!

I think the fix is basically, in KafkaSourceFetcherManager#commitOffsets, if 
the provided offsetsToCommitMap is empty, the callback (where the logic for 
truncating the map) should be used as well. Currently, it just returns without 
calling the callback at all. Code link: 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78-L80

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Major
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> 

[jira] [Commented] (FLINK-33104) Nightly run for Flink Kafka connector fails due to architecture tests failing

2023-10-05 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772385#comment-17772385
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33104:
-

[~echauchot] this seems to be the conclusion, correct?: 
[https://lists.apache.org/thread/pr0g812olzpgz21d9oodhc46db9jpxo3]

If that's the case, I'll do the same for the Kafka connector as well then. i.e.
 * for now only run archunit tests when building against 1.17.1
 * once 1.18 drops, update the violation store and switch to run archunit tests 
only against 1.18.0

> Nightly run for Flink Kafka connector fails due to architecture tests failing
> -
>
> Key: FLINK-33104
> URL: https://issues.apache.org/jira/browse/FLINK-33104
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Priority: Blocker
>
> {code:java}
> 2023-09-17T00:29:07.1675694Z [WARNING] Tests run: 18, Failures: 0, Errors: 0, 
> Skipped: 9, Time elapsed: 308.532 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerMigrationTest
> 2023-09-17T00:29:07.5171608Z [INFO] 
> 2023-09-17T00:29:07.5172360Z [INFO] Results:
> 2023-09-17T00:29:07.5172773Z [INFO] 
> 2023-09-17T00:29:07.5173139Z [ERROR] Failures: 
> 2023-09-17T00:29:07.5174181Z [ERROR]   Architecture Violation [Priority: 
> MEDIUM] - Rule 'ITCASE tests should use a MiniCluster resource or extension' 
> was violated (13 times):
> 2023-09-17T00:29:07.5176050Z 
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducerITCase does 
> not satisfy: only one of the following predicates match:
> 2023-09-17T00:29:07.5177452Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5179831Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5181277Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> InternalMiniClusterExtension
> 2023-09-17T00:29:07.5182154Z * reside outside of package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> MiniClusterExtension
> 2023-09-17T00:29:07.5182951Z  or contain any fields that are public, static, 
> and of type MiniClusterWithClientResource and final and annotated with 
> @ClassRule or contain any fields that is of type 
> MiniClusterWithClientResource and public and final and not static and 
> annotated with @Rule
> 2023-09-17T00:29:07.5183906Z 
> org.apache.flink.connector.kafka.sink.KafkaSinkITCase does not satisfy: only 
> one of the following predicates match:
> 2023-09-17T00:29:07.5184769Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5185812Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5186880Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> InternalMiniClusterExtension
> 2023-09-17T00:29:07.5187929Z * reside outside of package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> MiniClusterExtension
> 2023-09-17T00:29:07.5189073Z  or contain any fields that are public, static, 
> and of type MiniClusterWithClientResource and final and annotated with 
> @ClassRule or contain any fields that is of type 
> MiniClusterWithClientResource and public and final and not static and 
> annotated with @Rule
> 2023-09-17T00:29:07.5190076Z 
> org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase does not 
> satisfy: only one of the following predicates match:
> 2023-09-17T00:29:07.5190946Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5191983Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5192845Z * reside in a package 
> 

[jira] [Comment Edited] (FLINK-33104) Nightly run for Flink Kafka connector fails due to architecture tests failing

2023-10-05 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772385#comment-17772385
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-33104 at 10/5/23 9:10 PM:
--

[~echauchot] this seems to be the conclusion, correct?: 
[https://lists.apache.org/thread/pr0g812olzpgz21d9oodhc46db9jpxo3]

If that's the case, I'll do the same for the Kafka connector as well then. i.e.
 * for now only run archunit tests when building against 1.17.1
 * once 1.18 drops, update the violation store and switch to run archunit tests 
only when building against 1.18.0


was (Author: tzulitai):
[~echauchot] this seems to be the conclusion, correct?: 
[https://lists.apache.org/thread/pr0g812olzpgz21d9oodhc46db9jpxo3]

If that's the case, I'll do the same for the Kafka connector as well then. i.e.
 * for now only run archunit tests when building against 1.17.1
 * once 1.18 drops, update the violation store and switch to run archunit tests 
only against 1.18.0

> Nightly run for Flink Kafka connector fails due to architecture tests failing
> -
>
> Key: FLINK-33104
> URL: https://issues.apache.org/jira/browse/FLINK-33104
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Priority: Blocker
>
> {code:java}
> 2023-09-17T00:29:07.1675694Z [WARNING] Tests run: 18, Failures: 0, Errors: 0, 
> Skipped: 9, Time elapsed: 308.532 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerMigrationTest
> 2023-09-17T00:29:07.5171608Z [INFO] 
> 2023-09-17T00:29:07.5172360Z [INFO] Results:
> 2023-09-17T00:29:07.5172773Z [INFO] 
> 2023-09-17T00:29:07.5173139Z [ERROR] Failures: 
> 2023-09-17T00:29:07.5174181Z [ERROR]   Architecture Violation [Priority: 
> MEDIUM] - Rule 'ITCASE tests should use a MiniCluster resource or extension' 
> was violated (13 times):
> 2023-09-17T00:29:07.5176050Z 
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducerITCase does 
> not satisfy: only one of the following predicates match:
> 2023-09-17T00:29:07.5177452Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5179831Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5181277Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> InternalMiniClusterExtension
> 2023-09-17T00:29:07.5182154Z * reside outside of package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> MiniClusterExtension
> 2023-09-17T00:29:07.5182951Z  or contain any fields that are public, static, 
> and of type MiniClusterWithClientResource and final and annotated with 
> @ClassRule or contain any fields that is of type 
> MiniClusterWithClientResource and public and final and not static and 
> annotated with @Rule
> 2023-09-17T00:29:07.5183906Z 
> org.apache.flink.connector.kafka.sink.KafkaSinkITCase does not satisfy: only 
> one of the following predicates match:
> 2023-09-17T00:29:07.5184769Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5185812Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5186880Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> InternalMiniClusterExtension
> 2023-09-17T00:29:07.5187929Z * reside outside of package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> MiniClusterExtension
> 2023-09-17T00:29:07.5189073Z  or contain any fields that are public, static, 
> and of type MiniClusterWithClientResource and final and annotated with 
> @ClassRule or contain any fields that is of type 
> MiniClusterWithClientResource and public and final and not static and 
> annotated with @Rule
> 2023-09-17T00:29:07.5190076Z 
> org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase does not 
> satisfy: only one of the following predicates match:
> 2023-09-17T00:29:07.5190946Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are 

[jira] [Commented] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-10-04 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772065#comment-17772065
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-28303:
-

Making this a blocker as it's confirmed to be a real issue data loss issue with 
the newer {{{}KafkaSource{}}}.

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Assignee: tanjialiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-10-04 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-28303:
---

Assignee: tanjialiang

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Assignee: tanjialiang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-10-04 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-28303:

Priority: Blocker  (was: Major)

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Blocker
>  Labels: pull-request-available
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28303) Kafka SQL Connector loses data when restoring from a savepoint with a topic with empty partitions

2023-10-04 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-28303:

Fix Version/s: kafka-3.0.1
   kafka-3.1.0

> Kafka SQL Connector loses data when restoring from a savepoint with a topic 
> with empty partitions
> -
>
> Key: FLINK-28303
> URL: https://issues.apache.org/jira/browse/FLINK-28303
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
>Reporter: Robert Metzger
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> Steps to reproduce:
> - Set up a Kafka topic with 10 partitions
> - produce records 0-9 into the topic
> - take a savepoint and stop the job
> - produce records 10-19 into the topic
> - restore the job from the savepoint.
> The job will be missing usually 2-4 records from 10-19.
> My assumption is that if a partition never had data (which is likely with 10 
> partitions and 10 records), the savepoint will only contain offsets for 
> partitions with data. 
> While the job was offline (and we've written record 10-19 into the topic), 
> all partitions got filled. Now, when Kafka comes online again, it will use 
> the "latest" offset for those partitions, skipping some data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29398) Utilize Rack Awareness in Flink Consumer

2023-09-29 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-29398.
---
Resolution: Fixed

> Utilize Rack Awareness in Flink Consumer
> 
>
> Key: FLINK-29398
> URL: https://issues.apache.org/jira/browse/FLINK-29398
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Jeremy DeGroot
>Assignee: Jeremy DeGroot
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: kafka-3.1.0
>
>
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  was implemented some time ago in Kafka. This allows brokers and consumers to 
> communicate about the rack (or AWS Availability Zone) they're located in. 
> Reading from a local broker can save money in bandwidth and improve latency 
> for your consumers.
> Flink Kafka consumers currently cannot easily use rack awareness if they're 
> deployed across multiple racks or availability zones, because they have no 
> control over which rack the Task Manager they'll be assigned to may be in. 
> This improvement proposes that a Kafka Consumer could be configured with a 
> callback or Future that could be run when it's being configured on the task 
> manager, that will set the appropriate value at runtime if a value is 
> provided. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29398) Utilize Rack Awareness in Flink Consumer

2023-09-29 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-29398:

Fix Version/s: kafka-3.1.0

> Utilize Rack Awareness in Flink Consumer
> 
>
> Key: FLINK-29398
> URL: https://issues.apache.org/jira/browse/FLINK-29398
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Jeremy DeGroot
>Assignee: Jeremy DeGroot
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: kafka-3.1.0
>
>
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  was implemented some time ago in Kafka. This allows brokers and consumers to 
> communicate about the rack (or AWS Availability Zone) they're located in. 
> Reading from a local broker can save money in bandwidth and improve latency 
> for your consumers.
> Flink Kafka consumers currently cannot easily use rack awareness if they're 
> deployed across multiple racks or availability zones, because they have no 
> control over which rack the Task Manager they'll be assigned to may be in. 
> This improvement proposes that a Kafka Consumer could be configured with a 
> callback or Future that could be run when it's being configured on the task 
> manager, that will set the appropriate value at runtime if a value is 
> provided. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29398) Utilize Rack Awareness in Flink Consumer

2023-09-29 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770582#comment-17770582
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-29398:
-

Thanks for driving this to the finish line [~jeremy.degroot].

Merged to apache/flink-connector-kafka:main with 
d89a082180232bb79e3c764228c4e7dbb9eb6b8b

> Utilize Rack Awareness in Flink Consumer
> 
>
> Key: FLINK-29398
> URL: https://issues.apache.org/jira/browse/FLINK-29398
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Jeremy DeGroot
>Assignee: Jeremy DeGroot
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  was implemented some time ago in Kafka. This allows brokers and consumers to 
> communicate about the rack (or AWS Availability Zone) they're located in. 
> Reading from a local broker can save money in bandwidth and improve latency 
> for your consumers.
> Flink Kafka consumers currently cannot easily use rack awareness if they're 
> deployed across multiple racks or availability zones, because they have no 
> control over which rack the Task Manager they'll be assigned to may be in. 
> This improvement proposes that a Kafka Consumer could be configured with a 
> callback or Future that could be run when it's being configured on the task 
> manager, that will set the appropriate value at runtime if a value is 
> provided. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2023-09-29 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770492#comment-17770492
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-30238:
-

[~martijnvisser] I've replied to your SinkFunction deprecation / SinkV2 public 
thread in the ML, and touched on this issue as an attempt to revive the 
discussion there. Lets move the discussion there and let me know what you think.

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
> Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png
>
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32417) DynamicKafkaSource User Documentation

2023-09-19 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-32417:
---

Assignee: Mason Chen

> DynamicKafkaSource User Documentation
> -
>
> Key: FLINK-32417
> URL: https://issues.apache.org/jira/browse/FLINK-32417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: kafka-3.1.0
>
>
> Add user documentation for DynamicKafkaSource



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32416) Initial DynamicKafkaSource Implementation

2023-09-19 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-32416:
---

Assignee: Mason Chen

> Initial DynamicKafkaSource Implementation 
> --
>
> Key: FLINK-32416
> URL: https://issues.apache.org/jira/browse/FLINK-32416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.1.0
>
>
> Implementation that supports unbounded and bounded modes. With a default 
> implementation of KafkaMetadataService



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28758) FlinkKafkaConsumer fails to stop with savepoint

2023-09-15 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-28758:

Fix Version/s: kafka-3.0.1
   kafka-3.1.0

> FlinkKafkaConsumer fails to stop with savepoint 
> 
>
> Key: FLINK-28758
> URL: https://issues.apache.org/jira/browse/FLINK-28758
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.17.0, 1.16.1
> Environment: Flink version:1.15.0
> deploy mode :K8s applicaiton Mode.   local mini cluster also have this 
> problem.
> Kafka Connector : use Kafka SourceFunction . No new Api.
>Reporter: hjw
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
> Attachments: image-2022-10-13-19-47-56-635.png
>
>
> I post a stop with savepoint request to Flink Job throught rest api.
> A Error happened in Kafka connector close.
> The job will enter restarting .
> It is successful to use savepoint command alone.
> {code:java}
> 13:33:42.857 [Kafka Fetcher for Source: nlp-kafka-source -> nlp-clean 
> (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer 
> clientId=consumer-hjw-3, groupId=hjw] Kafka consumer has been closed
> 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean 
> (1/1)#0] INFO org.apache.kafka.common.utils.AppInfoParser - App info 
> kafka.consumer for consumer-hjw-4 unregistered
> 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean 
> (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer 
> clientId=consumer-hjw-4, groupId=hjw] Kafka consumer has been closed
> 13:33:42.860 [Source: nlp-kafka-source -> nlp-clean (1/1)#0] DEBUG 
> org.apache.flink.streaming.runtime.tasks.StreamTask - Cleanup StreamTask 
> (operators closed: false, cancelled: false)
> 13:33:42.860 [jobmanager-io-thread-4] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
> checkpoint 5 by task eeefcb27475446241861ad8db3f33144 of job 
> d6fed247feab1c0bcc1b0dcc2cfb4736 at 79edfa88-ccc3-4140-b0e1-7ce8a7f8669f @ 
> 127.0.0.1 (dataPort=-1).
> org.apache.flink.util.SerializedThrowable: Task name with subtask : Source: 
> nlp-kafka-source -> nlp-clean (1/1)#0 Failure reason: Task has failed.
>  at 
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
>  at 
> org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
>  at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>  at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
> Caused by: org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
>  at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>  at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>  ... 3 common frames omitted
> Caused by: org.apache.flink.util.SerializedThrowable: null
>  at 
> org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
>  at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>  

[jira] [Comment Edited] (FLINK-33017) Nightly run for Flink Kafka connector fails

2023-09-12 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764321#comment-17764321
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-33017 at 9/12/23 5:55 PM:
--

Lets only resolve this ticket once we have a green light on the nightly build:

Manually triggered one here -
[https://github.com/apache/flink-connector-kafka/actions/runs/6163109229]


was (Author: tzulitai):
Lets only resolve this ticket once we have a green light on the nightly build:
https://github.com/apache/flink-connector-kafka/actions/runs/6163109229

> Nightly run for Flink Kafka connector fails
> ---
>
> Key: FLINK-33017
> URL: https://issues.apache.org/jira/browse/FLINK-33017
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Assignee: Alex Sorokoumov
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> https://github.com/apache/flink-connector-kafka/actions/runs/6061283403/job/16446313350#step:13:54462
> {code:java}
> 2023-09-03T00:29:28.8942615Z [ERROR] Errors: 
> 2023-09-03T00:29:28.8942799Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestore
> 2023-09-03T00:29:28.8943079Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943342Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943604Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943903Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944164Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944419Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944714Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944970Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945221Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945294Z [INFO] 
> 2023-09-03T00:29:28.8945577Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateNoPartitions
> 2023-09-03T00:29:28.8945769Z [ERROR]   Run 1: 
> org/apache/flink/shaded/guava31/com/google/common/collect/ImmutableList
> 2023-09-03T00:29:28.8946019Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946266Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946525Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946778Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947027Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947269Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947516Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947765Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947834Z [INFO] 
> 2023-09-03T00:29:28.8948117Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions
> 2023-09-03T00:29:28.8948407Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948660Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948949Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949192Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949433Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils

[jira] [Comment Edited] (FLINK-33017) Nightly run for Flink Kafka connector fails

2023-09-12 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764321#comment-17764321
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-33017 at 9/12/23 5:55 PM:
--

Lets only resolve this ticket once we have a green light on the nightly build:
https://github.com/apache/flink-connector-kafka/actions/runs/6163109229


was (Author: tzulitai):
Lets only resolve this ticket once we have a green light on the nightly builds.

> Nightly run for Flink Kafka connector fails
> ---
>
> Key: FLINK-33017
> URL: https://issues.apache.org/jira/browse/FLINK-33017
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Assignee: Alex Sorokoumov
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> https://github.com/apache/flink-connector-kafka/actions/runs/6061283403/job/16446313350#step:13:54462
> {code:java}
> 2023-09-03T00:29:28.8942615Z [ERROR] Errors: 
> 2023-09-03T00:29:28.8942799Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestore
> 2023-09-03T00:29:28.8943079Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943342Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943604Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943903Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944164Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944419Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944714Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944970Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945221Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945294Z [INFO] 
> 2023-09-03T00:29:28.8945577Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateNoPartitions
> 2023-09-03T00:29:28.8945769Z [ERROR]   Run 1: 
> org/apache/flink/shaded/guava31/com/google/common/collect/ImmutableList
> 2023-09-03T00:29:28.8946019Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946266Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946525Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946778Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947027Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947269Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947516Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947765Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947834Z [INFO] 
> 2023-09-03T00:29:28.8948117Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions
> 2023-09-03T00:29:28.8948407Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948660Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948949Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949192Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949433Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949673Z [ERROR]   Run 6: Could not initialize class 
> 

[jira] [Commented] (FLINK-33017) Nightly run for Flink Kafka connector fails

2023-09-12 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764321#comment-17764321
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33017:
-

Lets only resolve this ticket once we have a green light on the nightly builds.

> Nightly run for Flink Kafka connector fails
> ---
>
> Key: FLINK-33017
> URL: https://issues.apache.org/jira/browse/FLINK-33017
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Assignee: Alex Sorokoumov
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> https://github.com/apache/flink-connector-kafka/actions/runs/6061283403/job/16446313350#step:13:54462
> {code:java}
> 2023-09-03T00:29:28.8942615Z [ERROR] Errors: 
> 2023-09-03T00:29:28.8942799Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestore
> 2023-09-03T00:29:28.8943079Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943342Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943604Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943903Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944164Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944419Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944714Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944970Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945221Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945294Z [INFO] 
> 2023-09-03T00:29:28.8945577Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateNoPartitions
> 2023-09-03T00:29:28.8945769Z [ERROR]   Run 1: 
> org/apache/flink/shaded/guava31/com/google/common/collect/ImmutableList
> 2023-09-03T00:29:28.8946019Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946266Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946525Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946778Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947027Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947269Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947516Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947765Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947834Z [INFO] 
> 2023-09-03T00:29:28.8948117Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions
> 2023-09-03T00:29:28.8948407Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948660Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948949Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949192Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949433Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949673Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949913Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8950155Z [ERROR]   Run 8: 

[jira] [Commented] (FLINK-33017) Nightly run for Flink Kafka connector fails

2023-09-12 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764320#comment-17764320
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33017:
-

Fixed via:
apache/flink-connector-kafka:main - 818d1fdedaad63631eab5d44ec90c748cfcf299f
apache/flink-connector-kafka:v3.0 - a81cbeb62b1f12a3f80ff6f2380047a2d7400194

> Nightly run for Flink Kafka connector fails
> ---
>
> Key: FLINK-33017
> URL: https://issues.apache.org/jira/browse/FLINK-33017
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Assignee: Alex Sorokoumov
>Priority: Blocker
>  Labels: pull-request-available
>
> https://github.com/apache/flink-connector-kafka/actions/runs/6061283403/job/16446313350#step:13:54462
> {code:java}
> 2023-09-03T00:29:28.8942615Z [ERROR] Errors: 
> 2023-09-03T00:29:28.8942799Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestore
> 2023-09-03T00:29:28.8943079Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943342Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943604Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943903Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944164Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944419Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944714Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944970Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945221Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945294Z [INFO] 
> 2023-09-03T00:29:28.8945577Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateNoPartitions
> 2023-09-03T00:29:28.8945769Z [ERROR]   Run 1: 
> org/apache/flink/shaded/guava31/com/google/common/collect/ImmutableList
> 2023-09-03T00:29:28.8946019Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946266Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946525Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946778Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947027Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947269Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947516Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947765Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947834Z [INFO] 
> 2023-09-03T00:29:28.8948117Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions
> 2023-09-03T00:29:28.8948407Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948660Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948949Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949192Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949433Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949673Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949913Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 

[jira] [Updated] (FLINK-33017) Nightly run for Flink Kafka connector fails

2023-09-12 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-33017:

Fix Version/s: kafka-3.0.1
   kafka-3.1.0

> Nightly run for Flink Kafka connector fails
> ---
>
> Key: FLINK-33017
> URL: https://issues.apache.org/jira/browse/FLINK-33017
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Assignee: Alex Sorokoumov
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> https://github.com/apache/flink-connector-kafka/actions/runs/6061283403/job/16446313350#step:13:54462
> {code:java}
> 2023-09-03T00:29:28.8942615Z [ERROR] Errors: 
> 2023-09-03T00:29:28.8942799Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestore
> 2023-09-03T00:29:28.8943079Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943342Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943604Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943903Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944164Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944419Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944714Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944970Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945221Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945294Z [INFO] 
> 2023-09-03T00:29:28.8945577Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateNoPartitions
> 2023-09-03T00:29:28.8945769Z [ERROR]   Run 1: 
> org/apache/flink/shaded/guava31/com/google/common/collect/ImmutableList
> 2023-09-03T00:29:28.8946019Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946266Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946525Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946778Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947027Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947269Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947516Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947765Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947834Z [INFO] 
> 2023-09-03T00:29:28.8948117Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions
> 2023-09-03T00:29:28.8948407Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948660Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948949Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949192Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949433Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949673Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949913Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8950155Z [ERROR]   Run 8: Could not initialize class 
> 

[jira] [Assigned] (FLINK-33017) Nightly run for Flink Kafka connector fails

2023-09-12 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-33017:
---

Assignee: Alex Sorokoumov

> Nightly run for Flink Kafka connector fails
> ---
>
> Key: FLINK-33017
> URL: https://issues.apache.org/jira/browse/FLINK-33017
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Assignee: Alex Sorokoumov
>Priority: Blocker
>  Labels: pull-request-available
>
> https://github.com/apache/flink-connector-kafka/actions/runs/6061283403/job/16446313350#step:13:54462
> {code:java}
> 2023-09-03T00:29:28.8942615Z [ERROR] Errors: 
> 2023-09-03T00:29:28.8942799Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestore
> 2023-09-03T00:29:28.8943079Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943342Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943604Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8943903Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944164Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944419Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944714Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8944970Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945221Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8945294Z [INFO] 
> 2023-09-03T00:29:28.8945577Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateNoPartitions
> 2023-09-03T00:29:28.8945769Z [ERROR]   Run 1: 
> org/apache/flink/shaded/guava31/com/google/common/collect/ImmutableList
> 2023-09-03T00:29:28.8946019Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946266Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946525Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8946778Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947027Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947269Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947516Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947765Z [ERROR]   Run 9: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8947834Z [INFO] 
> 2023-09-03T00:29:28.8948117Z [ERROR] 
> FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions
> 2023-09-03T00:29:28.8948407Z [ERROR]   Run 1: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948660Z [ERROR]   Run 2: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8948949Z [ERROR]   Run 3: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949192Z [ERROR]   Run 4: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949433Z [ERROR]   Run 5: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949673Z [ERROR]   Run 6: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8949913Z [ERROR]   Run 7: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8950155Z [ERROR]   Run 8: Could not initialize class 
> org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils
> 2023-09-03T00:29:28.8950518Z [ERROR]   

[jira] [Comment Edited] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2023-08-31 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761051#comment-17761051
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-30238 at 8/31/23 5:40 PM:
--

hi all, I'd like to move this ticket forward and make a call if things are 
working as expected, or there's an actual bug.

First of all, lets narrow down the scope of this ticket to address only the 
following :
{{stop-with-savepoint}} stores last CommittableSummary with LONG_MAX checkpoint 
ID to be persisted in savepoints, which prevents future {{stop-with-savepoint}} 
to succeed after restore because CommittableSummaries should always have 
different checkpoint IDs.

The separate issue with post-commit topologies not receiving committed 
committables can be addressed separately.

Summarizing the facts we have so far, looking at the comments above and 
discussing this offline with [~martijnvisser] [~Ge]:
 * If a LONG_MAX checkpoint ID special marker is propagated, it means 
{{endOfInput}} was called and it is expected behavior that the job will not 
function properly if restored from the generated savepoint. {{endOfInput}} 
should only ever be called when 1) reaching bounded end of input, or 2) 
stop-with-savepoint with the --drain option.
 * The reason why the Kafka ITCase referenced by [~fpaul] has {{endOfInput}} 
being called is because of the setup {{{}advanceTimestamp = true{}}}, which 
translates equivalently to stop-with-savepoint with --drain.

With the above, can we conclude that:
 * {{stop-with-savepoint}} without draining works as expected, such that 
{{endOfInput}} is not being called and the LONG_MAX CommittableSummary is not 
being sent and stored as committables state.
 * {{stop-with-savepoint}} _with draining_ is also working as expected - jobs 
restored from these savepoints are not expected to function properly in the 
first place.

 

I'm looking to verify that {{stop-with-savepoint}} without draining indeed 
doesn't call the {{endOfInput}} method - if that's the case, then I think we 
have a case closed? If that's the case - the only fix we need is to adjust the 
Kafka ITCase.

In parallel, just wanna jot these thoughts down to see if I'm missing anything 
obvious cc [~fpaul] [~gaoyunhaii].


was (Author: tzulitai):
hi all, I'd like to move this ticket forward and make a call if things are 
working as expected, or there's an actual bug.

First of all, lets narrow down the scope of this ticket to address only the 
following :
{{stop-with-savepoint}} stores last CommittableSummary with LONG_MAX checkpoint 
ID to be persisted in savepoints, which prevents future {{stop-with-savepoint}} 
to succeed after restore because CommittableSummaries should always have 
different checkpoint IDs.

The separate issue with post-commit topologies not receiving committed 
committables can be addressed separately.

Summarizing the facts we have so far:
 * If a LONG_MAX checkpoint ID special marker is propagated, it means 
{{endOfInput}} was called and it is expected behavior that the job will not 
function properly if restored from the generated savepoint. {{endOfInput}} 
should only ever be called when 1) reaching bounded end of input, or 2) 
stop-with-savepoint with the --drain option.
 * The reason why the Kafka ITCase referenced by [~fpaul] has {{endOfInput}} 
being called is because of the setup {{{}advanceTimestamp = true{}}}, which 
translates equivalently to stop-with-savepoint with --drain.

With the above, can we conclude that:
 * {{stop-with-savepoint}} without draining works as expected, such that 
{{endOfInput}} is not being called and the LONG_MAX CommittableSummary is not 
being sent and stored as committables state.
 * {{stop-with-savepoint}} _with draining_ is also working as expected - jobs 
restored from these savepoints are not expected to function properly in the 
first place.

 

I'm looking to verify that {{stop-with-savepoint}} without draining indeed 
doesn't call the {{endOfInput}} method - if that's the case, then I think we 
have a case closed? If that's the case - the only fix we need is to adjust the 
Kafka ITCase.

In parallel, just wanna jot these thoughts down to see if I'm missing anything 
obvious cc [~fpaul] [~gaoyunhaii].

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
> Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png
>
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> 

[jira] [Comment Edited] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2023-08-31 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761051#comment-17761051
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-30238 at 8/31/23 5:39 PM:
--

hi all, I'd like to move this ticket forward and make a call if things are 
working as expected, or there's an actual bug.

First of all, lets narrow down the scope of this ticket to address only the 
following :
{{stop-with-savepoint}} stores last CommittableSummary with LONG_MAX checkpoint 
ID to be persisted in savepoints, which prevents future {{stop-with-savepoint}} 
to succeed after restore because CommittableSummaries should always have 
different checkpoint IDs.

The separate issue with post-commit topologies not receiving committed 
committables can be addressed separately.

Summarizing the facts we have so far:
 * If a LONG_MAX checkpoint ID special marker is propagated, it means 
{{endOfInput}} was called and it is expected behavior that the job will not 
function properly if restored from the generated savepoint. {{endOfInput}} 
should only ever be called when 1) reaching bounded end of input, or 2) 
stop-with-savepoint with the --drain option.
 * The reason why the Kafka ITCase referenced by [~fpaul] has {{endOfInput}} 
being called is because of the setup {{{}advanceTimestamp = true{}}}, which 
translates equivalently to stop-with-savepoint with --drain.

With the above, can we conclude that:
 * {{stop-with-savepoint}} without draining works as expected, such that 
{{endOfInput}} is not being called and the LONG_MAX CommittableSummary is not 
being sent and stored as committables state.
 * {{stop-with-savepoint}} _with draining_ is also working as expected - jobs 
restored from these savepoints are not expected to function properly in the 
first place.

 

I'm looking to verify that {{stop-with-savepoint}} without draining indeed 
doesn't call the {{endOfInput}} method - if that's the case, then I think we 
have a case closed? If that's the case - the only fix we need is to adjust the 
Kafka ITCase.

In parallel, just wanna jot these thoughts down to see if I'm missing anything 
obvious cc [~fpaul] [~gaoyunhaii].


was (Author: tzulitai):
hi all, I'd like to move this ticket forward and make a call if things are 
working as expected, or there's an actual bug.

First of all, lets narrow down the scope of this ticket to address only the 
following :
{{stop-with-savepoint}} stores last CommittableSummary with LONG_MAX checkpoint 
ID to be persisted in savepoints, which prevents future {{stop-with-savepoint}} 
to succeed after restore because CommittableSummaries should always have 
different checkpoint IDs.

The separate issue with post-commit topologies not receiving committed 
committables can be addressed separately.

Summarizing the facts we have so far:
 * If a LONG_MAX checkpoint ID special marker is propagated, it means 
{{endOfInput}} was called and it is expected behavior that the job will not 
function properly if restored from the generated savepoint. {{endOfInput}} 
should only ever be called when 1) reaching bounded end of input, or 2) 
stop-with-savepoint with the --drain option.
 * The reason why the Kafka ITCase referenced by [~fpaul] has {{endOfInput}} 
being called is because of the setup {{{}advanceTimestamp = true{}}}, which 
translates equivalently to stop-with-savepoint with --drain.


With the above, can we conclude that:
 * {{stop-with-savepoint}} without draining works as expected, such that 
{{endOfInput}} is not being called and the LONG_MAX CommittableSummary is not 
being sent and stored as committables state.
 * {{stop-with-savepoint}} _with draining_ is also working as expected - jobs 
restored from these savepoints are not expected to function properly in the 
first place.

 

I'm looking to verify that {{stop-with-savepoint}} without draining indeed 
doesn't call the {{endOfInput}} method - if that's the case, then I think we 
have a case closed? In parallel, just wanna jot these thoughts down to see if 
I'm missing anything obvious cc [~fpaul] [~gaoyunhaii].

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
> Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png
>
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does 

[jira] [Commented] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2023-08-31 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761051#comment-17761051
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-30238:
-

hi all, I'd like to move this ticket forward and make a call if things are 
working as expected, or there's an actual bug.

First of all, lets narrow down the scope of this ticket to address only the 
following :
{{stop-with-savepoint}} stores last CommittableSummary with LONG_MAX checkpoint 
ID to be persisted in savepoints, which prevents future {{stop-with-savepoint}} 
to succeed after restore because CommittableSummaries should always have 
different checkpoint IDs.

The separate issue with post-commit topologies not receiving committed 
committables can be addressed separately.

Summarizing the facts we have so far:
 * If a LONG_MAX checkpoint ID special marker is propagated, it means 
{{endOfInput}} was called and it is expected behavior that the job will not 
function properly if restored from the generated savepoint. {{endOfInput}} 
should only ever be called when 1) reaching bounded end of input, or 2) 
stop-with-savepoint with the --drain option.
 * The reason why the Kafka ITCase referenced by [~fpaul] has {{endOfInput}} 
being called is because of the setup {{{}advanceTimestamp = true{}}}, which 
translates equivalently to stop-with-savepoint with --drain.


With the above, can we conclude that:
 * {{stop-with-savepoint}} without draining works as expected, such that 
{{endOfInput}} is not being called and the LONG_MAX CommittableSummary is not 
being sent and stored as committables state.
 * {{stop-with-savepoint}} _with draining_ is also working as expected - jobs 
restored from these savepoints are not expected to function properly in the 
first place.

 

I'm looking to verify that {{stop-with-savepoint}} without draining indeed 
doesn't call the {{endOfInput}} method - if that's the case, then I think we 
have a case closed? In parallel, just wanna jot these thoughts down to see if 
I'm missing anything obvious cc [~fpaul] [~gaoyunhaii].

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
> Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png
>
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31408) Add EXACTLY_ONCE support to upsert-kafka

2023-07-12 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-31408.
---
Resolution: Fixed

> Add EXACTLY_ONCE support to upsert-kafka
> 
>
> Key: FLINK-31408
> URL: https://issues.apache.org/jira/browse/FLINK-31408
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Reporter: Alex Sorokoumov
>Assignee: Alex Sorokoumov
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.1.0
>
>
> {{upsert-kafka}} connector should support optional {{EXACTLY_ONCE}} delivery 
> semantics.
> [upsert-kafka 
> docs|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees]
>  suggest that the connector handles duplicate records from 
> {{{}AT_LEAST_ONCE{}}}. However, at least 2 reasons exist to configure the 
> connector with {{{}EXACTLY_ONCE{}}}.
> First, there might be other non-Flink topic consumers that would rather not 
> have duplicated records.
> Second, multiple {{upsert-kafka}} producers might cause keys to roll back to 
> previous values. Consider a scenario with 2 producing jobs A and B, writing 
> to the same topic with {{AT_LEAST_ONCE}} and a consuming job reading from the 
> topic. Both producers write unique, monotonically increasing sequences to the 
> same key. Job A writes {{x=a1,a2,a3,a4,a5…}} Job B writes 
> {{{}x=b1,b2,b3,b4,b5,...{}}}. With this setup, we can have the following 
> sequence:
>  # Job A produces x=a5.
>  # Job B produces x=b5.
>  # Job A produces the duplicate write x= 5.
> The consuming job would observe {{x}} going to {{{}a5{}}}, then to 
> {{{}b5{}}}, then back {{{}a5{}}}. {{EXACTLY_ONCE}} would prevent this 
> behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31408) Add EXACTLY_ONCE support to upsert-kafka

2023-07-12 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-31408:

Fix Version/s: kafka-3.1.0

> Add EXACTLY_ONCE support to upsert-kafka
> 
>
> Key: FLINK-31408
> URL: https://issues.apache.org/jira/browse/FLINK-31408
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Reporter: Alex Sorokoumov
>Assignee: Alex Sorokoumov
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.1.0
>
>
> {{upsert-kafka}} connector should support optional {{EXACTLY_ONCE}} delivery 
> semantics.
> [upsert-kafka 
> docs|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees]
>  suggest that the connector handles duplicate records from 
> {{{}AT_LEAST_ONCE{}}}. However, at least 2 reasons exist to configure the 
> connector with {{{}EXACTLY_ONCE{}}}.
> First, there might be other non-Flink topic consumers that would rather not 
> have duplicated records.
> Second, multiple {{upsert-kafka}} producers might cause keys to roll back to 
> previous values. Consider a scenario with 2 producing jobs A and B, writing 
> to the same topic with {{AT_LEAST_ONCE}} and a consuming job reading from the 
> topic. Both producers write unique, monotonically increasing sequences to the 
> same key. Job A writes {{x=a1,a2,a3,a4,a5…}} Job B writes 
> {{{}x=b1,b2,b3,b4,b5,...{}}}. With this setup, we can have the following 
> sequence:
>  # Job A produces x=a5.
>  # Job B produces x=b5.
>  # Job A produces the duplicate write x= 5.
> The consuming job would observe {{x}} going to {{{}a5{}}}, then to 
> {{{}b5{}}}, then back {{{}a5{}}}. {{EXACTLY_ONCE}} would prevent this 
> behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31408) Add EXACTLY_ONCE support to upsert-kafka

2023-07-12 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742414#comment-17742414
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-31408:
-

Merged to flink-connector-kafka via:

main - 9109722c0cf27299b60232c64ae29e00c62934a7

> Add EXACTLY_ONCE support to upsert-kafka
> 
>
> Key: FLINK-31408
> URL: https://issues.apache.org/jira/browse/FLINK-31408
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Reporter: Alex Sorokoumov
>Assignee: Alex Sorokoumov
>Priority: Major
>  Labels: pull-request-available
>
> {{upsert-kafka}} connector should support optional {{EXACTLY_ONCE}} delivery 
> semantics.
> [upsert-kafka 
> docs|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees]
>  suggest that the connector handles duplicate records from 
> {{{}AT_LEAST_ONCE{}}}. However, at least 2 reasons exist to configure the 
> connector with {{{}EXACTLY_ONCE{}}}.
> First, there might be other non-Flink topic consumers that would rather not 
> have duplicated records.
> Second, multiple {{upsert-kafka}} producers might cause keys to roll back to 
> previous values. Consider a scenario with 2 producing jobs A and B, writing 
> to the same topic with {{AT_LEAST_ONCE}} and a consuming job reading from the 
> topic. Both producers write unique, monotonically increasing sequences to the 
> same key. Job A writes {{x=a1,a2,a3,a4,a5…}} Job B writes 
> {{{}x=b1,b2,b3,b4,b5,...{}}}. With this setup, we can have the following 
> sequence:
>  # Job A produces x=a5.
>  # Job B produces x=b5.
>  # Job A produces the duplicate write x= 5.
> The consuming job would observe {{x}} going to {{{}a5{}}}, then to 
> {{{}b5{}}}, then back {{{}a5{}}}. {{EXACTLY_ONCE}} would prevent this 
> behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32453) flink-connector-kafka does not build against Flink 1.18-SNAPSHOT

2023-07-12 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-32453:

Affects Version/s: kafka-3.0.0
   (was: 1.18.0)

> flink-connector-kafka does not build against Flink 1.18-SNAPSHOT
> 
>
> Key: FLINK-32453
> URL: https://issues.apache.org/jira/browse/FLINK-32453
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> There are a few breaking changes in test utility code that prevents 
> {{apache/flink-connector-kafka}} from building against Flink 1.18-SNAPSHOT. 
> This umbrella ticket captures all breaking changes, and should only be closed 
> once we make things build again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-12 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-32455.
---
Resolution: Fixed

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: kafka-3.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32453) flink-connector-kafka does not build against Flink 1.18-SNAPSHOT

2023-07-12 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-32453.
---
Resolution: Fixed

> flink-connector-kafka does not build against Flink 1.18-SNAPSHOT
> 
>
> Key: FLINK-32453
> URL: https://issues.apache.org/jira/browse/FLINK-32453
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> There are a few breaking changes in test utility code that prevents 
> {{apache/flink-connector-kafka}} from building against Flink 1.18-SNAPSHOT. 
> This umbrella ticket captures all breaking changes, and should only be closed 
> once we make things build again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32453) flink-connector-kafka does not build against Flink 1.18-SNAPSHOT

2023-07-12 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-32453:

Fix Version/s: kafka-3.0.1
   kafka-3.1.0
   (was: 1.18.0)

> flink-connector-kafka does not build against Flink 1.18-SNAPSHOT
> 
>
> Key: FLINK-32453
> URL: https://issues.apache.org/jira/browse/FLINK-32453
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> There are a few breaking changes in test utility code that prevents 
> {{apache/flink-connector-kafka}} from building against Flink 1.18-SNAPSHOT. 
> This umbrella ticket captures all breaking changes, and should only be closed 
> once we make things build again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-12 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-32455:

Fix Version/s: kafka-3.0.1
   kafka-3.1.0

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: kafka-3.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-12 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-32455:

Affects Version/s: kafka-3.0.0
   (was: 1.18.0)

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: kafka-3.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-12 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742413#comment-17742413
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32455:
-

Merged to flink-connector-kafka via:

main - 21d3b10e4d66aca82d70a05a4ab24fb5cf2db348

v3.0 - 59ac7389fde0200dbd6b928cf1e9fcb8c29b6350

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-12 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-32455:

Fix Version/s: (was: 1.18.0)

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-11 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741999#comment-17741999
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32455:
-

[~renqs] I'm merging my hotfix PR now which should unblock things for the time 
being. The PR has already been reviewed and approved by [~gaoyunhaii].

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-11 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741997#comment-17741997
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32455:
-

[~gaoyunhaii] that plan sounds good to me. Are you planning to steps 1. and 2. 
already for 1.18? If yes I can probably revert the "quick" fix PR I did in the 
Flink Kafka connector repo.

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-03 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739797#comment-17739797
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32455:
-

FYI preview PR for the fast workaround fix: 
https://github.com/apache/flink-connector-kafka/pull/39

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32462) Kafka shouldn't rely on Flink-Shaded

2023-07-03 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-32462.
---
Resolution: Fixed

> Kafka shouldn't rely on Flink-Shaded
> 
>
> Key: FLINK-32462
> URL: https://issues.apache.org/jira/browse/FLINK-32462
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32462) Kafka shouldn't rely on Flink-Shaded

2023-07-03 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-32462:

Fix Version/s: kafka-3.0.1
   kafka-3.1.0

> Kafka shouldn't rely on Flink-Shaded
> 
>
> Key: FLINK-32462
> URL: https://issues.apache.org/jira/browse/FLINK-32462
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32462) Kafka shouldn't rely on Flink-Shaded

2023-07-03 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-32462:
---

Assignee: Martijn Visser

> Kafka shouldn't rely on Flink-Shaded
> 
>
> Key: FLINK-32462
> URL: https://issues.apache.org/jira/browse/FLINK-32462
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32462) Kafka shouldn't rely on Flink-Shaded

2023-07-03 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739795#comment-17739795
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32462:
-

Merged to {{{}apache/flink-connector-kafka{}}}:

{{{}main{}}}: ac07d6d233351a55d0edcd9ff0c43c791e7ce002
{{{}v3.0{}}}: 6778824f43d8b1d777fce9679f868c1ce3412e85

> Kafka shouldn't rely on Flink-Shaded
> 
>
> Key: FLINK-32462
> URL: https://issues.apache.org/jira/browse/FLINK-32462
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-03 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739787#comment-17739787
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-32455 at 7/4/23 4:36 AM:
-

One (quicker) fix for now might be to make a copy of the original 
{{TypeSerializerUpgradeTestBase}} to the Kafka connector, and only afterwards 
introduce a proper public-facing test utility in {{apache/flink}} and only then 
migration the Kafka connector test code to move to that.


was (Author: tzulitai):
One (quick) fix for now might be to make a copy of the original 
{{TypeSerializerUpgradeTestBase}} to the Kafka connector, and only afterwards 
introduce a proper public-facing test utility in {{apache/flink}} and only then 
migration the Kafka connector test code to move to that.

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-03 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739787#comment-17739787
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32455:
-

One (quick) fix for now might be to make a copy of the original 
{{TypeSerializerUpgradeTestBase}} to the Kafka connector, and only afterwards 
introduce a proper public-facing test utility in {{apache/flink}} and only then 
migration the Kafka connector test code to move to that.

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-03 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739785#comment-17739785
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-32455 at 7/4/23 4:14 AM:
-

[~gaoyunhaii] sorry for the slow reply - just finished other stuff and circling 
back to this.

> refactor the existing migration tests to the new framework. 

The main challenge here is that external connector repos generally need to 
build against the latest 2 major version, i.e. for now that would be 1.17.x and 
the upcoming 1.18.x version.

Therefore, we can't simply refactor the migration tests in the Kafka connector 
to use the new {{MigrationTest}} abstraction, otherwise the code won't be able 
to simultaneously build against older versions.

Specifically, the situation is that:
 * The Kafka connector code has a test ({{{}KafkaSerializerUpgradeTest{}}}) 
that depends on {{{}TypeSerializerUpgradeTestBase{}}}, which with FLINK-27518 
now implements the new {{{}MigrationTest{}}}.
 * After implementing {{{}MigrationTest{}}}, {{TypeSerializerUpgradeTestBase}} 
has a breaking change across versions 1.17.x and 1.18.x.
 * Therefore, it is not possible to update the test code to simultaneously 
build against both versions.

To move forward, I think we need to:
 # We might need to revert the changes to middle-layer test util abstractions 
like {{TypeSerializerUpgradeTestBase}} that externalized connector code might 
be depending on.
 # At the same time, formally introduce a replacement for them in the 
{{fink-migration-test-utils}} package, and mark the original 
{{TypeSerializerUpgradeTestBase}} as deprecated.
 # Gradually migrate the test code in the Kafka connector to use 
{{flink-migration-test-utils}} after a few connector releases.

TLDR we need to handle the changes in {{TypeSerializerUpgradeTestBase}} as if 
it's for public use and consider the same compatibility concerns.

What do you think [~gaoyunhaii] [~mapohl]?


was (Author: tzulitai):
[~gaoyunhaii] sorry for the slow reply - just finished other stuff and circling 
back to this.

> refactor the existing migration tests to the new framework. 

The main challenge here is that external connector repos generally need to 
build against the latest 2 major version, i.e. for now that would be 1.17.x and 
the upcoming 1.18.x version.

Therefore, we can't simply refactor the migration tests in the Kafka connector 
to use the new {{MigrationTest}} abstraction, otherwise the code won't be able 
to simultaneously build against older versions.

Specifically, the situation is that:
 * The Kafka connector code has a test ({{{}KafkaSerializerUpgradeTest{}}}) 
that depends on {{{}TypeSerializerUpgradeTestBase{}}}, which with FLINK-27518 
now implements the new {{{}MigrationTest{}}}.
 * After implementing {{{}MigrationTest{}}}, {{TypeSerializerUpgradeTestBase}} 
has a breaking change across versions 1.17.x and 1.18.x.
 * Therefore, it is not possible to update the test code to simultaneously 
build against both versions.

To move forward, I think we need to:
 # We might need to revert the changes to middle-layer test util abstractions 
like {{TypeSerializerUpgradeTestBase}} that externalized connector code might 
be depending on.
 # At the same time, formally introduce a replacement for them in the 
{{fink-migration-test-utils}} package, and mark the original 
{{TypeSerializerUpgradeTestBase}} as deprecated.
 # Gradually migrate the test code in the Kafka connector to use 
{{flink-migration-test-utils}} after a few connector releases.

What do you think [~gaoyunhaii]?

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-03 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739785#comment-17739785
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-32455 at 7/4/23 4:13 AM:
-

[~gaoyunhaii] sorry for the slow reply - just finished other stuff and circling 
back to this.

> refactor the existing migration tests to the new framework. 

The main challenge here is that external connector repos generally need to 
build against the latest 2 major version, i.e. for now that would be 1.17.x and 
the upcoming 1.18.x version.

Therefore, we can't simply refactor the migration tests in the Kafka connector 
to use the new {{MigrationTest}} abstraction, otherwise the code won't be able 
to simultaneously build against older versions.

Specifically, the situation is that:
 * The Kafka connector code has a test ({{{}KafkaSerializerUpgradeTest{}}}) 
that depends on {{{}TypeSerializerUpgradeTestBase{}}}, which with FLINK-27518 
now implements the new {{{}MigrationTest{}}}.
 * After implementing {{{}MigrationTest{}}}, {{TypeSerializerUpgradeTestBase}} 
has a breaking change across versions 1.17.x and 1.18.x.
 * Therefore, it is not possible to update the test code to simultaneously 
build against both versions.

To move forward, I think we need to:
 # We might need to revert the changes to middle-layer test util abstractions 
like {{TypeSerializerUpgradeTestBase}} that externalized connector code might 
be depending on.
 # At the same time, formally introduce a replacement for them in the 
{{fink-migration-test-utils}} package, and mark the original 
{{TypeSerializerUpgradeTestBase}} as deprecated.
 # Gradually migrate the test code in the Kafka connector to use 
{{flink-migration-test-utils}} after a few connector releases.

What do you think [~gaoyunhaii]?


was (Author: tzulitai):
[~gaoyunhaii] sorry for the slow reply - just finished other stuff and circling 
back to this.

> refactor the existing migration tests to the new framework. 

The main challenge here is that external connector repos generally need to 
build against the latest 2 major version, i.e. for now that would be 1.17.x and 
the upcoming 1.18.x version.

Therefore, we can't simply refactor the migration tests in the Kafka connector 
to use the new {{MigrationTest}} abstraction, otherwise the code won't be able 
to simultaneously build against older versions.

Specifically, the situation is that:
 * The Kafka connector code has a test ({{{}KafkaSerializerUpgradeTest{}}}) 
that depends on {{{}TypeSerializerUpgradeTestBase{}}}, which with FLINK-27518 
now implements the new {{{}MigrationTest{}}}.
 * After implementing {{{}MigrationTest{}}}, {{TypeSerializerUpgradeTestBase}} 
has a breaking change across versions 1.17.x and 1.18.x.
 * Therefore, it is not possible to update the test code to simultaneously 
build against both versions.

To move forward, I think we need to:
 # We might need to revert the changes to middle-layer test util abstractions 
like {{TypeSerializerUpgradeTestBase}} that externalized connector code might 
be depending on.
 # At the same time, formally introduce a replacement for them in the 
{{fink-migration-test-utils}} package, and mark the original 
{{TypeSerializerUpgradeTestBase}} as deprecated.
 # Gradually migrate the test code in the Kafka connector to use 
{{flink-migration-test-utils}} after a few connector releases.

What do you think?

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-07-03 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739785#comment-17739785
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32455:
-

[~gaoyunhaii] sorry for the slow reply - just finished other stuff and circling 
back to this.

> refactor the existing migration tests to the new framework. 

The main challenge here is that external connector repos generally need to 
build against the latest 2 major version, i.e. for now that would be 1.17.x and 
the upcoming 1.18.x version.

Therefore, we can't simply refactor the migration tests in the Kafka connector 
to use the new {{MigrationTest}} abstraction, otherwise the code won't be able 
to simultaneously build against older versions.

Specifically, the situation is that:
 * The Kafka connector code has a test ({{{}KafkaSerializerUpgradeTest{}}}) 
that depends on {{{}TypeSerializerUpgradeTestBase{}}}, which with FLINK-27518 
now implements the new {{{}MigrationTest{}}}.
 * After implementing {{{}MigrationTest{}}}, {{TypeSerializerUpgradeTestBase}} 
has a breaking change across versions 1.17.x and 1.18.x.
 * Therefore, it is not possible to update the test code to simultaneously 
build against both versions.

To move forward, I think we need to:
 # We might need to revert the changes to middle-layer test util abstractions 
like {{TypeSerializerUpgradeTestBase}} that externalized connector code might 
be depending on.
 # At the same time, formally introduce a replacement for them in the 
{{fink-migration-test-utils}} package, and mark the original 
{{TypeSerializerUpgradeTestBase}} as deprecated.
 # Gradually migrate the test code in the Kafka connector to use 
{{flink-migration-test-utils}} after a few connector releases.

What do you think?

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-06-27 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737935#comment-17737935
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32455:
-

cc [~gaoyunhaii] 

> Breaking change in TypeSerializerUpgradeTestBase prevents 
> flink-connector-kafka from building against 1.18-SNAPSHOT
> ---
>
> Key: FLINK-32455
> URL: https://issues.apache.org/jira/browse/FLINK-32455
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> FLINK-27518 introduced a breaking signature change to the abstract class 
> {{TypeSerializerUpgradeTestBase}}, specifically the abstract 
> {{createTestSpecifications}} method signature was changed. This breaks 
> downstream test code in externalized connector repos, e.g. 
> flink-connector-kafka's {{KafkaSerializerUpgradeTest}}
> Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
> downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32455) Breaking change in TypeSerializerUpgradeTestBase prevents flink-connector-kafka from building against 1.18-SNAPSHOT

2023-06-27 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-32455:
---

 Summary: Breaking change in TypeSerializerUpgradeTestBase prevents 
flink-connector-kafka from building against 1.18-SNAPSHOT
 Key: FLINK-32455
 URL: https://issues.apache.org/jira/browse/FLINK-32455
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka, Test Infrastructure
Affects Versions: 1.18.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.18.0


FLINK-27518 introduced a breaking signature change to the abstract class 
{{TypeSerializerUpgradeTestBase}}, specifically the abstract 
{{createTestSpecifications}} method signature was changed. This breaks 
downstream test code in externalized connector repos, e.g. 
flink-connector-kafka's {{KafkaSerializerUpgradeTest}}

Moreover, {{fink-migration-test-utils}} needs to be transitively pulled in by 
downstream test code that depends on flink-core test-jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32453) flink-connector-kafka does not build against Flink 1.18-SNAPSHOT

2023-06-27 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737933#comment-17737933
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32453:
-

cc 1.18.0 release managers [~knaufk] [~martijnvisser] [~renqs] 

> flink-connector-kafka does not build against Flink 1.18-SNAPSHOT
> 
>
> Key: FLINK-32453
> URL: https://issues.apache.org/jira/browse/FLINK-32453
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.18.0
>
>
> There are a few breaking changes in test utility code that prevents 
> {{apache/flink-connector-kafka}} from building against Flink 1.18-SNAPSHOT. 
> This umbrella ticket captures all breaking changes, and should only be closed 
> once we make things build again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31731) No suitable constructor found for DebeziumAvroSerializationSchema

2023-06-27 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-31731:

Parent: FLINK-32453
Issue Type: Sub-task  (was: Bug)

> No suitable constructor found for DebeziumAvroSerializationSchema
> -
>
> Key: FLINK-31731
> URL: https://issues.apache.org/jira/browse/FLINK-31731
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0, kafka-4.0.0
>Reporter: Martijn Visser
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
>
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
> (default-testCompile) on project flink-connector-kafka: Compilation failure
> Error:  
> /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java:[939,16]
>  no suitable constructor found for 
> DebeziumAvroSerializationSchema(org.apache.flink.table.types.logical.RowType,java.lang.String,java.lang.String,)
> Error:  constructor 
> org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroSerializationSchema.DebeziumAvroSerializationSchema(org.apache.flink.table.types.logical.RowType,java.lang.String,java.lang.String,java.lang.String,java.util.Map)
>  is not applicable
> Error:(actual and formal argument lists differ in length)
> Error:  constructor 
> org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroSerializationSchema.DebeziumAvroSerializationSchema(org.apache.flink.formats.avro.AvroRowDataSerializationSchema)
>  is not applicable
> Error:(actual and formal argument lists differ in length)
> Error:  -> [Help 1]
> Error:  
> Error:  To see the full stack trace of the errors, re-run Maven with the -e 
> switch.
> Error:  Re-run Maven using the -X switch to enable full debug logging.
> Error:  
> Error:  For more information about the errors and possible solutions, please 
> read the following articles:
> Error:  [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> Error:  
> Error:  After correcting the problems, you can resume the build with the 
> command
> Error:mvn  -rf :flink-connector-kafka
> Error: Process completed with exit code 1.
> {code}
> https://github.com/apache/flink-connector-kafka/actions/runs/4610715024/jobs/8149513647#step:13:153



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32453) flink-connector-kafka does not build against Flink 1.18-SNAPSHOT

2023-06-27 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-32453:
---

 Summary: flink-connector-kafka does not build against Flink 
1.18-SNAPSHOT
 Key: FLINK-32453
 URL: https://issues.apache.org/jira/browse/FLINK-32453
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.18.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.18.0


There are a few breaking changes in test utility code that prevents 
{{apache/flink-connector-kafka}} from building against Flink 1.18-SNAPSHOT. 
This umbrella ticket captures all breaking changes, and should only be closed 
once we make things build again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30859) Remove flink-connector-kafka from master branch

2023-06-27 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737472#comment-17737472
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-30859:
-

Merged via apache/flink:149a5e34c1ed8d8943c901a98c65c70693915811

> Remove flink-connector-kafka from master branch
> ---
>
> Key: FLINK-30859
> URL: https://issues.apache.org/jira/browse/FLINK-30859
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
>
> Remove flink-connector-kafka from master branch since the repo has now been 
> externalized and 1.17 commits have been sync'ed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30859) Remove flink-connector-kafka from master branch

2023-06-27 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-30859.
---
Fix Version/s: 1.18.0
   Resolution: Fixed

> Remove flink-connector-kafka from master branch
> ---
>
> Key: FLINK-30859
> URL: https://issues.apache.org/jira/browse/FLINK-30859
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Remove flink-connector-kafka from master branch since the repo has now been 
> externalized and 1.17 commits have been sync'ed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31731) No suitable constructor found for DebeziumAvroSerializationSchema

2023-06-22 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-31731.
---
Fix Version/s: 1.18.0
   Resolution: Fixed

> No suitable constructor found for DebeziumAvroSerializationSchema
> -
>
> Key: FLINK-31731
> URL: https://issues.apache.org/jira/browse/FLINK-31731
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0, kafka-4.0.0
>Reporter: Martijn Visser
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.0
>
>
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
> (default-testCompile) on project flink-connector-kafka: Compilation failure
> Error:  
> /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java:[939,16]
>  no suitable constructor found for 
> DebeziumAvroSerializationSchema(org.apache.flink.table.types.logical.RowType,java.lang.String,java.lang.String,)
> Error:  constructor 
> org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroSerializationSchema.DebeziumAvroSerializationSchema(org.apache.flink.table.types.logical.RowType,java.lang.String,java.lang.String,java.lang.String,java.util.Map)
>  is not applicable
> Error:(actual and formal argument lists differ in length)
> Error:  constructor 
> org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroSerializationSchema.DebeziumAvroSerializationSchema(org.apache.flink.formats.avro.AvroRowDataSerializationSchema)
>  is not applicable
> Error:(actual and formal argument lists differ in length)
> Error:  -> [Help 1]
> Error:  
> Error:  To see the full stack trace of the errors, re-run Maven with the -e 
> switch.
> Error:  Re-run Maven using the -X switch to enable full debug logging.
> Error:  
> Error:  For more information about the errors and possible solutions, please 
> read the following articles:
> Error:  [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> Error:  
> Error:  After correcting the problems, you can resume the build with the 
> command
> Error:mvn  -rf :flink-connector-kafka
> Error: Process completed with exit code 1.
> {code}
> https://github.com/apache/flink-connector-kafka/actions/runs/4610715024/jobs/8149513647#step:13:153



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31731) No suitable constructor found for DebeziumAvroSerializationSchema

2023-06-22 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17736016#comment-17736016
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-31731:
-

Merged for apache/flink via 274aa0debffaa57926c474f11e36be753b49cbc5

> No suitable constructor found for DebeziumAvroSerializationSchema
> -
>
> Key: FLINK-31731
> URL: https://issues.apache.org/jira/browse/FLINK-31731
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0, kafka-4.0.0
>Reporter: Martijn Visser
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
> (default-testCompile) on project flink-connector-kafka: Compilation failure
> Error:  
> /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java:[939,16]
>  no suitable constructor found for 
> DebeziumAvroSerializationSchema(org.apache.flink.table.types.logical.RowType,java.lang.String,java.lang.String,)
> Error:  constructor 
> org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroSerializationSchema.DebeziumAvroSerializationSchema(org.apache.flink.table.types.logical.RowType,java.lang.String,java.lang.String,java.lang.String,java.util.Map)
>  is not applicable
> Error:(actual and formal argument lists differ in length)
> Error:  constructor 
> org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroSerializationSchema.DebeziumAvroSerializationSchema(org.apache.flink.formats.avro.AvroRowDataSerializationSchema)
>  is not applicable
> Error:(actual and formal argument lists differ in length)
> Error:  -> [Help 1]
> Error:  
> Error:  To see the full stack trace of the errors, re-run Maven with the -e 
> switch.
> Error:  Re-run Maven using the -X switch to enable full debug logging.
> Error:  
> Error:  For more information about the errors and possible solutions, please 
> read the following articles:
> Error:  [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> Error:  
> Error:  After correcting the problems, you can resume the build with the 
> command
> Error:mvn  -rf :flink-connector-kafka
> Error: Process completed with exit code 1.
> {code}
> https://github.com/apache/flink-connector-kafka/actions/runs/4610715024/jobs/8149513647#step:13:153



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31762) Subscribe to multiple Kafka topics may cause partition assignment skew

2023-05-30 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727590#comment-17727590
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-31762:
-

Thanks for opening the ticket [~liam8]. I think your analysis is correct, and 
the distribution will indeed be sub-optimal when multiple topics are being 
read. Especially if these topics have varying number of partitions (probably 
likely the case).

Quick intuition is to first sort the partitions, and then round-robin assign 
the partitions starting from subtask 0. Whenever we discover new partitions (if 
continuous partition discovery is enabled), we continue assigning subtasks 
starting from the last assignment.

We should double check if this would break anything in terms of state restores 
though. With the new {{KafkaSource}} on top of source V2, it shouldn't break 
anything as partition state restore is decoupled from the discovery and 
assignments which happen on the split enumerator.

> Subscribe to multiple Kafka topics may cause partition assignment skew
> --
>
> Key: FLINK-31762
> URL: https://issues.apache.org/jira/browse/FLINK-31762
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0, 1.18.0
>Reporter: Liam
>Priority: Major
> Attachments: image-2023-04-11-08-00-16-054.png, 
> image-2023-04-11-08-12-24-115.png
>
>
> To simplify the demonstration, let us assume that there are two topics, and 
> each topic has four partitions. We have set the parallelism to eight to 
> consume these two topics. However, the current partition assignment method 
> may lead to some subtasks being assigned two partitions while others are left 
> with none.
> !image-2023-04-11-08-00-16-054.png|width=500,height=143!
> In my case, the situation is even worse as I have ten topics, each with 100 
> partitions. If I set the parallelism to 1000, some slots may be assigned 
> seven partitions while others remain unassigned.
> To address this issue, I propose a new partition assignment solution. In this 
> approach, round-robin assignment takes place between all topics, not just one.
> For example, the ideal assignment for the case mentioned above is presented 
> below:
>  
> !https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!
> This new solution can also handle cases where each topic has more partitions.
> !image-2023-04-11-08-12-24-115.png|width=444,height=127!
> Let us work together to reach a consensus on this proposal. Thank you!
>  
> FYI: how the partition be assigned currently
> {code:java}
> public class KafkaTopicPartitionAssigner {    
>     public static int assign(KafkaTopicPartition partition, int 
> numParallelSubtasks) {
>         return assign(partition.getTopic(), partition.getPartition(), 
> numParallelSubtasks);
>     }    public static int assign(String topic, int partition, int 
> numParallelSubtasks) {
>         int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % 
> numParallelSubtasks;        // here, the assumption is that the id of Kafka 
> partitions are always ascending
>         // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
>         // start index
>         return (startIndex + partition) % numParallelSubtasks;
>     }
>  {code}
> for Kafka Source, it's implemented in the KafkaSourceEnumerator as below
> {code:java}
>     static int getSplitOwner(TopicPartition tp, int numReaders) {
>         int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFF) % 
> numReaders;        // here, the assumption is that the id of Kafka partitions 
> are always ascending
>         // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
>         // start index
>         return (startIndex + tp.partition()) % numReaders;
>     } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32038) OffsetCommitMode.Kafka_periodic with checkpointing enabled

2023-05-30 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727581#comment-17727581
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-32038 at 5/30/23 3:26 PM:
--

Hi [~pritam.agarwala], it does look like that the behavior was actually altered 
/ broken when the Kafka source connector was reimplemented on top of the new 
Source V2 interface.

i.e. the desired behavior that you are describing is exactly how things worked 
in the old {{{}FlinkKafkaConsumer{}}}. Things changed such that offset 
committing is only ever done when checkpoint is enabled in the new 
{{{}KafkaSource{}}}.

Since your ticket description doesn't explicitly mention: could you clarify 
which version of the Kafka source connector are you using? 
{{FlinkKafkaConsumer}} or {{{}KafkaSource{}}}?

If it's the latter, I do think there's a case to fix this, as its current 
behavior conflicts with what the document describes. Since it seems to be a 
"bug", I wouldn't categorize this as "changing default behavior". Moreover, not 
having this behavior added back to {{KafkaSource}} would arguably break 
existing tooling / integrations if users want to migrate from 
{{FlinkKafkaConsumer}} to {{{}KafkaSource{}}}. cc [~martijnvisser] 


was (Author: tzulitai):
Hi [~pritam.agarwala], it does look like that the behavior was actually altered 
/ broken when the Kafka source connector was reimplemented on top of the new 
Source V2 interface.

i.e. the desired behavior that you are describing is exactly how things worked 
in the old {{{}FlinkKafkaConsumer{}}}. Things changed such that offset 
committing is only ever done when checkpoint is enabled in the new 
{{{}KafkaSource{}}}.

Since your ticket description doesn't explicitly mention: could you clarify 
which version of the Kafka source connector are you using? 
{{FlinkKafkaConsumer}} or {{{}KafkaSource{}}}?

If it's the latter, I do think there's a case to fix this, as its current 
behavior conflicts with what the document describes. I wouldn't categorize this 
as "changing default behavior".

> OffsetCommitMode.Kafka_periodic with checkpointing enabled 
> ---
>
> Key: FLINK-32038
> URL: https://issues.apache.org/jira/browse/FLINK-32038
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Affects Versions: 1.14.6
>Reporter: Pritam Agarwala
>Priority: Major
>
> I need to get kafka-lag to prepare a graph and its dependent on kafka 
> committed offset. Flink is updating the offsets only after checkpointing to 
> make it consistent.
> Default Behaviour as per doc :
> If checkpoint is enabled, but {{consumer.setCommitOffsetsOnCheckpoints}} set 
> to false, then offset will not be committed at all even if the 
> {{enable.auto.commit}} is set to true.
> So, when {{consumer.setCommitOffsetsOnCheckpoints}} set to false, *shouldn't 
> it fall back on the {{enable.auto.commit}} to do offset commit regularly 
> since* *in any case flink doesn't use consumer committed offsets for 
> recovery.*
>  
> OffsetCommitModes class :
>   
> {code:java}
> public class OffsetCommitModes {
> /**
>  * Determine the offset commit mode using several configuration values.
>  *
>  * @param enableAutoCommit whether or not auto committing is enabled in 
> the provided Kafka
>  * properties.
>  * @param enableCommitOnCheckpoint whether or not committing on 
> checkpoints is enabled.
>  * @param enableCheckpointing whether or not checkpoint is enabled for 
> the consumer.
>  * @return the offset commit mode to use, based on the configuration 
> values.
>  */
> public static OffsetCommitMode fromConfiguration(
> boolean enableAutoCommit,
> boolean enableCommitOnCheckpoint,
> boolean enableCheckpointing) {
> if (enableCheckpointing) {
> // if checkpointing is enabled, the mode depends only on whether  
>  committing on
> // checkpoints is enabled
> return (enableCommitOnCheckpoint)
> ? OffsetCommitMode.ON_CHECKPOINTS
> : OffsetCommitMode.DISABLED;
> } else {
> // else, the mode depends only on whether auto committing is 
> enabled in the provided
> // Kafka properties
> return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : 
> OffsetCommitMode.DISABLED;
> }
> }
> }
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32038) OffsetCommitMode.Kafka_periodic with checkpointing enabled

2023-05-30 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727581#comment-17727581
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32038:
-

Hi [~pritam.agarwala], it does look like that the behavior was actually altered 
/ broken when the Kafka source connector was reimplemented on top of the new 
Source V2 interface.

i.e. the desired behavior that you are describing is exactly how things worked 
in the old {{{}FlinkKafkaConsumer{}}}. Things changed such that offset 
committing is only ever done when checkpoint is enabled in the new 
{{{}KafkaSource{}}}.

Since your ticket description doesn't explicitly mention: could you clarify 
which version of the Kafka source connector are you using? 
{{FlinkKafkaConsumer}} or {{{}KafkaSource{}}}?

If it's the latter, I do think there's a case to fix this, as its current 
behavior conflicts with what the document describes. I wouldn't categorize this 
as "changing default behavior".

> OffsetCommitMode.Kafka_periodic with checkpointing enabled 
> ---
>
> Key: FLINK-32038
> URL: https://issues.apache.org/jira/browse/FLINK-32038
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Affects Versions: 1.14.6
>Reporter: Pritam Agarwala
>Priority: Major
>
> I need to get kafka-lag to prepare a graph and its dependent on kafka 
> committed offset. Flink is updating the offsets only after checkpointing to 
> make it consistent.
> Default Behaviour as per doc :
> If checkpoint is enabled, but {{consumer.setCommitOffsetsOnCheckpoints}} set 
> to false, then offset will not be committed at all even if the 
> {{enable.auto.commit}} is set to true.
> So, when {{consumer.setCommitOffsetsOnCheckpoints}} set to false, *shouldn't 
> it fall back on the {{enable.auto.commit}} to do offset commit regularly 
> since* *in any case flink doesn't use consumer committed offsets for 
> recovery.*
>  
> OffsetCommitModes class :
>   
> {code:java}
> public class OffsetCommitModes {
> /**
>  * Determine the offset commit mode using several configuration values.
>  *
>  * @param enableAutoCommit whether or not auto committing is enabled in 
> the provided Kafka
>  * properties.
>  * @param enableCommitOnCheckpoint whether or not committing on 
> checkpoints is enabled.
>  * @param enableCheckpointing whether or not checkpoint is enabled for 
> the consumer.
>  * @return the offset commit mode to use, based on the configuration 
> values.
>  */
> public static OffsetCommitMode fromConfiguration(
> boolean enableAutoCommit,
> boolean enableCommitOnCheckpoint,
> boolean enableCheckpointing) {
> if (enableCheckpointing) {
> // if checkpointing is enabled, the mode depends only on whether  
>  committing on
> // checkpoints is enabled
> return (enableCommitOnCheckpoint)
> ? OffsetCommitMode.ON_CHECKPOINTS
> : OffsetCommitMode.DISABLED;
> } else {
> // else, the mode depends only on whether auto committing is 
> enabled in the provided
> // Kafka properties
> return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : 
> OffsetCommitMode.DISABLED;
> }
> }
> }
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32196) kafka sink under EO sometimes is unable to recover from a checkpoint

2023-05-25 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726426#comment-17726426
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-32196 at 5/26/23 12:06 AM:
---

[~sharonxr55] a few things to clarify first:
 # When a KafkaSink subtask restores, there are some transaction that needs to 
be committed (i.e. ones that are written in the Flink checkpoint), and
 # All other transactions are considered "lingering" which should be aborted 
(which is done by the loop you referenced).
 # Only after the above 2 step completes, the subtask initialization is 
considered complete.

So:

> A healthy transaction goes from INITIALIZING to READY- to 
> COMMITTING_TRANSACTION to READY in the log

I believe these transactions are the ones from step 1. Which is expected.

> I've found that the transaction thread never progress beyond “Transition from 
> state INITIALIZING to READY”

These are the ones to abort in step 2. Initializing the transaction 
automatically aborts the transaction, as I mentioned in earlier comments. So I 
believe this is also expected.

 

What is NOT expected, though, is the bunch of {{kafka-producer-network-thread}} 
threads being spawned per TID to abort in step 2. Thanks for sharing the logs 
btw, it was helpful figuring out what was going on!

Kafka's producer only spawns a single {{kafka-producer-network-thread}} per 
instance. And the abort loop for lingering transactions always tries to reuse 
the same producer instance without creating new ones, so I would expect to only 
see a single {{kafka-producer-network-thread}} throughout the whole loop. This 
doesn't seem to be the case. From the naming of these threads, it seems like 
for every TID that the KafkaSink is trying to abort, a new 
{{kafka-producer-network-thread}} thread is spawned:

This is hinted by the naming of the threads (see the last portion of the thread 
name, where it's strictly incrementing; that's the TIDs of transactions the 
KafkaSink is trying to abort)
{code:java}
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708579"
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708580”
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708581"
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708582”
{code}
The only way I see this happen is if the loop is creating new producer 
instances per attempted TID, but it doesn't make sense given the code. It could 
be something wrong with how the KafkaSink is using Java reflections to reset 
the TID on the reused producer, but I'll need to spend some time to look into 
this a bit deeper.


was (Author: tzulitai):
[~sharonxr55] a few things to clarify first:
 # When a KafkaSink subtask restores, there are some transaction that needs to 
be committed (i.e. ones that are written in the Flink checkpoint), and
 # All other transactions are considered "lingering" which should be aborted 
(which is done by the loop you referenced).
 # Only after the above 2 step completes, the subtask initialization is 
considered complete.

So:

> A healthy transaction goes from INITIALIZING to READY- to 
> COMMITTING_TRANSACTION to READY in the log

I believe these transactions are the ones from step 1. Which is expected.

> I've found that the transaction thread never progress beyond “Transition from 
> state INITIALIZING to READY”

These are the ones to abort in step 2. Initializing the transaction 
automatically aborts the transaction, as I mentioned in earlier comments. So I 
believe this is also expected.

 

What is NOT expected, though, is the bunch of {{kafka-producer-network-thread}} 
threads being spawned per TID to abort in step 2. Thanks for sharing the logs 
btw, it was helpful figuring out what was going on!


Kafka's producer only spawns a single {{kafka-producer-network-thread}} per 
instance. And the abort loop for lingering transactions always tries to reuse 
the same producer instance without creating new ones, so I would expect to only 
see a single {{kafka-producer-network-thread}} throughout the whole loop. This 
doesn't seem to be the case. From the naming of these threads, it seems like 
for every TID that the KafkaSink is trying to abort, a new 
{{kafka-producer-network-thread}} thread is spawned:

This is hinted by the naming of the threads (see the last portion of the thread 
name, where it's strictly incrementing; that's the TIDs of transactions the 
KafkaSink is trying to abort)
{code}
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708579"
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708580”
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708581"
“kafka-producer-network-thread | 

[jira] [Commented] (FLINK-32196) kafka sink under EO sometimes is unable to recover from a checkpoint

2023-05-25 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726426#comment-17726426
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32196:
-

[~sharonxr55] a few things to clarify first:
 # When a KafkaSink subtask restores, there are some transaction that needs to 
be committed (i.e. ones that are written in the Flink checkpoint), and
 # All other transactions are considered "lingering" which should be aborted 
(which is done by the loop you referenced).
 # Only after the above 2 step completes, the subtask initialization is 
considered complete.

So:

> A healthy transaction goes from INITIALIZING to READY- to 
> COMMITTING_TRANSACTION to READY in the log

I believe these transactions are the ones from step 1. Which is expected.

> I've found that the transaction thread never progress beyond “Transition from 
> state INITIALIZING to READY”

These are the ones to abort in step 2. Initializing the transaction 
automatically aborts the transaction, as I mentioned in earlier comments. So I 
believe this is also expected.

 

What is NOT expected, though, is the bunch of {{kafka-producer-network-thread}} 
threads being spawned per TID to abort in step 2. Thanks for sharing the logs 
btw, it was helpful figuring out what was going on!


Kafka's producer only spawns a single {{kafka-producer-network-thread}} per 
instance. And the abort loop for lingering transactions always tries to reuse 
the same producer instance without creating new ones, so I would expect to only 
see a single {{kafka-producer-network-thread}} throughout the whole loop. This 
doesn't seem to be the case. From the naming of these threads, it seems like 
for every TID that the KafkaSink is trying to abort, a new 
{{kafka-producer-network-thread}} thread is spawned:

This is hinted by the naming of the threads (see the last portion of the thread 
name, where it's strictly incrementing; that's the TIDs of transactions the 
KafkaSink is trying to abort)
{code}
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708579"
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708580”
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708581"
“kafka-producer-network-thread | 
producer-tx-account-7a604e01-CONNECTION-75373861-0-1708582”
{code}

The only way I see this happen is if the loop is creating new producer 
instances per attempted TID, but it doesn't make sense given the code. It could 
be something funny with how the KafkaSink is using Java reflections to reset 
the TID on the reused producer, but I'll need to spend some time to look into 
this a bit deeper.

> kafka sink under EO sometimes is unable to recover from a checkpoint
> 
>
> Key: FLINK-32196
> URL: https://issues.apache.org/jira/browse/FLINK-32196
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.15.4
>Reporter: Sharon Xie
>Priority: Major
> Attachments: healthy_kafka_producer_thread.csv, 
> kafka_producer_network_thread_log.csv, kafka_sink_oom_logs.csv
>
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best *hypothesis* for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32196) KafkaWriter recovery doesn't abort lingering transactions under the EO semantic

2023-05-25 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726390#comment-17726390
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32196:
-

In terms of the lingering transactions you are observing, a few questions:
 # Are you actually observing that there are lingering transactions not being 
aborted in Kafka? Or was that a speculation based on not seeing a 
{{abortTransaction()}} in the code?
 # If there are actually lingering transactions in Kafka after restore, do they 
get timeout by Kafka after {{{}transaction.timeout.ms{}}}? Or are they 
lingering beyond the timeout threshold?

> KafkaWriter recovery doesn't abort lingering transactions under the EO 
> semantic
> ---
>
> Key: FLINK-32196
> URL: https://issues.apache.org/jira/browse/FLINK-32196
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.15.4
>Reporter: Sharon Xie
>Priority: Major
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best hypothesis for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32196) KafkaWriter recovery doesn't abort lingering transactions under the EO semantic

2023-05-25 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726385#comment-17726385
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-32196 at 5/25/23 8:56 PM:
--

Hi [~sharonxr55], the {{abortTransactionOfSubtask}} you posted aborts 
transactions by relying on the fact that when you call `initTransactions()`, 
Kafka automatically aborts any old ongoing transactions under the same 
{{{}transactional.id{}}}.

Could you re-elaborate the producer leak? As far as I can tell, the loop is 
reusing the same producer instance; on every loop entry, the same producer 
instance is reset with a new {{transactional.id}} and called 
{{initTransactions()}} to abort the transaction. So, there doesn't seem to be 
an issue with run-away producer instances, unless I'm misunderstanding 
something here.


was (Author: tzulitai):
Hi [~sharonxr55], the {{abortTransactionOfSubtask}} you posted aborts 
transactions by relying on the fact that when you call `initTransactions()`, 
Kafka automatically aborts any old ongoing transactions under the same 
{{{}transactional.id{}}}.

Could you re-elaborate the producer leak? As far as I can tell, the loop is 
reusing the same producer instance; on every loop entry, the same producer 
instance is reset with a new {{transactional.id}} and called 
{{initTransactions()}} to abort the transaction.

> KafkaWriter recovery doesn't abort lingering transactions under the EO 
> semantic
> ---
>
> Key: FLINK-32196
> URL: https://issues.apache.org/jira/browse/FLINK-32196
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.15.4
>Reporter: Sharon Xie
>Priority: Major
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best hypothesis for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32196) KafkaWriter recovery doesn't abort lingering transactions under the EO semantic

2023-05-25 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726385#comment-17726385
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-32196:
-

Hi [~sharonxr55], the {{abortTransactionOfSubtask}} you posted aborts 
transactions by relying on the fact that when you call `initTransactions()`, 
Kafka automatically aborts any old ongoing transactions under the same 
{{{}transactional.id{}}}.

Could you re-elaborate the producer leak? As far as I can tell, the loop is 
reusing the same producer instance; on every loop entry, the same producer 
instance is reset with a new {{transactional.id}} and called 
{{initTransactions()}} to abort the transaction.

> KafkaWriter recovery doesn't abort lingering transactions under the EO 
> semantic
> ---
>
> Key: FLINK-32196
> URL: https://issues.apache.org/jira/browse/FLINK-32196
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.15.4
>Reporter: Sharon Xie
>Priority: Major
>
> We are seeing an issue where a Flink job using kafka sink under EO is unable 
> to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and 
> eventually runs OOM. The cause for OOM is that there is a kafka producer 
> thread leak.
> Here is our best hypothesis for the issue.
> In `KafkaWriter` under the EO semantic, it intends to abort lingering 
> transactions upon recovery 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179]
> However, the actual implementation to abort those transactions in the 
> `TransactionAborter` doesn't abort those transactions 
> [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124]
> Specifically `producer.abortTransaction()` is never called in that function. 
> Instead it calls `producer.flush()`.
> Also The function is in for loop that only breaks when `producer.getEpoch() 
> == 0` which is why we are seeing a producer thread leak as the recovery gets 
> stuck in this for loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >