Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on PR #14735: URL: https://github.com/apache/kafka/pull/14735#issuecomment-1842502256 Test failures are unrelated, looks good to merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman merged PR #14735: URL: https://github.com/apache/kafka/pull/14735 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1414691022 ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * A callback that will be invoked after registering the changelogs for each state store in a standby + * task. It is guaranteed to always be invoked before any records are loaded into the standby store. + * + * @param topicPartition the changelog TopicPartition for this standby task + * @param storeName the name of the store being loaded + * @param startingOffset the offset from which the standby task begins consuming from the changelog + */ +void onUpdateStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset); + +/** + * Method called after loading a batch of records. In this case the maximum size of the batch is whatever + * the value of the MAX_POLL_RECORDS is set to. + * + * This method is called after loading each batch and it is advised to keep processing to a minimum. + * Any heavy processing will block the state updater thread and slow down the rate of standby task + * loading. Therefore, if you need to do any extended processing or connect to an external service, + * consider doing so asynchronously. + * + * @param topicPartition the TopicPartition containing the values to restore + * @param storeName the name of the store undergoing restoration + * @param batchEndOffset batchEndOffset the changelog end offset (inclusive) of the batch that was just loaded + * @param batchSize the total number of records in the batch that was just loaded + * @param currentEndOffset the current end offset of the changelog topic partition. + */ +void onBatchLoaded(final TopicPartition topicPartition, + final String storeName, + final TaskId taskId, + final long batchEndOffset, + final long batchSize, + final long currentEndOffset); + +/** + * This method is called when the corresponding standby task stops updating, for the provided reason. + * + * If the task was {@code MIGRATED} to another instance, this callback will be invoked after this + * state store (and the task itself) are closed (in which case the data will be cleaned up after + * state.cleanup.delay.ms). + * If the task was {@code PROMOTED} to an active task, the state store will not be closed, and the + * callback will be invoked after unregistering it as a standby task but before re-registering it as an active task + * and beginning restoration. In other words, this will always called before the corresponding + * {@link StateRestoreListener#onRestoreStart} call is made. + * + * @param topicPartition the TopicPartition containing the values to restore + * @param storeName the name of the store undergoing restoration Review Comment: ```suggestion * @param topicPartition the changelog TopicPartition for this standby task * @param storeName the name of the store being loaded ``` ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1412648726 ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeNamethe name of the store being watched by this Standby Task. + * @param startingOffset the offset from which the Standby Task starts watching. + */ +void onUpdateStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset); + +/** + * Method called after restoring a batch of records. In this case the maximum size of the batch is whatever Review Comment: ```suggestion * Method called after loading a batch of records. In this case the maximum size of the batch is whatever ``` ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeNamethe name of the store being watched by this Standby Task. Review Comment: ditto here, copy this to the other two callbacks as well ```suggestion * @param storeName the name of the store being loaded ``` ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeNamethe name of the store being watched by this
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1412638885 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -580,6 +582,23 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState } } +/** + * Set the listener which is triggered whenever a standby task is updated + * + * @param standbyListener The listener triggered when a standby task is updated. + * @throws IllegalStateException if this {@code KafkaStreams} instance has already been started. + */ +public void setStandbyUpdateListener(final StandbyUpdateListener standbyListener) { Review Comment: Agreed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1412148951 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -580,6 +582,23 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState } } +/** + * Set the listener which is triggered whenever a standby task is updated + * + * @param standbyListener The listener triggered when a standby task is updated. + * @throws IllegalStateException if this {@code KafkaStreams} instance has already been started. + */ +public void setStandbyUpdateListener(final StandbyUpdateListener standbyListener) { Review Comment: IMO, in the context of this method, `standbyListener` makes more sense than `userStandbyListener`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1412102373 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -175,13 +176,15 @@ public class KafkaStreams implements AutoCloseable { private final KafkaClientSupplier clientSupplier; protected final TopologyMetadata topologyMetadata; private final QueryableStoreProvider queryableStoreProvider; +private final StandbyUpdateListener delegatingStandbyUpdateListener; GlobalStreamThread globalStreamThread; private KafkaStreams.StateListener stateListener; private StateRestoreListener globalStateRestoreListener; private boolean oldHandler; private BiConsumer streamsUncaughtExceptionHandler; private final Object changeThreadCount = new Object(); +private StandbyUpdateListener globalStandbyListener; Review Comment: I agree. Your proposal makes things easier to understand. Thanks! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1411413767 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -175,13 +176,15 @@ public class KafkaStreams implements AutoCloseable { private final KafkaClientSupplier clientSupplier; protected final TopologyMetadata topologyMetadata; private final QueryableStoreProvider queryableStoreProvider; +private final StandbyUpdateListener delegatingStandbyUpdateListener; GlobalStreamThread globalStreamThread; private KafkaStreams.StateListener stateListener; private StateRestoreListener globalStateRestoreListener; private boolean oldHandler; private BiConsumer streamsUncaughtExceptionHandler; private final Object changeThreadCount = new Object(); +private StandbyUpdateListener globalStandbyListener; Review Comment: I know you're just following the established pattern here, but it always felt weird to me that we have two different top-level fields for the restore listener. I think it makes more sense for the "global" standby listener to just be completely encapsulated by the DelegatingStandbyUpdateListener class, and then make that class static. Imo we should also name it `userStandbyListener` rather than `globalStandbyListener` to avoid confusion with global state stores. I decided to just whip up a quick PR with all these fixes for the existing restore listener, so you can just take a look at https://github.com/apache/kafka/pull/14886 to see exactly what I'm suggesting we do for the standby listener ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -672,6 +678,8 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM } catch (final Exception e) { throw new StreamsException("State restore listener failed on batch restored", e); } +} else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY) { +standbyUpdateListener.onBatchLoaded(partition, storeName, stateManager.taskId(), currentOffset, numRecords, storeMetadata.endOffset()); Review Comment: We should probably put this in a try block like the restore listener does right above this: ```suggestion try { standbyUpdateListener.onBatchLoaded(partition, storeName, stateManager.taskId(), currentOffset, numRecords, storeMetadata.endOffset()); } catch (final Exception e) { throw new StreamsException("Standby updater listener failed on batch loaded", e); } ``` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -1012,6 +1017,8 @@ private void prepareChangelogs(final Map tasks, // no records to restore; in this case we just initialize the sensor to zero final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); task.recordRestoration(time, recordsToRestore, true); +} else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY) { +standbyUpdateListener.onUpdateStart(partition, storeName, startOffset); Review Comment: Same here, we should wrap in try-catch that mirrors the #onRestoreStart block above: ```suggestion try { standbyUpdateListener.onUpdateStart(partition, storeName, startOffset); } catch (final Exception e) { throw new StreamsException("Standby update listener failed on update start", e); } ``` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -980,25 +988,22 @@ private void prepareChangelogs(final Map tasks, restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset); } -// do not trigger restore listener if we are processing standby tasks for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) { -if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) { -final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata; -final TopicPartition partition = storeMetadata.changelogPartition(); -final String storeName = storeMetadata.store().name(); - -long startOffset = 0L; -try { -startOffset = restoreConsumer.position(partition); -} catch (final TimeoutException swallow) { -// if we cannot find the starting position at the beginning, just use the default 0L -} catch (final KafkaException e) { -
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1409188652 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: I found `RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore` test very useful to understand how all this works, Thanks for the suggestion. Question: Is there any similar test for migrated standby tasks? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1409158503 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: I made some adjustments so that we can tell when a 'Task' has been `Migrated` or `Promoted` by looking at its `ChangelogState` and `Task` states. Basically, in this way: Task State: `Running` and ChangelogState: `Restoring` => Promoted Task State: `Suspended` and ChangelogState: `Restoring` => Migrated This allows the 'StoreChangelogReader' to infer the cause of the suspension. This seems to be true only for state updater, for some reason, when the state updater is disabled, Task State is always `Suspended` when `#unregister` is called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1409158503 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: I made some adjustments so that we can tell when a 'Task' has been `Migrated` or `Promoted` by looking at its 'ChangelogState' and 'Task' states. Basically, in this way: Task State: `Running` and ChangelogState: `Restoring` => Promoted Task State: `Suspended` and ChangelogState: `Restoring` => Migrated This allows the 'StoreChangelogReader' to infer the cause of the suspension. This seems to be true only for state updater, for some reason, when the state updater is disabled, Task State is always `Suspended` when `#unregister` is called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1409158503 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: I made some adjustments so that we can tell when a 'Task' has been `Migrated` or `Promoted` by looking at its 'ChangelogState' and 'Task' states. Basically, in this way: Task State: `Running` and ChangelogState: `Restoring` => Promoted Task State: `Suspended` and ChangelogState: `Restoring` => Migrated This allows the 'StoreChangelogReader' to infer the cause of the suspension. This seems to be true only for state updater, for some reason, when the state updater is disabled, Task State is always `Suspended` when `#unregister` gets called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1409143192 ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeNamethe name of the store being watched by this Standby Task. + * @param startingOffset the offset from which the Standby Task starts watching. + * @param currentEndOffset the current latest offset on the associated changelog partition. + */ +void onUpdateStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset, + final long currentEndOffset); + +/** + * Method called after restoring a batch of records. In this case the maximum size of the batch is whatever + * the value of the MAX_POLL_RECORDS is set to. + * Review Comment: I'm not sure if I follow this correctly, but I have a line break between paragraph. can you clarify this please? :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1396625434 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: I think that makes sense -- it's certainly worth a shot! You can always try it out and see if it works On that note, there is an existing integration tests that cover task recycling which you might find useful. You can just add a standby listener to that instead of creating new integration tests from scratch. Check out `RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: I think that makes sense -- it's certainly worth a shot! You can always try it out and see if it works On that note, there is an existing integration tests that cover task recycling which you might find useful. You can just add a standby listener to that instead of creating new integration tests from scratch. Check out `RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1396623185 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map tasks, // no records to restore; in this case we just initialize the sensor to zero final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); task.recordRestoration(time, recordsToRestore, true); +} else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY && storeMetadata.endOffset() != null) { Review Comment: That's a really good point, most (close to all) of the time we won't actually know the `endOffset` when `#onUpdateStart` is invoked. I think it's totally reasonable to remove this parameter from that callback. It's probably way more important/useful to track the end offset as it changes over the course of the standby updates anyways, compared to `#onRestoreStart` where the end offset never advances during restoration but it's nice to know it up front to compute how much there is to restore. So I'm in agreement with 2), you can just send a quick update to the KIP thread with a brief explanation. That said, I think we should still try to guarantee that the listener callbacks are always invoked, even if we don't know the end offset. In particular `#onUpdateSuspended`, which I imagine might be used to clean up resources (such as metrics) created during `#onUpdateStart`. One could argue that since `#onBatchLoaded` is only ever called after a successful poll and so the end offset should always be known in that case, but just for the sake of consistency, I think we should treat both callbacks the same and invoke them whether `endOffset` is known or not. Also, the end offset is ultimately just one piece of information and not the sole source of value, so it feels weird to skip over the listener altogether just because a single field is missing. So I guess overall, I actually think we should do both of @coltmcnealy-lh 's suggestions, 1 and 2. As for whether to use `-1`, `null`, or `Optional.empty()`, I'll leave that up to the KIP authors. Of course since you should probably update the mailing list thread with this change as well, if you don't have a strong preference yourself, you can also just put the options out there and see what everyone agrees on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1394297063 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: I have a theory. We can figure out the `SuspendReason` for `#unregister` method using `ChangelogState`. When method `#unregister` is called, `StoreChangelogReader` can consider a standby task to be `MIGRATED` when the task's `ChangelogState` is in the `RESTORING` state and `PROMOTED` when it is in `REGISTERED` state. Since it can be inferred in this instance, we don't need to provide a new argument for the `#unregister` method. This is based on what I discover from the code, I may be missing something, so I should probably double check this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1394297063 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: I have a theory. We can figure out the `SuspendReason` for `#unregister` method using `ChangelogState`. When method `#unregister` is called, `StoreChangelogReader` can consider a standby task to be `MIGRATED` when the task's `ChangelogState` is in the `RESTORING` state and PROMOTED when it is in `REGISTERED` state. Since it can be inferred in this instance, we don't need to provide a new argument for the `#unregister` method. This is based on what I discover from the code, I may be missing something, so I should probably double check this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1393218354 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: Thanks! I agree. I'm going to implement this in the `StoreChangelogReader` adding a new argument for the `#unregister ` method and cover the case for `MIGRATED` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: Thanks! I agree. I'm going to implement this in the `StoreChangelogReader` adding a new argument for the `#unregister` method and cover the case for `MIGRATED` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
coltmcnealy-lh commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1392849939 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map tasks, // no records to restore; in this case we just initialize the sensor to zero final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); task.recordRestoration(time, recordsToRestore, true); +} else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY && storeMetadata.endOffset() != null) { Review Comment: I think the `endOffset == null` could happen when the consumer hasn't yet made a `poll()` for a certain partition. Which means that I think it will be null in most cases, honestly. So this is actually a bit of a dilemma. Since in most cases we won't know the end offset until we have made our first call to `poll()` (and then `onBatchLoaded()`), the way the code is currently written makes me think we will almost never have a call to `onUpdateStart()`, which kind of defeats the purpose of the `onUpdateStart()` callback. I see two options here: 1. Pass in some sentinel value when we don't know the `endOffset` upon initialization of the Standby Task. Sentinel value could be `-1` or `null` or `Optional.Empty`...apparently my team thinks I am a really crappy programmer, so I don't have the right to opine on which one :joy: 2. Remove the `endOffset` parameter from the `onUpdateStart()` method signature. This might require changing the KIP but I don't think it would take a vote. Personally, I prefer option 2). In the most common case, we won't have the end offset, so I wouldn't want to "mislead" someone reading the javadoc into thinking that they might get some info that we probably don't have. And the `onUpdateStart()` in practice is normally followed by `onBatchLoaded()` just a few hundred milliseconds later. The `onBatchLoaded()` will contain the `endOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391891296 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -70,6 +71,7 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; +@SuppressWarnings("ClassFanOutComplexity") Review Comment: Well actually after reading through the whole PR, I do think it's unnecessary complexity for the `TaskManager` after all -- see [this comment](https://github.com/apache/kafka/pull/14735#discussion_r1391855192). But that's more because of the complexity of determining when to invoke `#onUpdateSuspended` and to keep everything in the StoreChangelogReader class. Hopefully that all makes sense to you too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391877914 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -70,6 +71,7 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; +@SuppressWarnings("ClassFanOutComplexity") Review Comment: Oh, I see. I was thinking that this might be an unnecessary complexity for the `TaskManager` class, and instead we can pass standby callback to the `ActiveTaskCreator`. But not sure if Standby callback API is compatible with the terminology used in the `ActiveTaskCreator` class. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -70,6 +71,7 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; +@SuppressWarnings("ClassFanOutComplexity") Review Comment: Oh, I see. I was thinking that this might be an unnecessary complexity for the `TaskManager` class, and instead we can pass standby callback to the `ActiveTaskCreator`. But not sure if Standby callback API is compatible with the terminology used in the `ActiveTaskCreator` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org