Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-07-10 Thread via GitHub


AHeise commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1671715837


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##
@@ -72,12 +74,17 @@ private static Properties withTransactionalId(
 return props;
 }
 
+public long getPendingRecordsCount() {
+return pendingRecords.get();
+}
+
 @Override
 public Future send(ProducerRecord record, Callback 
callback) {
 if (inTransaction) {
 hasRecordsInTransaction = true;
 }
-return super.send(record, callback);
+pendingRecords.incrementAndGet();
+return super.send(record, new TrackingCallback(callback));

Review Comment:
   I can't quite follow. I was proposing to use
   
   `return super.send(record, callbackCache.computeIfAbsent(callback, 
TrackingCallback::new));`
   
   So we have 3 cases:
   - New callback, wrap in `TackingCallback` and cache.
   - Existing callback (common case), retrieve existing callback and use it.
   - Remove existing `TackingCallback` from cache if full.
   
   In all cases, both the TackingCallback and the original callback will be 
invoked. The only difference to the code without cache is that we avoiding 
creating extra TrackingCallback instances around the same original callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-07-09 Thread via GitHub


hhktseng commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1670903610


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##
@@ -86,6 +93,11 @@ public void flush() {
 if (inTransaction) {
 flushNewPartitions();
 }
+final long pendingRecordsCount = pendingRecords.get();
+if (pendingRecordsCount != 0) {
+throw new IllegalStateException(
+"Pending record count must be zero at this point: " + 
pendingRecordsCount);

Review Comment:
   i think having reference to the issue reported might allow us better 
tracking the potential problem



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-07-09 Thread via GitHub


hhktseng commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1670902618


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##
@@ -72,12 +74,17 @@ private static Properties withTransactionalId(
 return props;
 }
 
+public long getPendingRecordsCount() {
+return pendingRecords.get();
+}
+
 @Override
 public Future send(ProducerRecord record, Callback 
callback) {
 if (inTransaction) {
 hasRecordsInTransaction = true;
 }
-return super.send(record, callback);
+pendingRecords.incrementAndGet();
+return super.send(record, new TrackingCallback(callback));

Review Comment:
   just want to check, are we proposing putting instance of TrackingCallback 
into the rotating cache? wouldn't that caused the previous callback that might 
not have been invoked to be ignored?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-07-09 Thread via GitHub


AHeise commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1669998602


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##
@@ -86,6 +93,11 @@ public void flush() {
 if (inTransaction) {
 flushNewPartitions();
 }
+final long pendingRecordsCount = pendingRecords.get();
+if (pendingRecordsCount != 0) {
+throw new IllegalStateException(
+"Pending record count must be zero at this point: " + 
pendingRecordsCount);

Review Comment:
   I'd improve the error message as follows:
   
   `Some records have not been fully persisted in Kafka. As a precaution, Flink 
will restart to resume from previous checkpoint. Please report this issue with 
logs on https://issues.apache.org/jira/browse/FLINK-33545`.



##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##
@@ -72,12 +74,17 @@ private static Properties withTransactionalId(
 return props;
 }
 
+public long getPendingRecordsCount() {
+return pendingRecords.get();
+}
+
 @Override
 public Future send(ProducerRecord record, Callback 
callback) {
 if (inTransaction) {
 hasRecordsInTransaction = true;
 }
-return super.send(record, callback);
+pendingRecords.incrementAndGet();
+return super.send(record, new TrackingCallback(callback));

Review Comment:
   This change creates a new callback with every `send`. Since the callback 
being passed in our codebase is mostly constant, we should add a simple cache 
like `new LRUMap(3);`. The number is kind of arbitrary and 1 should work 
already. The most important part is that it shouldn't grow boundless or we get 
the next memory leak if I overlooked a dynamic usage ;).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-07-08 Thread via GitHub


AHeise commented on PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#issuecomment-2213583717

   Please check my comment here. 
https://issues.apache.org/jira/browse/FLINK-33545?focusedCommentId=17863737=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17863737
 
   If everyone is sure that the current fix is addressing the actual issue, 
please go ahead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-07-04 Thread via GitHub


tweise commented on PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#issuecomment-2209349636

   @hhktseng thanks for working on this. Can you please address the review 
comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-07-04 Thread via GitHub


tweise commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1665918661


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##
@@ -396,8 +408,27 @@ public String toString() {
 + transactionalId
 + "', inTransaction="
 + inTransaction
++ ", pendingRecords="
++ pendingRecords.get()
 + ", closed="
 + closed
 + '}';
 }
+
+public class TrackingCallback implements Callback {
+
+private final Callback actualCallback;
+
+public TrackingCallback(final Callback actualCallback) {
+this.actualCallback = actualCallback;
+}
+
+@Override
+public void onCompletion(final RecordMetadata recordMetadata, final 
Exception e) {
+pendingRecords.decrementAndGet();

Review Comment:
   Since it already happened, we should probably decrement as soon as possible?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-04-08 Thread via GitHub


mas-chen commented on PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#issuecomment-2043431836

   @hhktseng were you able to test that this change mitigates your original 
issue? Is there a way to repro in the tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-04-08 Thread via GitHub


mas-chen commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1556268256


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##
@@ -86,6 +93,11 @@ public void flush() {
 if (inTransaction) {
 flushNewPartitions();
 }
+final long pendingRecordsCount = pendingRecords.get();
+if (pendingRecordsCount != 0) {
+throw new IllegalStateException(
+"Pending record count must be zero at this point: " + 
pendingRecordsCount);

Review Comment:
   nit: how about this message like this:
   
   `n pending records after flush. There must be no pending records left.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-04-08 Thread via GitHub


mas-chen commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1556263478


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##
@@ -396,8 +408,27 @@ public String toString() {
 + transactionalId
 + "', inTransaction="
 + inTransaction
++ ", pendingRecords="
++ pendingRecords.get()
 + ", closed="
 + closed
 + '}';
 }
+
+public class TrackingCallback implements Callback {
+
+private final Callback actualCallback;
+
+public TrackingCallback(final Callback actualCallback) {
+this.actualCallback = actualCallback;
+}
+
+@Override
+public void onCompletion(final RecordMetadata recordMetadata, final 
Exception e) {
+pendingRecords.decrementAndGet();

Review Comment:
   Do we want to decrement after the callback is completed? What's the best 
approach semantically?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-04-08 Thread via GitHub


mas-chen commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1556262703


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##
@@ -396,8 +408,27 @@ public String toString() {
 + transactionalId
 + "', inTransaction="
 + inTransaction
++ ", pendingRecords="
++ pendingRecords.get()
 + ", closed="
 + closed
 + '}';
 }
+
+public class TrackingCallback implements Callback {

Review Comment:
   nit: private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-03-18 Thread via GitHub


hhktseng commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1529087212


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##
@@ -74,6 +76,10 @@ private static Properties withTransactionalId(
 
 @Override
 public Future send(ProducerRecord record, Callback 
callback) {
+if (flushGated) {

Review Comment:
   reworked to follow 
[flink-connector-kafka/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java](https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1107)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-02-15 Thread via GitHub


mas-chen commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1491960996


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##
@@ -74,6 +76,10 @@ private static Properties withTransactionalId(
 
 @Override
 public Future send(ProducerRecord record, Callback 
callback) {
+if (flushGated) {

Review Comment:
   I don't think this helps because during a SinkWriter#flush(), a 
SinkWriter#write() cannot be invoked. So this boolean condition never occurs. 
For your issue, we are looking to replicate this behavior 
https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1107
 to ensure that all records are flushed before completing the checkpoint, even 
in the case of Kafka client timeouts/retries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-01-23 Thread via GitHub


hhktseng commented on PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#issuecomment-1906723207

   created patch and applied after syncing to latest commit, then replaced 
forked branch with latest sync + patch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-01-19 Thread via GitHub


MartijnVisser commented on PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#issuecomment-1899932551

   > @MartijnVisser can you point me to which commit to rebase onto?
   
   @hhktseng On the latest changes from `main` please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-01-18 Thread via GitHub


hhktseng commented on PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#issuecomment-1899929011

   > @hhktseng Can you rebase your PR?
   
   @MartijnVisser can you point me to which commit to rebase onto?
   
   thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2024-01-17 Thread via GitHub


MartijnVisser commented on PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#issuecomment-1895500575

   @hhktseng Can you rebase your PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching [flink-connector-kafka]

2023-11-30 Thread via GitHub


boring-cyborg[bot] commented on PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#issuecomment-1834763669

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org