[jira] [Commented] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests

2024-07-08 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863925#comment-17863925
 ] 

Mason Chen commented on FLINK-34127:


Hi [~arvid], yes feel free to takeover. I left it at this PR: 
[https://github.com/apache/flink-connector-kafka/pull/98]

I underestimated the task and eventually got stuck at the Java 21 CI errors, 
which requires to upgrade spotless and few more dependencies from the connector 
utils.

> Kafka connector repo runs a duplicate of `IntegrationTests` framework tests
> ---
>
> Key: FLINK-34127
> URL: https://issues.apache.org/jira/browse/FLINK-34127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Connectors / Kafka
>Affects Versions: kafka-3.0.2
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
>
> I found out this behavior when troubleshooting CI flakiness. These 
> integration tests make heavy use of the CI since they require Kafka, 
> Zookeeper, and Docker containers. We can further stablize CI by not 
> redundantly running these set of tests.
> `grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` 
> returns:
> ```
> 2024-01-17T00:51:05.2943150Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:51:07.6922535Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:27.1326332Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:28.4000830Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:58.7830792Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.0544092Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:59.3910987Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.6025298Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:37.8378640Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.0144732Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:38.2004796Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.4072815Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T01:06:11.2933375Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-07-08 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863924#comment-17863924
 ] 

Mason Chen commented on FLINK-33545:


[~arvid] [~thw] I believe this is solved by 
https://issues.apache.org/jira/browse/FLINK-35749!

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of 

[jira] [Commented] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-06-18 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17856079#comment-17856079
 ] 

Mason Chen commented on FLINK-34445:


Thanks for the update, I'll try to find that out.

Hi [~fanrui] (1.20 release manager), would you know if anyone would have 
bandwidth to complete this? My employer makes it difficult to contribute this 
UI integration to OSS, is it possible someone could take this over? The 
endpoint is already merged to Flink, so we just need the UI integration. I 
haven't been attending the 1.20 release sync so not sure if anyone can help out

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Assignee: Ruan Hang
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34064) FLIP-471: Expose JobManagerOperatorMetrics via REST API

2024-06-03 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-34064:
---
Summary: FLIP-471: Expose JobManagerOperatorMetrics via REST API  (was: 
Expose JobManagerOperatorMetrics via REST API)

> FLIP-471: Expose JobManagerOperatorMetrics via REST API
> ---
>
> Key: FLINK-34064
> URL: https://issues.apache.org/jira/browse/FLINK-34064
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: 1.20.0
>
>
> Add a REST API to fetch coordinator metrics.
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-06-03 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851748#comment-17851748
 ] 

Mason Chen commented on FLINK-34445:


[~ruanhang1993] do you still have time to work on this?

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35228) DynamicKafkaSource does not read re-added topic for the same cluster

2024-04-29 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen resolved FLINK-35228.

Resolution: Fixed

CI passes

> DynamicKafkaSource does not read re-added topic for the same cluster
> 
>
> Key: FLINK-35228
> URL: https://issues.apache.org/jira/browse/FLINK-35228
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Ignas Daukšas
>Assignee: Ignas Daukšas
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.3.0
>
>
> When using DynamicKafkaSource, if topic is removed from the cluster (that has 
> more active topics remaining) and then re-added back, consumption from that 
> topic won't be happening.
> However, if the topic in question is the only topic in that cluster, then 
> everything works as expected - consumption restarts once cluster-topic is 
> re-added.
> Steps to reproduce:
>  # Have a DynamicKafkaSource.
>  # Have KafkaMetadataService report a single cluster with two topics (A and 
> B) for the subscribed stream/streams.
>  # Consume some data, topics A and B are consumed as expected.
>  # Have KafkaMetadataService remove topic A.
>  # Continue consuming data, only topic B consumed as expected.
>  # Have KafkaMetadataService re-add topic A.
>  # Continue consuming data, however only topic B is actually consumed - this 
> is not expected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35228) DynamicKafkaSource does not read re-added topic for the same cluster

2024-04-29 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-35228:
---
Fix Version/s: kafka-3.3.0

> DynamicKafkaSource does not read re-added topic for the same cluster
> 
>
> Key: FLINK-35228
> URL: https://issues.apache.org/jira/browse/FLINK-35228
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Ignas Daukšas
>Assignee: Ignas Daukšas
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.3.0
>
>
> When using DynamicKafkaSource, if topic is removed from the cluster (that has 
> more active topics remaining) and then re-added back, consumption from that 
> topic won't be happening.
> However, if the topic in question is the only topic in that cluster, then 
> everything works as expected - consumption restarts once cluster-topic is 
> re-added.
> Steps to reproduce:
>  # Have a DynamicKafkaSource.
>  # Have KafkaMetadataService report a single cluster with two topics (A and 
> B) for the subscribed stream/streams.
>  # Consume some data, topics A and B are consumed as expected.
>  # Have KafkaMetadataService remove topic A.
>  # Continue consuming data, only topic B consumed as expected.
>  # Have KafkaMetadataService re-add topic A.
>  # Continue consuming data, however only topic B is actually consumed - this 
> is not expected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35247) Upgrade spotless apply to `2.41.1` in flink-connector-parent to work with Java 21

2024-04-26 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-35247:
---
Description: 
Spotless apply version from flink-connector-parent does not work with Java 21

Issue found here: [https://github.com/apache/flink-connector-kafka/pull/98]

This is already fixed by spotless apply: 
[https://github.com/diffplug/spotless/pull/1920]

but also requires an upgrade to a later `google-java-format`

  was:
Spotless apply version from flink-connector-parent does not work with Java 21

Tested here: [https://github.com/apache/flink-connector-kafka/pull/98]

This is already fixed by spotless apply: 
https://github.com/diffplug/spotless/pull/1920


> Upgrade spotless apply to `2.41.1` in flink-connector-parent to work with 
> Java 21
> -
>
> Key: FLINK-35247
> URL: https://issues.apache.org/jira/browse/FLINK-35247
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Connectors / Common
>Affects Versions: connector-parent-1.1.0
>Reporter: Mason Chen
>Priority: Major
>
> Spotless apply version from flink-connector-parent does not work with Java 21
> Issue found here: [https://github.com/apache/flink-connector-kafka/pull/98]
> This is already fixed by spotless apply: 
> [https://github.com/diffplug/spotless/pull/1920]
> but also requires an upgrade to a later `google-java-format`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35247) Upgrade spotless apply to `2.41.1` in flink-connector-parent to work with Java 21

2024-04-26 Thread Mason Chen (Jira)
Mason Chen created FLINK-35247:
--

 Summary: Upgrade spotless apply to `2.41.1` in 
flink-connector-parent to work with Java 21
 Key: FLINK-35247
 URL: https://issues.apache.org/jira/browse/FLINK-35247
 Project: Flink
  Issue Type: Improvement
  Components: Build System / CI, Connectors / Common
Affects Versions: connector-parent-1.1.0
Reporter: Mason Chen


Spotless apply version from flink-connector-parent does not work with Java 21

Tested here: [https://github.com/apache/flink-connector-kafka/pull/98]

This is already fixed by spotless apply: 
https://github.com/diffplug/spotless/pull/1920



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35228) DynamicKafkaSource does not read re-added topic for the same cluster

2024-04-24 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen reassigned FLINK-35228:
--

Assignee: Ignas Daukšas

> DynamicKafkaSource does not read re-added topic for the same cluster
> 
>
> Key: FLINK-35228
> URL: https://issues.apache.org/jira/browse/FLINK-35228
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Ignas Daukšas
>Assignee: Ignas Daukšas
>Priority: Major
>  Labels: pull-request-available
>
> When using DynamicKafkaSource, if topic is removed from the cluster (that has 
> more active topics remaining) and then re-added back, consumption from that 
> topic won't be happening.
> However, if the topic in question is the only topic in that cluster, then 
> everything works as expected - consumption restarts once cluster-topic is 
> re-added.
> Steps to reproduce:
>  # Have a DynamicKafkaSource.
>  # Have KafkaMetadataService report a single cluster with two topics (A and 
> B) for the subscribed stream/streams.
>  # Consume some data, topics A and B are consumed as expected.
>  # Have KafkaMetadataService remove topic A.
>  # Continue consuming data, only topic B consumed as expected.
>  # Have KafkaMetadataService re-add topic A.
>  # Continue consuming data, however only topic B is actually consumed - this 
> is not expected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API

2024-04-23 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-34064:
---
Component/s: Runtime / Web Frontend

> Expose JobManagerOperatorMetrics via REST API
> -
>
> Key: FLINK-34064
> URL: https://issues.apache.org/jira/browse/FLINK-34064
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: 1.20.0
>
>
> Add a REST API to fetch coordinator metrics.
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-04-23 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840247#comment-17840247
 ] 

Mason Chen commented on FLINK-34445:


[~ruanhang1993] I reassigned to you. Thank you for volunteering, I will help 
review!

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34444) Add new endpoint handler to flink

2024-04-23 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen resolved FLINK-3.

Resolution: Fixed

Master CI passes

> Add new endpoint handler to flink
> -
>
> Key: FLINK-3
> URL: https://issues.apache.org/jira/browse/FLINK-3
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / REST
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Add new endpoint handler to flink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34444) Add new endpoint handler to flink

2024-04-23 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-3:
---
Description: Add new endpoint handler to flink

> Add new endpoint handler to flink
> -
>
> Key: FLINK-3
> URL: https://issues.apache.org/jira/browse/FLINK-3
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / REST
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Add new endpoint handler to flink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-04-23 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen reassigned FLINK-34445:
--

Assignee: Hang Ruan  (was: Mason Chen)

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34444) Add new endpoint handler to flink

2024-04-23 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen reassigned FLINK-3:
--

Assignee: Mason Chen  (was: Mason Chen)

> Add new endpoint handler to flink
> -
>
> Key: FLINK-3
> URL: https://issues.apache.org/jira/browse/FLINK-3
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / REST
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API

2024-04-23 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen reassigned FLINK-34064:
--

Assignee: Mason Chen  (was: Mason Chen)

> Expose JobManagerOperatorMetrics via REST API
> -
>
> Key: FLINK-34064
> URL: https://issues.apache.org/jira/browse/FLINK-34064
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: 1.20.0
>
>
> Add a REST API to fetch coordinator metrics.
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests

2024-04-17 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837982#comment-17837982
 ] 

Mason Chen commented on FLINK-34127:


It seems to be a bug in junit. https://github.com/junit-team/junit5/issues/3782

> Kafka connector repo runs a duplicate of `IntegrationTests` framework tests
> ---
>
> Key: FLINK-34127
> URL: https://issues.apache.org/jira/browse/FLINK-34127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Connectors / Kafka
>Affects Versions: kafka-3.0.2
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>
> I found out this behavior when troubleshooting CI flakiness. These 
> integration tests make heavy use of the CI since they require Kafka, 
> Zookeeper, and Docker containers. We can further stablize CI by not 
> redundantly running these set of tests.
> `grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` 
> returns:
> ```
> 2024-01-17T00:51:05.2943150Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:51:07.6922535Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:27.1326332Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:28.4000830Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:58.7830792Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.0544092Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:59.3910987Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.6025298Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:37.8378640Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.0144732Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:38.2004796Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.4072815Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T01:06:11.2933375Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T01:06:12.1790031Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T01:06:12.5703927Z 

[jira] [Assigned] (FLINK-32892) Handle Kafka clients JMX mBean errors by disabling JMX metrics

2024-04-16 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen reassigned FLINK-32892:
--

Assignee: Mason Chen

> Handle Kafka clients JMX mBean errors by disabling JMX metrics
> --
>
> Key: FLINK-32892
> URL: https://issues.apache.org/jira/browse/FLINK-32892
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: kafka-4.0.0
>
>
> 3.4.x includes an implementation for KIP-830. 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter]
> Many people on the mailing list have complained about confusing warning logs 
> about mbean conflicts. We should be safe to disable this JMX reporter since 
> Flink has its own metric system and the internal kafka-clients JMX reporter 
> should not be used. 
> This affects Kafka connector release 3.1 and below (for some reason I cannot 
> enter 3.1 in the affects version/s box).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31400) Add autoscaler integration for Iceberg source

2024-04-16 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen reassigned FLINK-31400:
--

Assignee: Mason Chen  (was: Mason Chen)

> Add autoscaler integration for Iceberg source
> -
>
> Key: FLINK-31400
> URL: https://issues.apache.org/jira/browse/FLINK-31400
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Mason Chen
>Priority: Major
>
> A very critical part in the scaling algorithm is setting the source 
> processing rate correctly such that the Flink pipeline can keep up with the 
> ingestion rate. The autoscaler does that by looking at the {{pendingRecords}} 
> Flink source metric. Even if that metric is not available, the source can 
> still be sized according to the busyTimeMsPerSecond metric, but there will be 
> no backlog information available. For Kafka, the autoscaler also determines 
> the number of partitions to avoid scaling higher than the maximum number of 
> partitions.
> In order to support a wider range of use cases, we should investigate an 
> integration with the Iceberg source. As far as I know, it does not expose the 
> pedingRecords metric, nor does the autoscaler know about other constraints, 
> e.g. the maximum number of open files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check

2024-04-16 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837569#comment-17837569
 ] 

Mason Chen commented on FLINK-35088:


 

Watermark alignment was moved out of beta during the 1.18.0 release. I'd 
recommend that version and onwards. 
https://issues.apache.org/jira/browse/FLINK-32705

> watermark alignment maxAllowedWatermarkDrift and updateInterval param need 
> check
> 
>
> Key: FLINK-35088
> URL: https://issues.apache.org/jira/browse/FLINK-35088
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-11-20-12-29-951.png
>
>
> When I use watermark alignment,
> 1.I found that setting maxAllowedWatermarkDrift to a negative number 
> initially led me to believe it could support delaying the consumption of the 
> source, so I tried it. Then, the upstream data flow would hang indefinitely.
> Root cause:
> {code:java}
> long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()             
>     + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  {code}
> If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark 
> < lastEmittedWatermark, then the SourceReader will be blocked indefinitely 
> and cannot recover.
> I'm not sure if this is a supported feature of watermark alignment. If it's 
> not, I think an additional parameter validation should be implemented to 
> throw an exception on the client side if the value is negative.
> 2.The updateInterval parameter also lacks validation. If I set it to 0, the 
> task will throw an exception when starting the job manager. The JDK class 
> java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and 
> throws the exception.
> {code:java}
> java.lang.IllegalArgumentException: null
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
>  ~[?:1.8.0_351]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> 

[jira] [Created] (FLINK-35122) Implement Watermark Alignment for DynamicKafkaSource

2024-04-16 Thread Mason Chen (Jira)
Mason Chen created FLINK-35122:
--

 Summary: Implement Watermark Alignment for DynamicKafkaSource
 Key: FLINK-35122
 URL: https://issues.apache.org/jira/browse/FLINK-35122
 Project: Flink
  Issue Type: Improvement
Affects Versions: kafka-3.1.0
Reporter: Mason Chen
Assignee: Mason Chen


Implement Watermark Alignment for DynamicKafkaSource



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35122) Implement Watermark Alignment for DynamicKafkaSource

2024-04-16 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-35122:
---
Component/s: Connectors / Kafka

> Implement Watermark Alignment for DynamicKafkaSource
> 
>
> Key: FLINK-35122
> URL: https://issues.apache.org/jira/browse/FLINK-35122
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>
> Implement Watermark Alignment for DynamicKafkaSource



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests

2024-04-15 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen reassigned FLINK-34127:
--

Assignee: Mason Chen

> Kafka connector repo runs a duplicate of `IntegrationTests` framework tests
> ---
>
> Key: FLINK-34127
> URL: https://issues.apache.org/jira/browse/FLINK-34127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Connectors / Kafka
>Affects Versions: kafka-3.0.2
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>
> I found out this behavior when troubleshooting CI flakiness. These 
> integration tests make heavy use of the CI since they require Kafka, 
> Zookeeper, and Docker containers. We can further stablize CI by not 
> redundantly running these set of tests.
> `grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` 
> returns:
> ```
> 2024-01-17T00:51:05.2943150Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:51:07.6922535Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:27.1326332Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:28.4000830Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:58.7830792Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.0544092Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:59.3910987Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.6025298Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:37.8378640Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.0144732Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:38.2004796Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.4072815Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T01:06:11.2933375Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T01:06:12.1790031Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T01:06:12.5703927Z Test 
> 

[jira] [Assigned] (FLINK-34320) Flink Kafka connector tests time out

2024-04-15 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen reassigned FLINK-34320:
--

Assignee: (was: Mason Chen)

> Flink Kafka connector tests time out
> 
>
> Key: FLINK-34320
> URL: https://issues.apache.org/jira/browse/FLINK-34320
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Priority: Critical
>  Labels: test-stability
>
> https://github.com/apache/flink-connector-kafka/actions/runs/7700171105/job/20987805277?pr=83#step:14:61746
> {code:java}
> 2024-01-29T19:45:07.4412975Z 19:45:07,094 [main] INFO  
> org.apache.kafka.common.utils.AppInfoParser  [] - App info 
> kafka.producer for producer-client-id unregistered
> 2024-01-29T19:45:07.4413978Z 19:45:07,097 [main] INFO  
> org.apache.flink.runtime.io.disk.FileChannelManagerImpl  [] - 
> FileChannelManager removed spill file directory 
> /tmp/flink-io-3306202c-1639-4b7b-a54c-381826e3682e
> 2024-01-29T19:45:07.4414533Z 19:45:07,440 [main] INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerMigrationTest 
> [] - 
> 2024-01-29T19:45:07.4414785Z 
> 
> 2024-01-29T19:45:07.4415494Z Test testRestoreProducer[Migration Savepoint: 
> 1.16](org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerMigrationTest)
>  successfully run.
> 2024-01-29T19:45:07.4415646Z 
> 
> 2024-01-29T19:45:07.4698277Z [WARNING] Tests run: 18, Failures: 0, Errors: 0, 
> Skipped: 9, Time elapsed: 206.197 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerMigrationTest
> 2024-01-29T20:30:32.8459835Z ##[error]The action has timed out.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34320) Flink Kafka connector tests time out

2024-04-15 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen reassigned FLINK-34320:
--

Assignee: Mason Chen

> Flink Kafka connector tests time out
> 
>
> Key: FLINK-34320
> URL: https://issues.apache.org/jira/browse/FLINK-34320
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Assignee: Mason Chen
>Priority: Critical
>  Labels: test-stability
>
> https://github.com/apache/flink-connector-kafka/actions/runs/7700171105/job/20987805277?pr=83#step:14:61746
> {code:java}
> 2024-01-29T19:45:07.4412975Z 19:45:07,094 [main] INFO  
> org.apache.kafka.common.utils.AppInfoParser  [] - App info 
> kafka.producer for producer-client-id unregistered
> 2024-01-29T19:45:07.4413978Z 19:45:07,097 [main] INFO  
> org.apache.flink.runtime.io.disk.FileChannelManagerImpl  [] - 
> FileChannelManager removed spill file directory 
> /tmp/flink-io-3306202c-1639-4b7b-a54c-381826e3682e
> 2024-01-29T19:45:07.4414533Z 19:45:07,440 [main] INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerMigrationTest 
> [] - 
> 2024-01-29T19:45:07.4414785Z 
> 
> 2024-01-29T19:45:07.4415494Z Test testRestoreProducer[Migration Savepoint: 
> 1.16](org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerMigrationTest)
>  successfully run.
> 2024-01-29T19:45:07.4415646Z 
> 
> 2024-01-29T19:45:07.4698277Z [WARNING] Tests run: 18, Failures: 0, Errors: 0, 
> Skipped: 9, Time elapsed: 206.197 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerMigrationTest
> 2024-01-29T20:30:32.8459835Z ##[error]The action has timed out.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests

2024-04-11 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836382#comment-17836382
 ] 

Mason Chen commented on FLINK-34127:


Sure. Can I assign myself now with committer permissions? :D

> Kafka connector repo runs a duplicate of `IntegrationTests` framework tests
> ---
>
> Key: FLINK-34127
> URL: https://issues.apache.org/jira/browse/FLINK-34127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Connectors / Kafka
>Affects Versions: kafka-3.0.2
>Reporter: Mason Chen
>Priority: Major
>
> I found out this behavior when troubleshooting CI flakiness. These 
> integration tests make heavy use of the CI since they require Kafka, 
> Zookeeper, and Docker containers. We can further stablize CI by not 
> redundantly running these set of tests.
> `grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` 
> returns:
> ```
> 2024-01-17T00:51:05.2943150Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:51:07.6922535Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:27.1326332Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:28.4000830Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:58.7830792Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.0544092Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:59.3910987Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.6025298Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:37.8378640Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.0144732Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:38.2004796Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.4072815Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T01:06:11.2933375Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T01:06:12.1790031Z Test 
> org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
>  [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T01:06:12.5703927Z Test 
> 

[jira] [Commented] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-03-26 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831170#comment-17831170
 ] 

Mason Chen commented on FLINK-34445:


Anytime after https://issues.apache.org/jira/browse/FLINK-3 is resolved for 
which I already have a PR. I guess anytime before 1.20 Flink release?

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34444) Add new endpoint handler to flink

2024-03-26 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-3:
---
Fix Version/s: 1.20.0

> Add new endpoint handler to flink
> -
>
> Key: FLINK-3
> URL: https://issues.apache.org/jira/browse/FLINK-3
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / REST
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-03-26 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-34445:
---
Fix Version/s: 1.20.0

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API

2024-03-26 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-34064:
---
Fix Version/s: 1.20.0

> Expose JobManagerOperatorMetrics via REST API
> -
>
> Key: FLINK-34064
> URL: https://issues.apache.org/jira/browse/FLINK-34064
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: 1.20.0
>
>
> Add a REST API to fetch coordinator metrics.
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-03-25 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830674#comment-17830674
 ] 

Mason Chen commented on FLINK-34445:


[~ruanhang1993] I believe you expressed some interest in this FLIP. 
Unfortunately, my employer makes it difficult for me to contribute this change 
to OSS. Is it possible that someone from your side could work on this? I'd be 
happy to review but need some help for the contribution

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API

2024-02-20 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818968#comment-17818968
 ] 

Mason Chen commented on FLINK-34064:


[~mxm] [~fanrui] [~thw] can you assign this ticket to me? Thanks!

> Expose JobManagerOperatorMetrics via REST API
> -
>
> Key: FLINK-34064
> URL: https://issues.apache.org/jira/browse/FLINK-34064
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Priority: Major
>
> Add a REST API to fetch coordinator metrics.
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-02-14 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-34445:
---
Component/s: Runtime / Web Frontend

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-02-14 Thread Mason Chen (Jira)
Mason Chen created FLINK-34445:
--

 Summary: Integrate new endpoint with Flink UI metrics section
 Key: FLINK-34445
 URL: https://issues.apache.org/jira/browse/FLINK-34445
 Project: Flink
  Issue Type: Sub-task
Reporter: Mason Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34444) Add new endpoint handler to flink

2024-02-14 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-3:
---
Component/s: Runtime / Metrics
 Runtime / REST

> Add new endpoint handler to flink
> -
>
> Key: FLINK-3
> URL: https://issues.apache.org/jira/browse/FLINK-3
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / REST
>Reporter: Mason Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34444) Add new endpoint handler to flink

2024-02-14 Thread Mason Chen (Jira)
Mason Chen created FLINK-3:
--

 Summary: Add new endpoint handler to flink
 Key: FLINK-3
 URL: https://issues.apache.org/jira/browse/FLINK-3
 Project: Flink
  Issue Type: Sub-task
Reporter: Mason Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-02-13 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817136#comment-17817136
 ] 

Mason Chen commented on FLINK-33545:


Hi [~aeolus811tw], have you had time to review the followup feedback? As 
mentioned before there's no guarantee that a second commit would succeed and it 
would possibly need multiple. The best way to do this is by throwing an 
exception and allowing Flink to restart and try again 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the 

[jira] [Created] (FLINK-34327) Use maven wrapper in operator build

2024-01-31 Thread Mason Chen (Jira)
Mason Chen created FLINK-34327:
--

 Summary: Use maven wrapper in operator build
 Key: FLINK-34327
 URL: https://issues.apache.org/jira/browse/FLINK-34327
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Mason Chen


Contributors need to switch between maven versions at times and mvnw can help 
make this easy. For reference, the build was failing with maven 3.2 but passed 
when I switched manually to maven 3.9



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-01-29 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812113#comment-17812113
 ] 

Mason Chen commented on FLINK-33545:


[~yang] yup that's what I'm proposing. We could track an AtomicInteger and 
decrement it in the writer callback. Then, we need to assert that the counter 
is 0 (this is what we assume to be the contract of the KafkaProducer#flush 
API). 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-01-26 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811156#comment-17811156
 ] 

Mason Chen commented on FLINK-33545:


I do think this issue here is not with the Flink runtime but it could be 
possible that the Kafka client flush doesn't flush all records (I haven't 
personally encountered this yet). As explained, if even there was a bug with 
the Flink runtime, the proper fix would need to be in Flink, rather than this 
connector.

Your proposal is to pretty much explicitly retry the flush, but there's no 
telling if that 2nd flush won't exhibit the same behavior as the first. As 
such, the best thing that the connector can do here is to 
 # maintain a counter of records to be flushed.
 # on completion of flush during checkpoint phase, verify that there are no 
exceptions AND no pending records.
 # if there was lingering records, fail immediately–causing job restart and 
subsequently checkpoint to fail.

The best thing we can do here is to fail and replay from last checkpoint, to 
maintain at least/exactly once semantics. This is also inline with how 
FlinkKafkaProducer has implemented this. My best guess is there is some race 
condition during broker network issues. I think we can implement this first and 
make carefully logging when this condition occurs.

WDYT of my proposal? [~aeolus811tw] [~yang] [~tzulitai] [~martijnvisser] 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This 

[jira] [Commented] (FLINK-32514) FLIP-309: Support using larger checkpointing interval when source is processing backlog

2024-01-18 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808342#comment-17808342
 ] 

Mason Chen commented on FLINK-32514:


Agreed with Martijn. Thanks for catching this!

> FLIP-309: Support using larger checkpointing interval when source is 
> processing backlog
> ---
>
> Key: FLINK-32514
> URL: https://issues.apache.org/jira/browse/FLINK-32514
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Yunfeng Zhou
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Umbrella issue for 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests

2024-01-16 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17807559#comment-17807559
 ] 

Mason Chen commented on FLINK-34127:


I found this is an instance of a 50 minute, timed out CI run. This causes 
network instability which causes a KafkaConsumer to infinitely reset the offset:

 

```

2024-01-17T00:56:08.7826779Z 00:56:08,659 [Source Data Fetcher for Source: 
Tested Source (3/5)#0] INFO  
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=DynamicKafkaSourceExternalContext-cluster0-2, 
groupId=DynamicKafkaSourceExternalContext] Seeking to earliest offset of 
partition topic1-3301243828175368455-0

```

 

This log is repeated over 10 times through the log and the thread dump confirms 
that this test's thread was stuck waiting for the test to complete.

> Kafka connector repo runs a duplicate of `IntegrationTests` framework tests
> ---
>
> Key: FLINK-34127
> URL: https://issues.apache.org/jira/browse/FLINK-34127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System / CI, Connectors / Kafka
>Affects Versions: kafka-3.0.2
>Reporter: Mason Chen
>Priority: Major
>
> I found out this behavior when troubleshooting CI flakiness. These 
> integration tests make heavy use of the CI since they require Kafka, 
> Zookeeper, and Docker containers. We can further stablize CI by not 
> redundantly running these set of tests.
> `grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` 
> returns:
> ```
> 2024-01-17T00:51:05.2943150Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:51:07.6922535Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:27.1326332Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:28.4000830Z Test 
> org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 
> [org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
>  Semantic: [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:58.7830792Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.0544092Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:56:59.3910987Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:56:59.6025298Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:37.8378640Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.0144732Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
> [EXACTLY_ONCE]] successfully run.
> 2024-01-17T00:57:38.2004796Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: 
> [EXACTLY_ONCE]] is running.
> 2024-01-17T00:57:38.4072815Z Test 
> org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
>  [MiniCluster], ExternalContext: 

[jira] [Created] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests

2024-01-16 Thread Mason Chen (Jira)
Mason Chen created FLINK-34127:
--

 Summary: Kafka connector repo runs a duplicate of 
`IntegrationTests` framework tests
 Key: FLINK-34127
 URL: https://issues.apache.org/jira/browse/FLINK-34127
 Project: Flink
  Issue Type: Improvement
  Components: Build System / CI, Connectors / Kafka
Affects Versions: kafka-3.0.2
Reporter: Mason Chen


I found out this behavior when troubleshooting CI flakiness. These integration 
tests make heavy use of the CI since they require Kafka, Zookeeper, and Docker 
containers. We can further stablize CI by not redundantly running these set of 
tests.


`grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` 
returns:

```

2024-01-17T00:51:05.2943150Z Test 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: 
[org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
 Semantic: [EXACTLY_ONCE]] is running.
2024-01-17T00:51:07.6922535Z Test 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: 
[org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
 Semantic: [EXACTLY_ONCE]] successfully run.
2024-01-17T00:56:27.1326332Z Test 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: 
[org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
 Semantic: [EXACTLY_ONCE]] is running.
2024-01-17T00:56:28.4000830Z Test 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: 
[org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
 Semantic: [EXACTLY_ONCE]] successfully run.
2024-01-17T00:56:58.7830792Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] is running.
2024-01-17T00:56:59.0544092Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] successfully run.
2024-01-17T00:56:59.3910987Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: [EXACTLY_ONCE]] 
is running.
2024-01-17T00:56:59.6025298Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: [EXACTLY_ONCE]] 
successfully run.
2024-01-17T00:57:37.8378640Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] is running.
2024-01-17T00:57:38.0144732Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] successfully run.
2024-01-17T00:57:38.2004796Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: [EXACTLY_ONCE]] 
is running.
2024-01-17T00:57:38.4072815Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: [EXACTLY_ONCE]] 
successfully run.
2024-01-17T01:06:11.2933375Z Test 
org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
 [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] is running.
2024-01-17T01:06:12.1790031Z Test 
org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
 [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] successfully run.
2024-01-17T01:06:12.5703927Z Test 
org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
 [FlinkContainers], ExternalContext: [KafkaSource-TOPIC], Semantic: 
[EXACTLY_ONCE]] is running.
2024-01-17T01:06:13.3369574Z Test 
org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
 [FlinkContainers], ExternalContext: [KafkaSource-TOPIC], Semantic: 
[EXACTLY_ONCE]] successfully run.

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API

2024-01-11 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-34064:
---
Affects Version/s: 1.18.0

> Expose JobManagerOperatorMetrics via REST API
> -
>
> Key: FLINK-34064
> URL: https://issues.apache.org/jira/browse/FLINK-34064
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.18.0
>Reporter: Mason Chen
>Priority: Major
>
> Add a REST API to fetch coordinator metrics.
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API

2024-01-11 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-34064:
---
Component/s: Runtime / REST

> Expose JobManagerOperatorMetrics via REST API
> -
>
> Key: FLINK-34064
> URL: https://issues.apache.org/jira/browse/FLINK-34064
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Mason Chen
>Priority: Major
>
> Add a REST API to fetch coordinator metrics.
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34064) Expose JobManagerOperatorMetrics via REST API

2024-01-11 Thread Mason Chen (Jira)
Mason Chen created FLINK-34064:
--

 Summary: Expose JobManagerOperatorMetrics via REST API
 Key: FLINK-34064
 URL: https://issues.apache.org/jira/browse/FLINK-34064
 Project: Flink
  Issue Type: Improvement
Reporter: Mason Chen


Add a REST API to fetch coordinator metrics.

[https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32197) FLIP 246: Dynamic Kafka Source

2024-01-08 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804366#comment-17804366
 ] 

Mason Chen commented on FLINK-32197:


Hi [~yunta], the PR for https://issues.apache.org/jira/browse/FLINK-32416 is 
still waiting for more feedback. Are you or your collaborators able to give the 
PR a review? I'll also ask on the mailing list.

> FLIP 246: Dynamic Kafka Source
> --
>
> Key: FLINK-32197
> URL: https://issues.apache.org/jira/browse/FLINK-32197
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: kafka-3.1.0
>
>
> This is for introducing a new connector that extends off the current 
> KafkaSource to read multiple Kafka clusters, which can change dynamically.
> For more details, please refer to [FLIP 
> 246|https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-12-05 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793457#comment-17793457
 ] 

Mason Chen edited comment on FLINK-33545 at 12/5/23 11:40 PM:
--

The reason why you don't find any synchronization is because the task is single 
threaded. Even if the issue you describe exists, it would need to be fixed in 
the Flink runtime and not the Kafka connector, as such a bug would affect all 
sink connectors.

Assuming the Flink runtime is correct, the only way for data loss to occur if 
there is a case when the Kafka Producer API doesn't throw an exception when the 
broker has not ack'ed the record.

Otherwise, to check if Flink runtime is correct, we would need to analyze how 
the mailexecutor behaves.


was (Author: mason6345):
The reason why you don't find any synchronization is because the task is single 
threaded. Even if the issue you describe exists, it would need to be fixed in 
the Flink runtime and not the Kafka connector, as such a bug would affect all 
sink connectors.

Assuming the Flink runtime is correct, the only way for data loss to occur if 
there is a case when the Kafka Producer API doesn't throw an exception when the 
broker has not ack'ed the record.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-12-05 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793457#comment-17793457
 ] 

Mason Chen commented on FLINK-33545:


The reason why you don't find any synchronization is because the task is single 
threaded. Even if the issue you describe exists, it would need to be fixed in 
the Flink runtime and not the Kafka connector, as such a bug would affect all 
sink connectors.

Assuming the Flink runtime is correct, the only way for data loss to occur if 
there is a case when the Kafka Producer API doesn't throw an exception when the 
broker has not ack'ed the record.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking 

[jira] [Comment Edited] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-30 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791866#comment-17791866
 ] 

Mason Chen edited comment on FLINK-33545 at 12/1/23 1:30 AM:
-

Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink. By any chance, does your KafkaSink serialization schema produce 
multiple records from one input record?

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.

It seems that https://issues.apache.org/jira/browse/FLINK-33293 and this one 
suggests that Flink runtime does not throw exception on checkpoint timeout when 
Kafka producer timeout is very close.


was (Author: mason6345):
Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink. By any chance, does your KafkaSink serialization schema produce 
multiple records from one input record?

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.

It seems that https://issues.apache.org/jira/browse/FLINK-33293 and this one 
suggests that Flink API does not throw exception on checkpoint timeout when 
Kafka producer timeout is very close.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter 

[jira] [Comment Edited] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-30 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791866#comment-17791866
 ] 

Mason Chen edited comment on FLINK-33545 at 12/1/23 1:29 AM:
-

Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink. By any chance, does your KafkaSink serialization schema produce 
multiple records from one input record?

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.

It seems that https://issues.apache.org/jira/browse/FLINK-33293 and this one 
suggests that the flush API doesn't throw exception on timeout, in rare cases.


was (Author: mason6345):
Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink. By any chance, does your KafkaSink serialization schema produce 
multiple records from one input record?

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records 

[jira] [Comment Edited] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-30 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791866#comment-17791866
 ] 

Mason Chen edited comment on FLINK-33545 at 12/1/23 1:29 AM:
-

Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink. By any chance, does your KafkaSink serialization schema produce 
multiple records from one input record?

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.

It seems that https://issues.apache.org/jira/browse/FLINK-33293 and this one 
suggests that Flink API does not throw exception on checkpoint timeout when 
Kafka producer timeout is very close.


was (Author: mason6345):
Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink. By any chance, does your KafkaSink serialization schema produce 
multiple records from one input record?

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.

It seems that https://issues.apache.org/jira/browse/FLINK-33293 and this one 
suggests that the flush API doesn't throw exception on timeout, in rare cases.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes 

[jira] [Comment Edited] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-30 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791866#comment-17791866
 ] 

Mason Chen edited comment on FLINK-33545 at 12/1/23 1:24 AM:
-

Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink. By any chance, does your KafkaSink serialization schema produce 
multiple records from one input record?

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.


was (Author: mason6345):
Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink. By any chance, does your serialization schema produce multiple 
records from one input record?

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows 

[jira] [Comment Edited] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-30 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791866#comment-17791866
 ] 

Mason Chen edited comment on FLINK-33545 at 12/1/23 1:24 AM:
-

Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink. By any chance, does your serialization schema produce multiple 
records from one input record?

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.


was (Author: mason6345):
Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100.

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-30 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791866#comment-17791866
 ] 

Mason Chen commented on FLINK-33545:


Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100.

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously 

[jira] [Commented] (FLINK-33567) Flink documentation should only display connector downloads links when a connector is available

2023-11-17 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787367#comment-17787367
 ] 

Mason Chen commented on FLINK-33567:


[~martijnvisser] thanks for fixing this!

> Flink documentation should only display connector downloads links when a 
> connector is available
> ---
>
> Key: FLINK-33567
> URL: https://issues.apache.org/jira/browse/FLINK-33567
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.16.3, 1.19.0, 1.18.1, 1.17.3
>
>
> We currently have the situation that:
> 1. When visiting the master documentation, a message is correctly displayed 
> that there are only connectors available for stable (= released) versions of 
> Flink
> 2. When visiting the docs for release-1.18 or release-1.17, sometimes 
> download links to non-existing links are displayed, because there's no 
> compatible version of the connector (yet) available. 
> In order to solve this, we should:
> 1. Add a Flink Compatibility collection to the connector repo doc
> 2. Use the compatibility collection in the documentation to display the 
> correct links if a connector version for that Flink version is available, and 
> else display the message that there's no connector release available for that 
> Flink version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33575) FLIP-394: Add Metrics for Connector Agnostic Autoscaling

2023-11-16 Thread Mason Chen (Jira)
Mason Chen created FLINK-33575:
--

 Summary: FLIP-394: Add Metrics for Connector Agnostic Autoscaling
 Key: FLINK-33575
 URL: https://issues.apache.org/jira/browse/FLINK-33575
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common, Kubernetes Operator, Runtime / 
Metrics
Reporter: Mason Chen


https://cwiki.apache.org/confluence/display/FLINK/FLIP-394%3A+Add+Metrics+for+Connector+Agnostic+Autoscaling



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33293) Data loss with Kafka Sink

2023-10-18 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776853#comment-17776853
 ] 

Mason Chen commented on FLINK-33293:


Hi, this case should be handled by FLINK-31305 as [~tzulitai] mentioned. On 
delivery timeout, the flush should fail and restart from the previous 
checkpoint. I have confirmed that this is correct from your logs (KafkaSink 
threw exception, failed the checkpoint, and subsequently restarted the job). 

 

However, there is something suspicious with the restart, unrelated to the 
KafkaSink.

```

21:46:12,793 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] 
- Restoring job f543de153e861f822176132218942c8d from Checkpoint 5 @ 
1695671165746 for f543de153e861f822176132218942c8d located at 
. 21:46:12,817 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state 
to restore

```

Is it possible that the job did not recover from checkpoint on the restart? 
That would violate at least once semantics.

> Data loss with Kafka Sink
> -
>
> Key: FLINK-33293
> URL: https://issues.apache.org/jira/browse/FLINK-33293
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1, 1.16.2, 1.17.1
>Reporter: Jasmin Redzepovic
>Priority: Major
> Attachments: job.log
>
>
> More info in Slack discussion: 
> [https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1]
>  
> *TLDR:*
> (in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and 
> 1.17 I can reproduce it)
> I have created a sink topic with 8 partitions, a replication factor of 3, and 
> a minimum in-sync replicas of 2. The consumer properties are set to their 
> default values.
> For the producer, I made changes to the delivery.timeout.ms and 
> request.timeout.ms properties, setting them to *5000ms* and *4000ms* 
> respectively. (acks are set to -1 by default, which is equals to _all_ I 
> guess)
> KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job 
> parallelism is set to 1 and the checkpointing interval is set to 2000ms. I 
> started a Flink Job and monitored its logs. Additionally, I was consuming the 
> __consumer_offsets topic in parallel to track when offsets are committed for 
> my consumer group.
> The problematic part occurs during checkpoint 5. Its duration was 5009ms, 
> which exceeds the delivery timeout for Kafka (5000ms). Although it was marked 
> as completed, I believe that the output buffer of KafkaSink was not fully 
> acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 
> but immediately encountered a Kafka {_}TimeoutException: Expiring N 
> records{_}.
> I suspect that this exception originated from checkpoint 5 and that 
> checkpoint 5 should not have been considered successful. The job then failed 
> but recovered from checkpoint 5. Some time after checkpoint 7, consumer 
> offsets were committed to Kafka, and this process repeated once more at 
> checkpoint 9.
> Since the offsets of checkpoint 5 were committed to Kafka, but the output 
> buffer was only partially delivered, there has been data loss. I confirmed 
> this when sinking the topic to the database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33011) Operator deletes HA data unexpectedly

2023-10-06 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772350#comment-17772350
 ] 

Mason Chen edited comment on FLINK-33011 at 10/6/23 7:07 PM:
-

[~gyfora] -could you backport this to 1.6? We are also hitting this bug in 1.6-

It has been already backported. Would be good to update the ticket fix version. 
Thanks for the fix!


was (Author: mason6345):
[~gyfora] -could you backport this to 1.6? We are also hitting this bug in 1.6-

It has been already backported. I'll update the ticket fix versions. Thanks for 
the fix!

> Operator deletes HA data unexpectedly
> -
>
> Key: FLINK-33011
> URL: https://issues.apache.org/jira/browse/FLINK-33011
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: 1.17.1, kubernetes-operator-1.6.0
> Environment: Flink: 1.17.1
> Flink Kubernetes Operator: 1.6.0
>Reporter: Ruibin Xing
>Assignee: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
> Attachments: flink_operator_logs_0831.csv
>
>
> We encountered a problem where the operator unexpectedly deleted HA data.
> The timeline is as follows:
> 12:08 We submitted the first spec, which suspended the job with savepoint 
> upgrade mode.
> 12:08 The job was suspended, while the HA data was preserved, and the log 
> showed the observed job deployment status was MISSING.
> 12:10 We submitted the second spec, which deployed the job with the last 
> state upgrade mode.
> 12:10 Logs showed the operator deleted both the Flink deployment and the HA 
> data again.
> 12:10 The job failed to start because the HA data was missing.
> According to the log, the deletion was triggered by 
> https://github.com/apache/flink-kubernetes-operator/blob/a728ba768e20236184e2b9e9e45163304b8b196c/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L168
> I think this would only be triggered if the job deployment status wasn't 
> MISSING. But the log before the deletion showed the observed job status was 
> MISSING at that moment.
> Related logs:
>  
> {code:java}
> 2023-08-30 12:08:48.190 + o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][default/pipeline-pipeline-se-3] Cluster shutdown completed.
> 2023-08-30 12:10:27.010 + o.a.f.k.o.o.d.ApplicationObserver [INFO 
> ][default/pipeline-pipeline-se-3] Observing JobManager deployment. Previous 
> status: MISSING
> 2023-08-30 12:10:27.533 + o.a.f.k.o.l.AuditUtils         [INFO 
> ][default/pipeline-pipeline-se-3] >>> Event  | Info    | SPECCHANGED     | 
> UPGRADE change(s) detected (Diff: FlinkDeploymentSpec[image : 
> docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:0835137c-362
>  -> 
> docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:23db7ae8-365,
>  podTemplate.metadata.labels.app.kubernetes.io~1version : 
> 0835137cd803b7258695eb53a6ec520cb62a48a7 -> 
> 23db7ae84bdab8d91fa527fe2f8f2fce292d0abc, job.state : suspended -> running, 
> job.upgradeMode : last-state -> savepoint, restartNonce : 1545 -> 1547]), 
> starting reconciliation.
> 2023-08-30 12:10:27.679 + o.a.f.k.o.s.NativeFlinkService [INFO 
> ][default/pipeline-pipeline-se-3] Deleting JobManager deployment and HA 
> metadata.
> {code}
> A more complete log file is attached. Thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33011) Operator deletes HA data unexpectedly

2023-10-06 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772350#comment-17772350
 ] 

Mason Chen edited comment on FLINK-33011 at 10/6/23 7:06 PM:
-

[~gyfora] -could you backport this to 1.6? We are also hitting this bug in 1.6-

It has been already backported. I'll update the ticket fix versions. Thanks for 
the fix!


was (Author: mason6345):
[~gyfora] could you backport this to 1.6? We are also hitting this bug in 1.6

> Operator deletes HA data unexpectedly
> -
>
> Key: FLINK-33011
> URL: https://issues.apache.org/jira/browse/FLINK-33011
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: 1.17.1, kubernetes-operator-1.6.0
> Environment: Flink: 1.17.1
> Flink Kubernetes Operator: 1.6.0
>Reporter: Ruibin Xing
>Assignee: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
> Attachments: flink_operator_logs_0831.csv
>
>
> We encountered a problem where the operator unexpectedly deleted HA data.
> The timeline is as follows:
> 12:08 We submitted the first spec, which suspended the job with savepoint 
> upgrade mode.
> 12:08 The job was suspended, while the HA data was preserved, and the log 
> showed the observed job deployment status was MISSING.
> 12:10 We submitted the second spec, which deployed the job with the last 
> state upgrade mode.
> 12:10 Logs showed the operator deleted both the Flink deployment and the HA 
> data again.
> 12:10 The job failed to start because the HA data was missing.
> According to the log, the deletion was triggered by 
> https://github.com/apache/flink-kubernetes-operator/blob/a728ba768e20236184e2b9e9e45163304b8b196c/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L168
> I think this would only be triggered if the job deployment status wasn't 
> MISSING. But the log before the deletion showed the observed job status was 
> MISSING at that moment.
> Related logs:
>  
> {code:java}
> 2023-08-30 12:08:48.190 + o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][default/pipeline-pipeline-se-3] Cluster shutdown completed.
> 2023-08-30 12:10:27.010 + o.a.f.k.o.o.d.ApplicationObserver [INFO 
> ][default/pipeline-pipeline-se-3] Observing JobManager deployment. Previous 
> status: MISSING
> 2023-08-30 12:10:27.533 + o.a.f.k.o.l.AuditUtils         [INFO 
> ][default/pipeline-pipeline-se-3] >>> Event  | Info    | SPECCHANGED     | 
> UPGRADE change(s) detected (Diff: FlinkDeploymentSpec[image : 
> docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:0835137c-362
>  -> 
> docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:23db7ae8-365,
>  podTemplate.metadata.labels.app.kubernetes.io~1version : 
> 0835137cd803b7258695eb53a6ec520cb62a48a7 -> 
> 23db7ae84bdab8d91fa527fe2f8f2fce292d0abc, job.state : suspended -> running, 
> job.upgradeMode : last-state -> savepoint, restartNonce : 1545 -> 1547]), 
> starting reconciliation.
> 2023-08-30 12:10:27.679 + o.a.f.k.o.s.NativeFlinkService [INFO 
> ][default/pipeline-pipeline-se-3] Deleting JobManager deployment and HA 
> metadata.
> {code}
> A more complete log file is attached. Thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33011) Operator deletes HA data unexpectedly

2023-10-05 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772350#comment-17772350
 ] 

Mason Chen commented on FLINK-33011:


[~gyfora] could you backport this to 1.6? We are also hitting this bug in 1.6

> Operator deletes HA data unexpectedly
> -
>
> Key: FLINK-33011
> URL: https://issues.apache.org/jira/browse/FLINK-33011
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: 1.17.1, kubernetes-operator-1.6.0
> Environment: Flink: 1.17.1
> Flink Kubernetes Operator: 1.6.0
>Reporter: Ruibin Xing
>Assignee: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
> Attachments: flink_operator_logs_0831.csv
>
>
> We encountered a problem where the operator unexpectedly deleted HA data.
> The timeline is as follows:
> 12:08 We submitted the first spec, which suspended the job with savepoint 
> upgrade mode.
> 12:08 The job was suspended, while the HA data was preserved, and the log 
> showed the observed job deployment status was MISSING.
> 12:10 We submitted the second spec, which deployed the job with the last 
> state upgrade mode.
> 12:10 Logs showed the operator deleted both the Flink deployment and the HA 
> data again.
> 12:10 The job failed to start because the HA data was missing.
> According to the log, the deletion was triggered by 
> https://github.com/apache/flink-kubernetes-operator/blob/a728ba768e20236184e2b9e9e45163304b8b196c/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L168
> I think this would only be triggered if the job deployment status wasn't 
> MISSING. But the log before the deletion showed the observed job status was 
> MISSING at that moment.
> Related logs:
>  
> {code:java}
> 2023-08-30 12:08:48.190 + o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][default/pipeline-pipeline-se-3] Cluster shutdown completed.
> 2023-08-30 12:10:27.010 + o.a.f.k.o.o.d.ApplicationObserver [INFO 
> ][default/pipeline-pipeline-se-3] Observing JobManager deployment. Previous 
> status: MISSING
> 2023-08-30 12:10:27.533 + o.a.f.k.o.l.AuditUtils         [INFO 
> ][default/pipeline-pipeline-se-3] >>> Event  | Info    | SPECCHANGED     | 
> UPGRADE change(s) detected (Diff: FlinkDeploymentSpec[image : 
> docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:0835137c-362
>  -> 
> docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:23db7ae8-365,
>  podTemplate.metadata.labels.app.kubernetes.io~1version : 
> 0835137cd803b7258695eb53a6ec520cb62a48a7 -> 
> 23db7ae84bdab8d91fa527fe2f8f2fce292d0abc, job.state : suspended -> running, 
> job.upgradeMode : last-state -> savepoint, restartNonce : 1545 -> 1547]), 
> starting reconciliation.
> 2023-08-30 12:10:27.679 + o.a.f.k.o.s.NativeFlinkService [INFO 
> ][default/pipeline-pipeline-se-3] Deleting JobManager deployment and HA 
> metadata.
> {code}
> A more complete log file is attached. Thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33104) Nightly run for Flink Kafka connector fails due to architecture tests failing

2023-09-28 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770197#comment-17770197
 ] 

Mason Chen edited comment on FLINK-33104 at 9/28/23 6:39 PM:
-

[~martijnvisser] do we need any help with updating the particular unit tests to 
unblock this? I have some bandwidth. It seems arch test error report is valid 
and affects important tests so I wouldn't pursue an alternative method to 
ignore the errors

cc [~tzulitai] 


was (Author: mason6345):
[~martijnvisser] do we need any help with updating the particular unit tests to 
unblock this? I have some bandwidth. It seems arch test error report is valid 
and affects important tests so I wouldn't pursue an alternative method to 
ignore the errors

> Nightly run for Flink Kafka connector fails due to architecture tests failing
> -
>
> Key: FLINK-33104
> URL: https://issues.apache.org/jira/browse/FLINK-33104
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Priority: Blocker
>
> {code:java}
> 2023-09-17T00:29:07.1675694Z [WARNING] Tests run: 18, Failures: 0, Errors: 0, 
> Skipped: 9, Time elapsed: 308.532 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerMigrationTest
> 2023-09-17T00:29:07.5171608Z [INFO] 
> 2023-09-17T00:29:07.5172360Z [INFO] Results:
> 2023-09-17T00:29:07.5172773Z [INFO] 
> 2023-09-17T00:29:07.5173139Z [ERROR] Failures: 
> 2023-09-17T00:29:07.5174181Z [ERROR]   Architecture Violation [Priority: 
> MEDIUM] - Rule 'ITCASE tests should use a MiniCluster resource or extension' 
> was violated (13 times):
> 2023-09-17T00:29:07.5176050Z 
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducerITCase does 
> not satisfy: only one of the following predicates match:
> 2023-09-17T00:29:07.5177452Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5179831Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5181277Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> InternalMiniClusterExtension
> 2023-09-17T00:29:07.5182154Z * reside outside of package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> MiniClusterExtension
> 2023-09-17T00:29:07.5182951Z  or contain any fields that are public, static, 
> and of type MiniClusterWithClientResource and final and annotated with 
> @ClassRule or contain any fields that is of type 
> MiniClusterWithClientResource and public and final and not static and 
> annotated with @Rule
> 2023-09-17T00:29:07.5183906Z 
> org.apache.flink.connector.kafka.sink.KafkaSinkITCase does not satisfy: only 
> one of the following predicates match:
> 2023-09-17T00:29:07.5184769Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5185812Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5186880Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> InternalMiniClusterExtension
> 2023-09-17T00:29:07.5187929Z * reside outside of package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> MiniClusterExtension
> 2023-09-17T00:29:07.5189073Z  or contain any fields that are public, static, 
> and of type MiniClusterWithClientResource and final and annotated with 
> @ClassRule or contain any fields that is of type 
> MiniClusterWithClientResource and public and final and not static and 
> annotated with @Rule
> 2023-09-17T00:29:07.5190076Z 
> org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase does not 
> satisfy: only one of the following predicates match:
> 2023-09-17T00:29:07.5190946Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5191983Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type 

[jira] [Commented] (FLINK-33104) Nightly run for Flink Kafka connector fails due to architecture tests failing

2023-09-28 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770197#comment-17770197
 ] 

Mason Chen commented on FLINK-33104:


[~martijnvisser] do we need any help with updating the particular unit tests to 
unblock this? I have some bandwidth. It seems arch test error report is valid 
and affects important tests so I wouldn't pursue an alternative method to 
ignore the errors

> Nightly run for Flink Kafka connector fails due to architecture tests failing
> -
>
> Key: FLINK-33104
> URL: https://issues.apache.org/jira/browse/FLINK-33104
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Priority: Blocker
>
> {code:java}
> 2023-09-17T00:29:07.1675694Z [WARNING] Tests run: 18, Failures: 0, Errors: 0, 
> Skipped: 9, Time elapsed: 308.532 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerMigrationTest
> 2023-09-17T00:29:07.5171608Z [INFO] 
> 2023-09-17T00:29:07.5172360Z [INFO] Results:
> 2023-09-17T00:29:07.5172773Z [INFO] 
> 2023-09-17T00:29:07.5173139Z [ERROR] Failures: 
> 2023-09-17T00:29:07.5174181Z [ERROR]   Architecture Violation [Priority: 
> MEDIUM] - Rule 'ITCASE tests should use a MiniCluster resource or extension' 
> was violated (13 times):
> 2023-09-17T00:29:07.5176050Z 
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducerITCase does 
> not satisfy: only one of the following predicates match:
> 2023-09-17T00:29:07.5177452Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5179831Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5181277Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> InternalMiniClusterExtension
> 2023-09-17T00:29:07.5182154Z * reside outside of package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> MiniClusterExtension
> 2023-09-17T00:29:07.5182951Z  or contain any fields that are public, static, 
> and of type MiniClusterWithClientResource and final and annotated with 
> @ClassRule or contain any fields that is of type 
> MiniClusterWithClientResource and public and final and not static and 
> annotated with @Rule
> 2023-09-17T00:29:07.5183906Z 
> org.apache.flink.connector.kafka.sink.KafkaSinkITCase does not satisfy: only 
> one of the following predicates match:
> 2023-09-17T00:29:07.5184769Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5185812Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5186880Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> InternalMiniClusterExtension
> 2023-09-17T00:29:07.5187929Z * reside outside of package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> MiniClusterExtension
> 2023-09-17T00:29:07.5189073Z  or contain any fields that are public, static, 
> and of type MiniClusterWithClientResource and final and annotated with 
> @ClassRule or contain any fields that is of type 
> MiniClusterWithClientResource and public and final and not static and 
> annotated with @Rule
> 2023-09-17T00:29:07.5190076Z 
> org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase does not 
> satisfy: only one of the following predicates match:
> 2023-09-17T00:29:07.5190946Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5191983Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5192845Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> InternalMiniClusterExtension
> 2023-09-17T00:29:07.5193532Z * reside outside 

[jira] [Commented] (FLINK-33104) Nightly run for Flink Kafka connector fails due to architecture tests failing

2023-09-20 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767232#comment-17767232
 ] 

Mason Chen commented on FLINK-33104:


Do we want to ignore the violations for now? or do we want to migrate tests to 
JUnit5? I can help do some of the refactoring.

Moreover, it seems like any arch unit rule addition can easily break builds. I 
don't know if there is anything to make this situation more stable since we 
rely on the SNAPSHOT artifacts

> Nightly run for Flink Kafka connector fails due to architecture tests failing
> -
>
> Key: FLINK-33104
> URL: https://issues.apache.org/jira/browse/FLINK-33104
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Martijn Visser
>Priority: Blocker
>
> {code:java}
> 2023-09-17T00:29:07.1675694Z [WARNING] Tests run: 18, Failures: 0, Errors: 0, 
> Skipped: 9, Time elapsed: 308.532 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerMigrationTest
> 2023-09-17T00:29:07.5171608Z [INFO] 
> 2023-09-17T00:29:07.5172360Z [INFO] Results:
> 2023-09-17T00:29:07.5172773Z [INFO] 
> 2023-09-17T00:29:07.5173139Z [ERROR] Failures: 
> 2023-09-17T00:29:07.5174181Z [ERROR]   Architecture Violation [Priority: 
> MEDIUM] - Rule 'ITCASE tests should use a MiniCluster resource or extension' 
> was violated (13 times):
> 2023-09-17T00:29:07.5176050Z 
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducerITCase does 
> not satisfy: only one of the following predicates match:
> 2023-09-17T00:29:07.5177452Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5179831Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5181277Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> InternalMiniClusterExtension
> 2023-09-17T00:29:07.5182154Z * reside outside of package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> MiniClusterExtension
> 2023-09-17T00:29:07.5182951Z  or contain any fields that are public, static, 
> and of type MiniClusterWithClientResource and final and annotated with 
> @ClassRule or contain any fields that is of type 
> MiniClusterWithClientResource and public and final and not static and 
> annotated with @Rule
> 2023-09-17T00:29:07.5183906Z 
> org.apache.flink.connector.kafka.sink.KafkaSinkITCase does not satisfy: only 
> one of the following predicates match:
> 2023-09-17T00:29:07.5184769Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5185812Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5186880Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> InternalMiniClusterExtension
> 2023-09-17T00:29:07.5187929Z * reside outside of package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> MiniClusterExtension
> 2023-09-17T00:29:07.5189073Z  or contain any fields that are public, static, 
> and of type MiniClusterWithClientResource and final and annotated with 
> @ClassRule or contain any fields that is of type 
> MiniClusterWithClientResource and public and final and not static and 
> annotated with @Rule
> 2023-09-17T00:29:07.5190076Z 
> org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase does not 
> satisfy: only one of the following predicates match:
> 2023-09-17T00:29:07.5190946Z * reside in a package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type InternalMiniClusterExtension and annotated with @RegisterExtension
> 2023-09-17T00:29:07.5191983Z * reside outside of package 
> 'org.apache.flink.runtime.*' and contain any fields that are static, final, 
> and of type MiniClusterExtension and annotated with @RegisterExtension or are 
> , and of type MiniClusterTestEnvironment and annotated with @TestEnv
> 2023-09-17T00:29:07.5192845Z * reside in a package 
> 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class 
> 

[jira] [Updated] (FLINK-32197) FLIP 246: Dynamic Kafka Source

2023-09-19 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32197:
---
Affects Version/s: kafka-3.1.0
   (was: kafka-3.0.0)

> FLIP 246: Dynamic Kafka Source
> --
>
> Key: FLINK-32197
> URL: https://issues.apache.org/jira/browse/FLINK-32197
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: kafka-3.1.0
>
>
> This is for introducing a new connector that extends off the current 
> KafkaSource to read multiple Kafka clusters, which can change dynamically.
> For more details, please refer to [FLIP 
> 246|https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32417) DynamicKafkaSource User Documentation

2023-09-19 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32417:
---
Affects Version/s: kafka-3.1.0

> DynamicKafkaSource User Documentation
> -
>
> Key: FLINK-32417
> URL: https://issues.apache.org/jira/browse/FLINK-32417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: kafka-3.1.0
>
>
> Add user documentation for DynamicKafkaSource



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32416) Initial DynamicKafkaSource Implementation

2023-09-19 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32416:
---
Affects Version/s: kafka-3.1.0

> Initial DynamicKafkaSource Implementation 
> --
>
> Key: FLINK-32416
> URL: https://issues.apache.org/jira/browse/FLINK-32416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.1.0
>
>
> Implementation that supports unbounded and bounded modes. With a default 
> implementation of KafkaMetadataService



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32197) FLIP 246: Dynamic Kafka Source

2023-09-19 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32197:
---
Affects Version/s: kafka-3.0.0

> FLIP 246: Dynamic Kafka Source
> --
>
> Key: FLINK-32197
> URL: https://issues.apache.org/jira/browse/FLINK-32197
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: kafka-3.1.0
>
>
> This is for introducing a new connector that extends off the current 
> KafkaSource to read multiple Kafka clusters, which can change dynamically.
> For more details, please refer to [FLIP 
> 246|https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32893) Make client.id configurable from KafkaSource

2023-09-19 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32893:
---
Affects Version/s: kafka-3.0.0
   (was: 1.17.1)

> Make client.id configurable from KafkaSource
> 
>
> Key: FLINK-32893
> URL: https://issues.apache.org/jira/browse/FLINK-32893
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: kafka-3.0.0
>Reporter: Mason Chen
>Priority: Major
>  Labels: pull-request-available
>
> The client id is not strictly configurable from the KafkaSource because it 
> appends a configurable prefix and subtask information to avoid the mbean 
> conflict exception messages that are internal to the Kafka client.
>  
> However, various users reported that they need use this client.id for Kafka 
> quotas and they need to have control over the client.id to enforce quotas 
> properly.
>  
> Affects Kafka connector 3.1 and below.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32893) Make client.id configurable from KafkaSource

2023-09-19 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17766948#comment-17766948
 ] 

Mason Chen commented on FLINK-32893:


We discussed this offline. It is unclear whether or not this is actually 
required since Kafka quotas seem to be able to use an opaque principal name as 
per [https://docs.confluent.io/kafka/design/quotas.html]. The client.id 
parameter is optional from the Kafka perspective and I'm not sure we should 
violate this. 

In addition, it doesn't make sense to use quotas without proper auth and proper 
auth as noted in the docs should be able to identify a client without the 
`client.id` parameter.

I would opt to mark this ticket as "not a problem" unless there are differing 
opinions

> Make client.id configurable from KafkaSource
> 
>
> Key: FLINK-32893
> URL: https://issues.apache.org/jira/browse/FLINK-32893
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.17.1
>Reporter: Mason Chen
>Priority: Major
>  Labels: pull-request-available
>
> The client id is not strictly configurable from the KafkaSource because it 
> appends a configurable prefix and subtask information to avoid the mbean 
> conflict exception messages that are internal to the Kafka client.
>  
> However, various users reported that they need use this client.id for Kafka 
> quotas and they need to have control over the client.id to enforce quotas 
> properly.
>  
> Affects Kafka connector 3.1 and below.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32417) DynamicKafkaSource User Documentation

2023-09-19 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32417:
---
Fix Version/s: kafka-3.1.0

> DynamicKafkaSource User Documentation
> -
>
> Key: FLINK-32417
> URL: https://issues.apache.org/jira/browse/FLINK-32417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Mason Chen
>Priority: Major
> Fix For: kafka-3.1.0
>
>
> Add user documentation for DynamicKafkaSource



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32197) FLIP 246: Dynamic Kafka Source

2023-09-19 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32197:
---
Fix Version/s: kafka-3.1.0

> FLIP 246: Dynamic Kafka Source
> --
>
> Key: FLINK-32197
> URL: https://issues.apache.org/jira/browse/FLINK-32197
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: kafka-3.1.0
>
>
> This is for introducing a new connector that extends off the current 
> KafkaSource to read multiple Kafka clusters, which can change dynamically.
> For more details, please refer to [FLIP 
> 246|https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32197) FLIP 246: Dynamic Kafka Source

2023-09-19 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17766945#comment-17766945
 ] 

Mason Chen commented on FLINK-32197:


Hi [~yunta], there is a PR out for FLINK-32416. I actually can't assign those 
subtasks to myself since I don't have permissions. Otherwise, I would mark them 
as "in progress". 

Currently I am waiting for a review and talked to [~tzulitai] offline since he 
was interested in this FLIP. He would have more time to look at it after this 
month and, if not, I can start to ask some internal folks to review it as well, 
though it would be great to get others to review. 

> FLIP 246: Dynamic Kafka Source
> --
>
> Key: FLINK-32197
> URL: https://issues.apache.org/jira/browse/FLINK-32197
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>
> This is for introducing a new connector that extends off the current 
> KafkaSource to read multiple Kafka clusters, which can change dynamically.
> For more details, please refer to [FLIP 
> 246|https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32416) Initial DynamicKafkaSource Implementation

2023-09-19 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32416:
---
Fix Version/s: kafka-3.1.0

> Initial DynamicKafkaSource Implementation 
> --
>
> Key: FLINK-32416
> URL: https://issues.apache.org/jira/browse/FLINK-32416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.1.0
>
>
> Implementation that supports unbounded and bounded modes. With a default 
> implementation of KafkaMetadataService



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32892) Handle Kafka clients JMX mBean errors by disabling JMX metrics

2023-09-19 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17766942#comment-17766942
 ] 

Mason Chen commented on FLINK-32892:


Renamed the title as I just realized 
https://issues.apache.org/jira/browse/FLINK-31599 is completed

> Handle Kafka clients JMX mBean errors by disabling JMX metrics
> --
>
> Key: FLINK-32892
> URL: https://issues.apache.org/jira/browse/FLINK-32892
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Mason Chen
>Priority: Major
>
> 3.4.x includes an implementation for KIP-830. 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter]
> Many people on the mailing list have complained about confusing warning logs 
> about mbean conflicts. We should be safe to disable this JMX reporter since 
> Flink has its own metric system and the internal kafka-clients JMX reporter 
> should not be used. 
> This affects Kafka connector release 3.1 and below (for some reason I cannot 
> enter 3.1 in the affects version/s box).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32892) Handle Kafka clients JMX mBean errors by disabling JMX metrics

2023-09-19 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32892:
---
Summary: Handle Kafka clients JMX mBean errors by disabling JMX metrics  
(was: Upgrade kafka-clients dependency to 3.4.x)

> Handle Kafka clients JMX mBean errors by disabling JMX metrics
> --
>
> Key: FLINK-32892
> URL: https://issues.apache.org/jira/browse/FLINK-32892
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Mason Chen
>Priority: Major
>
> 3.4.x includes an implementation for KIP-830. 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter]
> Many people on the mailing list have complained about confusing warning logs 
> about mbean conflicts. We should be safe to disable this JMX reporter since 
> Flink has its own metric system and the internal kafka-clients JMX reporter 
> should not be used. 
> This affects Kafka connector release 3.1 and below (for some reason I cannot 
> enter 3.1 in the affects version/s box).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32892) Handle Kafka clients JMX mBean errors by disabling JMX metrics

2023-09-19 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32892:
---
Fix Version/s: kafka-3.1.0

> Handle Kafka clients JMX mBean errors by disabling JMX metrics
> --
>
> Key: FLINK-32892
> URL: https://issues.apache.org/jira/browse/FLINK-32892
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Mason Chen
>Priority: Major
> Fix For: kafka-3.1.0
>
>
> 3.4.x includes an implementation for KIP-830. 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter]
> Many people on the mailing list have complained about confusing warning logs 
> about mbean conflicts. We should be safe to disable this JMX reporter since 
> Flink has its own metric system and the internal kafka-clients JMX reporter 
> should not be used. 
> This affects Kafka connector release 3.1 and below (for some reason I cannot 
> enter 3.1 in the affects version/s box).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22014) Flink JobManager failed to restart after failure in kubernetes HA setup

2023-09-19 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17766891#comment-17766891
 ] 

Mason Chen commented on FLINK-22014:


We are also hitting this issue with Flink 1.16.2

> Flink JobManager failed to restart after failure in kubernetes HA setup
> ---
>
> Key: FLINK-22014
> URL: https://issues.apache.org/jira/browse/FLINK-22014
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: Mikalai Lushchytski
>Priority: Major
>  Labels: k8s-ha, pull-request-available
> Attachments: flink-logs.txt.zip, image-2021-04-19-11-17-58-215.png, 
> scalyr-logs (1).txt
>
>
> After the JobManager pod failed and the new one started, it was not able to 
> recover jobs due to the absence of recovery data in storage - config map 
> pointed at not existing file.
>   
>  Due to this the JobManager pod entered into the `CrashLoopBackOff`state and 
> was not able to recover - each attempt failed with the same error so the 
> whole cluster became unrecoverable and not operating.
>   
>  I had to manually delete the config map and start the jobs again without the 
> save point.
>   
>  If I tried to emulate the failure further by deleting job manager pod 
> manually, the new pod every time recovered well and issue was not 
> reproducible anymore artificially.
>   
>  Below is the failure log:
> {code:java}
> 2021-03-26 08:22:57,925 INFO 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - 
> Starting the SlotManager.
>  2021-03-26 08:22:57,928 INFO 
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
> Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver
> {configMapName='stellar-flink-cluster-dispatcher-leader'}.
>  2021-03-26 08:22:57,931 INFO 
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Retrieved job 
> ids [198c46bac791e73ebcc565a550fa4ff6, 344f5ebc1b5c3a566b4b2837813e4940, 
> 96c4603a0822d10884f7fe536703d811, d9ded24224aab7c7041420b3efc1b6ba] from 
> KubernetesStateHandleStore{configMapName='stellar-flink-cluster-dispatcher-leader'}
> 2021-03-26 08:22:57,933 INFO 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] 
> - Trying to recover job with job id 198c46bac791e73ebcc565a550fa4ff6.
>  2021-03-26 08:22:58,029 INFO 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] 
> - Stopping SessionDispatcherLeaderProcess.
>  2021-03-26 08:28:22,677 INFO 
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Stopping 
> DefaultJobGraphStore. 2021-03-26 08:28:22,681 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error 
> occurred in the cluster entrypoint. java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkRuntimeException: Could not recover job with job 
> id 198c46bac791e73ebcc565a550fa4ff6.
>at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) 
> ~[?:?]
>at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) [?:?]
>at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
> [?:?]
>at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
>at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
>at java.lang.Thread.run(Unknown Source) [?:?] Caused by: 
> org.apache.flink.util.FlinkRuntimeException: Could not recover job with job 
> id 198c46bac791e73ebcc565a550fa4ff6.
>at 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:144
>  undefined) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>at 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:122
>  undefined) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>at 
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198
>  undefined) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>at 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:113
>  undefined) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 4 more 
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve submitted 
> JobGraph from state handle under jobGraph-198c46bac791e73ebcc565a550fa4ff6. 
> This indicates that the retrieved state handle is broken. Try cleaning the 
> state handle store.
>at 
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.recoverJobGraph(DefaultJobGraphStore.java:171
>  undefined) 

[jira] [Commented] (FLINK-31121) KafkaSink should be able to catch and ignore exp via config on/off

2023-09-05 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762235#comment-17762235
 ] 

Mason Chen commented on FLINK-31121:


[~martijnvisser] [~zjureel] Hi! I'm going to take a look at this–making error 
handling more generic

> KafkaSink should be able to catch and ignore exp via config on/off
> --
>
> Key: FLINK-31121
> URL: https://issues.apache.org/jira/browse/FLINK-31121
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0
>Reporter: Jing Ge
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> It is a common requirement for users to catch and ignore exp while sinking 
> the event to to downstream system like Kafka. It will be convenient for some 
> use cases, if Flink Sink can provide built-in functionality and config to 
> turn it on and off, especially for cases that data consistency is not very 
> important or the stream contains dirty events. [1][2]
> First of all, consider doing it for KafkaSink. Long term, a common solution 
> that can be used by any connector would be even better.
>  
> [1][https://lists.apache.org/thread/wy31s8wb9qnskq29wn03kp608z4vrwv8]
> [2]https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31871) Interpret Flink MemoryUnits according to the actual user input

2023-09-05 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1776#comment-1776
 ] 

Mason Chen commented on FLINK-31871:


It would be nice and intuitive to support what Kubernetes supports (both 
notations e.g. 400 Mi and 400M).

> Interpret Flink MemoryUnits according to the actual user input
> --
>
> Key: FLINK-31871
> URL: https://issues.apache.org/jira/browse/FLINK-31871
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Kubernetes Operator
>Reporter: Alexander Fedulov
>Priority: Major
>
> Currently all MemorySize.MemoryUnits are interpreted in "bibyte" notation, 
> regardless of the units that users specify: 
> [https://github.com/apache/flink/blob/release-1.17/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java#L352-L356]
> (i.e. G = Gi)
> Flink Kubernetes Operator utilizes these units for specifying resources in 
> user-facing CR API (taskManager.resource.memory, jobManager.resource.memory). 
> In other places this CR requires native K8S units specification (i.e. 
> spec.containers[*].ephemeral-storage). 
> There are two issues with this:
>  * users cannot rely on the same units notation (taskManager.resource.memory 
> = 16Gi fails)
>  * taskManager.resource.memory = 16G is not interpreted as other units in the 
> spec (16G is implicitly converted into 16Gi)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30461) Some rocksdb sst files will remain forever

2023-08-22 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757746#comment-17757746
 ] 

Mason Chen commented on FLINK-30461:


[~fanrui] Thanks for fixing this! Just curious, how did you figure out that 
some SST files were not being cleaned up? Are there any tricks to discover the 
issue outside of reading the code? I recently hit this issue too but all I saw 
was that SST sizes continuous growth from RocksDB metrics.

> Some rocksdb sst files will remain forever
> --
>
> Key: FLINK-30461
> URL: https://issues.apache.org/jira/browse/FLINK-30461
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0, 1.15.3
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
> Attachments: image-2022-12-20-18-45-32-948.png, 
> image-2022-12-20-18-47-42-385.png, screenshot-1.png
>
>
> In rocksdb incremental checkpoint mode, during file upload, if some files 
> have been uploaded and some files have not been uploaded, the checkpoint is 
> canceled due to checkpoint timeout at this time, and the uploaded files will 
> remain.
>  
> h2. Impact: 
> The shared directory of a flink job has more than 1 million files. It 
> exceeded the hdfs upper limit, causing new files not to be written.
> However only 50k files are available, the other 950k files should be cleaned 
> up.
> !https://user-images.githubusercontent.com/38427477/207588272-dda7ba69-c84c-4372-aeb4-c54657b9b956.png|width=1962,height=364!
> h2. Root cause:
> If an exception is thrown during the checkpoint async phase, flink will clean 
> up metaStateHandle, miscFiles and sstFiles.
> However, when all sst files are uploaded, they are added together to 
> sstFiles. If some sst files have been uploaded and some sst files are still 
> being uploaded, and  the checkpoint is canceled due to checkpoint timeout at 
> this time, all sst files will not be added to sstFiles. The uploaded sst will 
> remain on hdfs.
> [code 
> link|https://github.com/apache/flink/blob/49146cdec41467445de5fc81f100585142728bdf/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L328]
> h2. Solution:
> Using the CloseableRegistry as the tmpResourcesRegistry. If the async phase 
> is failed, the tmpResourcesRegistry will cleanup these temporary resources.
>  
> POC code:
> [https://github.com/1996fanrui/flink/commit/86a456b2bbdad6c032bf8e0bff71c4824abb3ce1]
>  
>  
> !image-2022-12-20-18-45-32-948.png|width=1114,height=442!
> !image-2022-12-20-18-47-42-385.png|width=1332,height=552!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32507) Document KafkaSink SinkWriterMetricGroup metrics

2023-08-17 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755732#comment-17755732
 ] 

Mason Chen commented on FLINK-32507:


Yes it is, closed and marked as dup

> Document KafkaSink SinkWriterMetricGroup metrics
> 
>
> Key: FLINK-32507
> URL: https://issues.apache.org/jira/browse/FLINK-32507
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Mason Chen
>Priority: Major
>
> SinkWriterMetricGroup metrics that KafkaSink implements are not documented



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32507) Document KafkaSink SinkWriterMetricGroup metrics

2023-08-17 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen closed FLINK-32507.
--
Resolution: Duplicate

> Document KafkaSink SinkWriterMetricGroup metrics
> 
>
> Key: FLINK-32507
> URL: https://issues.apache.org/jira/browse/FLINK-32507
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Mason Chen
>Priority: Major
>
> SinkWriterMetricGroup metrics that KafkaSink implements are not documented



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32893) Make client.id configurable from KafkaSource

2023-08-17 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755707#comment-17755707
 ] 

Mason Chen commented on FLINK-32893:


I was talking to [~ryanvanhuuksloot] offline, he's interested in picking this up

 

> Make client.id configurable from KafkaSource
> 
>
> Key: FLINK-32893
> URL: https://issues.apache.org/jira/browse/FLINK-32893
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.17.1
>Reporter: Mason Chen
>Priority: Major
>
> The client id is not strictly configurable from the KafkaSource because it 
> appends a configurable prefix and subtask information to avoid the mbean 
> conflict exception messages that are internal to the Kafka client.
>  
> However, various users reported that they need use this client.id for Kafka 
> quotas and they need to have control over the client.id to enforce quotas 
> properly.
>  
> Affects Kafka connector 3.1 and below.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32893) Make client.id configurable from KafkaSource

2023-08-17 Thread Mason Chen (Jira)
Mason Chen created FLINK-32893:
--

 Summary: Make client.id configurable from KafkaSource
 Key: FLINK-32893
 URL: https://issues.apache.org/jira/browse/FLINK-32893
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.17.1
Reporter: Mason Chen


The client id is not strictly configurable from the KafkaSource because it 
appends a configurable prefix and subtask information to avoid the mbean 
conflict exception messages that are internal to the Kafka client.

 

However, various users reported that they need use this client.id for Kafka 
quotas and they need to have control over the client.id to enforce quotas 
properly.

 

Affects Kafka connector 3.1 and below.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32892) Upgrade kafka-clients dependency to 3.4.x

2023-08-17 Thread Mason Chen (Jira)
Mason Chen created FLINK-32892:
--

 Summary: Upgrade kafka-clients dependency to 3.4.x
 Key: FLINK-32892
 URL: https://issues.apache.org/jira/browse/FLINK-32892
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.17.1
Reporter: Mason Chen


3.4.x includes an implementation for KIP-830. 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter]

Many people on the mailing list have complained about confusing warning logs 
about mbean conflicts. We should be safe to disable this JMX reporter since 
Flink has its own metric system and the internal kafka-clients JMX reporter 
should not be used. 

This affects Kafka connector release 3.1 and below (for some reason I cannot 
enter 3.1 in the affects version/s box).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2023-08-10 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752901#comment-17752901
 ] 

Mason Chen edited comment on FLINK-32828 at 8/10/23 6:45 PM:
-

>From the logs, it looks like you have a key by and watermark is progressing 
>because that one active partition is moving data to all other operators. I 
>would start by setting and tuning the 
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources


was (Author: mason6345):
>From the logs, it looks like you have a key by and watermark is progressing 
>because that one active partition is moving data to all other operators. I 
>would start by increasing the `forBoundedOutOfOrderness` to account for the 
>"late" arriving data

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Priority: Major
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from each partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  

[jira] [Comment Edited] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2023-08-10 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752901#comment-17752901
 ] 

Mason Chen edited comment on FLINK-32828 at 8/10/23 6:45 PM:
-

>From the logs, it looks like you have a key by and watermark is progressing 
>because that one active partition is moving data to all other operators. I 
>would start by setting and tuning the 
>[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources]
> to account for the idle partitions


was (Author: mason6345):
>From the logs, it looks like you have a key by and watermark is progressing 
>because that one active partition is moving data to all other operators. I 
>would start by setting and tuning the 
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Priority: Major
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from each partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 

[jira] [Comment Edited] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2023-08-10 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752901#comment-17752901
 ] 

Mason Chen edited comment on FLINK-32828 at 8/10/23 6:45 PM:
-

>From the logs, it looks like you have a key by and watermark is progressing 
>because that one active partition is moving data to all other operators. I 
>would start by setting and tuning the  idleness 
>([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources)|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources]
> to account for the idle partitions


was (Author: mason6345):
>From the logs, it looks like you have a key by and watermark is progressing 
>because that one active partition is moving data to all other operators. I 
>would start by setting and tuning the 
>[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources]
> to account for the idle partitions

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Priority: Major
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from each partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 

[jira] [Commented] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2023-08-10 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752901#comment-17752901
 ] 

Mason Chen commented on FLINK-32828:


>From the logs, it looks like you have a key by and watermark is progressing 
>because that one active partition is moving data to all other operators. I 
>would start by increasing the `forBoundedOutOfOrderness` to account for the 
>"late" arriving data

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Priority: Major
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from each partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> 

[jira] [Commented] (FLINK-32822) Add connector option to control whether to enable auto-commit of offsets when checkpoints is enabled

2023-08-09 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752494#comment-17752494
 ] 

Mason Chen commented on FLINK-32822:


This auto-commit is disabled for conscious reasons as it would violate 
consistency guarantees of a checkpoint and I'm not sure we should introduce 
this feature. If you are monitoring lag from server side, is it possible for 
you the monitor lag from the client side? Flink has a good set of metric 
reporters that integrate with external systems?

Also, this does sound like more of issue with the checkpoint. Is there a reason 
why the interval is long?

 

> Add connector option to control whether to enable auto-commit of offsets when 
> checkpoints is enabled
> 
>
> Key: FLINK-32822
> URL: https://issues.apache.org/jira/browse/FLINK-32822
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Zhanghao Chen
>Priority: Major
>
> When checkpointing is enabled, Flink Kafka connector commits the current 
> consuming offset when checkpoints are *completed* although Kafka source does 
> *NOT* rely on committed offsets for fault tolerance. When the checkpoint 
> interval is long, the lag curve will behave in a zig-zag way: the lag will 
> keep increasing, and suddenly drops on a complete checkpoint. It have led to 
> some confusion for users as in 
> [https://stackoverflow.com/questions/76419633/flink-kafka-source-commit-offset-to-error-offset-suddenly-increase-or-decrease]
>  and may also affect external monitoring for setting up alarms (you'll have 
> to set up with a high threshold due to the non-realtime commit of offsets) 
> and autoscaling (the algorithm would need to pay extra effort to distinguish 
> whether the backlog is actually growing or just because the checkpoint is not 
> completed yet).
> Therefore, I think it is worthwhile to add an option to enable auto-commit of 
> offsets when checkpoints is enabled. For DataStream API, it will be adding a 
> configuration method. For Table API, it will be adding a new connector option 
> which wires to the DataStream API configuration underneath.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32507) Document KafkaSink SinkWriterMetricGroup metrics

2023-06-30 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32507:
---
Issue Type: Improvement  (was: Technical Debt)

> Document KafkaSink SinkWriterMetricGroup metrics
> 
>
> Key: FLINK-32507
> URL: https://issues.apache.org/jira/browse/FLINK-32507
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Mason Chen
>Priority: Major
>
> SinkWriterMetricGroup metrics that KafkaSink implements are not documented



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32507) Document KafkaSink SinkWriterMetricGroup metrics

2023-06-30 Thread Mason Chen (Jira)
Mason Chen created FLINK-32507:
--

 Summary: Document KafkaSink SinkWriterMetricGroup metrics
 Key: FLINK-32507
 URL: https://issues.apache.org/jira/browse/FLINK-32507
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Kafka
Reporter: Mason Chen


SinkWriterMetricGroup metrics that KafkaSink implements are not documented



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32496) Sources with idleness and alignment always wait for alignment when part of multiple sources is idle

2023-06-30 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739189#comment-17739189
 ] 

Mason Chen commented on FLINK-32496:


Hi [~haishui] [~fanrui]! I was looking at the user thread too earlier but got 
distracted by internal issues. I can help review! 

> Sources with idleness and alignment always wait for alignment when part of 
> multiple sources is idle
> ---
>
> Key: FLINK-32496
> URL: https://issues.apache.org/jira/browse/FLINK-32496
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.2, 1.17.1
>Reporter: haishui
>Assignee: Rui Fan
>Priority: Major
>
> Sources with idleness and alignment always wait for alignment when part of 
> multiple sources is idle.
> *Root cause:*
> In 
> [SourceOperator|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java],
>  `lastEmittedWatermark` is Long.MAX_VALUE if a source is idle.
> When other source is active, the `currentMaxDesiredWatermark` is less then 
> Long.MAX_VALUE.
> So the `shouldWaitForAlignment` method is always true for idle sources.
>  
> What's more, the source will become idle if a source wait for alignment for a 
> long time, which also should be considered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32452) Refactor SQL Client E2E Test to Remove Kafka SQL Connector Dependency

2023-06-27 Thread Mason Chen (Jira)
Mason Chen created FLINK-32452:
--

 Summary: Refactor SQL Client E2E Test to Remove Kafka SQL 
Connector Dependency
 Key: FLINK-32452
 URL: https://issues.apache.org/jira/browse/FLINK-32452
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Client, Tests
Affects Versions: 1.18.0
Reporter: Mason Chen


Since the Kafka connector has been externalized, we should remove dependencies 
on the external Kafka connector. The E2E sql client test can use a different 
connector to exercise this test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32451) Refactor Confluent Schema Registry E2E Tests to remove Kafka connector dependency

2023-06-27 Thread Mason Chen (Jira)
Mason Chen created FLINK-32451:
--

 Summary: Refactor Confluent Schema Registry E2E Tests to remove 
Kafka connector dependency
 Key: FLINK-32451
 URL: https://issues.apache.org/jira/browse/FLINK-32451
 Project: Flink
  Issue Type: Technical Debt
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Tests
Affects Versions: 1.18.0
Reporter: Mason Chen


Since the Kafka connector has been externalized, we should remove dependencies 
on the external Kafka connector. We can use a different connector to test the 
confluent schema registry format since the format is connector agnostic. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32449) Refactor state machine examples to remove Kafka dependency

2023-06-27 Thread Mason Chen (Jira)
Mason Chen created FLINK-32449:
--

 Summary: Refactor state machine examples to remove Kafka dependency
 Key: FLINK-32449
 URL: https://issues.apache.org/jira/browse/FLINK-32449
 Project: Flink
  Issue Type: Technical Debt
  Components: Documentation
Affects Versions: 1.18.0
Reporter: Mason Chen


Since the Kafka connector has been externalized, we should remove dependencies 
on the external Kafka connector. In this case, we should replace the 
KafkaSource with a example specific generator source, also deleting the 
KafkaEventsGeneratorJob



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32416) Initial DynamicKafkaSource Implementation

2023-06-22 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32416:
---
Component/s: Connectors / Kafka
Description: Implementation that supports unbounded and bounded modes. With 
a default implementation of KafkaMetadataService

> Initial DynamicKafkaSource Implementation 
> --
>
> Key: FLINK-32416
> URL: https://issues.apache.org/jira/browse/FLINK-32416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Mason Chen
>Priority: Major
>
> Implementation that supports unbounded and bounded modes. With a default 
> implementation of KafkaMetadataService



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32417) DynamicKafkaSource User Documentation

2023-06-22 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-32417:
---
Description: Add user documentation for DynamicKafkaSource

> DynamicKafkaSource User Documentation
> -
>
> Key: FLINK-32417
> URL: https://issues.apache.org/jira/browse/FLINK-32417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Mason Chen
>Priority: Major
>
> Add user documentation for DynamicKafkaSource



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >