Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-05-07 Thread via GitHub


C0urante merged PR #13801:
URL: https://github.com/apache/kafka/pull/13801


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-05-03 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2092565347

   Thanks @C0urante , good catch, yeah those are missing. I have modified some 
of the tests to consider all the 3 types of offset records. I added another 
test for the case when write to secondary store times out for regular offsets. 
Let me know if these look ok coverage-wise. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-29 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1583356709


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java:
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class ConnectorOffsetBackingStoreTest {
+
+// Serialized
+private static final byte[] OFFSET_KEY_SERIALIZED = 
"key-serialized".getBytes();
+private static final byte[] OFFSET_VALUE_SERIALIZED = 
"value-serialized".getBytes();
+
+private static final KafkaException PRODUCE_EXCEPTION = new 
KafkaException();
+
+private final ByteArraySerializer byteArraySerializer = new 
ByteArraySerializer();
+
+@Test
+public void 
testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() {
+MockProducer connectorStoreProducer = 
createMockProducer();
+MockProducer workerStoreProducer = 
createMockProducer();
+KafkaOffsetBackingStore connectorStore = createStore("topic1", 
connectorStoreProducer);
+KafkaOffsetBackingStore workerStore = createStore("topic2", 
workerStoreProducer);
+
+ConnectorOffsetBackingStore offsetBackingStore = 
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+() -> LoggingContext.forConnector("source-connector"),
+workerStore,
+connectorStore,
+"offsets-topic",
+mock(TopicAdmin.class));
+
+AtomicBoolean callbackInvoked = new AtomicBoolean();
+AtomicReference callbackResult = new AtomicReference<>();
+AtomicReference callbackError = new AtomicReference<>();
+
+Future setFuture = 
offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY_SERIALIZED, null), 
(error, result) -> {
+callbackInvoked.set(true);
+callbackResult.set(result);
+callbackError.set(error);
+});

Review Comment:
   Nit: could it also be helpful to make sure we don't prematurely complete our 
callbacks?
   
   ```suggestion
   });
   
   assertFalse("Store callback should not be invoked before underlying 
producer callback", callbackInvoked.get());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL 

Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-25 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2076983151

   Thanks Chris! I ran through the scenarios in the test and I can see that it 
handles the cases correctly. Regarding, `cancel` I don't see the future 
returned from `set` being cancelled explicitly so we can live w/o 
implementations of `cancel` and `isCancelled`. 
   Also, I have now updated the tests so that I control when should a record be 
returned, throw and error or a timeout. `MockProducer` provided some great 
hooks to do the same. I added a couple of more tests which even test the 
timeout scenario and the tests throw a timeout until all futures return 
promptly (error or o/w). Let me know how this is looking now. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-15 Thread via GitHub


C0urante commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2056973237

   Thanks Sagar, great catch! I suspected this would be a gnarly one to tackle 
but it's turning out to be even harder than I thought.
   
   I think there's still an issue with the current state of the PR. It looks 
like we aren't blocking on the future returned by `setPrimaryThenSecondary`, 
which means that we may spuriously return early from `get` in the future we're 
returning from `ConnectorOffsetBackingStore::set` if the write to the primary 
store hasn't completed yet. I believe this is missed by tests because the 
producer writes we mock out all take place synchronously; maybe we can use the 
`MockProducer` more idiomatically to simulate records being ack'd after calls 
to `MockProducer::send` have returned?
   
   I've sketched a new kind of `Future` implementation that seems to do the 
trick, though I haven't tested it rigorously:
   
   ```java
   private class ChainedOffsetWriteFuture implements Future {
   
   private final OffsetBackingStore primaryStore;
   private final OffsetBackingStore secondaryStore;
   private final Map completeOffsets;
   private final Map regularOffsets;
   private final Callback callback;
   private final AtomicReference writeError;
   private final CountDownLatch completed;
   
   public ChainedOffsetWriteFuture(
   OffsetBackingStore primaryStore,
   OffsetBackingStore secondaryStore,
   Map completeOffsets,
   Map regularOffsets,
   Map tombstoneOffsets,
   Callback callback
   ) {
   this.primaryStore = primaryStore;
   this.secondaryStore = secondaryStore;
   this.completeOffsets = completeOffsets;
   this.regularOffsets = regularOffsets;
   this.callback = callback;
   this.writeError = new AtomicReference<>();
   this.completed = new CountDownLatch(1);
   
   secondaryStore.set(tombstoneOffsets, this::onFirstWrite);
   }
   
   private void onFirstWrite(Throwable error, Void ignored) {
   if (error != null) {
   log.trace("Skipping offsets write to primary store because 
secondary tombstone write has failed", error);
   try (LoggingContext context = loggingContext()) {
   callback.onCompletion(error, ignored);
   writeError.compareAndSet(null, error);
   completed.countDown();
   }
   return;
   }
   setPrimaryThenSecondary(primaryStore, secondaryStore, 
completeOffsets, regularOffsets, this::onSecondWrite);
   }
   
   private void onSecondWrite(Throwable error, Void ignored) {
   callback.onCompletion(error, ignored);
   writeError.compareAndSet(null, error);
   completed.countDown();
   }
   
   @Override
   public boolean cancel(boolean mayInterruptIfRunning) {
   return false;
   }
   
   @Override
   public boolean isCancelled() {
   return false;
   }
   
   @Override
   public boolean isDone() {
   return completed.getCount() == 0;
   }
   
   @Override
   public Void get() throws InterruptedException, ExecutionException {
   completed.await();
   if (writeError.get() != null) {
   throw new ExecutionException(writeError.get());
   }
   return null;
   }
   
   @Override
   public Void get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
   if (!completed.await(timeout, unit)) {
   throw new TimeoutException("Failed to complete offset write in 
time");
   }
   if (writeError.get() != null) {
   throw new ExecutionException(writeError.get());
   }
   return null;
   }
   }
   ```
   
   (I've omitted an implementation of `cancel` and `isCancelled` for now since 
I'm not sure it's really necessary, but LMK if I've missed a case where this 
would make a difference.)
   
   The new class can be used at the end of `ConnectorOffsetBackingStore::set` 
like this:
   
   ```java
   if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
   return new ChainedOffsetWriteFuture(
   primaryStore,
   secondaryStore,
   values,
   regularOffsets,
   tombstoneOffsets,
   callback
   );
   } else {
   return setPrimaryThenSecondary(primaryStore, secondaryStore, values, 
regularOffsets, callback);
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-11 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1560724752


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +300,77 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+Map regularOffsets = new HashMap<>();
+Map tombstoneOffsets = new HashMap<>();
+values.forEach((partition, offset) -> {
+if (offset == null) {
+tombstoneOffsets.put(partition, null);
+} else {
+regularOffsets.put(partition, offset);
+}
+});
+
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+AtomicReference primaryWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteCallback = new 
FutureCallback() {
+@Override
+public void onCompletion(Throwable tombstoneWriteError, Void 
ignored) {
+super.onCompletion(tombstoneWriteError, ignored);
+if (tombstoneWriteError != null) {
+log.trace("Skipping offsets write to primary store 
because secondary tombstone write has failed", tombstoneWriteError);
+try (LoggingContext context = loggingContext()) {
+callback.onCompletion(tombstoneWriteError, 
ignored);
+}
+return;
+}
+setPrimaryThenSecondary(primaryStore, secondaryStore, 
values, regularOffsets, callback, primaryWriteError);
+}
+
+@Override
+public Void get() throws InterruptedException, 
ExecutionException {
+super.get();
+if (primaryWriteError.get() != null) {
+throw new ExecutionException(primaryWriteError.get());
+}
+return null;
+}
+
+@Override
+public Void get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+super.get(timeout, unit);
+if (primaryWriteError.get() != null) {
+if (primaryWriteError.get() instanceof 
TimeoutException) {
+throw (TimeoutException) primaryWriteError.get();

Review Comment:
   I am slightly on the fence if we need to handle this case or not, because in 
[ConvertingFutureCallback#result](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java#L135)
 I see that any exception other than `CancellationException` is wrapped in 
`ExecutionException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-11 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2049281246

   @C0urante , I think I figured out the reason for the failure with 
`testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceedsForTombstoneRecords`.
 The problem was that from we were returning the callback created 
[here](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L302)
 but when there is no write error to the secondary store, the 
[exception](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L348)
 object is null because of which 
[get](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L402)
 doesn't return an error. The primary store write error works fine but that's 
not the future the caller is waiting on.
   
   I have fixed this by creating a new `FutureCallback` object andmaking that 
as the underlying callback for `SetCallbackFuture`. The important thing is that 
it overrides the `get()` methods so that the caller of the method, waits on 
this future and now I can control throwing an exception when the primary store 
write fails and secondary store passes. Let me know what do you think about 
this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-11 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1560701187


##
checkstyle/suppressions.xml:
##
@@ -166,7 +166,7 @@
   
files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/>
 
 
+  
files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin|ConnectorOffsetBackingStore).java"/>

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-10 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2047548232

   Chris, I started changing the tests in alignment with the comments (i.e 
using AtomicBoolean, AtomicReference and removing try-catch block). I noticed 
an interesting issue with 
`testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceedsForTombstoneRecords`
 test. What's happening is that when we do a get on the future returned in this 
case, that doesn't throw an exception. I debugged it and I think the problem is 
because in this case, when the primary store fails, we set the callback to 
error correctly. However, because the secondary store write doesn't fail, when 
it's callback gets invoked from 
[here](https://github.com/apache/kafka/pull/13801/files#diff-0b612a24267f45b927d37b223af3034feebe4363b23b53f5751f1b29e54e2aa7R331),
 eventually the callback's onCompletion sets it to a non-error from 
[here](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L369-L372).
  The net effect is that the .get() call on the future doesn't return an error 
which isn't right. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-08 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1555264597


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
+
+public class ConnectorOffsetBackingStoreTest {
+
+private static final String NAMESPACE = "namespace";
+// Connect format - any types should be accepted here
+private static final Map OFFSET_KEY = 
Collections.singletonMap("key", "key");
+private static final Map OFFSET_VALUE = 
Collections.singletonMap("key", 12);
+
+// Serialized
+private static final byte[] OFFSET_KEY_SERIALIZED = 
"key-serialized".getBytes();
+private static final byte[] OFFSET_VALUE_SERIALIZED = 
"value-serialized".getBytes();
+
+private static final Exception PRODUCE_EXCEPTION = new KafkaException();
+
+private final Converter keyConverter = mock(Converter.class);
+private final Converter valueConverter = mock(Converter.class);

Review Comment:
   We tend to use the `@Mock` annotation instead of final fields:
   
   ```suggestion
   @Mock
   private Converter keyConverter;
   @Mock
   private Converter valueConverter;
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import 

Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-05 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2039360963

   Hey Chris, sorry for the long delay on this. I finally got a chance to 
verify the code that you provided and it makes sense. I agree that so far I was 
only thinking about either having 2 separate futures such that one waits for 
the other or trying to chain futures like CompletableFutures. However, the 
version you have provided is pretty straight forward and all the new tests 
passed OOB for me.
   
   Regarding 
   
   > I think the only question left is whether out-of-order writes are possible 
because of how things are chained
   
   I am assuming that for non-exactly-once source tasks, you are referring to 
scenarios when offset flushes are triggered and when flush operations finish 
out of order. I reviewed the code and I can see that this is being checked in 
`handleFinishWrite` which does not complete the flush in case the currently 
completed flush isn't the current one.  For any other erroneous cases, 
`cancelFlush` is being invoked ( as you mentioned).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-02-01 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1921517994

   Thanks @C0urante for this! Let me review this and get back to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-31 Thread via GitHub


C0urante commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1919475280

   Thanks for the in-depth analysis! I think part of the problem here stems 
from trying to make our internal `Future`-based API cooperate with Java's 
`CompleteableFuture` API. I've sketched out something below that doesn't rely 
on `CompletableFuture` and (I believe) preserves the semantics we want for 
asynchronicity:
   
   ```java
   public class ConnectorOffsetBackingStore implements OffsetBackingStore {
   
   @Override
   public Future set(Map values, 
Callback callback) {
   final OffsetBackingStore primaryStore;
   final OffsetBackingStore secondaryStore;
   if (connectorStore.isPresent()) {
   primaryStore = connectorStore.get();
   secondaryStore = workerStore.orElse(null);
   } else if (workerStore.isPresent()) {
   primaryStore = workerStore.get();
   secondaryStore = null;
   } else {
   // Should never happen since we check for this case in the 
constructor, but just in case, this should
   // be more informative than the NPE that would otherwise be 
thrown
   throw new IllegalStateException("At least one non-null offset 
store must be provided");
   }
   
   Map regularOffsets = new HashMap<>();
   Map tombstoneOffsets = new HashMap<>();
   values.forEach((partition, offset) -> {
   if (offset == null) {
   tombstoneOffsets.put(partition, null);
   } else {
   regularOffsets.put(partition, offset);
   }
   });
   
   if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
   return secondaryStore.set(tombstoneOffsets, 
(tombstoneWriteError, ignored) -> {
   if (tombstoneWriteError != null) {
   log.trace("Skipping offsets write to primary store 
because secondary tombstone write has failed", tombstoneWriteError);
   try (LoggingContext context = loggingContext()) {
   callback.onCompletion(tombstoneWriteError, ignored);
   }
   return;
   }
   
   setPrimaryThenSecondary(primaryStore, secondaryStore, 
values, regularOffsets, callback);
   });
   } else {
   return setPrimaryThenSecondary(primaryStore, secondaryStore, 
values, regularOffsets, callback);
   }
   }
   
   private Future setPrimaryThenSecondary(
   OffsetBackingStore primaryStore,
   OffsetBackingStore secondaryStore,
   Map completeOffsets,
   Map nonTombstoneOffsets,
   Callback callback
   ) {
   return primaryStore.set(completeOffsets, (primaryWriteError, 
ignored) -> {
   if (secondaryStore != null) {
   if (primaryWriteError != null) {
   log.trace("Skipping offsets write to secondary store 
because primary write has failed", primaryWriteError);
   } else {
   try {
   // Invoke OffsetBackingStore::set but ignore the 
resulting future; we don't block on writes to this
   // backing store.
   secondaryStore.set(nonTombstoneOffsets, 
(secondaryWriteError, ignored2) -> {
   try (LoggingContext context = loggingContext()) {
   if (secondaryWriteError != null) {
   log.warn("Failed to write offsets to 
secondary backing store", secondaryWriteError);
   } else {
   log.debug("Successfully flushed offsets 
to secondary backing store");
   }
   }
   });
   } catch (Exception e) {
   log.warn("Failed to write offsets to secondary 
backing store", e);
   }
   }
   }
   try (LoggingContext context = loggingContext()) {
   callback.onCompletion(primaryWriteError, ignored);
   }
   });
   }
   }
   ```
   
   It also obviates the need for the `exactlyOnce` and `offsetFlushTimeoutMs` 
fields.
   
   If this looks acceptable, I think the only question left is whether 
out-of-order writes are possible because of how things are chained. I believe 
this is only a problem for non-exactly-once source tasks (since we only have at 
most one in-flight offset commit at a time when exactly-once support is 
enabled), and should be handled gracefully by 
`OffsetStorageWriter::cancelFlush`, but it'd be nice to have a second pair of 
eyes to make sure.


-- 
This is an automated 

Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-24 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1908434176

   @C0urante , I did some analysis today pertaining to the version that was 
present with `CompletableFuture` 
[here](https://github.com/apache/kafka/blob/062591757a04647eb4837348f59b0e5736b6372f/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java#L312-L330).
   
   I wrote a small program to mimic the behaviour ie returning a future, having 
possibly blocking callback methods chained, from a method 
   
   ```
   package org.example;
   
   import java.time.Duration;
   import java.util.concurrent.*;
   
   public class CompletableFutureTest {
   
   private static Future getFuture() {
   CompletableFuture overallFuture = 
CompletableFuture.completedFuture(null);
   overallFuture = overallFuture.thenRun(() -> {
   try {
   System.out.println("Function 1 - Thread::" + 
Thread.currentThread());
   long initTime = System.currentTimeMillis();
   System.out.println("Function 1 - Initial Time::" + initTime);
   Thread.sleep(Duration.ofSeconds(30).toMillis());
   System.out.println("Function 1 - Time Elapsed::" + 
(System.currentTimeMillis() - initTime) / 1000);
   } catch (InterruptedException e) {
   throw new RuntimeException(e);
   }
   });
   overallFuture = overallFuture.thenRun(() -> {
   long initTime = System.currentTimeMillis();
   System.out.println("Function 2 - Thread::" + 
Thread.currentThread());
   System.out.println("Function 2 - Initial Time::" + initTime);
   try {
   Thread.sleep(Duration.ofSeconds(40).toMillis());
   } catch (InterruptedException e) {
   throw new RuntimeException(e);
   }
   System.out.println("Function 2 - Time Elapsed::" + 
(System.currentTimeMillis() - initTime) / 1000);
   });
   return overallFuture;
   }
   
   public static void main(String[] args) {
   System.out.println("Begin main");
   System.out.println("Main - Thread::" + Thread.currentThread());
   Future future = getFuture();
   try {
   long initTime = System.currentTimeMillis();
   future.get();
   System.out.println("Main - Time Elapsed::" + 
(System.currentTimeMillis() - initTime) / 1000);
   System.out.println("End of main");
   } catch (InterruptedException e) {
   throw new RuntimeException(e);
   } catch (ExecutionException e) {
   throw new RuntimeException(e.getCause());
   }
   }
   }
   ```
   and this is the output I get
   
   ```
   Begin main
   Main - Thread::Thread[main,5,main]
   Function 1 - Thread::Thread[main,5,main]
   Function 1 - Initial Time::1706111085236
   Function 1 - Time Elapsed::30
   Function 2 - Thread::Thread[main,5,main]
   Function 2 - Initial Time::170615248
   Function 2 - Time Elapsed::40
   Main - Time Elapsed::0
   End of main
   ```
   
   So, essentially the callbacks get executed as soon as the `getFuture()` 
method is invoked and by the time we get to the `get()` call in main, the 
future is already resolved and it returns immediately (as you pointed out).
   
   I get a very similar result when I change the definition of `overallFuture` 
to 
   
   ```
   CompletableFuture overallFuture = CompletableFuture.supplyAsync(() -> 
null);
   ```
   
   However, if I change the invocations of callback to use `thenRunAsync`, then 
I get totally different results (which is what we expect):
   
   ```
   Begin main
   Main - Thread::Thread[main,5,main]
   Function 1 - Thread::Thread[ForkJoinPool.commonPool-worker-1,5,main]
   Function 1 - Initial Time::1706111422731
   Function 1 - Time Elapsed::30
   Function 2 - Thread::Thread[ForkJoinPool.commonPool-worker-1,5,main]
   Function 2 - Initial Time::1706111452740
   Function 2 - Time Elapsed::40
   Main - Time Elapsed::70
   End of main
   ```
   
   i.e this time, the `getFuture` method returns immediately and the lambdas 
get invoked when `get` is invoked on the returned future. I think the reason 
for that is that the methods in `CompletableFuture` are executed immediately 
upon invocation. It's only the `xxxAsync` variants that execute on the common 
fork join pool (can be seen in the above printed thread) or if needed on a 
separate thread pool. This behaviour is similar to other reactive libraries 
like `rx-java` IIRC. I believe the offset write happens on the same thread 
which invokes the `set()` method and we would want to continue that behaviour, 
so I am not sure we would want to use the `xxxAsync` variants. I even tried 
using `KafkaFutureImpl` which is Kafka's internal implementation for async 
programming, but I get 

Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   ~~I have actually removed the chaining using `CompletableFuture` and 
simplified the logic. I just wait on thesecondary store write directly (with or 
without a timeout) and if the execution fails or the wait itself fails, I 
update the callback and return the same exception (because it's already 
completed).With this,  there are no changes (apart from using `regularOffsets` 
when writing to secondary store) from the primary store. Let me know what you 
think about this.~~
   
   I re-read the comments and looks like with this we are going against the 
approach you wanted in this 
https://github.com/apache/kafka/pull/13801/#discussion_r1268520271 i.e
   
   > That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a Future to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).
   
   
   Let me take a look at this again. Sorry about this since I had forgotten 
about this comment.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   ~~I have actually removed the chaining using `CompletableFuture` and 
simplified the logic. I just wait on thesecondary store write directly (with or 
without a timeout) and if the execution fails or the wait itself fails, I 
update the callback and return the same exception (because it's already 
completed).With this,  there are no changes (apart from using `regularOffsets` 
when writing to secondary store) from the primary store. Let me know what you 
think about this.~~
   
   I re-read the comments and looks like with this we are going against the 
approach you wanted in this 
https://github.com/apache/kafka/pull/13801/#discussion_r1268520271 i.e
   
   That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a Future to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).
   
   Let me take a look at this again. Sorry about this since I had forgotten 
about this comment.
   



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global 

Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158103


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {
+Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+try {
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (ExecutionException e) {
+log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());
+callback.onCompletion(e.getCause(), null);
+} catch (Exception e) {
+log.error("{} Got Exception when trying to flush 
tombstone(s) offsets to secondary store", this, e);
+callback.onCompletion(e, null);
+}
+});
+}
+offsetWriteFuture.thenAccept(v -> primaryStore.set(values, new 
FutureCallback<>((primaryWriteError, ignored) -> {

Review Comment:
   I have not added this test at this point and I can add it if you think it's 
still needed with the new approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   I re-read the comments and looks like with this we are going against the 
approach you wanted in this 
[comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) 
i.e 
   
   > That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a Future to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).
   
   Let me take a look at this again. Sorry about this since I had forgotten 
about this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   I re-read the comments again and looks like with this we are going against 
the approach you wanted in this 
[comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) 
i.e 
   
   > That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a Future to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).
   
   Let me take a look at this again. Sorry about this since I had forgotten 
about this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   I re-read the comments again and looks like with this we are going against 
the approach you wanted in this 
[comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) 
i.e 
   
   > That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a Future to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).
   
   Let me take a look at this again. Sorry about this but I had forgotten about 
this comment in particular.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158347


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -299,12 +349,11 @@ public Future set(Map 
values, Callback callb
 } catch (Exception e) {
 log.warn("Failed to write offsets to secondary backing 
store", e);
 }
+callback.onCompletion(null, null);
 }
 }
-try (LoggingContext context = loggingContext()) {
-callback.onCompletion(primaryWriteError, ignored);
-}
-});

Review Comment:
   Added it back. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158103


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {
+Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+try {
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (ExecutionException e) {
+log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());
+callback.onCompletion(e.getCause(), null);
+} catch (Exception e) {
+log.error("{} Got Exception when trying to flush 
tombstone(s) offsets to secondary store", this, e);
+callback.onCompletion(e, null);
+}
+});
+}
+offsetWriteFuture.thenAccept(v -> primaryStore.set(values, new 
FutureCallback<>((primaryWriteError, ignored) -> {

Review Comment:
   I have not added this test at this point and I can add it if you think it's 
still needed with the new approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   I have actually removed the chaining using `CompletableFuture` and 
simplified the logic. I just wait on the secondary store write directly (with 
or without a timeout) and if the execution fails or the wait itself fails, I 
update the callback and return the same exception (because it's already 
completed).
   
   ```
   if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
   Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, (t, r) -> { });
   try {
   if (exactlyOnce) {
   secondaryWriteFuture.get();
   } else {
   secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
   }
   log.debug("Successfully flushed tombstone offsets to 
secondary store");
   } catch (ExecutionException e) {
   log.error("{} Failed to flush tombstone offsets to secondary 
store", this, e.getCause());
   callback.onCompletion(e.getCause(), null);
   return secondaryWriteFuture;
   } catch (Throwable e) {
   log.error("{} Failed to flush tombstone offsets to secondary 
store", this, e);
   callback.onCompletion(e, null);
   return secondaryWriteFuture;
   }
   }
   ```
   
   With this,  there are no changes (apart from using `regularOffsets` when 
writing to secondary store) from the primary store. Let me know what you think 
about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463144968


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {
+Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());

Review Comment:
   I took the non-fancy route for now i.e using an inline no-op `Callback`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463143759


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));

Review Comment:
   Thanks for this. I updated the logic.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -254,7 +260,12 @@ public Map get(long timeout, 
TimeUnit unit) throws Inter
  * write to that store, and the passed-in {@link Callback} is invoked once 
that write completes. If a worker-global
  * store is provided, a secondary write is made to that store if the write 
to the connector-specific store
  * succeeds. Errors with this secondary write are not reflected in the 
returned {@link Future} or the passed-in
- * {@link Callback}; they are only logged as a warning to users.
+ * {@link Callback}; they are only logged as a warning to users. The only 
exception to this rule is when the offsets
+ * that need to be committed contains tombstone records as well. In such 
cases, a write consisting of only tombstone
+ * offsets would first happen on the worker-global store and only if it 
succeeds, would all the offsets be written
+ * to the connector-specific store and the regular offsets would be 
written to the worker-global store. Note that
+ * in this case, failure to write regular offsets to secondary store would 
still not reflect in the returned
+ * {@link Future} or the passed-in {@link Callback}

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2023-12-09 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1848322802

   > Thanks @vamossagar12, I've given the functional changes another pass. 
Unless I'm mistaken about the semantics for `CompletableFuture::thenAccept`, 
there's a pretty serious bug in the current implementation that needs to be 
fixed before merging, and we should also make sure to add testing coverage to 
both verify that we've fixed that bug now and won't re-introduce it later.
   
   Thanks @C0urante . When you say serious bug, I am assuming you are referring 
to this comment 
[here](https://github.com/apache/kafka/pull/13801#discussion_r1417751374)? That 
is the outer get doesn't block on the lambda's execution? Ok, let me verify 
that behaviour again and revert. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2023-12-06 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1417751374


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   This causes the passed-in lambda to be executed after `offsetWriteFuture` 
completes, but it doesn't cause `offsetWriteFuture::get` to block on the 
completion of that lambda, which means that if the caller of 
`ConnectorOffsetBackingStore::set` waits for the returned `Future` to complete 
right now, it'll complete immediately even though it's likely no offsets will 
have been written to either the primary or secondary store.
   
   Also, nit: when researching this behavior, I ran into 
`CompletableFuture::thenRun`, which is a slightly better fit than 
`CompletableFuture::thenAccept` (since we don't use the return value).
   
   ```suggestion
   offsetWriteFuture = offsetWriteFuture.thenRun(() -> {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2023-12-06 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1417677509


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.

Review Comment:
   This largely duplicates information present in the Javadoc for the method. 
IMO we can remove this entire comment block and, if we really want to make it 
easy for maintainers to understand not just the "what" but the "why" for this 
logic, we can link to https://issues.apache.org/jira/browse/KAFKA-15018 in the 
Javadoc for the method:
   
   ```java
*
* @see https://issues.apache.org/jira/browse/KAFKA-15018;>KAFKA-15018 for 
context on the three-step
* write sequence
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {
+Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+try {
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (ExecutionException e) {
+log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());

Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2023-12-05 Thread via GitHub


C0urante commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1842053883

   Removed the stale label.
   
   @vamossagar12 thanks for your patience on this one, I plan on making another 
pass this week!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2023-12-05 Thread via GitHub


github-actions[bot] commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1842029141

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org