[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386548#comment-16386548 ]
ASF GitHub Bot commented on KAFKA-6054: --------------------------------------- mjsax closed pull request #4630: KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path URL: https://github.com/apache/kafka/pull/4630 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 6b3626101bd..47becfc239b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -226,11 +226,11 @@ public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde"; private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface."; - /** {@code default timestamp.extractor} */ + /** {@code default.timestamp.extractor} */ public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor"; private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface."; - /** {@code default value.serde} */ + /** {@code default.value.serde} */ public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde"; private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface."; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 9aa0e94c8c1..71a84b2ca73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -66,7 +66,8 @@ public final TaskId taskId; public final TopicPartition partition; - AssignedPartition(final TaskId taskId, final TopicPartition partition) { + AssignedPartition(final TaskId taskId, + final TopicPartition partition) { this.taskId = taskId; this.partition = partition; } @@ -77,11 +78,11 @@ public int compareTo(final AssignedPartition that) { } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (!(o instanceof AssignedPartition)) { return false; } - AssignedPartition other = (AssignedPartition) o; + final AssignedPartition other = (AssignedPartition) o; return compareTo(other) == 0; } @@ -104,8 +105,9 @@ public int hashCode() { final String host = getHost(endPoint); final Integer port = getPort(endPoint); - if (host == null || port == null) + if (host == null || port == null) { throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint)); + } hostInfo = new HostInfo(host, port); } else { @@ -119,10 +121,11 @@ public int hashCode() { state = new ClientState(); } - void addConsumer(final String consumerMemberId, final SubscriptionInfo info) { + void addConsumer(final String consumerMemberId, + final SubscriptionInfo info) { consumers.add(consumerMemberId); - state.addPreviousActiveTasks(info.prevTasks); - state.addPreviousStandbyTasks(info.standbyTasks); + state.addPreviousActiveTasks(info.prevTasks()); + state.addPreviousStandbyTasks(info.standbyTasks()); state.incrementCapacity(); } @@ -157,8 +160,9 @@ public String toString() { private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() { @Override - public int compare(TopicPartition p1, TopicPartition p2) { - int result = p1.topic().compareTo(p2.topic()); + public int compare(final TopicPartition p1, + final TopicPartition p2) { + final int result = p1.topic().compareTo(p2.topic()); if (result != 0) { return result; @@ -194,15 +198,15 @@ public void configure(final Map<String, ?> configs) { final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR); if (o == null) { - KafkaException ex = new KafkaException("TaskManager is not specified"); - log.error(ex.getMessage(), ex); - throw ex; + final KafkaException fatalException = new KafkaException("TaskManager is not specified"); + log.error(fatalException.getMessage(), fatalException); + throw fatalException; } if (!(o instanceof TaskManager)) { - KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName())); - log.error(ex.getMessage(), ex); - throw ex; + final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName())); + log.error(fatalException.getMessage(), fatalException); + throw fatalException; } taskManager = (TaskManager) o; @@ -214,14 +218,14 @@ public void configure(final Map<String, ?> configs) { final String userEndPoint = streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG); if (userEndPoint != null && !userEndPoint.isEmpty()) { try { - String host = getHost(userEndPoint); - Integer port = getPort(userEndPoint); + final String host = getHost(userEndPoint); + final Integer port = getPort(userEndPoint); if (host == null || port == null) throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair" + " but received %s", logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s", logPrefix, userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG)); } @@ -240,7 +244,7 @@ public String name() { } @Override - public Subscription subscription(Set<String> topics) { + public Subscription subscription(final Set<String> topics) { // Adds the following information to subscription // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) // 2. Task ids of previously running tasks @@ -249,7 +253,11 @@ public Subscription subscription(Set<String> topics) { final Set<TaskId> previousActiveTasks = taskManager.prevActiveTaskIds(); final Set<TaskId> standbyTasks = taskManager.cachedTasksIds(); standbyTasks.removeAll(previousActiveTasks); - final SubscriptionInfo data = new SubscriptionInfo(taskManager.processId(), previousActiveTasks, standbyTasks, this.userEndPoint); + final SubscriptionInfo data = new SubscriptionInfo( + taskManager.processId(), + previousActiveTasks, + standbyTasks, + this.userEndPoint); taskManager.updateSubscriptionsFromMetadata(topics); @@ -277,22 +285,32 @@ public Subscription subscription(Set<String> topics) { * 3. within each client, tasks are assigned to consumer clients in round-robin manner. */ @Override - public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) { + public Map<String, Assignment> assign(final Cluster metadata, + final Map<String, Subscription> subscriptions) { // construct the client metadata from the decoded subscription info - Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>(); - - for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) { - String consumerId = entry.getKey(); - Subscription subscription = entry.getValue(); - - SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); + final Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>(); + + int minUserMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; + for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) { + final String consumerId = entry.getKey(); + final Subscription subscription = entry.getValue(); + + final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); + final int usedVersion = info.version(); + if (usedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) { + throw new IllegalStateException("Unknown metadata version: " + usedVersion + + "; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION); + } + if (usedVersion < minUserMetadataVersion) { + minUserMetadataVersion = usedVersion; + } // create the new client metadata if necessary - ClientMetadata clientMetadata = clientsMetadata.get(info.processId); + ClientMetadata clientMetadata = clientsMetadata.get(info.processId()); if (clientMetadata == null) { - clientMetadata = new ClientMetadata(info.userEndPoint); - clientsMetadata.put(info.processId, clientMetadata); + clientMetadata = new ClientMetadata(info.userEndPoint()); + clientsMetadata.put(info.processId(), clientMetadata); } // add the consumer to the client @@ -309,8 +327,8 @@ public Subscription subscription(Set<String> topics) { final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = taskManager.builder().topicGroups(); final Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>(); - for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { - for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) { + for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { + for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) { repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic)); } } @@ -319,13 +337,13 @@ public Subscription subscription(Set<String> topics) { do { numPartitionsNeeded = false; - for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { - for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) { + for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { + for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) { int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions; // try set the number of partitions for this repartition topic if it is not set yet if (numPartitions == UNKNOWN) { - for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) { + for (final InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) { final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics; if (otherSinkTopics.contains(topicName)) { @@ -375,7 +393,7 @@ public Subscription subscription(Set<String> topics) { // augment the metadata with the newly computed number of partitions for all the // repartition source topics final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>(); - for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) { + for (final Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) { final String topic = entry.getKey(); final int numPartitions = entry.getValue().numPartitions; @@ -395,7 +413,7 @@ public Subscription subscription(Set<String> topics) { // get the tasks as partition groups from the partition grouper final Set<String> allSourceTopics = new HashSet<>(); final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>(); - for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { + for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { allSourceTopics.addAll(entry.getValue().sourceTopics); sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics); } @@ -405,9 +423,9 @@ public Subscription subscription(Set<String> topics) { // check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks final Set<TopicPartition> allAssignedPartitions = new HashSet<>(); final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>(); - for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) { + for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) { final Set<TopicPartition> partitions = entry.getValue(); - for (TopicPartition partition : partitions) { + for (final TopicPartition partition : partitions) { if (allAssignedPartitions.contains(partition)) { log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask); } @@ -422,10 +440,10 @@ public Subscription subscription(Set<String> topics) { } ids.add(id); } - for (String topic : allSourceTopics) { + for (final String topic : allSourceTopics) { final List<PartitionInfo> partitionInfoList = fullMetadata.partitionsForTopic(topic); if (!partitionInfoList.isEmpty()) { - for (PartitionInfo partitionInfo : partitionInfoList) { + for (final PartitionInfo partitionInfo : partitionInfoList) { final TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); if (!allAssignedPartitions.contains(partition)) { log.warn("Partition {} is not assigned to any tasks: {}", partition, partitionsForTask); @@ -438,15 +456,15 @@ public Subscription subscription(Set<String> topics) { // add tasks to state change log topic subscribers final Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>(); - for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { + for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { final int topicGroupId = entry.getKey(); final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics; - for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) { + for (final InternalTopicConfig topicConfig : stateChangelogTopics.values()) { // the expected number of partitions is the max value of TaskId.partition + 1 int numPartitions = UNKNOWN; if (tasksByTopicGroup.get(topicGroupId) != null) { - for (TaskId task : tasksByTopicGroup.get(topicGroupId)) { + for (final TaskId task : tasksByTopicGroup.get(topicGroupId)) { if (numPartitions < task.partition + 1) numPartitions = task.partition + 1; } @@ -468,7 +486,7 @@ public Subscription subscription(Set<String> topics) { // assign tasks to clients final Map<UUID, ClientState> states = new HashMap<>(); - for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) { + for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) { states.put(entry.getKey(), entry.getValue().state); } @@ -484,25 +502,27 @@ public Subscription subscription(Set<String> topics) { // construct the global partition assignment per host map final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>(); - for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) { - final HostInfo hostInfo = entry.getValue().hostInfo; + if (minUserMetadataVersion == 2) { + for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) { + final HostInfo hostInfo = entry.getValue().hostInfo; - if (hostInfo != null) { - final Set<TopicPartition> topicPartitions = new HashSet<>(); - final ClientState state = entry.getValue().state; + if (hostInfo != null) { + final Set<TopicPartition> topicPartitions = new HashSet<>(); + final ClientState state = entry.getValue().state; - for (final TaskId id : state.activeTasks()) { - topicPartitions.addAll(partitionsForTask.get(id)); - } + for (final TaskId id : state.activeTasks()) { + topicPartitions.addAll(partitionsForTask.get(id)); + } - partitionsByHostState.put(hostInfo, topicPartitions); + partitionsByHostState.put(hostInfo, topicPartitions); + } } } taskManager.setPartitionsByHostState(partitionsByHostState); // within the client, distribute tasks to its owned consumers final Map<String, Assignment> assignment = new HashMap<>(); - for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) { + for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) { final Set<String> consumers = entry.getValue().consumers; final ClientState state = entry.getValue().state; @@ -511,7 +531,7 @@ public Subscription subscription(Set<String> topics) { int consumerTaskIndex = 0; - for (String consumer : consumers) { + for (final String consumer : consumers) { final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>(); final ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>(); @@ -540,13 +560,15 @@ public Subscription subscription(Set<String> topics) { Collections.sort(assignedPartitions); final List<TaskId> active = new ArrayList<>(); final List<TopicPartition> activePartitions = new ArrayList<>(); - for (AssignedPartition partition : assignedPartitions) { + for (final AssignedPartition partition : assignedPartitions) { active.add(partition.taskId); activePartitions.add(partition.partition); } // finally, encode the assignment before sending back to coordinator - assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode())); + assignment.put(consumer, new Assignment( + activePartitions, + new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode())); } } @@ -577,26 +599,54 @@ public Subscription subscription(Set<String> topics) { * @throws TaskAssignmentException if there is no task id for one of the partitions specified */ @Override - public void onAssignment(Assignment assignment) { - List<TopicPartition> partitions = new ArrayList<>(assignment.partitions()); + public void onAssignment(final Assignment assignment) { + final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions()); Collections.sort(partitions, PARTITION_COMPARATOR); - AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); + final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); + final int usedVersion = info.version(); - Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + // version 1 field + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + // version 2 fields + final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>(); + final Map<HostInfo, Set<TopicPartition>> partitionsByHost; + + switch (usedVersion) { + case 1: + processVersionOneAssignment(info, partitions, activeTasks); + partitionsByHost = Collections.emptyMap(); + break; + case 2: + processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); + partitionsByHost = info.partitionsByHost(); + break; + default: + throw new IllegalStateException("Unknown metadata version: " + usedVersion + + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION); + } + taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo)); + taskManager.setPartitionsByHostState(partitionsByHost); + taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks()); + taskManager.updateSubscriptionsFromAssignment(partitions); + } + + private void processVersionOneAssignment(final AssignmentInfo info, + final List<TopicPartition> partitions, + final Map<TaskId, Set<TopicPartition>> activeTasks) { // the number of assigned partitions should be the same as number of active tasks, which // could be duplicated if one task has more than one assigned partitions - if (partitions.size() != info.activeTasks.size()) { + if (partitions.size() != info.activeTasks().size()) { throw new TaskAssignmentException( - String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" + - ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks.size(), info.toString()) + String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" + + ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks().size(), info.toString()) ); } for (int i = 0; i < partitions.size(); i++) { - TopicPartition partition = partitions.get(i); - TaskId id = info.activeTasks.get(i); + final TopicPartition partition = partitions.get(i); + final TaskId id = info.activeTasks().get(i); Set<TopicPartition> assignedPartitions = activeTasks.get(id); if (assignedPartitions == null) { @@ -605,23 +655,23 @@ public void onAssignment(Assignment assignment) { } assignedPartitions.add(partition); } + } - final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>(); - for (Set<TopicPartition> value : info.partitionsByHost.values()) { - for (TopicPartition topicPartition : value) { - topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(), - topicPartition.partition(), - null, - new Node[0], - new Node[0])); + private void processVersionTwoAssignment(final AssignmentInfo info, + final List<TopicPartition> partitions, + final Map<TaskId, Set<TopicPartition>> activeTasks, + final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) { + processVersionOneAssignment(info, partitions, activeTasks); + + // process partitions by host + final Map<HostInfo, Set<TopicPartition>> partitionsByHost = info.partitionsByHost(); + for (final Set<TopicPartition> value : partitionsByHost.values()) { + for (final TopicPartition topicPartition : value) { + topicToPartitionInfo.put( + topicPartition, + new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0])); } } - - taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo)); - taskManager.setPartitionsByHostState(info.partitionsByHost); - taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks); - - taskManager.updateSubscriptionsFromAssignment(partitions); } /** @@ -658,10 +708,10 @@ private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitio log.debug("Completed validating internal topics in partition assignor."); } - private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, - Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions, - Cluster metadata) { - for (Set<String> copartitionGroup : copartitionGroups) { + private void ensureCopartitioning(final Collection<Set<String>> copartitionGroups, + final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions, + final Cluster metadata) { + for (final Set<String> copartitionGroup : copartitionGroups) { copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata); } } @@ -677,7 +727,7 @@ private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, private final Set<String> updatedTopicSubscriptions = new HashSet<>(); - public void updateTopics(Collection<String> topicNames) { + public void updateTopics(final Collection<String> topicNames) { updatedTopicSubscriptions.clear(); updatedTopicSubscriptions.addAll(topicNames); } @@ -735,7 +785,7 @@ void validate(final Set<String> copartitionGroup, // if all topics for this co-partition group is repartition topics, // then set the number of partitions to be the maximum of the number of partitions. if (numPartitions == UNKNOWN) { - for (Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) { + for (final Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) { if (copartitionGroup.contains(entry.getKey())) { final int partitions = entry.getValue().numPartitions; if (partitions > numPartitions) { @@ -745,7 +795,7 @@ void validate(final Set<String> copartitionGroup, } } // enforce co-partitioning restrictions to repartition topics by updating their number of partitions - for (Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) { + for (final Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) { if (copartitionGroup.contains(entry.getKey())) { entry.getValue().numPartitions = numPartitions; } @@ -755,7 +805,7 @@ void validate(final Set<String> copartitionGroup, } // following functions are for test only - void setInternalTopicManager(InternalTopicManager internalTopicManager) { + void setInternalTopicManager(final InternalTopicManager internalTopicManager) { this.internalTopicManager = internalTopicManager; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index 8607472c281..c8df7498755 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -39,76 +39,123 @@ public class AssignmentInfo { private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class); - /** - * A new field was added, partitionsByHost. CURRENT_VERSION - * is required so we can decode the previous version. For example, this may occur - * during a rolling upgrade - */ - private static final int CURRENT_VERSION = 2; - public final int version; - public final List<TaskId> activeTasks; // each element corresponds to a partition - public final Map<TaskId, Set<TopicPartition>> standbyTasks; - public final Map<HostInfo, Set<TopicPartition>> partitionsByHost; - public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks, - Map<HostInfo, Set<TopicPartition>> hostState) { - this(CURRENT_VERSION, activeTasks, standbyTasks, hostState); + public static final int LATEST_SUPPORTED_VERSION = 2; + + private final int usedVersion; + private List<TaskId> activeTasks; + private Map<TaskId, Set<TopicPartition>> standbyTasks; + private Map<HostInfo, Set<TopicPartition>> partitionsByHost; + + private AssignmentInfo(final int version) { + this.usedVersion = version; + } + + public AssignmentInfo(final List<TaskId> activeTasks, + final Map<TaskId, Set<TopicPartition>> standbyTasks, + final Map<HostInfo, Set<TopicPartition>> hostState) { + this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState); } - protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks, - Map<HostInfo, Set<TopicPartition>> hostState) { - this.version = version; + public AssignmentInfo(final int version, + final List<TaskId> activeTasks, + final Map<TaskId, Set<TopicPartition>> standbyTasks, + final Map<HostInfo, Set<TopicPartition>> hostState) { + this.usedVersion = version; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; this.partitionsByHost = hostState; } + public int version() { + return usedVersion; + } + + public List<TaskId> activeTasks() { + return activeTasks; + } + + public Map<TaskId, Set<TopicPartition>> standbyTasks() { + return standbyTasks; + } + + public Map<HostInfo, Set<TopicPartition>> partitionsByHost() { + return partitionsByHost; + } + /** * @throws TaskAssignmentException if method fails to encode the data, e.g., if there is an * IO exception during encoding */ public ByteBuffer encode() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(baos); - - try { - // Encode version - out.writeInt(version); - // Encode active tasks - out.writeInt(activeTasks.size()); - for (TaskId id : activeTasks) { - id.writeTo(out); - } - // Encode standby tasks - out.writeInt(standbyTasks.size()); - for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) { - TaskId id = entry.getKey(); - id.writeTo(out); - - Set<TopicPartition> partitions = entry.getValue(); - writeTopicPartitions(out, partitions); - } - out.writeInt(partitionsByHost.size()); - for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost - .entrySet()) { - final HostInfo hostInfo = entry.getKey(); - out.writeUTF(hostInfo.host()); - out.writeInt(hostInfo.port()); - writeTopicPartitions(out, entry.getValue()); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try (final DataOutputStream out = new DataOutputStream(baos)) { + switch (usedVersion) { + case 1: + encodeVersionOne(out); + break; + case 2: + encodeVersionTwo(out); + break; + default: + throw new IllegalStateException("Unknown metadata version: " + usedVersion + + "; latest supported version: " + LATEST_SUPPORTED_VERSION); } out.flush(); out.close(); return ByteBuffer.wrap(baos.toByteArray()); - } catch (IOException ex) { + } catch (final IOException ex) { throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex); } } - private void writeTopicPartitions(DataOutputStream out, Set<TopicPartition> partitions) throws IOException { + private void encodeVersionOne(final DataOutputStream out) throws IOException { + out.writeInt(1); // version + encodeActiveAndStandbyTaskAssignment(out); + } + + private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out) throws IOException { + // encode active tasks + out.writeInt(activeTasks.size()); + for (final TaskId id : activeTasks) { + id.writeTo(out); + } + + // encode standby tasks + out.writeInt(standbyTasks.size()); + for (final Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) { + final TaskId id = entry.getKey(); + id.writeTo(out); + + final Set<TopicPartition> partitions = entry.getValue(); + writeTopicPartitions(out, partitions); + } + } + + private void encodeVersionTwo(final DataOutputStream out) throws IOException { + out.writeInt(2); // version + encodeActiveAndStandbyTaskAssignment(out); + encodePartitionsByHost(out); + } + + private void encodePartitionsByHost(final DataOutputStream out) throws IOException { + // encode partitions by host + out.writeInt(partitionsByHost.size()); + for (final Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost.entrySet()) { + final HostInfo hostInfo = entry.getKey(); + out.writeUTF(hostInfo.host()); + out.writeInt(hostInfo.port()); + writeTopicPartitions(out, entry.getValue()); + } + } + + private void writeTopicPartitions(final DataOutputStream out, + final Set<TopicPartition> partitions) throws IOException { out.writeInt(partitions.size()); - for (TopicPartition partition : partitions) { + for (final TopicPartition partition : partitions) { out.writeUTF(partition.topic()); out.writeInt(partition.partition()); } @@ -117,52 +164,69 @@ private void writeTopicPartitions(DataOutputStream out, Set<TopicPartition> part /** * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown */ - public static AssignmentInfo decode(ByteBuffer data) { + public static AssignmentInfo decode(final ByteBuffer data) { // ensure we are at the beginning of the ByteBuffer data.rewind(); - try (DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) { - // Decode version - int version = in.readInt(); - if (version < 0 || version > CURRENT_VERSION) { - TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version); - log.error(ex.getMessage(), ex); - throw ex; - } + try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) { + // decode used version + final int usedVersion = in.readInt(); + final AssignmentInfo assignmentInfo = new AssignmentInfo(usedVersion); - // Decode active tasks - int count = in.readInt(); - List<TaskId> activeTasks = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - activeTasks.add(TaskId.readFrom(in)); - } - // Decode standby tasks - count = in.readInt(); - Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count); - for (int i = 0; i < count; i++) { - TaskId id = TaskId.readFrom(in); - standbyTasks.put(id, readTopicPartitions(in)); + switch (usedVersion) { + case 1: + decodeVersionOneData(assignmentInfo, in); + break; + case 2: + decodeVersionTwoData(assignmentInfo, in); + break; + default: + TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " + + "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); + log.error(fatalException.getMessage(), fatalException); + throw fatalException; } - Map<HostInfo, Set<TopicPartition>> hostStateToTopicPartitions = new HashMap<>(); - if (version == CURRENT_VERSION) { - int numEntries = in.readInt(); - for (int i = 0; i < numEntries; i++) { - HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt()); - hostStateToTopicPartitions.put(hostInfo, readTopicPartitions(in)); - } - } + return assignmentInfo; + } catch (final IOException ex) { + throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex); + } + } + + private static void decodeVersionOneData(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + // decode active tasks + int count = in.readInt(); + assignmentInfo.activeTasks = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + assignmentInfo.activeTasks.add(TaskId.readFrom(in)); + } - return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions); + // decode standby tasks + count = in.readInt(); + assignmentInfo.standbyTasks = new HashMap<>(count); + for (int i = 0; i < count; i++) { + TaskId id = TaskId.readFrom(in); + assignmentInfo.standbyTasks.put(id, readTopicPartitions(in)); + } + } - } catch (IOException ex) { - throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex); + private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + decodeVersionOneData(assignmentInfo, in); + + // decode partitions by host + assignmentInfo.partitionsByHost = new HashMap<>(); + final int numEntries = in.readInt(); + for (int i = 0; i < numEntries; i++) { + final HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt()); + assignmentInfo.partitionsByHost.put(hostInfo, readTopicPartitions(in)); } } - private static Set<TopicPartition> readTopicPartitions(DataInputStream in) throws IOException { - int numPartitions = in.readInt(); - Set<TopicPartition> partitions = new HashSet<>(numPartitions); + private static Set<TopicPartition> readTopicPartitions(final DataInputStream in) throws IOException { + final int numPartitions = in.readInt(); + final Set<TopicPartition> partitions = new HashSet<>(numPartitions); for (int j = 0; j < numPartitions; j++) { partitions.add(new TopicPartition(in.readUTF(), in.readInt())); } @@ -171,14 +235,14 @@ public static AssignmentInfo decode(ByteBuffer data) { @Override public int hashCode() { - return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode(); + return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode(); } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (o instanceof AssignmentInfo) { - AssignmentInfo other = (AssignmentInfo) o; - return this.version == other.version && + final AssignmentInfo other = (AssignmentInfo) o; + return this.usedVersion == other.usedVersion && this.activeTasks.equals(other.activeTasks) && this.standbyTasks.equals(other.standbyTasks) && this.partitionsByHost.equals(other.partitionsByHost); @@ -189,7 +253,7 @@ public boolean equals(Object o) { @Override public String toString() { - return "[version=" + version + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]"; + return "[version=" + usedVersion + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]"; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index f583dbafc94..7fee90b5402 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -31,42 +31,96 @@ private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class); - private static final int CURRENT_VERSION = 2; + public static final int LATEST_SUPPORTED_VERSION = 2; - public final int version; - public final UUID processId; - public final Set<TaskId> prevTasks; - public final Set<TaskId> standbyTasks; - public final String userEndPoint; + private final int usedVersion; + private UUID processId; + private Set<TaskId> prevTasks; + private Set<TaskId> standbyTasks; + private String userEndPoint; - public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) { - this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint); + private SubscriptionInfo(final int version) { + this.usedVersion = version; } - private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) { - this.version = version; + public SubscriptionInfo(final UUID processId, + final Set<TaskId> prevTasks, + final Set<TaskId> standbyTasks, + final String userEndPoint) { + this(LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, userEndPoint); + } + + public SubscriptionInfo(final int version, + final UUID processId, + final Set<TaskId> prevTasks, + final Set<TaskId> standbyTasks, + final String userEndPoint) { + this.usedVersion = version; this.processId = processId; this.prevTasks = prevTasks; this.standbyTasks = standbyTasks; this.userEndPoint = userEndPoint; } + public int version() { + return usedVersion; + } + + public UUID processId() { + return processId; + } + + public Set<TaskId> prevTasks() { + return prevTasks; + } + + public Set<TaskId> standbyTasks() { + return standbyTasks; + } + + public String userEndPoint() { + return userEndPoint; + } + /** * @throws TaskAssignmentException if method fails to encode the data */ public ByteBuffer encode() { - byte[] endPointBytes; - if (userEndPoint == null) { - endPointBytes = new byte[0]; - } else { - endPointBytes = userEndPoint.getBytes(Charset.forName("UTF-8")); + final ByteBuffer buf; + + switch (usedVersion) { + case 1: + buf = encodeVersionOne(); + break; + case 2: + buf = encodeVersionTwo(prepareUserEndPoint()); + break; + default: + throw new IllegalStateException("Unknown metadata version: " + usedVersion + + "; latest supported version: " + LATEST_SUPPORTED_VERSION); } - ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + - prevTasks.size() * 8 + 4 + standbyTasks.size() * 8 - + 4 /* length of bytes */ + endPointBytes.length - ); - // version - buf.putInt(version); + + buf.rewind(); + return buf; + } + + private ByteBuffer encodeVersionOne() { + final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength()); + + buf.putInt(1); // version + encodeVersionOneData(buf); + + return buf; + } + + private int getVersionOneByteLength() { + return 4 + // version + 16 + // client ID + 4 + prevTasks.size() * 8 + // length + prev tasks + 4 + standbyTasks.size() * 8; // length + standby tasks + } + + private void encodeVersionOneData(final ByteBuffer buf) { // encode client UUID buf.putLong(processId.getMostSignificantBits()); buf.putLong(processId.getLeastSignificantBits()); @@ -80,60 +134,104 @@ public ByteBuffer encode() { for (TaskId id : standbyTasks) { id.writeTo(buf); } - buf.putInt(endPointBytes.length); - buf.put(endPointBytes); - buf.rewind(); + } + + private byte[] prepareUserEndPoint() { + if (userEndPoint == null) { + return new byte[0]; + } else { + return userEndPoint.getBytes(Charset.forName("UTF-8")); + } + } + + private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) { + final ByteBuffer buf = ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes)); + + buf.putInt(2); // version + encodeVersionTwoData(buf, endPointBytes); + return buf; } + private int getVersionTwoByteLength(final byte[] endPointBytes) { + return getVersionOneByteLength() + + 4 + endPointBytes.length; // length + userEndPoint + } + + private void encodeVersionTwoData(final ByteBuffer buf, + final byte[] endPointBytes) { + encodeVersionOneData(buf); + if (endPointBytes != null) { + buf.putInt(endPointBytes.length); + buf.put(endPointBytes); + } + } + /** * @throws TaskAssignmentException if method fails to decode the data */ - public static SubscriptionInfo decode(ByteBuffer data) { + public static SubscriptionInfo decode(final ByteBuffer data) { // ensure we are at the beginning of the ByteBuffer data.rewind(); - // Decode version - int version = data.getInt(); - if (version == CURRENT_VERSION || version == 1) { - // Decode client UUID - UUID processId = new UUID(data.getLong(), data.getLong()); - // Decode previously active tasks - Set<TaskId> prevTasks = new HashSet<>(); - int numPrevs = data.getInt(); - for (int i = 0; i < numPrevs; i++) { - TaskId id = TaskId.readFrom(data); - prevTasks.add(id); - } - // Decode previously cached tasks - Set<TaskId> standbyTasks = new HashSet<>(); - int numCached = data.getInt(); - for (int i = 0; i < numCached; i++) { - standbyTasks.add(TaskId.readFrom(data)); - } - - String userEndPoint = null; - if (version == CURRENT_VERSION) { - int bytesLength = data.getInt(); - if (bytesLength != 0) { - byte[] bytes = new byte[bytesLength]; - data.get(bytes); - userEndPoint = new String(bytes, Charset.forName("UTF-8")); - } - - } - return new SubscriptionInfo(version, processId, prevTasks, standbyTasks, userEndPoint); + // decode used version + final int usedVersion = data.getInt(); + final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(usedVersion); + + switch (usedVersion) { + case 1: + decodeVersionOneData(subscriptionInfo, data); + break; + case 2: + decodeVersionTwoData(subscriptionInfo, data); + break; + default: + TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " + + "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); + log.error(fatalException.getMessage(), fatalException); + throw fatalException; + } + + return subscriptionInfo; + } - } else { - TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version); - log.error(ex.getMessage(), ex); - throw ex; + private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo, + final ByteBuffer data) { + // decode client UUID + subscriptionInfo.processId = new UUID(data.getLong(), data.getLong()); + + // decode previously active tasks + final int numPrevs = data.getInt(); + subscriptionInfo.prevTasks = new HashSet<>(); + for (int i = 0; i < numPrevs; i++) { + TaskId id = TaskId.readFrom(data); + subscriptionInfo.prevTasks.add(id); + } + + // decode previously cached tasks + final int numCached = data.getInt(); + subscriptionInfo.standbyTasks = new HashSet<>(); + for (int i = 0; i < numCached; i++) { + subscriptionInfo.standbyTasks.add(TaskId.readFrom(data)); + } + } + + private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo, + final ByteBuffer data) { + decodeVersionOneData(subscriptionInfo, data); + + // decode user end point (can be null) + int bytesLength = data.getInt(); + if (bytesLength != 0) { + final byte[] bytes = new byte[bytesLength]; + data.get(bytes); + subscriptionInfo.userEndPoint = new String(bytes, Charset.forName("UTF-8")); } } @Override public int hashCode() { - int hashCode = version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); + final int hashCode = usedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); if (userEndPoint == null) { return hashCode; } @@ -141,10 +239,10 @@ public int hashCode() { } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (o instanceof SubscriptionInfo) { - SubscriptionInfo other = (SubscriptionInfo) o; - return this.version == other.version && + final SubscriptionInfo other = (SubscriptionInfo) o; + return this.usedVersion == other.usedVersion && this.processId.equals(other.processId) && this.prevTasks.equals(other.prevTasks) && this.standbyTasks.equals(other.standbyTasks) && diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 8b4e8957ed5..bb06c72d080 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -376,7 +376,6 @@ public boolean conditionMet() { } } - @Test public void queryOnRebalance() throws InterruptedException { final int numThreads = STREAM_TWO_PARTITIONS; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index bf3f1d1ac5e..b0c0d68287b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -239,17 +239,17 @@ public void testAssignBasic() throws Exception { // the first consumer AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); + allActiveTasks.addAll(info10.activeTasks()); // the second consumer AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); - allActiveTasks.addAll(info11.activeTasks); + allActiveTasks.addAll(info11.activeTasks()); assertEquals(Utils.mkSet(task0, task1), allActiveTasks); // the third consumer AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); - allActiveTasks.addAll(info20.activeTasks); + allActiveTasks.addAll(info20.activeTasks()); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, new HashSet<>(allActiveTasks)); @@ -317,13 +317,13 @@ public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() throws E final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); final List<TaskId> expectedInfo10TaskIds = Arrays.asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3); - assertEquals(expectedInfo10TaskIds, info10.activeTasks); + assertEquals(expectedInfo10TaskIds, info10.activeTasks()); // the second consumer final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); final List<TaskId> expectedInfo11TaskIds = Arrays.asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2); - assertEquals(expectedInfo11TaskIds, info11.activeTasks); + assertEquals(expectedInfo11TaskIds, info11.activeTasks()); } @Test @@ -354,7 +354,7 @@ public void testAssignWithPartialTopology() throws Exception { // check assignment info Set<TaskId> allActiveTasks = new HashSet<>(); AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); + allActiveTasks.addAll(info10.activeTasks()); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, new HashSet<>(allActiveTasks)); @@ -394,7 +394,7 @@ public void testAssignEmptyMetadata() throws Exception { // check assignment info Set<TaskId> allActiveTasks = new HashSet<>(); AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); + allActiveTasks.addAll(info10.activeTasks()); assertEquals(0, allActiveTasks.size()); assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks)); @@ -407,7 +407,7 @@ public void testAssignEmptyMetadata() throws Exception { // the first consumer info10 = checkAssignment(allTopics, assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); + allActiveTasks.addAll(info10.activeTasks()); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, new HashSet<>(allActiveTasks)); @@ -455,15 +455,15 @@ public void testAssignWithNewTasks() throws Exception { AssignmentInfo info; info = AssignmentInfo.decode(assignments.get("consumer10").userData()); - allActiveTasks.addAll(info.activeTasks); + allActiveTasks.addAll(info.activeTasks()); allPartitions.addAll(assignments.get("consumer10").partitions()); info = AssignmentInfo.decode(assignments.get("consumer11").userData()); - allActiveTasks.addAll(info.activeTasks); + allActiveTasks.addAll(info.activeTasks()); allPartitions.addAll(assignments.get("consumer11").partitions()); info = AssignmentInfo.decode(assignments.get("consumer20").userData()); - allActiveTasks.addAll(info.activeTasks); + allActiveTasks.addAll(info.activeTasks()); allPartitions.addAll(assignments.get("consumer20").partitions()); assertEquals(allTasks, allActiveTasks); @@ -524,14 +524,14 @@ public void testAssignWithStates() throws Exception { AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); - assertEquals(2, info10.activeTasks.size()); - assertEquals(2, info11.activeTasks.size()); - assertEquals(2, info20.activeTasks.size()); + assertEquals(2, info10.activeTasks().size()); + assertEquals(2, info11.activeTasks().size()); + assertEquals(2, info20.activeTasks().size()); Set<TaskId> allTasks = new HashSet<>(); - allTasks.addAll(info10.activeTasks); - allTasks.addAll(info11.activeTasks); - allTasks.addAll(info20.activeTasks); + allTasks.addAll(info10.activeTasks()); + allTasks.addAll(info11.activeTasks()); + allTasks.addAll(info20.activeTasks()); assertEquals(new HashSet<>(tasks), allTasks); // check tasks for state topics @@ -603,15 +603,15 @@ public void testAssignWithStandbyReplicas() throws Exception { // the first consumer AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); - allStandbyTasks.addAll(info10.standbyTasks.keySet()); + allActiveTasks.addAll(info10.activeTasks()); + allStandbyTasks.addAll(info10.standbyTasks().keySet()); // the second consumer AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); - allActiveTasks.addAll(info11.activeTasks); - allStandbyTasks.addAll(info11.standbyTasks.keySet()); + allActiveTasks.addAll(info11.activeTasks()); + allStandbyTasks.addAll(info11.standbyTasks().keySet()); - assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet()); + assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks().keySet(), info10.standbyTasks().keySet()); // check active tasks assigned to the first client assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks)); @@ -619,8 +619,8 @@ public void testAssignWithStandbyReplicas() throws Exception { // the third consumer AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); - allActiveTasks.addAll(info20.activeTasks); - allStandbyTasks.addAll(info20.standbyTasks.keySet()); + allActiveTasks.addAll(info20.activeTasks()); + allStandbyTasks.addAll(info20.standbyTasks().keySet()); // all task ids are in the active tasks and also in the standby tasks @@ -847,7 +847,7 @@ public void shouldAddUserDefinedEndPointToSubscription() throws Exception { configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) userEndPoint)); final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input")); final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData()); - assertEquals("localhost:8080", subscriptionInfo.userEndPoint); + assertEquals("localhost:8080", subscriptionInfo.userEndPoint()); } @Test @@ -874,7 +874,7 @@ public void shouldMapUserEndPointToTopicPartitions() throws Exception { final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData()); - final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080)); + final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); assertEquals(Utils.mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)), topicPartitions); @@ -1072,8 +1072,8 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Except final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions); final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData()); - final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080)); - final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090)); + final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); + final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090)); final HashSet<TopicPartition> allAssignedPartitions = new HashSet<>(consumer1partitions); allAssignedPartitions.addAll(consumer2Partitions); assertThat(consumer1partitions, not(allPartitions)); @@ -1095,6 +1095,37 @@ public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProvider partitionAssignor.configure(config); } + @Test + public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() throws Exception { + final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); + final Set<TaskId> emptyTasks = Collections.emptySet(); + subscriptions.put( + "consumer1", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + ) + ); + subscriptions.put( + "consumer2", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + ) + ); + + mockTaskManager(Collections.<TaskId>emptySet(), + Collections.<TaskId>emptySet(), + UUID.randomUUID(), + new InternalTopologyBuilder()); + partitionAssignor.configure(configProps()); + final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions); + + assertThat(assignment.size(), equalTo(2)); + assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(1)); + assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(1)); + } + private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) { final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(), Collections.<TaskId, Set<TopicPartition>>emptyMap(), @@ -1111,7 +1142,7 @@ private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssi AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); // check if the number of assigned partitions == the size of active task id list - assertEquals(assignment.partitions().size(), info.activeTasks.size()); + assertEquals(assignment.partitions().size(), info.activeTasks().size()); // check if active tasks are consistent List<TaskId> activeTasks = new ArrayList<>(); @@ -1121,14 +1152,14 @@ private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssi activeTasks.add(new TaskId(0, partition.partition())); activeTopics.add(partition.topic()); } - assertEquals(activeTasks, info.activeTasks); + assertEquals(activeTasks, info.activeTasks()); // check if active partitions cover all topics assertEquals(expectedTopics, activeTopics); // check if standby tasks are consistent Set<String> standbyTopics = new HashSet<>(); - for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) { + for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks().entrySet()) { TaskId id = entry.getKey(); Set<TopicPartition> partitions = entry.getValue(); for (TopicPartition partition : partitions) { @@ -1139,7 +1170,7 @@ private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssi } } - if (info.standbyTasks.size() > 0) { + if (info.standbyTasks().size() > 0) { // check if standby partitions cover all topics assertEquals(expectedTopics, standbyTopics); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java index ec94ad81acd..726a5623cd5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java @@ -33,6 +33,7 @@ import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class AssignmentInfoTest { @@ -61,10 +62,10 @@ public void shouldDecodePreviousVersion() throws IOException { standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0))); final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null); final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion)); - assertEquals(oldVersion.activeTasks, decoded.activeTasks); - assertEquals(oldVersion.standbyTasks, decoded.standbyTasks); - assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1 - assertEquals(2, decoded.version); // automatically upgraded to v2 on decode; + assertEquals(oldVersion.activeTasks(), decoded.activeTasks()); + assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks()); + assertNull(decoded.partitionsByHost()); // should be null as wasn't in V1 + assertEquals(1, decoded.version()); } @@ -76,15 +77,15 @@ private ByteBuffer encodeV1(AssignmentInfo oldVersion) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(baos); // Encode version - out.writeInt(oldVersion.version); + out.writeInt(oldVersion.version()); // Encode active tasks - out.writeInt(oldVersion.activeTasks.size()); - for (TaskId id : oldVersion.activeTasks) { + out.writeInt(oldVersion.activeTasks().size()); + for (TaskId id : oldVersion.activeTasks()) { id.writeTo(out); } // Encode standby tasks - out.writeInt(oldVersion.standbyTasks.size()); - for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks.entrySet()) { + out.writeInt(oldVersion.standbyTasks().size()); + for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks().entrySet()) { TaskId id = entry.getKey(); id.writeTo(out); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java index 9c011bb0cae..633285a2b4d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java @@ -65,14 +65,12 @@ public void shouldBeBackwardCompatible() { final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks); final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding); - assertEquals(activeTasks, decode.prevTasks); - assertEquals(standbyTasks, decode.standbyTasks); - assertEquals(processId, decode.processId); - assertNull(decode.userEndPoint); - + assertEquals(activeTasks, decode.prevTasks()); + assertEquals(standbyTasks, decode.standbyTasks()); + assertEquals(processId, decode.processId()); + assertNull(decode.userEndPoint()); } - /** * This is a clone of what the V1 encoding did. The encode method has changed for V2 * so it is impossible to test compatibility without having this ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > ----------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.1 > Reporter: James Cheng > Assignee: Matthias J. Sax > Priority: Major > Labels: needs-kip > > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, > because the internal version number of the protocol changed when adding > Interactive Queries. Matthias asked me to file this JIRA> -- This message was sent by Atlassian JIRA (v7.6.3#76005)