lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1244450143


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config 
brokerSideTopicConfig,
         return brokerSideConfigEntry.value();
     }
 
+    public Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final 
Set<String> topics) {
+        log.debug("Starting to describe topics {} in partition assignor.", 
topics);
+
+        long currentWallClockMs = time.milliseconds();
+        final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+        Set<String> topicsToDescribe = new HashSet<>(topics);
+        final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new 
HashMap<>();
+
+        while (!topicsToDescribe.isEmpty()) {
+            final Map<String, List<TopicPartitionInfo>> existed = 
getTopicPartitionInfo(topicsToDescribe, null);
+            topicPartitionInfo.putAll(existed);
+            topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+            if (!topicsToDescribe.isEmpty()) {
+                currentWallClockMs = time.milliseconds();
+
+                if (currentWallClockMs >= deadlineMs) {
+                    final String timeoutError = String.format(
+                        "Could not create topics within %d milliseconds. " +
+                            "This can happen if the Kafka cluster is 
temporarily not available.",
+                        retryTimeoutMs);
+                    log.error(timeoutError);

Review Comment:
   Is this mimicking the `makeReady` function logs. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config 
brokerSideTopicConfig,
         return brokerSideConfigEntry.value();
     }
 
+    public Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final 
Set<String> topics) {
+        log.debug("Starting to describe topics {} in partition assignor.", 
topics);
+
+        long currentWallClockMs = time.milliseconds();
+        final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+        Set<String> topicsToDescribe = new HashSet<>(topics);
+        final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new 
HashMap<>();
+
+        while (!topicsToDescribe.isEmpty()) {
+            final Map<String, List<TopicPartitionInfo>> existed = 
getTopicPartitionInfo(topicsToDescribe, null);
+            topicPartitionInfo.putAll(existed);
+            topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+            if (!topicsToDescribe.isEmpty()) {
+                currentWallClockMs = time.milliseconds();
+
+                if (currentWallClockMs >= deadlineMs) {
+                    final String timeoutError = String.format(
+                        "Could not create topics within %d milliseconds. " +
+                            "This can happen if the Kafka cluster is 
temporarily not available.",
+                        retryTimeoutMs);
+                    log.error(timeoutError);
+                    throw new TimeoutException(timeoutError);
+                }
+                log.info(

Review Comment:
   Is this mimicking the `makeReady` function logs. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> 
tasksForTopicGroup,
+                                 final Map<UUID, Map<String, 
Optional<String>>> processRacks,
+                                 final InternalTopicManager 
internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if 
(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy))
 {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed

Review Comment:
   The reason I want to enforce all topic partition racks exist to enable the 
assignor is to avoid too much assignment changes if some racks appear or 
disappear later somehow.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> 
tasksForTopicGroup,
+                                 final Map<UUID, Map<String, 
Optional<String>>> processRacks,
+                                 final InternalTopicManager 
internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if 
(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy))
 {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : 
partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = 
fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", 
topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> 
new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);

Review Comment:
   Sure



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = 
"stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, 
node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, 
node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new 
StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, 
referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new 
HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new 
TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), 
Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new 
TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since process1 doesn't have rackId
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackDiffersInSameProcess() {

Review Comment:
   I'm ok deleting this. Was trying to get test coverage.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = 
"stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, 
node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, 
node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new 
StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, 
referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new 
HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new 
TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), 
Collections.singleton(tp00)),

Review Comment:
   This is just to simplify to one task



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;

Review Comment:
   What you said make sense. I was also on the fence about this.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> 
tasksForTopicGroup,
+                                 final Map<UUID, Map<String, 
Optional<String>>> processRacks,
+                                 final InternalTopicManager 
internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if 
(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy))
 {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : 
partitionsForTask.values()) {

Review Comment:
   Yes



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> 
tasksForTopicGroup,
+                                 final Map<UUID, Map<String, 
Optional<String>>> processRacks,
+                                 final InternalTopicManager 
internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if 
(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy))
 {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : 
partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = 
fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", 
topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());

Review Comment:
   If an external topic doesn't have node information, we will also try to 
describe it



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> 
tasksForTopicGroup,
+                                 final Map<UUID, Map<String, 
Optional<String>>> processRacks,
+                                 final InternalTopicManager 
internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if 
(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy))
 {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : 
partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = 
fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", 
topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {

Review Comment:
   Yes. It's 0 for repartition topics we create: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java#L99



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> 
tasksForTopicGroup,
+                                 final Map<UUID, Map<String, 
Optional<String>>> processRacks,
+                                 final InternalTopicManager 
internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if 
(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy))
 {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : 
partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = 
fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", 
topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> 
new HashSet<>()).add(node.rack());

Review Comment:
   Good catch



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -305,6 +327,31 @@ public void 
shouldThrowTimeoutExceptionIfTopicExistsDuringSetup() {
         );
     }
 
+    @Test
+    public void 
shouldThrowTimeoutExceptionIfGetPartitionInfoHasTopicDescriptionTimeout() {
+        mockAdminClient.timeoutNextRequest(1);
+
+        final InternalTopicManager internalTopicManager =
+                new InternalTopicManager(time, mockAdminClient, new 
StreamsConfig(config));
+        try {
+            final Set<String> topic1set = new 
HashSet<>(Collections.singletonList(topic1));
+            internalTopicManager.getTopicPartitionInfo(topic1set, null);
+
+        } catch (final TimeoutException expected) {
+            assertEquals(TimeoutException.class, 
expected.getCause().getClass());
+        }
+
+        mockAdminClient.timeoutNextRequest(1);

Review Comment:
   This is actually testing `getTopicPartitionInfo(final Set<String> topics, 
final Set<String> tempUnknownTopics)`



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> 
partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> 
tasksForTopicGroup,
+                                 final Map<UUID, Map<String, 
Optional<String>>> processRacks,
+                                 final InternalTopicManager 
internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if 
(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy))
 {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : 
partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = 
fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", 
topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> 
new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);
+          try {
+              final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = 
internalTopicManager.getTopicPartitionInfo(topicsToDescribe);
+              if (topicsToDescribe.size() > topicPartitionInfo.size()) {
+                  topicsToDescribe.removeAll(topicPartitionInfo.keySet());

Review Comment:
   To get the ones which fail to be described. It's used again in log on next 
line.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -279,6 +279,28 @@ public CreateTopicsResult createTopics(final 
Collection<NewTopic> newTopics,
                 + " You can change the replication.factor config or upgrade 
your brokers to version 2.4 or newer to avoid this error."));
     }
 
+    @Test
+    public void shouldThrowTimeoutExceptionInGetPartitionInfo() {
+        setupTopicInMockAdminClient(topic1, Collections.emptyMap());
+        final MockTime time = new MockTime(
+            (Integer) 
config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG))
 / 15

Review Comment:
   I think this is `autoTickMs`, if set to 0, time won't increase?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = 
"stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, 
node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, 
node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new 
StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*

Review Comment:
   Might need later after more implementation...



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = 
"stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, 
node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, 
node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new 
StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, 
referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new 
HashSet<>(Arrays.asList(tp00, tp10))),

Review Comment:
   For simplicity, this is just 1 task.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = 
"stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, 
node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, 
node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new 
StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, 
referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new 
HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new 
TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), 
Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new 
TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since process1 doesn't have rackId
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackDiffersInSameProcess() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
+
+        // Different rackId for same process
+        processRacks.computeIfAbsent(process0UUID , k -> new 
HashMap<>()).put("consumer1", Optional.of("rack1"));
+        processRacks.computeIfAbsent(process0UUID , k -> new 
HashMap<>()).put("consumer2", Optional.of("rack2"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), 
Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new 
TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithoutDescribingTopics() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
+
+        processRacks.computeIfAbsent(process0UUID , k -> new 
HashMap<>()).put("consumer1", Optional.of("rack1"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), 
Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new 
TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // tp00 has rackInfo in cluster metadata
+        assertTrue(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithDescribingTopics() {
+        final PartitionInfo noNodeInfo = new PartitionInfo(TOPIC0, 0, null, 
new Node[0], new Node[0]);
+
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2, Node.noNode())), 
// mockClientSupplier.setCluster requires noNode

Review Comment:
   
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java#L56
   
   has `cluster.nodeById(-1)`



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = 
"stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, 
node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, 
node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new 
StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, 
referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new 
HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new 
TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),

Review Comment:
   We don't need to. The test is mainly testing 
   `processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.empty()));`



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = 
"stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, 
node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, 
node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new 
StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, 
referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new 
HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", 
Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new 
HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new 
TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new 
AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());

Review Comment:
   Ideally yes. But this might be hard to verify... May make 
`validateTopicPartitionRack` public and verify it's false? Or we have to extract
   ```
                   final PartitionInfo partitionInfo = 
fullMetadata.partition(topicPartition);
                   if (partitionInfo == null) {
                       log.error("TopicPartition {} doesn't exist in cluster", 
topicPartition);
                       return false;
                   }
   ```
   to a public function and verify that returns false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to