Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante merged PR #13801: URL: https://github.com/apache/kafka/pull/13801 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-2092565347 Thanks @C0urante , good catch, yeah those are missing. I have modified some of the tests to consider all the 3 types of offset records. I added another test for the case when write to secondary store times out for regular offsets. Let me know if these look ok coverage-wise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1583356709 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class ConnectorOffsetBackingStoreTest { + +// Serialized +private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); +private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + +private static final KafkaException PRODUCE_EXCEPTION = new KafkaException(); + +private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); + +@Test +public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() { +MockProducer connectorStoreProducer = createMockProducer(); +MockProducer workerStoreProducer = createMockProducer(); +KafkaOffsetBackingStore connectorStore = createStore("topic1", connectorStoreProducer); +KafkaOffsetBackingStore workerStore = createStore("topic2", workerStoreProducer); + +ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( +() -> LoggingContext.forConnector("source-connector"), +workerStore, +connectorStore, +"offsets-topic", +mock(TopicAdmin.class)); + +AtomicBoolean callbackInvoked = new AtomicBoolean(); +AtomicReference callbackResult = new AtomicReference<>(); +AtomicReference callbackError = new AtomicReference<>(); + +Future setFuture = offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY_SERIALIZED, null), (error, result) -> { +callbackInvoked.set(true); +callbackResult.set(result); +callbackError.set(error); +}); Review Comment: Nit: could it also be helpful to make sure we don't prematurely complete our callbacks? ```suggestion }); assertFalse("Store callback should not be invoked before underlying producer callback", callbackInvoked.get()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL abov
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-2076983151 Thanks Chris! I ran through the scenarios in the test and I can see that it handles the cases correctly. Regarding, `cancel` I don't see the future returned from `set` being cancelled explicitly so we can live w/o implementations of `cancel` and `isCancelled`. Also, I have now updated the tests so that I control when should a record be returned, throw and error or a timeout. `MockProducer` provided some great hooks to do the same. I added a couple of more tests which even test the timeout scenario and the tests throw a timeout until all futures return promptly (error or o/w). Let me know how this is looking now. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-2056973237 Thanks Sagar, great catch! I suspected this would be a gnarly one to tackle but it's turning out to be even harder than I thought. I think there's still an issue with the current state of the PR. It looks like we aren't blocking on the future returned by `setPrimaryThenSecondary`, which means that we may spuriously return early from `get` in the future we're returning from `ConnectorOffsetBackingStore::set` if the write to the primary store hasn't completed yet. I believe this is missed by tests because the producer writes we mock out all take place synchronously; maybe we can use the `MockProducer` more idiomatically to simulate records being ack'd after calls to `MockProducer::send` have returned? I've sketched a new kind of `Future` implementation that seems to do the trick, though I haven't tested it rigorously: ```java private class ChainedOffsetWriteFuture implements Future { private final OffsetBackingStore primaryStore; private final OffsetBackingStore secondaryStore; private final Map completeOffsets; private final Map regularOffsets; private final Callback callback; private final AtomicReference writeError; private final CountDownLatch completed; public ChainedOffsetWriteFuture( OffsetBackingStore primaryStore, OffsetBackingStore secondaryStore, Map completeOffsets, Map regularOffsets, Map tombstoneOffsets, Callback callback ) { this.primaryStore = primaryStore; this.secondaryStore = secondaryStore; this.completeOffsets = completeOffsets; this.regularOffsets = regularOffsets; this.callback = callback; this.writeError = new AtomicReference<>(); this.completed = new CountDownLatch(1); secondaryStore.set(tombstoneOffsets, this::onFirstWrite); } private void onFirstWrite(Throwable error, Void ignored) { if (error != null) { log.trace("Skipping offsets write to primary store because secondary tombstone write has failed", error); try (LoggingContext context = loggingContext()) { callback.onCompletion(error, ignored); writeError.compareAndSet(null, error); completed.countDown(); } return; } setPrimaryThenSecondary(primaryStore, secondaryStore, completeOffsets, regularOffsets, this::onSecondWrite); } private void onSecondWrite(Throwable error, Void ignored) { callback.onCompletion(error, ignored); writeError.compareAndSet(null, error); completed.countDown(); } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return completed.getCount() == 0; } @Override public Void get() throws InterruptedException, ExecutionException { completed.await(); if (writeError.get() != null) { throw new ExecutionException(writeError.get()); } return null; } @Override public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (!completed.await(timeout, unit)) { throw new TimeoutException("Failed to complete offset write in time"); } if (writeError.get() != null) { throw new ExecutionException(writeError.get()); } return null; } } ``` (I've omitted an implementation of `cancel` and `isCancelled` for now since I'm not sure it's really necessary, but LMK if I've missed a case where this would make a difference.) The new class can be used at the end of `ConnectorOffsetBackingStore::set` like this: ```java if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { return new ChainedOffsetWriteFuture( primaryStore, secondaryStore, values, regularOffsets, tombstoneOffsets, callback ); } else { return setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1560724752 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +300,77 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +Map regularOffsets = new HashMap<>(); +Map tombstoneOffsets = new HashMap<>(); +values.forEach((partition, offset) -> { +if (offset == null) { +tombstoneOffsets.put(partition, null); +} else { +regularOffsets.put(partition, offset); +} +}); + +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +AtomicReference primaryWriteError = new AtomicReference<>(); +FutureCallback secondaryWriteCallback = new FutureCallback() { +@Override +public void onCompletion(Throwable tombstoneWriteError, Void ignored) { +super.onCompletion(tombstoneWriteError, ignored); +if (tombstoneWriteError != null) { +log.trace("Skipping offsets write to primary store because secondary tombstone write has failed", tombstoneWriteError); +try (LoggingContext context = loggingContext()) { +callback.onCompletion(tombstoneWriteError, ignored); +} +return; +} +setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback, primaryWriteError); +} + +@Override +public Void get() throws InterruptedException, ExecutionException { +super.get(); +if (primaryWriteError.get() != null) { +throw new ExecutionException(primaryWriteError.get()); +} +return null; +} + +@Override +public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { +super.get(timeout, unit); +if (primaryWriteError.get() != null) { +if (primaryWriteError.get() instanceof TimeoutException) { +throw (TimeoutException) primaryWriteError.get(); Review Comment: I am slightly on the fence if we need to handle this case or not, because in [ConvertingFutureCallback#result](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java#L135) I see that any exception other than `CancellationException` is wrapped in `ExecutionException`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-2049281246 @C0urante , I think I figured out the reason for the failure with `testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceedsForTombstoneRecords`. The problem was that from we were returning the callback created [here](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L302) but when there is no write error to the secondary store, the [exception](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L348) object is null because of which [get](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L402) doesn't return an error. The primary store write error works fine but that's not the future the caller is waiting on. I have fixed this by creating a new `FutureCallback` object andmaking that as the underlying callback for `SetCallbackFuture`. The important thing is that it overrides the `get()` methods so that the caller of the method, waits on this future and now I can control throwing an exception when the primary store write fails and secondary store passes. Let me know what do you think about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1560701187 ## checkstyle/suppressions.xml: ## @@ -166,7 +166,7 @@ files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/> + files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin|ConnectorOffsetBackingStore).java"/> Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-2047548232 Chris, I started changing the tests in alignment with the comments (i.e using AtomicBoolean, AtomicReference and removing try-catch block). I noticed an interesting issue with `testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceedsForTombstoneRecords` test. What's happening is that when we do a get on the future returned in this case, that doesn't throw an exception. I debugged it and I think the problem is because in this case, when the primary store fails, we set the callback to error correctly. However, because the secondary store write doesn't fail, when it's callback gets invoked from [here](https://github.com/apache/kafka/pull/13801/files#diff-0b612a24267f45b927d37b223af3034feebe4363b23b53f5751f1b29e54e2aa7R331), eventually the callback's onCompletion sets it to a non-error from [here](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L369-L372). The net effect is that the .get() call on the future doesn't return an error which isn't right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1555264597 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +public class ConnectorOffsetBackingStoreTest { + +private static final String NAMESPACE = "namespace"; +// Connect format - any types should be accepted here +private static final Map OFFSET_KEY = Collections.singletonMap("key", "key"); +private static final Map OFFSET_VALUE = Collections.singletonMap("key", 12); + +// Serialized +private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); +private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + +private static final Exception PRODUCE_EXCEPTION = new KafkaException(); + +private final Converter keyConverter = mock(Converter.class); +private final Converter valueConverter = mock(Converter.class); Review Comment: We tend to use the `@Mock` annotation instead of final fields: ```suggestion @Mock private Converter keyConverter; @Mock private Converter valueConverter; ``` ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.N
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-2039360963 Hey Chris, sorry for the long delay on this. I finally got a chance to verify the code that you provided and it makes sense. I agree that so far I was only thinking about either having 2 separate futures such that one waits for the other or trying to chain futures like CompletableFutures. However, the version you have provided is pretty straight forward and all the new tests passed OOB for me. Regarding > I think the only question left is whether out-of-order writes are possible because of how things are chained I am assuming that for non-exactly-once source tasks, you are referring to scenarios when offset flushes are triggered and when flush operations finish out of order. I reviewed the code and I can see that this is being checked in `handleFinishWrite` which does not complete the flush in case the currently completed flush isn't the current one. For any other erroneous cases, `cancelFlush` is being invoked ( as you mentioned). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1921517994 Thanks @C0urante for this! Let me review this and get back to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1919475280 Thanks for the in-depth analysis! I think part of the problem here stems from trying to make our internal `Future`-based API cooperate with Java's `CompleteableFuture` API. I've sketched out something below that doesn't rely on `CompletableFuture` and (I believe) preserves the semantics we want for asynchronicity: ```java public class ConnectorOffsetBackingStore implements OffsetBackingStore { @Override public Future set(Map values, Callback callback) { final OffsetBackingStore primaryStore; final OffsetBackingStore secondaryStore; if (connectorStore.isPresent()) { primaryStore = connectorStore.get(); secondaryStore = workerStore.orElse(null); } else if (workerStore.isPresent()) { primaryStore = workerStore.get(); secondaryStore = null; } else { // Should never happen since we check for this case in the constructor, but just in case, this should // be more informative than the NPE that would otherwise be thrown throw new IllegalStateException("At least one non-null offset store must be provided"); } Map regularOffsets = new HashMap<>(); Map tombstoneOffsets = new HashMap<>(); values.forEach((partition, offset) -> { if (offset == null) { tombstoneOffsets.put(partition, null); } else { regularOffsets.put(partition, offset); } }); if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { return secondaryStore.set(tombstoneOffsets, (tombstoneWriteError, ignored) -> { if (tombstoneWriteError != null) { log.trace("Skipping offsets write to primary store because secondary tombstone write has failed", tombstoneWriteError); try (LoggingContext context = loggingContext()) { callback.onCompletion(tombstoneWriteError, ignored); } return; } setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback); }); } else { return setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback); } } private Future setPrimaryThenSecondary( OffsetBackingStore primaryStore, OffsetBackingStore secondaryStore, Map completeOffsets, Map nonTombstoneOffsets, Callback callback ) { return primaryStore.set(completeOffsets, (primaryWriteError, ignored) -> { if (secondaryStore != null) { if (primaryWriteError != null) { log.trace("Skipping offsets write to secondary store because primary write has failed", primaryWriteError); } else { try { // Invoke OffsetBackingStore::set but ignore the resulting future; we don't block on writes to this // backing store. secondaryStore.set(nonTombstoneOffsets, (secondaryWriteError, ignored2) -> { try (LoggingContext context = loggingContext()) { if (secondaryWriteError != null) { log.warn("Failed to write offsets to secondary backing store", secondaryWriteError); } else { log.debug("Successfully flushed offsets to secondary backing store"); } } }); } catch (Exception e) { log.warn("Failed to write offsets to secondary backing store", e); } } } try (LoggingContext context = loggingContext()) { callback.onCompletion(primaryWriteError, ignored); } }); } } ``` It also obviates the need for the `exactlyOnce` and `offsetFlushTimeoutMs` fields. If this looks acceptable, I think the only question left is whether out-of-order writes are possible because of how things are chained. I believe this is only a problem for non-exactly-once source tasks (since we only have at most one in-flight offset commit at a time when exactly-once support is enabled), and should be handled gracefully by `OffsetStorageWriter::cancelFlush`, but it'd be nice to have a second pair of eyes to make sure. -- This is an automated
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1908434176 @C0urante , I did some analysis today pertaining to the version that was present with `CompletableFuture` [here](https://github.com/apache/kafka/blob/062591757a04647eb4837348f59b0e5736b6372f/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java#L312-L330). I wrote a small program to mimic the behaviour ie returning a future, having possibly blocking callback methods chained, from a method ``` package org.example; import java.time.Duration; import java.util.concurrent.*; public class CompletableFutureTest { private static Future getFuture() { CompletableFuture overallFuture = CompletableFuture.completedFuture(null); overallFuture = overallFuture.thenRun(() -> { try { System.out.println("Function 1 - Thread::" + Thread.currentThread()); long initTime = System.currentTimeMillis(); System.out.println("Function 1 - Initial Time::" + initTime); Thread.sleep(Duration.ofSeconds(30).toMillis()); System.out.println("Function 1 - Time Elapsed::" + (System.currentTimeMillis() - initTime) / 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }); overallFuture = overallFuture.thenRun(() -> { long initTime = System.currentTimeMillis(); System.out.println("Function 2 - Thread::" + Thread.currentThread()); System.out.println("Function 2 - Initial Time::" + initTime); try { Thread.sleep(Duration.ofSeconds(40).toMillis()); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("Function 2 - Time Elapsed::" + (System.currentTimeMillis() - initTime) / 1000); }); return overallFuture; } public static void main(String[] args) { System.out.println("Begin main"); System.out.println("Main - Thread::" + Thread.currentThread()); Future future = getFuture(); try { long initTime = System.currentTimeMillis(); future.get(); System.out.println("Main - Time Elapsed::" + (System.currentTimeMillis() - initTime) / 1000); System.out.println("End of main"); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e.getCause()); } } } ``` and this is the output I get ``` Begin main Main - Thread::Thread[main,5,main] Function 1 - Thread::Thread[main,5,main] Function 1 - Initial Time::1706111085236 Function 1 - Time Elapsed::30 Function 2 - Thread::Thread[main,5,main] Function 2 - Initial Time::170615248 Function 2 - Time Elapsed::40 Main - Time Elapsed::0 End of main ``` So, essentially the callbacks get executed as soon as the `getFuture()` method is invoked and by the time we get to the `get()` call in main, the future is already resolved and it returns immediately (as you pointed out). I get a very similar result when I change the definition of `overallFuture` to ``` CompletableFuture overallFuture = CompletableFuture.supplyAsync(() -> null); ``` However, if I change the invocations of callback to use `thenRunAsync`, then I get totally different results (which is what we expect): ``` Begin main Main - Thread::Thread[main,5,main] Function 1 - Thread::Thread[ForkJoinPool.commonPool-worker-1,5,main] Function 1 - Initial Time::1706111422731 Function 1 - Time Elapsed::30 Function 2 - Thread::Thread[ForkJoinPool.commonPool-worker-1,5,main] Function 2 - Initial Time::1706111452740 Function 2 - Time Elapsed::40 Main - Time Elapsed::70 End of main ``` i.e this time, the `getFuture` method returns immediately and the lambdas get invoked when `get` is invoked on the returned future. I think the reason for that is that the methods in `CompletableFuture` are executed immediately upon invocation. It's only the `xxxAsync` variants that execute on the common fork join pool (can be seen in the above printed thread) or if needed on a separate thread pool. This behaviour is similar to other reactive libraries like `rx-java` IIRC. I believe the offset write happens on the same thread which invokes the `set()` method and we would want to continue that behaviour, so I am not sure we would want to use the `xxxAsync` variants. I even tried using `KafkaFutureImpl` which is Kafka's internal implementation for async programming, but I get prett
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: ~~I have actually removed the chaining using `CompletableFuture` and simplified the logic. I just wait on thesecondary store write directly (with or without a timeout) and if the execution fails or the wait itself fails, I update the callback and return the same exception (because it's already completed).With this, there are no changes (apart from using `regularOffsets` when writing to secondary store) from the primary store. Let me know what you think about this.~~ I re-read the comments and looks like with this we are going against the approach you wanted in this https://github.com/apache/kafka/pull/13801/#discussion_r1268520271 i.e > That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this since I had forgotten about this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: ~~I have actually removed the chaining using `CompletableFuture` and simplified the logic. I just wait on thesecondary store write directly (with or without a timeout) and if the execution fails or the wait itself fails, I update the callback and return the same exception (because it's already completed).With this, there are no changes (apart from using `regularOffsets` when writing to secondary store) from the primary store. Let me know what you think about this.~~ I re-read the comments and looks like with this we are going against the approach you wanted in this https://github.com/apache/kafka/pull/13801/#discussion_r1268520271 i.e That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this since I had forgotten about this comment. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offs
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158103 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { +Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, new FutureCallback<>()); +try { +if (exactlyOnce) { +secondaryWriteFuture.get(); +} else { +secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); +} +} catch (ExecutionException e) { +log.error("{} Flush of tombstone(s) offsets to secondary store threw an unexpected exception: ", this, e.getCause()); +callback.onCompletion(e.getCause(), null); +} catch (Exception e) { +log.error("{} Got Exception when trying to flush tombstone(s) offsets to secondary store", this, e); +callback.onCompletion(e, null); +} +}); +} +offsetWriteFuture.thenAccept(v -> primaryStore.set(values, new FutureCallback<>((primaryWriteError, ignored) -> { Review Comment: I have not added this test at this point and I can add it if you think it's still needed with the new approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: I re-read the comments and looks like with this we are going against the approach you wanted in this [comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) i.e > That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this since I had forgotten about this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: I re-read the comments again and looks like with this we are going against the approach you wanted in this [comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) i.e > That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this since I had forgotten about this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: I re-read the comments again and looks like with this we are going against the approach you wanted in this [comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) i.e > That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this but I had forgotten about this comment in particular. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158347 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -299,12 +349,11 @@ public Future set(Map values, Callback callb } catch (Exception e) { log.warn("Failed to write offsets to secondary backing store", e); } +callback.onCompletion(null, null); } } -try (LoggingContext context = loggingContext()) { -callback.onCompletion(primaryWriteError, ignored); -} -}); Review Comment: Added it back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158103 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { +Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, new FutureCallback<>()); +try { +if (exactlyOnce) { +secondaryWriteFuture.get(); +} else { +secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); +} +} catch (ExecutionException e) { +log.error("{} Flush of tombstone(s) offsets to secondary store threw an unexpected exception: ", this, e.getCause()); +callback.onCompletion(e.getCause(), null); +} catch (Exception e) { +log.error("{} Got Exception when trying to flush tombstone(s) offsets to secondary store", this, e); +callback.onCompletion(e, null); +} +}); +} +offsetWriteFuture.thenAccept(v -> primaryStore.set(values, new FutureCallback<>((primaryWriteError, ignored) -> { Review Comment: I have not added this test at this point and I can add it if you think it's still needed with the new approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: I have actually removed the chaining using `CompletableFuture` and simplified the logic. I just wait on the secondary store write directly (with or without a timeout) and if the execution fails or the wait itself fails, I update the callback and return the same exception (because it's already completed). ``` if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, (t, r) -> { }); try { if (exactlyOnce) { secondaryWriteFuture.get(); } else { secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); } log.debug("Successfully flushed tombstone offsets to secondary store"); } catch (ExecutionException e) { log.error("{} Failed to flush tombstone offsets to secondary store", this, e.getCause()); callback.onCompletion(e.getCause(), null); return secondaryWriteFuture; } catch (Throwable e) { log.error("{} Failed to flush tombstone offsets to secondary store", this, e); callback.onCompletion(e, null); return secondaryWriteFuture; } } ``` With this, there are no changes (apart from using `regularOffsets` when writing to secondary store) from the primary store. Let me know what you think about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463144968 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { +Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, new FutureCallback<>()); Review Comment: I took the non-fancy route for now i.e using an inline no-op `Callback`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463143759 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); Review Comment: Thanks for this. I updated the logic. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -254,7 +260,12 @@ public Map get(long timeout, TimeUnit unit) throws Inter * write to that store, and the passed-in {@link Callback} is invoked once that write completes. If a worker-global * store is provided, a secondary write is made to that store if the write to the connector-specific store * succeeds. Errors with this secondary write are not reflected in the returned {@link Future} or the passed-in - * {@link Callback}; they are only logged as a warning to users. + * {@link Callback}; they are only logged as a warning to users. The only exception to this rule is when the offsets + * that need to be committed contains tombstone records as well. In such cases, a write consisting of only tombstone + * offsets would first happen on the worker-global store and only if it succeeds, would all the offsets be written + * to the connector-specific store and the regular offsets would be written to the worker-global store. Note that + * in this case, failure to write regular offsets to secondary store would still not reflect in the returned + * {@link Future} or the passed-in {@link Callback} Review Comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1848322802 > Thanks @vamossagar12, I've given the functional changes another pass. Unless I'm mistaken about the semantics for `CompletableFuture::thenAccept`, there's a pretty serious bug in the current implementation that needs to be fixed before merging, and we should also make sure to add testing coverage to both verify that we've fixed that bug now and won't re-introduce it later. Thanks @C0urante . When you say serious bug, I am assuming you are referring to this comment [here](https://github.com/apache/kafka/pull/13801#discussion_r1417751374)? That is the outer get doesn't block on the lambda's execution? Ok, let me verify that behaviour again and revert. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1417751374 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: This causes the passed-in lambda to be executed after `offsetWriteFuture` completes, but it doesn't cause `offsetWriteFuture::get` to block on the completion of that lambda, which means that if the caller of `ConnectorOffsetBackingStore::set` waits for the returned `Future` to complete right now, it'll complete immediately even though it's likely no offsets will have been written to either the primary or secondary store. Also, nit: when researching this behavior, I ran into `CompletableFuture::thenRun`, which is a slightly better fit than `CompletableFuture::thenAccept` (since we don't use the return value). ```suggestion offsetWriteFuture = offsetWriteFuture.thenRun(() -> { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1417677509 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. Review Comment: This largely duplicates information present in the Javadoc for the method. IMO we can remove this entire comment block and, if we really want to make it easy for maintainers to understand not just the "what" but the "why" for this logic, we can link to https://issues.apache.org/jira/browse/KAFKA-15018 in the Javadoc for the method: ```java * * @see https://issues.apache.org/jira/browse/KAFKA-15018";>KAFKA-15018 for context on the three-step * write sequence ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { +Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, new FutureCallback<>()); +try { +if (exactlyOnce) { +secondaryWriteFuture.get(); +} else { +secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); +} +} catch (ExecutionException e) { +log.error("{} Flush of tombstone(s) offsets to secondary store threw an unexpected exception: ", this, e.getCause());
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1842053883 Removed the stale label. @vamossagar12 thanks for your patience on this one, I plan on making another pass this week! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
github-actions[bot] commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1842029141 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org