cadonna commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657813645



##########
File path: docs/streams/upgrade-guide.html
##########
@@ -121,10 +121,23 @@ <h3><a id="streams_api_changes_300" 
href="#streams_api_changes_300">Streams API
     <p>
         The public <code>topicGroupId</code> and <code>partition</code> fields 
on TaskId have been deprecated and replaced with getters. Please migrate to 
using the new <code>TaskId.subtopology()</code>
         (which replaces <code>topicGroupId</code>) and 
<code>TaskId.partition()</code> APIs instead. Also, the 
<code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been 
deprecated
-        and will be removed, as they were never intended for public use. 
Finally, we have deprecated the <code>TaskMetadata.taskId()</code> method as 
well as the <code>TaskMetadata</code> constructor.
-        These have been replaced with APIs that better represent the task id 
as an actual <code>TaskId</code> object instead of a String. Please migrate to 
the new <code>TaskMetadata#getTaskId</code>
-        method. See <a 
href="https://cwiki.apache.org/confluence/x/vYTOCg";>KIP-740</a> for more 
details.
+        and will be removed, as they were never intended for public use. We 
have also deprecated the 
<code>org.apache.kafka.streams.processsor.TaskMetadata</code> class and 
introduced a new interface
+        <code>org.apache.kafka.streams.TaskMetadata</code> to be used instead. 
This change was introduced to better reflect the fact that 
<code>TaskMetadata</code> was not meant to be instantiated outside
+        of Kafka codebase.
+        Please note that the new <code>TaskMetadata</code> offers APIs that 
better represent the task id as an actual <code>TaskId</code> object instead of 
a String. Please migrate to the new
+        <code>org.apache.kafka.streams.TaskMetadata</code> which offers these 
better methods, for example, by using the new 
<code>ThreadMetadata#activeTasks</code> and 
<code>ThreadMetadata#standbyTasks</code>.
+        <code>org.apache.kafka.streams.processor.ThreadMetadata</code> class 
is also now deprecated and the newly introduced interface 
<code>org.apache.kafka.streams.ThreadMetadata</code> is to be used instead. In 
this new <code>ThreadMetadata</code>
+        interface, any reference to the deprecated <code>TaskMetadata</code> 
is replaced by the new interface.
+        Finally, also 
<code>org.apache.kafka.streams.state.StreamsMetadata</code> has been 
deprecated. Please migrate to the new 
<code>org.apache.kafka.streams.StreamsMetadata</code>.
+        We have deprecated several methods under 
<code>org.apache.kafka.streams.KafkaStreams</code> that returned the 
aforementioned deprecated classes:
     </p>
+    <ul>
+        <li>Users of <code>KafkaStreams#allMetadata</code> are meant to 
migrate to the new <code>KafkaStreams#allRunningMetadata</code>.</li>
+        <li>Users of <code>KafkaStreams#allMetadataForStore(String)</code> are 
meant to migrate to the new 
<code>KafkaStreams#allMetadataForGivenStore(String)</code>.</li>
+        <li>Users of <code>KafkaStreams#localThreadsMetadata</code> are meant 
to migrate to the new <code>KafkaStreams#threadsMetadata</code>.</li>

Review comment:
       You need to adapt this text to the new method names.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+
+public class TaskMetadataImplTest {
+
+    public static final TaskId TASK_ID = new TaskId(1, 2);
+    public static final TopicPartition TP_0 = new TopicPartition("t", 0);
+    public static final TopicPartition TP_1 = new TopicPartition("t", 1);
+    public static final Set<TopicPartition> TOPIC_PARTITIONS = mkSet(TP_0, 
TP_1);
+    public static final Map<TopicPartition, Long> COMMITTED_OFFSETS = 
mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 2L));
+    public static final Map<TopicPartition, Long> END_OFFSETS = 
mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 3L));
+    public static final Optional<Long> TIME_CURRENT_IDLING_STARTED = 
Optional.of(3L);
+
+    private TaskMetadata taskMetadata;
+
+    @Before
+    public void setUp() {
+        taskMetadata = new TaskMetadataImpl(
+                TASK_ID,
+                TOPIC_PARTITIONS,
+                COMMITTED_OFFSETS,
+                END_OFFSETS,
+                TIME_CURRENT_IDLING_STARTED);

Review comment:
       nit:
   ```suggestion
           taskMetadata = new TaskMetadataImpl(
               TASK_ID,
               TOPIC_PARTITIONS,
               COMMITTED_OFFSETS,
               END_OFFSETS,
               TIME_CURRENT_IDLING_STARTED
           );
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImplTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class ThreadMetadataImplTest {
+
+    public static final String THREAD_NAME = "thread name";
+    public static final String THREAD_STATE = "thread state";
+    public static final String MAIN_CONSUMER_CLIENT_ID = "main Consumer 
ClientID";
+    public static final String RESTORE_CONSUMER_CLIENT_ID = "restore Consumer 
ClientID";
+    public static final String CLIENT_ID_1 = "client Id 1";
+    public static final String CLIENT_ID_2 = "client Id 2";
+    public static final Set<String> PRODUCER_CLIENT_IDS = mkSet(CLIENT_ID_1, 
CLIENT_ID_2);
+    public static final TaskId TASK_ID_0 = new TaskId(1, 2);
+    public static final TaskId TASK_ID_1 = new TaskId(1, 1);
+    public static final TopicPartition TP_0_0 = new TopicPartition("t", 0);
+    public static final TopicPartition TP_1_0 = new TopicPartition("t", 1);
+    public static final TopicPartition TP_0_1 = new TopicPartition("t", 2);
+    public static final TopicPartition TP_1_1 = new TopicPartition("t", 3);
+    public static final TaskMetadata TM_0 = new TaskMetadataImpl(
+            TASK_ID_0,
+            mkSet(TP_0_0, TP_1_0),
+            mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+            mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+            Optional.of(3L));
+    public static final TaskMetadata TM_1 = new TaskMetadataImpl(
+            TASK_ID_1,
+            mkSet(TP_0_1, TP_1_1),
+            mkMap(mkEntry(TP_0_1, 1L), mkEntry(TP_1_1, 2L)),
+            mkMap(mkEntry(TP_0_1, 1L), mkEntry(TP_1_1, 2L)),
+            Optional.of(3L));
+    public static final Set<TaskMetadata> STANDBY_TASKS = mkSet(TM_0, TM_1);
+    public static final Set<TaskMetadata> ACTIVE_TASKS = mkSet(TM_1);
+    public static final String ADMIN_CLIENT_ID = "admin ClientID";
+
+    private ThreadMetadata threadMetadata;
+
+    @Before
+    public void setUp() {
+        threadMetadata = new ThreadMetadataImpl(
+                THREAD_NAME,
+                THREAD_STATE,
+                MAIN_CONSUMER_CLIENT_ID,
+                RESTORE_CONSUMER_CLIENT_ID,
+                PRODUCER_CLIENT_IDS,
+                ADMIN_CLIENT_ID,
+                ACTIVE_TASKS,
+                STANDBY_TASKS
+                );
+    }
+
+    @Test
+    public void shouldNotAllowModificationOfInternalStateViaGetters() {
+        assertTrue(isUnmodifiable(threadMetadata.producerClientIds()));
+        assertTrue(isUnmodifiable(threadMetadata.activeTasks()));
+        assertTrue(isUnmodifiable(threadMetadata.standbyTasks()));
+    }
+
+    @Test
+    public void shouldFollowHashCodeAndEqualsContract() {

Review comment:
       I prefer to have separate tests, because when something fails you 
immediately know the cause. But if you like to keep like that, I am fine with 
it.  

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -229,11 +230,11 @@ private void rebuildMetadata(final Map<HostInfo, 
Set<TopicPartition>> activePart
                                  final Map<HostInfo, Set<TopicPartition>> 
standbyPartitionHostMap) {
         if (activePartitionHostMap.isEmpty() && 
standbyPartitionHostMap.isEmpty()) {
             allMetadata = Collections.emptyList();
-            localMetadata.set(new StreamsMetadata(thisHost,
-                                                  Collections.emptySet(),
-                                                  Collections.emptySet(),
-                                                  Collections.emptySet(),
-                                                  Collections.emptySet()
+            localMetadata.set(new StreamsMetadataImpl(thisHost,
+                    Collections.emptySet(),
+                    Collections.emptySet(),
+                    Collections.emptySet(),
+                    Collections.emptySet()
             ));

Review comment:
       nit: it should be 4 not 8 spaces
   ```suggestion
               localMetadata.set(new StreamsMetadataImpl(
                   thisHost,
                   Collections.emptySet(),
                   Collections.emptySet(),
                   Collections.emptySet(),
                   Collections.emptySet()
               ));
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} 
application.
+ * It contains the user supplied {@link HostInfo} that can be used by 
developers to build
+ * APIs and services to connect to other instances, the Set of state stores 
available on
+ * the instance and the Set of {@link TopicPartition}s available on the 
instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadataImpl implements StreamsMetadata {
+    /**
+     * Sentinel to indicate that the StreamsMetadata is currently unavailable. 
This can occur during rebalance
+     * operations.
+     */
+    public final static StreamsMetadataImpl NOT_AVAILABLE = new 
StreamsMetadataImpl(HostInfo.unavailable(),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet());

Review comment:
       ```suggestion
       public final static StreamsMetadataImpl NOT_AVAILABLE = new 
StreamsMetadataImpl(
           HostInfo.unavailable(),
           Collections.emptySet(),
           Collections.emptySet(),
           Collections.emptySet(),
           Collections.emptySet()
       );
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -258,11 +259,11 @@ private void rebuildMetadata(final Map<HostInfo, 
Set<TopicPartition>> activePart
                     
standbyStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, 
standbyPartitionsOnHost));
                 }
 
-                final StreamsMetadata metadata = new StreamsMetadata(hostInfo,
-                                                                     
activeStoresOnHost,
-                                                                     
activePartitionsOnHost,
-                                                                     
standbyStoresOnHost,
-                                                                     
standbyPartitionsOnHost);
+                final StreamsMetadata metadata = new 
StreamsMetadataImpl(hostInfo,
+                        activeStoresOnHost,
+                        activePartitionsOnHost,
+                        standbyStoresOnHost,
+                        standbyPartitionsOnHost);

Review comment:
       ```suggestion
                   final StreamsMetadata metadata = new StreamsMetadataImpl(
                       hostInfo,
                       activeStoresOnHost,
                       activePartitionsOnHost,
                       standbyStoresOnHost,
                       standbyPartitionsOnHost
                   );
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImplTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class ThreadMetadataImplTest {
+
+    public static final String THREAD_NAME = "thread name";
+    public static final String THREAD_STATE = "thread state";
+    public static final String MAIN_CONSUMER_CLIENT_ID = "main Consumer 
ClientID";
+    public static final String RESTORE_CONSUMER_CLIENT_ID = "restore Consumer 
ClientID";
+    public static final String CLIENT_ID_1 = "client Id 1";
+    public static final String CLIENT_ID_2 = "client Id 2";
+    public static final Set<String> PRODUCER_CLIENT_IDS = mkSet(CLIENT_ID_1, 
CLIENT_ID_2);
+    public static final TaskId TASK_ID_0 = new TaskId(1, 2);
+    public static final TaskId TASK_ID_1 = new TaskId(1, 1);
+    public static final TopicPartition TP_0_0 = new TopicPartition("t", 0);
+    public static final TopicPartition TP_1_0 = new TopicPartition("t", 1);
+    public static final TopicPartition TP_0_1 = new TopicPartition("t", 2);
+    public static final TopicPartition TP_1_1 = new TopicPartition("t", 3);
+    public static final TaskMetadata TM_0 = new TaskMetadataImpl(
+            TASK_ID_0,
+            mkSet(TP_0_0, TP_1_0),
+            mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+            mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+            Optional.of(3L));

Review comment:
       nit:
   ```suggestion
       public static final TaskMetadata TM_0 = new TaskMetadataImpl(
           TASK_ID_0,
           mkSet(TP_0_0, TP_1_0),
           mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
           mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
           Optional.of(3L)
       );
   ```
   here and below.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
##########
@@ -55,6 +62,63 @@ public void 
shouldNotAllowModificationOfInternalStateViaGetters() {
         assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames()));
     }
 
+    @Test
+    public void shouldFollowHashCodeAndEqualsContract() {

Review comment:
       see my other comment about separate tests.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##########
@@ -57,7 +57,11 @@ public ThreadMetadataImpl(final String threadName,
                               final Set<TaskMetadata> standbyTasks) {
         this.mainConsumerClientId = mainConsumerClientId;
         this.restoreConsumerClientId = restoreConsumerClientId;
-        this.producerClientIds = 
Collections.unmodifiableSet(producerClientIds);
+        if (producerClientIds != null) {
+            this.producerClientIds = 
Collections.unmodifiableSet(producerClientIds);
+        } else {
+            this.producerClientIds = Collections.emptySet();
+        }

Review comment:
       I could not reproduce this issue. the set of producer IDs should never 
be `null`. If this is the case the test is wrong.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
##########
@@ -18,30 +18,39 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collection;
+import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 public class StreamsMetadataTest {
 
     private static final HostInfo HOST_INFO = new HostInfo("local", 12);
+    public static final Set<String> STATE_STORE_NAMES = mkSet("store1", 
"store2");
     private static final TopicPartition TP_0 = new TopicPartition("t", 0);
     private static final TopicPartition TP_1 = new TopicPartition("t", 1);
+    public static final Set<TopicPartition> TOPIC_PARTITIONS = mkSet(TP_0, 
TP_1);
+    public static final Set<String> STAND_BY_STORE_NAMES = mkSet("store2");
+    public static final Set<TopicPartition> STANDBY_TOPIC_PARTITIONS = 
mkSet(TP_1);
 
     private StreamsMetadata streamsMetadata;
 
     @Before
     public void setUp() {
-        streamsMetadata = new StreamsMetadata(
-            HOST_INFO,
-            mkSet("store1", "store2"),
-            mkSet(TP_0, TP_1),
-            mkSet("store2"),
-            mkSet(TP_1)
+        streamsMetadata = new StreamsMetadataImpl(
+                HOST_INFO,
+                STATE_STORE_NAMES,
+                TOPIC_PARTITIONS,
+                STAND_BY_STORE_NAMES,
+                STANDBY_TOPIC_PARTITIONS

Review comment:
       nit: The indentation was actually correct before. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link 
KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * @return the state of the Thread
+     */
+    String threadState();
+
+    /**
+     * @return the name of the Thread
+     */
+    String threadName();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the 
current active tasks
+     * @return a set of metadata for the active tasks
+     */
+    Set<TaskMetadata> activeTasks();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the 
current standby tasks
+     * @return a set of metadata for the standby tasks
+     */
+    Set<TaskMetadata> standbyTasks();
+
+    /**
+     * @return the consumer Client Id
+     */
+    String consumerClientId();
+
+    /**
+     * @return the restore consumer Client Id
+     */
+    String restoreConsumerClientId();
+
+    /**
+     * This function will return the set of Client Ids for the producers
+     * @return set of producer Client Ids
+     */
+    Set<String> producerClientIds();
+
+    /**
+     * @return the admin Client Id
+     */
+    String adminClientId();
+
+    /**
+     * Compares the specified object with this ThreadMetadata. Returns {@code 
true} if and only if the specified object is
+     * also a TaskMetadata and both {@code threadName()} are equal, {@code 
threadState()} are equal, {@code activeTasks()} contain the same

Review comment:
       `TaskMetadata` -> `ThreadMetadata`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##########
@@ -57,7 +57,11 @@ public ThreadMetadataImpl(final String threadName,
                               final Set<TaskMetadata> standbyTasks) {
         this.mainConsumerClientId = mainConsumerClientId;
         this.restoreConsumerClientId = restoreConsumerClientId;
-        this.producerClientIds = 
Collections.unmodifiableSet(producerClientIds);
+        if (producerClientIds != null) {
+            this.producerClientIds = 
Collections.unmodifiableSet(producerClientIds);
+        } else {
+            this.producerClientIds = Collections.emptySet();
+        }

Review comment:
       OK, now I see. You meant `StreamThreadTest`. There you need to setup the 
mock for the `TaskManager` to return at least an empty set with 
`expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());`.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+
+public class TaskMetadataImplTest {
+
+    public static final TaskId TASK_ID = new TaskId(1, 2);
+    public static final TopicPartition TP_0 = new TopicPartition("t", 0);
+    public static final TopicPartition TP_1 = new TopicPartition("t", 1);
+    public static final Set<TopicPartition> TOPIC_PARTITIONS = mkSet(TP_0, 
TP_1);
+    public static final Map<TopicPartition, Long> COMMITTED_OFFSETS = 
mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 2L));
+    public static final Map<TopicPartition, Long> END_OFFSETS = 
mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 3L));
+    public static final Optional<Long> TIME_CURRENT_IDLING_STARTED = 
Optional.of(3L);
+
+    private TaskMetadata taskMetadata;
+
+    @Before
+    public void setUp() {
+        taskMetadata = new TaskMetadataImpl(
+                TASK_ID,
+                TOPIC_PARTITIONS,
+                COMMITTED_OFFSETS,
+                END_OFFSETS,
+                TIME_CURRENT_IDLING_STARTED);
+    }
+
+    @Test
+    public void shouldNotAllowModificationOfInternalStateViaGetters() {
+        assertTrue(isUnmodifiable(taskMetadata.topicPartitions()));
+        assertTrue(isUnmodifiable(taskMetadata.committedOffsets()));
+        assertTrue(isUnmodifiable(taskMetadata.endOffsets()));

Review comment:
       In general, for new assertions we usually use
   ```suggestion
           assertThat(isUnmodifiable(taskMetadata.topicPartitions(), is(true)));
           assertThat(isUnmodifiable(taskMetadata.committedOffsets(), 
is(true)));
           assertThat(isUnmodifiable(taskMetadata.endOffsets()), is(true));
   ```
   This applies here and in other places of this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to