lucasbru commented on code in PR #15044:
URL: https://github.com/apache/kafka/pull/15044#discussion_r1431209859


##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -789,6 +806,158 @@ public void 
shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
         verifyOffsetsAreInCheckpoint(1);
     }
 
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled()
 throws Exception {
+        shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true);
+    }
+
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled()
 throws Exception {
+        if (!processingThreadsEnabled) {
+            
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(final boolean 
stateUpdaterEnabled) throws Exception {
+        if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && 
!eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) {
+            return;
+        }
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
eosConfig);
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
+        streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, 
processingThreadsEnabled);
+        streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled);
+        final String stateStoreName = "stateStore";
+
+        purgeLocalStreamsState(streamsConfiguration);
+
+        final int startKey = 1;
+        final int endKey = 30001;
+        final int valueSize = 1000;
+        final StringBuilder value1 = new StringBuilder(valueSize);
+        for (int i = 0; i < valueSize; ++i) {
+            value1.append("A");
+        }
+        final String valueStr1 = value1.toString();
+        final List<KeyValue<Integer, String>> recordBatch1 = 
IntStream.range(startKey, endKey).mapToObj(i -> KeyValue.pair(i, 
valueStr1)).collect(Collectors.toList());
+        
IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC,
+            recordBatch1,
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            CLUSTER.time);
+
+        final StoreBuilder<KeyValueStore<Integer, String>> stateStore = 
Stores.keyValueStoreBuilder(
+            Stores.persistentKeyValueStore(stateStoreName),
+            Serdes.Integer(),
+            Serdes.String()).withCachingEnabled();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicBoolean throwException = new AtomicBoolean(false);
+        final TaskId task00 = new TaskId(0, 0);
+        final AtomicLong restoredOffsetsForPartition0 = new AtomicLong(0);
+        final Topology topology = new Topology();
+        topology
+            .addSource("source", MULTI_PARTITION_INPUT_TOPIC)
+            .addProcessor("processor", () -> new Processor<Integer, String, 
Integer, String>() {
+                KeyValueStore<Integer, String> stateStore;
+                
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context;
+
+                @Override
+                public void init(final 
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context) {
+                    Processor.super.init(context);
+                    this.context = context;
+                    stateStore = context.getStateStore(stateStoreName);
+                }
+
+                @Override
+                public void process(final Record<Integer, String> record) {
+                    context.recordMetadata().ifPresent(recordMetadata -> {
+                        if (recordMetadata.partition() == 0) {
+                            if (throwException.compareAndSet(true, false)) {
+                                throw new 
TaskCorruptedException(Collections.singleton(task00));
+                            }
+                            stateStore.put(record.key(), record.value());
+                        } else {
+                            stateStore.put(record.key(), record.value());
+                            if (restoredOffsetsForPartition0.get() > 0) {
+                                latch.countDown();
+                            }
+                        }
+                    });
+                }
+
+                @Override
+                public void close() {
+                    Processor.super.close();
+                }
+            }, "source")
+            .addStateStore(stateStore, "processor")
+            .addSink("sink", MULTI_PARTITION_OUTPUT_TOPIC, "processor");
+
+        final KafkaStreams kafkaStreams = new KafkaStreams(topology, 
streamsConfiguration);
+        kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
+            @Override
+            public void onRestoreStart(final TopicPartition topicPartition,
+                                       final String storeName,
+                                       final long startingOffset,
+                                       final long endingOffset) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Starting offset: " 
+ startingOffset);
+                }
+            }
+            @Override
+            public void onBatchRestored(final TopicPartition topicPartition,
+                                        final String storeName,
+                                        final long batchEndOffset,
+                                        final long numRestored) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Batch end offset: " 
+ batchEndOffset);
+                    restoredOffsetsForPartition0.set(batchEndOffset);
+                }
+            }
+            @Override
+            public void onRestoreEnd(final TopicPartition topicPartition,
+                                     final String storeName,
+                                     final long totalRestored) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Total restored: " + 
totalRestored);
+                }
+            }
+        });
+        
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), 
Duration.ofSeconds(60));
+        ensureCommittedRecordsInTopic(
+            applicationId + "-" + stateStoreName + "-changelog",
+            2000,
+            StringDeserializer.class,
+            StringDeserializer.class
+        );
+        throwException.set(true);

Review Comment:
   Is there a risk here that I may process all 30k keys before I manage to 
throw the exception?



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -789,6 +806,158 @@ public void 
shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
         verifyOffsetsAreInCheckpoint(1);
     }
 
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled()
 throws Exception {
+        shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true);
+    }
+
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled()
 throws Exception {
+        if (!processingThreadsEnabled) {
+            
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(final boolean 
stateUpdaterEnabled) throws Exception {
+        if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && 
!eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) {
+            return;
+        }
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
eosConfig);
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
+        streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, 
processingThreadsEnabled);
+        streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled);
+        final String stateStoreName = "stateStore";
+
+        purgeLocalStreamsState(streamsConfiguration);
+
+        final int startKey = 1;
+        final int endKey = 30001;
+        final int valueSize = 1000;
+        final StringBuilder value1 = new StringBuilder(valueSize);
+        for (int i = 0; i < valueSize; ++i) {
+            value1.append("A");
+        }
+        final String valueStr1 = value1.toString();
+        final List<KeyValue<Integer, String>> recordBatch1 = 
IntStream.range(startKey, endKey).mapToObj(i -> KeyValue.pair(i, 
valueStr1)).collect(Collectors.toList());
+        
IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC,
+            recordBatch1,
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            CLUSTER.time);
+
+        final StoreBuilder<KeyValueStore<Integer, String>> stateStore = 
Stores.keyValueStoreBuilder(
+            Stores.persistentKeyValueStore(stateStoreName),
+            Serdes.Integer(),
+            Serdes.String()).withCachingEnabled();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicBoolean throwException = new AtomicBoolean(false);
+        final TaskId task00 = new TaskId(0, 0);
+        final AtomicLong restoredOffsetsForPartition0 = new AtomicLong(0);
+        final Topology topology = new Topology();
+        topology
+            .addSource("source", MULTI_PARTITION_INPUT_TOPIC)
+            .addProcessor("processor", () -> new Processor<Integer, String, 
Integer, String>() {
+                KeyValueStore<Integer, String> stateStore;
+                
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context;
+
+                @Override
+                public void init(final 
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context) {
+                    Processor.super.init(context);
+                    this.context = context;
+                    stateStore = context.getStateStore(stateStoreName);
+                }
+
+                @Override
+                public void process(final Record<Integer, String> record) {
+                    context.recordMetadata().ifPresent(recordMetadata -> {
+                        if (recordMetadata.partition() == 0) {
+                            if (throwException.compareAndSet(true, false)) {
+                                throw new 
TaskCorruptedException(Collections.singleton(task00));
+                            }
+                            stateStore.put(record.key(), record.value());
+                        } else {
+                            stateStore.put(record.key(), record.value());
+                            if (restoredOffsetsForPartition0.get() > 0) {
+                                latch.countDown();
+                            }
+                        }
+                    });
+                }
+
+                @Override
+                public void close() {
+                    Processor.super.close();
+                }
+            }, "source")
+            .addStateStore(stateStore, "processor")
+            .addSink("sink", MULTI_PARTITION_OUTPUT_TOPIC, "processor");
+
+        final KafkaStreams kafkaStreams = new KafkaStreams(topology, 
streamsConfiguration);
+        kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
+            @Override
+            public void onRestoreStart(final TopicPartition topicPartition,
+                                       final String storeName,
+                                       final long startingOffset,
+                                       final long endingOffset) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Starting offset: " 
+ startingOffset);

Review Comment:
   Do we really want to println this?



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -789,6 +806,158 @@ public void 
shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
         verifyOffsetsAreInCheckpoint(1);
     }
 
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled()
 throws Exception {
+        shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true);
+    }
+
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled()
 throws Exception {
+        if (!processingThreadsEnabled) {
+            
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(final boolean 
stateUpdaterEnabled) throws Exception {
+        if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && 
!eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) {
+            return;
+        }
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
eosConfig);
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
+        streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, 
processingThreadsEnabled);
+        streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled);
+        final String stateStoreName = "stateStore";
+
+        purgeLocalStreamsState(streamsConfiguration);
+
+        final int startKey = 1;
+        final int endKey = 30001;
+        final int valueSize = 1000;
+        final StringBuilder value1 = new StringBuilder(valueSize);
+        for (int i = 0; i < valueSize; ++i) {
+            value1.append("A");
+        }
+        final String valueStr1 = value1.toString();
+        final List<KeyValue<Integer, String>> recordBatch1 = 
IntStream.range(startKey, endKey).mapToObj(i -> KeyValue.pair(i, 
valueStr1)).collect(Collectors.toList());
+        
IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC,
+            recordBatch1,
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            CLUSTER.time);
+
+        final StoreBuilder<KeyValueStore<Integer, String>> stateStore = 
Stores.keyValueStoreBuilder(
+            Stores.persistentKeyValueStore(stateStoreName),
+            Serdes.Integer(),
+            Serdes.String()).withCachingEnabled();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicBoolean throwException = new AtomicBoolean(false);
+        final TaskId task00 = new TaskId(0, 0);
+        final AtomicLong restoredOffsetsForPartition0 = new AtomicLong(0);
+        final Topology topology = new Topology();
+        topology
+            .addSource("source", MULTI_PARTITION_INPUT_TOPIC)
+            .addProcessor("processor", () -> new Processor<Integer, String, 
Integer, String>() {
+                KeyValueStore<Integer, String> stateStore;
+                
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context;
+
+                @Override
+                public void init(final 
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context) {
+                    Processor.super.init(context);
+                    this.context = context;
+                    stateStore = context.getStateStore(stateStoreName);
+                }
+
+                @Override
+                public void process(final Record<Integer, String> record) {
+                    context.recordMetadata().ifPresent(recordMetadata -> {
+                        if (recordMetadata.partition() == 0) {
+                            if (throwException.compareAndSet(true, false)) {
+                                throw new 
TaskCorruptedException(Collections.singleton(task00));
+                            }
+                            stateStore.put(record.key(), record.value());
+                        } else {
+                            stateStore.put(record.key(), record.value());
+                            if (restoredOffsetsForPartition0.get() > 0) {
+                                latch.countDown();
+                            }
+                        }
+                    });
+                }
+
+                @Override
+                public void close() {
+                    Processor.super.close();
+                }
+            }, "source")
+            .addStateStore(stateStore, "processor")
+            .addSink("sink", MULTI_PARTITION_OUTPUT_TOPIC, "processor");
+
+        final KafkaStreams kafkaStreams = new KafkaStreams(topology, 
streamsConfiguration);
+        kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
+            @Override
+            public void onRestoreStart(final TopicPartition topicPartition,
+                                       final String storeName,
+                                       final long startingOffset,
+                                       final long endingOffset) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Starting offset: " 
+ startingOffset);
+                }
+            }
+            @Override
+            public void onBatchRestored(final TopicPartition topicPartition,
+                                        final String storeName,
+                                        final long batchEndOffset,
+                                        final long numRestored) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Batch end offset: " 
+ batchEndOffset);

Review Comment:
   Do we really want to println this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -377,6 +377,11 @@ public void closeDirty() {
             streamsProducer.abortTransaction();
         }
 
+        close();
+    }
+
+    private void close() {
+        offsets.clear();

Review Comment:
   I like having common logic for close clean / close dirty. Should we also 
move `removeAllProducedSensors` here?



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -789,6 +806,158 @@ public void 
shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
         verifyOffsetsAreInCheckpoint(1);
     }
 
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled()
 throws Exception {
+        shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true);
+    }
+
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled()
 throws Exception {
+        if (!processingThreadsEnabled) {
+            
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(final boolean 
stateUpdaterEnabled) throws Exception {
+        if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && 
!eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) {
+            return;
+        }
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
eosConfig);
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
+        streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, 
processingThreadsEnabled);
+        streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled);
+        final String stateStoreName = "stateStore";
+
+        purgeLocalStreamsState(streamsConfiguration);
+
+        final int startKey = 1;
+        final int endKey = 30001;
+        final int valueSize = 1000;
+        final StringBuilder value1 = new StringBuilder(valueSize);
+        for (int i = 0; i < valueSize; ++i) {
+            value1.append("A");
+        }
+        final String valueStr1 = value1.toString();
+        final List<KeyValue<Integer, String>> recordBatch1 = 
IntStream.range(startKey, endKey).mapToObj(i -> KeyValue.pair(i, 
valueStr1)).collect(Collectors.toList());
+        
IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC,
+            recordBatch1,
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            CLUSTER.time);
+
+        final StoreBuilder<KeyValueStore<Integer, String>> stateStore = 
Stores.keyValueStoreBuilder(
+            Stores.persistentKeyValueStore(stateStoreName),
+            Serdes.Integer(),
+            Serdes.String()).withCachingEnabled();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicBoolean throwException = new AtomicBoolean(false);
+        final TaskId task00 = new TaskId(0, 0);
+        final AtomicLong restoredOffsetsForPartition0 = new AtomicLong(0);
+        final Topology topology = new Topology();
+        topology
+            .addSource("source", MULTI_PARTITION_INPUT_TOPIC)
+            .addProcessor("processor", () -> new Processor<Integer, String, 
Integer, String>() {
+                KeyValueStore<Integer, String> stateStore;
+                
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context;
+
+                @Override
+                public void init(final 
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context) {
+                    Processor.super.init(context);
+                    this.context = context;
+                    stateStore = context.getStateStore(stateStoreName);
+                }
+
+                @Override
+                public void process(final Record<Integer, String> record) {
+                    context.recordMetadata().ifPresent(recordMetadata -> {
+                        if (recordMetadata.partition() == 0) {
+                            if (throwException.compareAndSet(true, false)) {
+                                throw new 
TaskCorruptedException(Collections.singleton(task00));
+                            }
+                            stateStore.put(record.key(), record.value());
+                        } else {
+                            stateStore.put(record.key(), record.value());
+                            if (restoredOffsetsForPartition0.get() > 0) {
+                                latch.countDown();
+                            }
+                        }
+                    });
+                }
+
+                @Override
+                public void close() {
+                    Processor.super.close();
+                }
+            }, "source")
+            .addStateStore(stateStore, "processor")
+            .addSink("sink", MULTI_PARTITION_OUTPUT_TOPIC, "processor");
+
+        final KafkaStreams kafkaStreams = new KafkaStreams(topology, 
streamsConfiguration);
+        kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
+            @Override
+            public void onRestoreStart(final TopicPartition topicPartition,
+                                       final String storeName,
+                                       final long startingOffset,
+                                       final long endingOffset) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Starting offset: " 
+ startingOffset);
+                }
+            }
+            @Override
+            public void onBatchRestored(final TopicPartition topicPartition,
+                                        final String storeName,
+                                        final long batchEndOffset,
+                                        final long numRestored) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Batch end offset: " 
+ batchEndOffset);
+                    restoredOffsetsForPartition0.set(batchEndOffset);
+                }
+            }
+            @Override
+            public void onRestoreEnd(final TopicPartition topicPartition,
+                                     final String storeName,
+                                     final long totalRestored) {
+                if (topicPartition.partition() == 0) {
+                    System.out.println("Restore listener - Total restored: " + 
totalRestored);

Review Comment:
   Do we really want to println this?



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -789,6 +806,158 @@ public void 
shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
         verifyOffsetsAreInCheckpoint(1);
     }
 
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled()
 throws Exception {
+        shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(true);
+    }
+
+    @Test
+    public void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled()
 throws Exception {
+        if (!processingThreadsEnabled) {
+            
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void 
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(final boolean 
stateUpdaterEnabled) throws Exception {
+        if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && 
!eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) {
+            return;
+        }
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
eosConfig);
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
+        streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, 
processingThreadsEnabled);
+        streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled);
+        final String stateStoreName = "stateStore";
+
+        purgeLocalStreamsState(streamsConfiguration);
+
+        final int startKey = 1;
+        final int endKey = 30001;
+        final int valueSize = 1000;
+        final StringBuilder value1 = new StringBuilder(valueSize);
+        for (int i = 0; i < valueSize; ++i) {

Review Comment:
   Is the size of the value essential to the test? In other words, "why?"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to