cadonna commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r656890122



##########
File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} 
application.
+ */
+public interface TaskMetadata {
+
+    /**
+     * @return the basic task metadata such as subtopology and partition id
+     */
+    TaskId taskId();
+
+    /**
+     * This function will return a set of the current TopicPartitions
+     */
+    Set<TopicPartition> topicPartitions();

Review comment:
       Could you please use javadoc mark-up like `@return` and `@param` for the 
docs? Here and for the other methods.

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link 
KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * @return the state of the Thread
+     */
+    String threadState();
+
+    /**
+     * @return the name of the Thread
+     */
+    String threadName();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the 
current active tasks
+     */
+    Set<TaskMetadata> activeTasks();

Review comment:
       Could you use javadoc mark-up for the docs? Here and for the other 
methods.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link 
KafkaStreams} application.
+ */
+public class ThreadMetadataImpl implements ThreadMetadata {
+
+    private final String threadName;
+
+    private final String threadState;
+
+    private final Set<TaskMetadata> activeTasks;
+
+    private final Set<TaskMetadata> standbyTasks;
+
+    private final String mainConsumerClientId;
+
+    private final String restoreConsumerClientId;
+
+    private final Set<String> producerClientIds;
+
+    // the admin client should be shared among all threads, so the client id 
should be the same;
+    // we keep it at the thread-level for user's convenience and possible 
extensions in the future
+    private final String adminClientId;
+
+    public ThreadMetadataImpl(final String threadName,
+                              final String threadState,
+                              final String mainConsumerClientId,
+                              final String restoreConsumerClientId,
+                              final Set<String> producerClientIds,
+                              final String adminClientId,
+                              final Set<TaskMetadata> activeTasks,
+                              final Set<TaskMetadata> standbyTasks) {
+        this.mainConsumerClientId = mainConsumerClientId;
+        this.restoreConsumerClientId = restoreConsumerClientId;
+        this.producerClientIds = producerClientIds;
+        this.adminClientId = adminClientId;
+        this.threadName = threadName;
+        this.threadState = threadState;
+        this.activeTasks = Collections.unmodifiableSet(activeTasks);
+        this.standbyTasks = Collections.unmodifiableSet(standbyTasks);
+    }
+
+
+    public String threadState() {
+        return threadState;
+    }
+
+    public String threadName() {
+        return threadName;
+    }
+
+
+    public Set<TaskMetadata> activeTasks() {
+        return activeTasks;
+    }
+
+    public Set<TaskMetadata> standbyTasks() {
+        return standbyTasks;
+    }
+
+    public String consumerClientId() {
+        return mainConsumerClientId;
+    }
+
+    public String restoreConsumerClientId() {
+        return restoreConsumerClientId;
+    }
+
+    public Set<String> producerClientIds() {
+        return producerClientIds;
+    }
+
+    public String adminClientId() {
+        return adminClientId;
+    }
+
+    @Override
+    public boolean equals(final Object o) {

Review comment:
       Could you add unit tests for equals() and hashCode()? Since we guarantee 
a behavior on the interface, it makes sense to verify if the behavior of the 
implementation is correct.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+public interface StreamsMetadata {
+
+    /**
+     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured 
for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    HostInfo hostInfo();
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    Set<String> stateStoreNames();
+
+    /**
+     * Topic partitions consumed by the instance as an active replica
+     *
+     * @return set of active topic partitions
+     */
+    Set<TopicPartition> topicPartitions();
+
+    /**
+     * (Source) Topic partitions for which the instance acts as standby.
+     *
+     * @return set of standby topic partitions
+     */
+    Set<TopicPartition> standbyTopicPartitions();
+
+    /**
+     * State stores owned by the instance as a standby replica
+     *
+     * @return set of standby state store names
+     */
+    Set<String> standbyStateStoreNames();
+
+    /**
+     * This method is equivalent to call {@code 
StreamsMetadata.hostInfo().host();}
+     */
+    String host();

Review comment:
       Could you describe the return value with `@return`? Please also check 
the other methods if they use appropriate javadocs mark-up. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskMetadataImpl.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+public class TaskMetadataImpl implements TaskMetadata {
+
+    private final TaskId taskId;
+
+    private final Set<TopicPartition> topicPartitions;
+
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private final Optional<Long> timeCurrentIdlingStarted;
+
+    public TaskMetadataImpl(final TaskId taskId,
+                            final Set<TopicPartition> topicPartitions,
+                            final Map<TopicPartition, Long> committedOffsets,
+                            final Map<TopicPartition, Long> endOffsets,
+                            final Optional<Long> timeCurrentIdlingStarted) {
+        this.taskId = taskId;
+        this.topicPartitions = topicPartitions;
+        this.committedOffsets = committedOffsets;
+        this.endOffsets = endOffsets;
+        this.timeCurrentIdlingStarted = timeCurrentIdlingStarted;
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Set<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> endOffsets() {
+        return endOffsets;
+    }
+
+    @Override
+    public Optional<Long> timeCurrentIdlingStarted() {
+        return timeCurrentIdlingStarted;
+    }
+
+    @Override
+    public boolean equals(final Object o) {

Review comment:
       Could you add unit tests for `equals()` and `hashCode()`? Since we 
guarantee a behavior on the interface, it makes sense to verify if the behavior 
of the implementation is correct.  

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##########
@@ -27,10 +27,12 @@
 
 /**
  * Represents the state of a single task running within a {@link KafkaStreams} 
application.
+ * @deprecated since 3.0, not intended for public use, use {@link 
org.apache.kafka.streams.TaskMetadata} instead.
  */
+@Deprecated

Review comment:
       nit: If we deprecate the class, we do not need to deprecate the 
constructor anymore, right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1478,8 +1499,36 @@ public void cleanUp() {
      * @param storeName the {@code storeName} to find metadata for
      * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances 
with the provide {@code storeName} of
      * this application
+     * @deprecated since 3.0.0 use {@link 
KafkaStreams#allMetadataForGivenStore} instead
      */
-    public Collection<StreamsMetadata> allMetadataForStore(final String 
storeName) {
+    @Deprecated
+    public Collection<org.apache.kafka.streams.state.StreamsMetadata> 
allMetadataForStore(final String storeName) {
+        validateIsRunningOrRebalancing();
+        return 
streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata
 ->
+                new 
org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+                        streamsMetadata.stateStoreNames(),
+                        streamsMetadata.topicPartitions(),
+                        streamsMetadata.standbyStateStoreNames(),
+                        streamsMetadata.standbyTopicPartitions()))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Find all currently running {@code KafkaStreams} instances (potentially 
remotely) that
+     * <ul>
+     *   <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG 
application ID} as this instance (i.e., all
+     *       instances that belong to the same Kafka Streams application)</li>
+     *   <li>and that contain a {@link StateStore} with the given {@code 
storeName}</li>
+     * </ul>
+     * and return {@link StreamsMetadata} for each discovered instance.
+     * <p>
+     * Note: this is a point in time view and it may change due to partition 
reassignment.
+     *
+     * @param storeName the {@code storeName} to find metadata for
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances 
with the provide {@code storeName} of
+     * this application
+     */
+    public Collection<StreamsMetadata> allMetadataForGivenStore(final String 
storeName) {

Review comment:
       I am +1 for `streamsMetadataForStore`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} 
application.
+ * It contains the user supplied {@link HostInfo} that can be used by 
developers to build
+ * APIs and services to connect to other instances, the Set of state stores 
available on
+ * the instance and the Set of {@link TopicPartition}s available on the 
instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadataImpl implements StreamsMetadata {
+    /**
+     * Sentinel to indicate that the StreamsMetadata is currently unavailable. 
This can occur during rebalance
+     * operations.
+     */
+    public final static StreamsMetadataImpl NOT_AVAILABLE = new 
StreamsMetadataImpl(HostInfo.unavailable(),
+                                                                            
Collections.emptySet(),
+                                                                            
Collections.emptySet(),
+                                                                            
Collections.emptySet(),
+                                                                            
Collections.emptySet());
+
+    private final HostInfo hostInfo;
+
+    private final Set<String> stateStoreNames;
+
+    private final Set<TopicPartition> topicPartitions;
+
+    private final Set<String> standbyStateStoreNames;
+
+    private final Set<TopicPartition> standbyTopicPartitions;
+
+    public StreamsMetadataImpl(final HostInfo hostInfo,
+                               final Set<String> stateStoreNames,
+                               final Set<TopicPartition> topicPartitions,
+                               final Set<String> standbyStateStoreNames,
+                               final Set<TopicPartition> 
standbyTopicPartitions) {
+
+        this.hostInfo = hostInfo;
+        this.stateStoreNames = stateStoreNames;
+        this.topicPartitions = topicPartitions;
+        this.standbyTopicPartitions = standbyTopicPartitions;
+        this.standbyStateStoreNames = standbyStateStoreNames;
+    }
+
+    /**
+     * The value of {@link 
org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG} configured 
for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    @Override
+    public HostInfo hostInfo() {
+        return hostInfo;
+    }
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    @Override
+    public Set<String> stateStoreNames() {
+        return Collections.unmodifiableSet(stateStoreNames);
+    }
+
+    /**
+     * Topic partitions consumed by the instance as an active replica
+     *
+     * @return set of active topic partitions
+     */
+    @Override
+    public Set<TopicPartition> topicPartitions() {
+        return Collections.unmodifiableSet(topicPartitions);
+    }
+
+    /**
+     * (Source) Topic partitions for which the instance acts as standby.
+     *
+     * @return set of standby topic partitions
+     */
+    @Override
+    public Set<TopicPartition> standbyTopicPartitions() {
+        return Collections.unmodifiableSet(standbyTopicPartitions);
+    }
+
+    /**
+     * State stores owned by the instance as a standby replica
+     *
+     * @return set of standby state store names
+     */
+    @Override
+    public Set<String> standbyStateStoreNames() {
+        return Collections.unmodifiableSet(standbyStateStoreNames);
+    }
+
+    @Override
+    public String host() {
+        return hostInfo.host();
+    }
+
+    @SuppressWarnings("unused")
+    @Override
+    public int port() {
+        return hostInfo.port();
+    }
+
+    @Override
+    public boolean equals(final Object o) {

Review comment:
       Could you add unit tests for equals() and hashCode()? Since we guarantee 
a behavior on the interface, it makes sense to verify if the behavior of the 
implementation is correct.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskMetadataImpl.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+public class TaskMetadataImpl implements TaskMetadata {
+
+    private final TaskId taskId;
+
+    private final Set<TopicPartition> topicPartitions;
+
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private final Optional<Long> timeCurrentIdlingStarted;
+
+    public TaskMetadataImpl(final TaskId taskId,
+                            final Set<TopicPartition> topicPartitions,
+                            final Map<TopicPartition, Long> committedOffsets,
+                            final Map<TopicPartition, Long> endOffsets,
+                            final Optional<Long> timeCurrentIdlingStarted) {
+        this.taskId = taskId;
+        this.topicPartitions = topicPartitions;
+        this.committedOffsets = committedOffsets;
+        this.endOffsets = endOffsets;

Review comment:
       Since this class is read only, it makes sense to use 
`Collections.unmodifiable*()` methods here to avoid modification of the 
contents of the member fields.
   Unit tests as in `StreamsMetadataTest` would also be good. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link 
KafkaStreams} application.
+ */
+public class ThreadMetadataImpl implements ThreadMetadata {
+
+    private final String threadName;
+
+    private final String threadState;
+
+    private final Set<TaskMetadata> activeTasks;
+
+    private final Set<TaskMetadata> standbyTasks;
+
+    private final String mainConsumerClientId;
+
+    private final String restoreConsumerClientId;
+
+    private final Set<String> producerClientIds;
+
+    // the admin client should be shared among all threads, so the client id 
should be the same;
+    // we keep it at the thread-level for user's convenience and possible 
extensions in the future
+    private final String adminClientId;
+
+    public ThreadMetadataImpl(final String threadName,
+                              final String threadState,
+                              final String mainConsumerClientId,
+                              final String restoreConsumerClientId,
+                              final Set<String> producerClientIds,
+                              final String adminClientId,
+                              final Set<TaskMetadata> activeTasks,
+                              final Set<TaskMetadata> standbyTasks) {
+        this.mainConsumerClientId = mainConsumerClientId;
+        this.restoreConsumerClientId = restoreConsumerClientId;
+        this.producerClientIds = producerClientIds;

Review comment:
       I think, you should use `Collections.unmodifiableSet()` also here.
   Also unit tests to verify the immutability as in `StreamsMetadataTest` would 
be great!

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -1972,12 +1971,6 @@ public void 
shouldAlwaysUpdateTasksMetadataAfterChangingState() {
         assertEquals(StreamThread.State.RUNNING.name(), 
metadata.threadState());
     }
 
-    private void assertThreadMetadataHasEmptyTasksWithState(final 
ThreadMetadata metadata, final StreamThread.State state) {

Review comment:
       Thank you for the clean up!

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} 
application.
+ * It contains the user supplied {@link HostInfo} that can be used by 
developers to build
+ * APIs and services to connect to other instances, the Set of state stores 
available on
+ * the instance and the Set of {@link TopicPartition}s available on the 
instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadataImpl implements StreamsMetadata {
+    /**
+     * Sentinel to indicate that the StreamsMetadata is currently unavailable. 
This can occur during rebalance
+     * operations.
+     */
+    public final static StreamsMetadataImpl NOT_AVAILABLE = new 
StreamsMetadataImpl(HostInfo.unavailable(),
+                                                                            
Collections.emptySet(),
+                                                                            
Collections.emptySet(),
+                                                                            
Collections.emptySet(),
+                                                                            
Collections.emptySet());
+
+    private final HostInfo hostInfo;
+
+    private final Set<String> stateStoreNames;
+
+    private final Set<TopicPartition> topicPartitions;
+
+    private final Set<String> standbyStateStoreNames;
+
+    private final Set<TopicPartition> standbyTopicPartitions;
+
+    public StreamsMetadataImpl(final HostInfo hostInfo,
+                               final Set<String> stateStoreNames,
+                               final Set<TopicPartition> topicPartitions,
+                               final Set<String> standbyStateStoreNames,
+                               final Set<TopicPartition> 
standbyTopicPartitions) {
+
+        this.hostInfo = hostInfo;
+        this.stateStoreNames = stateStoreNames;
+        this.topicPartitions = topicPartitions;
+        this.standbyTopicPartitions = standbyTopicPartitions;
+        this.standbyStateStoreNames = standbyStateStoreNames;
+    }
+
+    /**
+     * The value of {@link 
org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG} configured 
for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    @Override
+    public HostInfo hostInfo() {
+        return hostInfo;
+    }
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    @Override
+    public Set<String> stateStoreNames() {
+        return Collections.unmodifiableSet(stateStoreNames);

Review comment:
       I think it would be better to make the collections immutable in the 
constructor since they should also not be modified within this class (for now). 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1558,12 +1607,45 @@ private void processStreamThread(final 
Consumer<StreamThread> consumer) {
         for (final StreamThread thread : copy) consumer.accept(thread);
     }
 
+    /**
+     * Returns runtime information about the local threads of this {@link 
KafkaStreams} instance.
+     *
+     * @return the set of {@link 
org.apache.kafka.streams.processor.ThreadMetadata}.
+     * @deprecated since 3.0 use {@link #threadsMetadata()}
+     */
+    @Deprecated
+    @SuppressWarnings("deprecation")
+    public Set<org.apache.kafka.streams.processor.ThreadMetadata> 
localThreadsMetadata() {
+        return threadsMetadata().stream().map(threadMetadata -> new 
org.apache.kafka.streams.processor.ThreadMetadata(
+                threadMetadata.threadName(),
+                threadMetadata.threadState(),
+                threadMetadata.consumerClientId(),
+                threadMetadata.restoreConsumerClientId(),
+                threadMetadata.producerClientIds(),
+                threadMetadata.adminClientId(),
+                threadMetadata.activeTasks().stream().map(taskMetadata -> new 
org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet()),
+                threadMetadata.standbyTasks().stream().map(taskMetadata -> new 
org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet())))
+                .collect(Collectors.toSet());
+    }
+
     /**
      * Returns runtime information about the local threads of this {@link 
KafkaStreams} instance.
      *
      * @return the set of {@link ThreadMetadata}.
      */
-    public Set<ThreadMetadata> localThreadsMetadata() {
+    public Set<ThreadMetadata> threadsMetadata() {

Review comment:
       nit: I think `localThreadMetadata` does not sound consistent since 
metadata of multiple stream threads is returned. What about 
`metadataForLocalThreads()`?

##########
File path: docs/streams/upgrade-guide.html
##########
@@ -121,10 +121,23 @@ <h3><a id="streams_api_changes_300" 
href="#streams_api_changes_300">Streams API
     <p>
         The public <code>topicGroupId</code> and <code>partition</code> fields 
on TaskId have been deprecated and replaced with getters. Please migrate to 
using the new <code>TaskId.subtopology()</code>
         (which replaces <code>topicGroupId</code>) and 
<code>TaskId.partition()</code> APIs instead. Also, the 
<code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been 
deprecated
-        and will be removed, as they were never intended for public use. 
Finally, we have deprecated the <code>TaskMetadata.taskId()</code> method as 
well as the <code>TaskMetadata</code> constructor.
-        These have been replaced with APIs that better represent the task id 
as an actual <code>TaskId</code> object instead of a String. Please migrate to 
the new <code>TaskMetadata#getTaskId</code>
-        method. See <a 
href="https://cwiki.apache.org/confluence/x/vYTOCg";>KIP-740</a> for more 
details.
+        and will be removed, as they were never intended for public use. We 
have also deprecated the 
<code>org.apache.kafka.streams.processsor.TaskMetadata</code> class and 
introduced a new interface
+        <code>org.apache.kafka.streams.TaskMetadata</code> to be used instead. 
This change was introduced to better reflect the fact that 
<code>TaskMetadata</code> was not meant to be instantiated outside
+        of Kafka codebase.
+        Please note that the new <code>TaskMetadata</code> offers APIs that 
better represent the task id as an actual <code>TaskId</code> object instead of 
a String. Please migrate to the new
+        <code>org.apache.kafka.streams.TaskMetadata</code> which offers these 
better methods, for example, by using the new 
<code>ThreadMetadata#getActiveTasks</code> and 
<code>ThreadMetadata#getStandbyTasks</code>.

Review comment:
       ```suggestion
           <code>org.apache.kafka.streams.TaskMetadata</code> which offers 
these better methods, for example, by using the new 
<code>ThreadMetadata#activeTasks</code> and 
<code>ThreadMetadata#standbyTasks</code>.
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1458,8 +1457,30 @@ public void cleanUp() {
      * Note: this is a point in time view and it may change due to partition 
reassignment.
      *
      * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances 
of this application
+     * @deprecated since 3.0.0 use {@link KafkaStreams#allRunningMetadata}
      */
-    public Collection<StreamsMetadata> allMetadata() {
+    @Deprecated
+    public Collection<org.apache.kafka.streams.state.StreamsMetadata> 
allMetadata() {
+        validateIsRunningOrRebalancing();
+        return 
streamsMetadataState.getAllMetadata().stream().map(streamsMetadata ->
+                new 
org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+                        streamsMetadata.stateStoreNames(),
+                        streamsMetadata.topicPartitions(),
+                        streamsMetadata.standbyStateStoreNames(),
+                        streamsMetadata.standbyTopicPartitions()))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Find all currently running {@code KafkaStreams} instances (potentially 
remotely) that use the same
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this 
instance (i.e., all instances that belong to
+     * the same Kafka Streams application) and return {@link StreamsMetadata} 
for each discovered instance.
+     * <p>
+     * Note: this is a point in time view and it may change due to partition 
reassignment.
+     *
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances 
of this application
+     */
+    public Collection<StreamsMetadata> allRunningMetadata() {

Review comment:
       What about `metadataForAllStreamsClients()`? I think this makes it more 
explicit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to