Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-06 Thread via GitHub


ableegoldman commented on PR #14735:
URL: https://github.com/apache/kafka/pull/14735#issuecomment-1842502256

   Test failures are unrelated, looks good to merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-06 Thread via GitHub


ableegoldman merged PR #14735:
URL: https://github.com/apache/kafka/pull/14735


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-04 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1414691022


##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * A callback that will be invoked after registering the changelogs for 
each state store in a standby
+ * task. It is guaranteed to always be invoked before any records are 
loaded into the standby store.
+ *
+ * @param topicPartition the changelog TopicPartition for this standby task
+ * @param storeName the name of the store being loaded
+ * @param startingOffset the offset from which the standby task begins 
consuming from the changelog
+ */
+void onUpdateStart(final TopicPartition topicPartition,
+   final String storeName,
+   final long startingOffset);
+
+/**
+ * Method called after loading a batch of records. In this case the 
maximum size of the batch is whatever
+ * the value of the MAX_POLL_RECORDS is set to.
+ * 
+ * This method is called after loading each batch and it is advised to 
keep processing to a minimum.
+ * Any heavy processing will block the state updater thread and slow down 
the rate of standby task 
+ * loading. Therefore, if you need to do any extended processing or 
connect to an external service,
+ * consider doing so asynchronously.
+ *
+ * @param topicPartition the TopicPartition containing the values to 
restore
+ * @param storeName the name of the store undergoing restoration
+ * @param batchEndOffset batchEndOffset the changelog end offset 
(inclusive) of the batch that was just loaded
+ * @param batchSize the total number of records in the batch that was just 
loaded
+ * @param currentEndOffset the current end offset of the changelog topic 
partition.
+ */
+void onBatchLoaded(final TopicPartition topicPartition,
+   final String storeName,
+   final TaskId taskId,
+   final long batchEndOffset,
+   final long batchSize,
+   final long currentEndOffset);
+
+/**
+ * This method is called when the corresponding standby task stops 
updating, for the provided reason.
+ * 
+ * If the task was {@code MIGRATED} to another instance, this callback 
will be invoked after this
+ * state store (and the task itself) are closed (in which case the data 
will be cleaned up after 
+ * state.cleanup.delay.ms).
+ * If the task was {@code PROMOTED} to an active task, the state store 
will not be closed, and the 
+ * callback will be invoked after unregistering it as a standby task but 
before re-registering it as an active task 
+ * and beginning restoration. In other words, this will always called 
before the corresponding 
+ * {@link StateRestoreListener#onRestoreStart} call is made.
+ *
+ * @param topicPartition the TopicPartition containing the values to 
restore
+ * @param storeName the name of the store undergoing restoration

Review Comment:
   ```suggestion
* @param topicPartition the changelog TopicPartition for this standby 
task
* @param storeName the name of the store being loaded
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable 

Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-01 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1412648726


##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * Method called upon the creation of the Standby Task.
+ *
+ * @param topicPartition   the TopicPartition of the Standby Task.
+ * @param storeNamethe name of the store being watched by this 
Standby Task.
+ * @param startingOffset   the offset from which the Standby Task starts 
watching.
+ */
+void onUpdateStart(final TopicPartition topicPartition,
+   final String storeName,
+   final long startingOffset);
+
+/**
+ * Method called after restoring a batch of records. In this case the 
maximum size of the batch is whatever

Review Comment:
   ```suggestion
* Method called after loading a batch of records. In this case the 
maximum size of the batch is whatever
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * Method called upon the creation of the Standby Task.
+ *
+ * @param topicPartition   the TopicPartition of the Standby Task.
+ * @param storeNamethe name of the store being watched by this 
Standby Task.

Review Comment:
   ditto here, copy this to the other two callbacks as well
   ```suggestion
* @param storeName the name of the store being loaded
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * Method called upon the creation of the Standby Task.
+ *
+ * @param topicPartition   the TopicPartition of the Standby Task.
+ * @param storeNamethe name of the store being watched by this 

Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-01 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1412638885


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -580,6 +582,23 @@ public void setGlobalStateRestoreListener(final 
StateRestoreListener globalState
 }
 }
 
+/**
+ * Set the listener which is triggered whenever a standby task is updated
+ *
+ * @param standbyListener The listener triggered when a standby task is 
updated.
+ * @throws IllegalStateException if this {@code KafkaStreams} instance has 
already been started.
+ */
+public void setStandbyUpdateListener(final StandbyUpdateListener 
standbyListener) {

Review Comment:
   Agreed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-01 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1412148951


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -580,6 +582,23 @@ public void setGlobalStateRestoreListener(final 
StateRestoreListener globalState
 }
 }
 
+/**
+ * Set the listener which is triggered whenever a standby task is updated
+ *
+ * @param standbyListener The listener triggered when a standby task is 
updated.
+ * @throws IllegalStateException if this {@code KafkaStreams} instance has 
already been started.
+ */
+public void setStandbyUpdateListener(final StandbyUpdateListener 
standbyListener) {

Review Comment:
   IMO, in the context of this method, `standbyListener` makes more sense than 
`userStandbyListener`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-01 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1412102373


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -175,13 +176,15 @@ public class KafkaStreams implements AutoCloseable {
 private final KafkaClientSupplier clientSupplier;
 protected final TopologyMetadata topologyMetadata;
 private final QueryableStoreProvider queryableStoreProvider;
+private final StandbyUpdateListener delegatingStandbyUpdateListener;
 
 GlobalStreamThread globalStreamThread;
 private KafkaStreams.StateListener stateListener;
 private StateRestoreListener globalStateRestoreListener;
 private boolean oldHandler;
 private BiConsumer streamsUncaughtExceptionHandler;
 private final Object changeThreadCount = new Object();
+private StandbyUpdateListener globalStandbyListener;

Review Comment:
   I agree. Your proposal makes things easier to understand. Thanks! :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-30 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1411413767


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -175,13 +176,15 @@ public class KafkaStreams implements AutoCloseable {
 private final KafkaClientSupplier clientSupplier;
 protected final TopologyMetadata topologyMetadata;
 private final QueryableStoreProvider queryableStoreProvider;
+private final StandbyUpdateListener delegatingStandbyUpdateListener;
 
 GlobalStreamThread globalStreamThread;
 private KafkaStreams.StateListener stateListener;
 private StateRestoreListener globalStateRestoreListener;
 private boolean oldHandler;
 private BiConsumer streamsUncaughtExceptionHandler;
 private final Object changeThreadCount = new Object();
+private StandbyUpdateListener globalStandbyListener;

Review Comment:
   I know you're just following the established pattern here, but it always 
felt weird to me that we have two different top-level fields for the restore 
listener. I think it makes more sense for the "global" standby listener to just 
be completely encapsulated by the DelegatingStandbyUpdateListener class, and 
then make that class static. 
   
   Imo we should also name it `userStandbyListener` rather than 
`globalStandbyListener` to avoid confusion with global state stores. I decided 
to just whip up a quick PR with all these fixes for the existing restore 
listener, so you can just take a look at 
https://github.com/apache/kafka/pull/14886 to see exactly what I'm suggesting 
we do for the standby listener



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -672,6 +678,8 @@ private int restoreChangelog(final Task task, final 
ChangelogMetadata changelogM
 } catch (final Exception e) {
 throw new StreamsException("State restore listener failed 
on batch restored", e);
 }
+} else if (changelogMetadata.stateManager.taskType() == 
TaskType.STANDBY) {
+standbyUpdateListener.onBatchLoaded(partition, storeName, 
stateManager.taskId(), currentOffset, numRecords, storeMetadata.endOffset());

Review Comment:
   We should probably put this in a try block like the restore listener does 
right above this:
   ```suggestion
   try {
   standbyUpdateListener.onBatchLoaded(partition, 
storeName, stateManager.taskId(), currentOffset, numRecords, 
storeMetadata.endOffset());
   } catch (final Exception e) {
   throw new StreamsException("Standby updater listener 
failed on batch loaded", e);
   }
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -1012,6 +1017,8 @@ private void prepareChangelogs(final Map 
tasks,
 // no records to restore; in this case we just initialize the 
sensor to zero
 final long recordsToRestore = 
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
 task.recordRestoration(time, recordsToRestore, true);
+}  else if (changelogMetadata.stateManager.taskType() == 
TaskType.STANDBY) {
+standbyUpdateListener.onUpdateStart(partition, storeName, 
startOffset);

Review Comment:
   Same here, we should wrap in try-catch that mirrors the #onRestoreStart 
block above:
   ```suggestion
   try {
   standbyUpdateListener.onUpdateStart(partition, 
storeName, startOffset);
   } catch (final Exception e) {
   throw new StreamsException("Standby update listener 
failed on update start", e);
   }
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -980,25 +988,22 @@ private void prepareChangelogs(final Map 
tasks,
 restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset);
 }
 
-// do not trigger restore listener if we are processing standby tasks
 for (final ChangelogMetadata changelogMetadata : 
newPartitionsToRestore) {
-if (changelogMetadata.stateManager.taskType() == 
Task.TaskType.ACTIVE) {
-final StateStoreMetadata storeMetadata = 
changelogMetadata.storeMetadata;
-final TopicPartition partition = 
storeMetadata.changelogPartition();
-final String storeName = storeMetadata.store().name();
-
-long startOffset = 0L;
-try {
-startOffset = restoreConsumer.position(partition);
-} catch (final TimeoutException swallow) {
-// if we cannot find the starting position at the 
beginning, just use the default 0L
-} catch (final KafkaException e) {
-

Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-29 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1409188652


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   I found 
`RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore`
 test very useful to understand how all this works, Thanks for the suggestion.
   Question: Is there any similar test for migrated standby tasks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-29 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1409158503


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   I made some adjustments so that we can tell when a 'Task' has been 
`Migrated` or `Promoted` by looking at its `ChangelogState` and `Task` states. 
Basically, in this way:
   
   Task State: `Running` and ChangelogState: `Restoring` => Promoted
   Task State: `Suspended` and ChangelogState: `Restoring` => Migrated
   
   This allows the 'StoreChangelogReader' to infer the cause of the suspension. 
This seems to be true only for state updater, for some reason, when the state 
updater is disabled, Task State is always `Suspended` when `#unregister` is 
called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-29 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1409158503


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   I made some adjustments so that we can tell when a 'Task' has been 
`Migrated` or `Promoted` by looking at its 'ChangelogState' and 'Task' states. 
Basically, in this way:
   
   Task State: `Running` and ChangelogState: `Restoring` => Promoted
   Task State: `Suspended` and ChangelogState: `Restoring` => Migrated
   
   This allows the 'StoreChangelogReader' to infer the cause of the suspension. 
This seems to be true only for state updater, for some reason, when the state 
updater is disabled, Task State is always `Suspended` when `#unregister` is 
called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-29 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1409158503


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   I made some adjustments so that we can tell when a 'Task' has been 
`Migrated` or `Promoted` by looking at its 'ChangelogState' and 'Task' states. 
Basically, in this way:
   
   Task State: `Running` and ChangelogState: `Restoring` => Promoted
   Task State: `Suspended` and ChangelogState: `Restoring` => Migrated
   
   This allows the 'StoreChangelogReader' to infer the cause of the suspension. 
This seems to be true only for state updater, for some reason, when the state 
updater is disabled, Task State is always `Suspended` when `#unregister` gets 
called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-29 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1409143192


##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * Method called upon the creation of the Standby Task.
+ *
+ * @param topicPartition   the TopicPartition of the Standby Task.
+ * @param storeNamethe name of the store being watched by this 
Standby Task.
+ * @param startingOffset   the offset from which the Standby Task starts 
watching.
+ * @param currentEndOffset the current latest offset on the associated 
changelog partition.
+ */
+void onUpdateStart(final TopicPartition topicPartition,
+   final String storeName,
+   final long startingOffset,
+   final long currentEndOffset);
+
+/**
+ * Method called after restoring a batch of records.  In this case the 
maximum size of the batch is whatever
+ * the value of the MAX_POLL_RECORDS is set to.
+ *

Review Comment:
   I'm not sure if I follow this correctly, but I have a line break between 
paragraph. can you clarify this please? :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-16 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1396625434


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   I think that makes sense -- it's certainly worth a shot! You can always try 
it out and see if it works  
   
   On that note, there is an existing integration tests that cover task 
recycling which you might find useful. You can just add a standby listener to 
that instead of creating new integration tests from scratch. Check out 
`RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore`



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   I think that makes sense -- it's certainly worth a shot! You can always try 
it out and see if it works  
   
   On that note, there is an existing integration tests that cover task 
recycling which you might find useful. You can just add a standby listener to 
that instead of creating new integration tests from scratch. Check out 
`RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-16 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1396623185


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map 
tasks,
 // no records to restore; in this case we just initialize the 
sensor to zero
 final long recordsToRestore = 
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
 task.recordRestoration(time, recordsToRestore, true);
+}  else if (changelogMetadata.stateManager.taskType() == 
TaskType.STANDBY && storeMetadata.endOffset() != null) {

Review Comment:
   That's a really good point, most (close to all) of the time we won't 
actually know the `endOffset` when `#onUpdateStart` is invoked. I think it's 
totally reasonable to remove this parameter from that callback. It's probably 
way more important/useful to track the end offset as it changes over the course 
of the standby updates anyways, compared to `#onRestoreStart` where the end 
offset never advances during restoration but it's nice to know it up front to 
compute how much there is to restore.
   
   So I'm in agreement with 2), you can just send a quick update to the KIP 
thread with a brief explanation. That said, I think we should still try to 
guarantee that the listener callbacks are always invoked, even if we don't know 
the end offset. 
In particular `#onUpdateSuspended`, which I imagine might be used to clean 
up resources (such as metrics) created during `#onUpdateStart`. One could argue 
that since `#onBatchLoaded` is only ever called after a successful poll and so 
the end offset should always be known in that case, but just for the sake of 
consistency, I think we should treat both callbacks the same and invoke them 
whether `endOffset` is known or not. Also, the end offset is ultimately just 
one piece of information and not the sole source of value, so it feels weird to 
skip over the listener altogether just because a single field is missing.
   
   So I guess overall, I actually think we should do both of @coltmcnealy-lh 's 
suggestions, 1 and 2. As for whether to use `-1`, `null`, or 
`Optional.empty()`, I'll leave that up to the KIP authors. Of course since you 
should probably update the mailing list thread with this change as well, if you 
don't have a strong preference yourself, you can also just put the options out 
there and see what everyone agrees on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-15 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1394297063


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   I have a theory. We can figure out the `SuspendReason` for `#unregister` 
method using `ChangelogState`.
   
   When method `#unregister` is called, `StoreChangelogReader` can consider a 
standby task to be `MIGRATED` when the task's `ChangelogState` is in the 
`RESTORING` state and `PROMOTED` when it is in `REGISTERED` state. Since it can 
be inferred in this instance, we don't need to provide a new argument for the 
`#unregister` method.
   
   This is based on what I discover from the code, I may be missing something, 
so I should probably double check this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-15 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1394297063


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   I have a theory. We can figure out the `SuspendReason` for `#unregister` 
method using `ChangelogState`.
   
   When method `#unregister` is called, `StoreChangelogReader` can consider a 
standby task to be `MIGRATED` when the task's `ChangelogState` is in the 
`RESTORING` state and PROMOTED when it is in `REGISTERED` state. Since it can 
be inferred in this instance, we don't need to provide a new argument for the 
`#unregister` method.
   
   This is based on what I discover from the code, I may be missing something, 
so I should probably double check this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-14 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1393218354


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   Thanks! I agree. I'm going to implement this in the `StoreChangelogReader` 
adding a new argument for the `#unregister ` method and cover the case for 
`MIGRATED`



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   Thanks! I agree. I'm going to implement this in the `StoreChangelogReader` 
adding a new argument for the `#unregister` method and cover the case for 
`MIGRATED`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-14 Thread via GitHub


coltmcnealy-lh commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1392849939


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map 
tasks,
 // no records to restore; in this case we just initialize the 
sensor to zero
 final long recordsToRestore = 
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
 task.recordRestoration(time, recordsToRestore, true);
+}  else if (changelogMetadata.stateManager.taskType() == 
TaskType.STANDBY && storeMetadata.endOffset() != null) {

Review Comment:
   I think the `endOffset == null` could happen when the consumer hasn't yet 
made a `poll()` for a certain partition. Which means that I think it will be 
null in most cases, honestly.
   
   So this is actually a bit of a dilemma. Since in most cases we won't know 
the end offset until we have made our first call to `poll()` (and then 
`onBatchLoaded()`), the way the code is currently written makes me think we 
will almost never have a call to `onUpdateStart()`, which kind of defeats the 
purpose of the `onUpdateStart()` callback.
   
   I see two options here:
   1.  Pass in some sentinel value when we don't know the `endOffset` upon 
initialization of the Standby Task. Sentinel value could be `-1` or `null` or 
`Optional.Empty`...apparently my team thinks I am a really crappy programmer, 
so I don't have the right to opine on which one :joy: 
   2. Remove the `endOffset` parameter from the `onUpdateStart()` method 
signature. This might require changing the KIP but I don't think it would take 
a vote.
   
   Personally, I prefer option 2). In the most common case, we won't have the 
end offset, so I wouldn't want to "mislead" someone reading the javadoc into 
thinking that they might get some info that we probably don't have.
   
   And the `onUpdateStart()` in practice is normally followed by 
`onBatchLoaded()` just a few hundred milliseconds later. The `onBatchLoaded()` 
will contain the `endOffset`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-13 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391891296


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -70,6 +71,7 @@
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 
+@SuppressWarnings("ClassFanOutComplexity")

Review Comment:
   Well actually after reading through the whole PR, I do think it's 
unnecessary complexity for the `TaskManager` after all -- see [this 
comment](https://github.com/apache/kafka/pull/14735#discussion_r1391855192). 
But that's more because of the complexity of determining when to invoke 
`#onUpdateSuspended` and to keep everything in the StoreChangelogReader class. 
Hopefully that all makes sense to you too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-13 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391877914


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -70,6 +71,7 @@
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 
+@SuppressWarnings("ClassFanOutComplexity")

Review Comment:
   Oh, I see. I was thinking that this might be an unnecessary complexity for 
the `TaskManager` class, and instead we can pass standby callback to the 
`ActiveTaskCreator`. But not sure if Standby callback API is compatible with 
the terminology used in the  `ActiveTaskCreator` class.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -70,6 +71,7 @@
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 
+@SuppressWarnings("ClassFanOutComplexity")

Review Comment:
   Oh, I see. I was thinking that this might be an unnecessary complexity for 
the `TaskManager` class, and instead we can pass standby callback to the 
`ActiveTaskCreator`. But not sure if Standby callback API is compatible with 
the terminology used in the  `ActiveTaskCreator` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org