ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r501322300
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -483,7 +485,7 @@ public void testEagerSubscription() {
Collections.sort(subscription.topics());
assertEquals(asList("topic1", "topic2"), subscription.topics());
- final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);
Review comment:
Can you add a test that verifies that it goes back and forth between the
two expected values when you call `partitionAssignor.subscriptionUserData`
multiples times (let's say 3) -- also maybe add a verification that the
`uniqueField` has a length of just 1
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -234,15 +236,20 @@ public ByteBuffer subscriptionUserData(final Set<String>
topics) {
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Map from task id to its overall lag
+ // 3. Unique Field to ensure a rebalance when a thread rejoins by
forcing the user data to be different
handleRebalanceStart(topics);
+ if (usedSubscriptionMetadataVersion >= 8) {
Review comment:
Instead of hardcoded `8` all over, let's just define a constant for this
similar to the `MIN_VERSION_OFFSET_SUM_SUBSCRIPTION ` in SubscriptionInfo
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -234,15 +236,20 @@ public ByteBuffer subscriptionUserData(final Set<String>
topics) {
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Map from task id to its overall lag
+ // 3. Unique Field to ensure a rebalance when a thread rejoins by
forcing the user data to be different
handleRebalanceStart(topics);
+ if (usedSubscriptionMetadataVersion >= 8) {
Review comment:
If you want, I think it's also fine to just always flip the byte and not
even check against the used subscription version.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##########
@@ -286,23 +296,23 @@ public void
shouldEncodeAndDecodeSmallerLatestSupportedVersion() {
final int latestSupportedVersion = LATEST_SUPPORTED_VERSION - 1;
final SubscriptionInfo info =
- new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1,
"localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1,
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
final SubscriptionInfo expectedInfo =
- new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1,
"localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1,
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
}
@Test
public void shouldEncodeAndDecodeVersion7() {
Review comment:
Can you add a test like this for the new version 8?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -212,6 +213,7 @@ public void configure(final Map<String, ?> configs) {
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
taskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
+ uniqueField = usedSubscriptionMetadataVersion >= 8 ? new byte[1] : new
byte[0];
Review comment:
Also, now that I think about it, the `usedSubscriptionMetadataVersion`
will only ever be >= 8 at this point. It might be set to something lower at
some point later on, but it has to be at least 8 when the assignor is just
being created/configured
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##########
@@ -313,7 +323,8 @@ public void
shouldReturnTaskOffsetSumsMapForDecodedSubscription() {
new SubscriptionInfo(MIN_VERSION_OFFSET_SUM_SUBSCRIPTION,
LATEST_SUPPORTED_VERSION, UUID_1,
"localhost:80",
- TASK_OFFSET_SUMS)
+ TASK_OFFSET_SUMS,
+ IGNORED_UNIQUE_FIELD)
.encode());
assertThat(info.taskOffsetSums(), is(TASK_OFFSET_SUMS));
}
Review comment:
I think we should also add a test to make sure that if you pass in a
`uniqueField` to the SubscriptionInfo but the `usedVersion` is less than 8,
that it does not actually encode this field.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -212,6 +213,7 @@ public void configure(final Map<String, ?> configs) {
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
taskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
+ uniqueField = usedSubscriptionMetadataVersion >= 8 ? new byte[1] : new
byte[0];
Review comment:
Sounds good. I think it's also fine to just initialize it to `new
byte[1]` regardless of whether the version is high enough to actually need it
or not. It's just a single byte, and simpler is better
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]