cadonna commented on code in PR #15044: URL: https://github.com/apache/kafka/pull/15044#discussion_r1431243577
########## streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ########## @@ -789,6 +806,158 @@ public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception { verifyOffsetsAreInCheckpoint(1); } + @Test + public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled() throws Exception { + shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true); + } + + @Test + public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled() throws Exception { + if (!processingThreadsEnabled) { + shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false); + } + } + + @SuppressWarnings("deprecation") + private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(final boolean stateUpdaterEnabled) throws Exception { + if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && !eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) { + return; + } + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); + streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); + streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); + streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); + final String stateStoreName = "stateStore"; + + purgeLocalStreamsState(streamsConfiguration); + + final int startKey = 1; + final int endKey = 30001; + final int valueSize = 1000; + final StringBuilder value1 = new StringBuilder(valueSize); + for (int i = 0; i < valueSize; ++i) { + value1.append("A"); + } + final String valueStr1 = value1.toString(); + final List<KeyValue<Integer, String>> recordBatch1 = IntStream.range(startKey, endKey).mapToObj(i -> KeyValue.pair(i, valueStr1)).collect(Collectors.toList()); + IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, + recordBatch1, + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class), + CLUSTER.time); + + final StoreBuilder<KeyValueStore<Integer, String>> stateStore = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(stateStoreName), + Serdes.Integer(), + Serdes.String()).withCachingEnabled(); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean throwException = new AtomicBoolean(false); + final TaskId task00 = new TaskId(0, 0); + final AtomicLong restoredOffsetsForPartition0 = new AtomicLong(0); + final Topology topology = new Topology(); + topology + .addSource("source", MULTI_PARTITION_INPUT_TOPIC) + .addProcessor("processor", () -> new Processor<Integer, String, Integer, String>() { + KeyValueStore<Integer, String> stateStore; + org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> context; + + @Override + public void init(final org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> context) { + Processor.super.init(context); + this.context = context; + stateStore = context.getStateStore(stateStoreName); + } + + @Override + public void process(final Record<Integer, String> record) { + context.recordMetadata().ifPresent(recordMetadata -> { + if (recordMetadata.partition() == 0) { + if (throwException.compareAndSet(true, false)) { + throw new TaskCorruptedException(Collections.singleton(task00)); + } + stateStore.put(record.key(), record.value()); + } else { + stateStore.put(record.key(), record.value()); + if (restoredOffsetsForPartition0.get() > 0) { + latch.countDown(); + } + } + }); + } + + @Override + public void close() { + Processor.super.close(); + } + }, "source") + .addStateStore(stateStore, "processor") + .addSink("sink", MULTI_PARTITION_OUTPUT_TOPIC, "processor"); + + final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration); + kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() { + @Override + public void onRestoreStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset, + final long endingOffset) { + if (topicPartition.partition() == 0) { + System.out.println("Restore listener - Starting offset: " + startingOffset); + } + } + @Override + public void onBatchRestored(final TopicPartition topicPartition, + final String storeName, + final long batchEndOffset, + final long numRestored) { + if (topicPartition.partition() == 0) { + System.out.println("Restore listener - Batch end offset: " + batchEndOffset); + restoredOffsetsForPartition0.set(batchEndOffset); + } + } + @Override + public void onRestoreEnd(final TopicPartition topicPartition, + final String storeName, + final long totalRestored) { + if (topicPartition.partition() == 0) { + System.out.println("Restore listener - Total restored: " + totalRestored); + } + } + }); + startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60)); + ensureCommittedRecordsInTopic( + applicationId + "-" + stateStoreName + "-changelog", + 2000, + StringDeserializer.class, + StringDeserializer.class + ); + throwException.set(true); Review Comment: OK, I see that there is a chance to not throw the exception in the `if`-branch of the processor. I added a second batch of records to add to the input topic after ensuring committed records in the changelog topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org