vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1254543662
########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ########## @@ -192,6 +209,236 @@ public void testCancelAfterAwaitFlush() throws Exception { flushFuture.get(1000, TimeUnit.MILLISECONDS); } + @Test + public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", true); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + Future<Void> flushFuture = offsetStorageWriter.doFlush((error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }); + assertThrows(ExecutionException.class, () -> flushFuture.get(1000L, TimeUnit.MILLISECONDS)); + } + + @Test + public void testFlushSuccessWhenWritesSucceedToBothPrimaryAndSecondaryStoresForTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + offsetStorageWriter.doFlush((error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testFlushSuccessWhenWriteToSecondaryStoreFailsForNonTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", true); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + offsetStorageWriter.doFlush((error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testFlushSuccessWhenWritesToPrimaryAndSecondaryStoreSucceeds() throws InterruptedException, TimeoutException, ExecutionException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + offsetStorageWriter.doFlush((error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceeds() throws InterruptedException, TimeoutException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", true); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + Future<Void> flushFuture = offsetStorageWriter.doFlush((error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }); + assertThrows(ExecutionException.class, () -> flushFuture.get(1000L, TimeUnit.MILLISECONDS)); + } + + @Test + public void testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceedsForTombstoneRecords() throws InterruptedException, TimeoutException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", true); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + Future<Void> flushFuture = offsetStorageWriter.doFlush((error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }); + assertThrows(ExecutionException.class, () -> flushFuture.get(1000L, TimeUnit.MILLISECONDS)); + } + + @Test + public void testFlushSuccessWhenWritesToPrimaryStoreSucceedsWithNoSecondaryStore() throws InterruptedException, TimeoutException, ExecutionException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forConnector("source-connector"), + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + offsetStorageWriter.doFlush((error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testFlushFailureWhenWritesToPrimaryStoreFailsWithNoSecondaryStore() throws InterruptedException, TimeoutException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", true); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forConnector("source-connector"), + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + Future<Void> flushFuture = offsetStorageWriter.doFlush((error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }); + assertThrows(ExecutionException.class, () -> flushFuture.get(1000L, TimeUnit.MILLISECONDS)); + } + + @SuppressWarnings("unchecked") + private KafkaOffsetBackingStore setupOffsetBackingStoreWithProducer(String topic, boolean throwingProducer) { + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(() -> mock(TopicAdmin.class), () -> "connect", mock(Converter.class)); + KafkaBasedLog<byte[], byte[]> kafkaBasedLog = new KafkaBasedLog<byte[], byte[]>(topic, new HashMap<>(), new HashMap<>(), + () -> mock(TopicAdmin.class), mock(Callback.class), new MockTime(), null); + Whitebox.setInternalState(kafkaBasedLog, "producer", Optional.of(createProducer(throwingProducer))); Review Comment: Yeah that worked. Thanks 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,8 +284,59 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } + boolean containsTombstones = values.containsValue(null); + + // If there are tombstone offsets, then the failure to write to secondary store will + // not be ignored. Also, for tombstone records, we first write to secondary store and + // then to primary stores. + if (secondaryStore != null && containsTombstones) { + AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new AtomicReference<>(); + Future<Void> secondaryWriteFuture = secondaryStore.set(values, (secondaryWriteError, ignored) -> { + try (LoggingContext context = loggingContext()) { + if (secondaryWriteError != null) { + log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); + secondaryStoreTombstoneWriteError.compareAndSet(null, secondaryWriteError); + } else { + log.debug("Successfully flushed tombstone(s)-containing to secondary backing store"); + } + } + }); + try { + // For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for + // commits. We still need to wait because we want to fail the offset commit for cases when + // tombstone records fail to be written to the secondary store. Note that while commitTransaction + // already waits for all records to be sent and ack'ed, in this case we do need to add an explicit + // blocking call. In case EOS is disabled, we wait for the same duration as `offset.commit.timeout.ms` + // and throw that exception which would allow the offset commit to fail. + if (exactlyOnce) { + secondaryWriteFuture.get(); + } else { + secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + log.warn("{} Flush of tombstone(s)-containing to secondary store interrupted, cancelling", this); + secondaryStoreTombstoneWriteError.compareAndSet(null, e); + } catch (ExecutionException e) { + log.error("{} Flush of tombstone(s)-containing offsets to secondary store threw an unexpected exception: ", this, e); + secondaryStoreTombstoneWriteError.compareAndSet(null, e); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org