cadonna commented on code in PR #15044:
URL: https://github.com/apache/kafka/pull/15044#discussion_r1431220982


##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -789,6 +806,158 @@ public void 
shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
         verifyOffsetsAreInCheckpoint(1);
     }
 
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled()
 throws Exception {
+        shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true);
+    }
+
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled()
 throws Exception {
+        if (!processingThreadsEnabled) {
+            
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(final boolean 
stateUpdaterEnabled) throws Exception {
+        if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && 
!eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) {
+            return;
+        }
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
eosConfig);
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
+        streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, 
processingThreadsEnabled);
+        streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled);
+        final String stateStoreName = "stateStore";
+
+        purgeLocalStreamsState(streamsConfiguration);
+
+        final int startKey = 1;
+        final int endKey = 30001;
+        final int valueSize = 1000;
+        final StringBuilder value1 = new StringBuilder(valueSize);
+        for (int i = 0; i < valueSize; ++i) {
+            value1.append("A");
+        }
+        final String valueStr1 = value1.toString();
+        final List<KeyValue<Integer, String>> recordBatch1 = 
IntStream.range(startKey, endKey).mapToObj(i -> KeyValue.pair(i, 
valueStr1)).collect(Collectors.toList());
+        
IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC,
+            recordBatch1,
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            CLUSTER.time);
+
+        final StoreBuilder<KeyValueStore<Integer, String>> stateStore = 
Stores.keyValueStoreBuilder(
+            Stores.persistentKeyValueStore(stateStoreName),
+            Serdes.Integer(),
+            Serdes.String()).withCachingEnabled();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicBoolean throwException = new AtomicBoolean(false);
+        final TaskId task00 = new TaskId(0, 0);
+        final AtomicLong restoredOffsetsForPartition0 = new AtomicLong(0);
+        final Topology topology = new Topology();
+        topology
+            .addSource("source", MULTI_PARTITION_INPUT_TOPIC)
+            .addProcessor("processor", () -> new Processor<Integer, String, 
Integer, String>() {
+                KeyValueStore<Integer, String> stateStore;
+                
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context;
+
+                @Override
+                public void init(final 
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context) {
+                    Processor.super.init(context);
+                    this.context = context;
+                    stateStore = context.getStateStore(stateStoreName);
+                }
+
+                @Override
+                public void process(final Record<Integer, String> record) {
+                    context.recordMetadata().ifPresent(recordMetadata -> {
+                        if (recordMetadata.partition() == 0) {
+                            if (throwException.compareAndSet(true, false)) {
+                                throw new 
TaskCorruptedException(Collections.singleton(task00));
+                            }
+                            stateStore.put(record.key(), record.value());
+                        } else {
+                            stateStore.put(record.key(), record.value());
+                            if (restoredOffsetsForPartition0.get() > 0) {
+                                latch.countDown();
+                            }
+                        }
+                    });
+                }
+
+                @Override
+                public void close() {
+                    Processor.super.close();
+                }
+            }, "source")
+            .addStateStore(stateStore, "processor")
+            .addSink("sink", MULTI_PARTITION_OUTPUT_TOPIC, "processor");
+
+        final KafkaStreams kafkaStreams = new KafkaStreams(topology, 
streamsConfiguration);
+        kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
+            @Override
+            public void onRestoreStart(final TopicPartition topicPartition,
+                                       final String storeName,
+                                       final long startingOffset,
+                                       final long endingOffset) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Starting offset: " 
+ startingOffset);
+                }
+            }
+            @Override
+            public void onBatchRestored(final TopicPartition topicPartition,
+                                        final String storeName,
+                                        final long batchEndOffset,
+                                        final long numRestored) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Batch end offset: " 
+ batchEndOffset);
+                    restoredOffsetsForPartition0.set(batchEndOffset);
+                }
+            }
+            @Override
+            public void onRestoreEnd(final TopicPartition topicPartition,
+                                     final String storeName,
+                                     final long totalRestored) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Total restored: " + 
totalRestored);
+                }
+            }
+        });
+        
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), 
Duration.ofSeconds(60));
+        ensureCommittedRecordsInTopic(
+            applicationId + "-" + stateStoreName + "-changelog",
+            2000,
+            StringDeserializer.class,
+            StringDeserializer.class
+        );
+        throwException.set(true);

Review Comment:
   Why is that a risk?
   If you are referring to the `else`-branch in the processor, I changed that 
in the last commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to