ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r431481812



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -811,17 +812,41 @@ private void prepareChangelogs(final 
Set<ChangelogMetadata> newPartitionsToResto
         }
     }
 
+    private RuntimeException invokeOnRestoreEnd(final TopicPartition partition,
+                                                final ChangelogMetadata 
changelogMetadata) {
+        // only trigger the store's specific listener to make sure we disable 
bulk loading before transition to standby
+        final StateStoreMetadata storeMetadata = 
changelogMetadata.storeMetadata;
+        final StateRestoreCallback restoreCallback = 
storeMetadata.restoreCallback();
+        final String storeName = storeMetadata.store().name();
+        if (restoreCallback instanceof StateRestoreListener) {
+            try {
+                ((StateRestoreListener) 
restoreCallback).onRestoreEnd(partition, storeName, 
changelogMetadata.totalRestored);
+            } catch (final RuntimeException e) {
+                return e;
+            }
+        }
+        return null;
+    }
+
     @Override
-    public void remove(final Collection<TopicPartition> revokedChangelogs) {
-        // Only changelogs that are initialized that been added to the restore 
consumer's assignment
+    public void unregister(final Collection<TopicPartition> revokedChangelogs,
+                           final boolean triggerOnRestoreEnd) {
+        final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
+
+        // Only changelogs that are initialized have been added to the restore 
consumer's assignment
         final List<TopicPartition> revokedInitializedChangelogs = new 
ArrayList<>();
 
         for (final TopicPartition partition : revokedChangelogs) {
             final ChangelogMetadata changelogMetadata = 
changelogs.remove(partition);
             if (changelogMetadata != null) {
-                if (changelogMetadata.state() != ChangelogState.REGISTERED) {
+                if (triggerOnRestoreEnd && 
changelogMetadata.state().equals(ChangelogState.RESTORING)) {

Review comment:
       I assume you're referring to the user registered global listener? I was 
trying to imagine what users actually might be using this for, and figured a 
major reason was to alert when a store is ready for IQ. Obviously, the store is 
not in fact ready for IQ in this case. I assume the worst that could happen is 
they'll just get an exception saying the store is not in fact ready if they do 
try to query it, but it still seems likely to cause confusion. 
   
   If you're also wondering about the seemingly arbitrary distinction made 
between the store-specific listener and the global one, it seems like the 
intent of the store-specific listener is to take action on a particular store 
as it transitions between restoring and not. The store-specific listener has a 
reference to the actual store, and can for example toggle it for bulk loading.
   
   But IIUC the global listener does not have a reference to any actual stores 
and thus it's purpose seems more for alerting on the completion of restoration 
rather than taking some action on the store as restoration begins/ends. 
   
   Restoration completion =/= restoration ending, but unfortunately we just 
have the one callback




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to