mjsax commented on a change in pull request #11945: URL: https://github.com/apache/kafka/pull/11945#discussion_r839183920
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java ########## @@ -60,6 +63,42 @@ public void setIfUnset(final SerdeGetter getter) { } } + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + this.upgradeFromV0 = upgradeFromV0(configs); + } + + private static boolean upgradeFromV0(final Map<String, ?> configs) { + final Object upgradeFrom = configs.get(StreamsConfig.UPGRADE_FROM_CONFIG); + if (!(upgradeFrom instanceof String)) { Review comment: As above. ########## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ########## @@ -636,7 +684,10 @@ "Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + UPGRADE_FROM_21 + "\", \"" + - UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\" (for upgrading from the corresponding old version)."; Review comment: > When upgrading from 2.4 to a newer version it is not required to specify this config. Seems this needs an update? ########## File path: docs/streams/upgrade-guide.html ########## @@ -34,9 +34,9 @@ <h1>Upgrade Guide and API Changes</h1> </div> <p> - Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 2.3 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code> - (possible values are <code>"0.10.0" - "2.3"</code>) and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager - rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to + Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 3.3 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code> Review comment: > upgrading from 3.3 Should be `3.2` -- the fix should go into 3.3, right? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java ########## @@ -92,6 +103,32 @@ public void setIfUnset(final SerdeGetter getter) { return buf.array(); } + private byte[] serializeV1(final String topic, final SubscriptionResponseWrapper<V> data) { + final byte[] serializedData = data.getForeignValue() == null ? null : serializer.serialize(topic, data.getForeignValue()); + final int serializedDataLength = serializedData == null ? 0 : serializedData.length; + final long[] originalHash = data.getOriginalValueHash(); + final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES; + final int primaryPartitionLength = Integer.BYTES; + final int dataLength = 1 + hashLength + serializedDataLength + primaryPartitionLength; + + final ByteBuffer buf = ByteBuffer.allocate(dataLength); + + if (originalHash != null) { + buf.put(data.getVersion()); + } else { + buf.put((byte) (data.getVersion() | (byte) 0x80)); + } + buf.putInt(data.getPrimaryPartition()); Review comment: > This means that we can't just put data.getVersion() into the buffer, but have to hardcode it to 0 Yes, but this seems ok? Ie, we would have a `serializeV0Internal(final short version)` that is called as (something like): ``` public byte[] serializeV0() { return serializeV0Internal(0).getBytes(); } public byte[] serializeV1() { final ByteStream out = serializeV0Internal(0); // add v1 stuff to `out` return out.getBytes(); } ``` > Making this logic generic enough to handle v0, upgrade-from, and v1 will be cumbersome. Why? I understand that putting variable length at the end could be simpler. I guess I am just used to how it's usually done if Kafka. And personally, if I read code it's easier to have a mental model of the different versions in the bytes, if older versions are always a prefix of newer versions? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java ########## @@ -62,6 +71,42 @@ public void setIfUnset(final SerdeGetter getter) { } } + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + this.upgradeFromV0 = upgradeFromV0(configs); + } + + private static boolean upgradeFromV0(final Map<String, ?> configs) { + final Object upgradeFrom = configs.get(StreamsConfig.UPGRADE_FROM_CONFIG); + if (!(upgradeFrom instanceof String)) { Review comment: We don't need this check. `StreamsConfig` ensures that it will be a `String`. Cf `StreamConfig`: ``` .define(UPGRADE_FROM_CONFIG, ConfigDef.Type.STRING, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org