mjsax commented on a change in pull request #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r839183920



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -60,6 +63,42 @@ public void setIfUnset(final SerdeGetter getter) {
             }
         }
 
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean 
isKey) {
+            this.upgradeFromV0 = upgradeFromV0(configs);
+        }
+
+        private static boolean upgradeFromV0(final Map<String, ?> configs) {
+            final Object upgradeFrom = 
configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+            if (!(upgradeFrom instanceof String)) {

Review comment:
       As above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -636,7 +684,10 @@
         "Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + 
UPGRADE_FROM_0101 + "\", \"" +
         UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + 
UPGRADE_FROM_10 + "\", \"" +
         UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + 
UPGRADE_FROM_21 + "\", \"" +
-        UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\" (for upgrading from 
the corresponding old version).";

Review comment:
       > When upgrading from 2.4 to a newer version it is not required to 
specify this config.
   
   Seems this needs an update?

##########
File path: docs/streams/upgrade-guide.html
##########
@@ -34,9 +34,9 @@ <h1>Upgrade Guide and API Changes</h1>
     </div>
 
     <p>
-        Upgrading from any older version to {{fullDotVersion}} is possible: if 
upgrading from 2.3 or below, you will need to do two rolling bounces, where 
during the first rolling bounce phase you set the config 
<code>upgrade.from="older version"</code>
-        (possible values are <code>"0.10.0" - "2.3"</code>) and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
-        rebalancing protocol if you skip or delay the second rolling bounce, 
but you can safely switch over to cooperative at any time once the entire group 
is on 2.4+ by removing the config value and bouncing. For more details please 
refer to
+        Upgrading from any older version to {{fullDotVersion}} is possible: if 
upgrading from 3.3 or below, you will need to do two rolling bounces, where 
during the first rolling bounce phase you set the config 
<code>upgrade.from="older version"</code>

Review comment:
       > upgrading from 3.3
   
   Should be `3.2` -- the fix should go into 3.3, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -92,6 +103,32 @@ public void setIfUnset(final SerdeGetter getter) {
             return buf.array();
         }
 
+        private byte[] serializeV1(final String topic, final 
SubscriptionResponseWrapper<V> data) {
+            final byte[] serializedData = data.getForeignValue() == null ? 
null : serializer.serialize(topic, data.getForeignValue());
+            final int serializedDataLength = serializedData == null ? 0 : 
serializedData.length;
+            final long[] originalHash = data.getOriginalValueHash();
+            final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
+            final int primaryPartitionLength = Integer.BYTES;
+            final int dataLength = 1 + hashLength + serializedDataLength + 
primaryPartitionLength;
+
+            final ByteBuffer buf = ByteBuffer.allocate(dataLength);
+
+            if (originalHash != null) {
+                buf.put(data.getVersion());
+            } else {
+                buf.put((byte) (data.getVersion() | (byte) 0x80));
+            }
+            buf.putInt(data.getPrimaryPartition());

Review comment:
       >  This means that we can't just put data.getVersion() into the buffer, 
but have to hardcode it to 0
   
   Yes, but this seems ok? Ie, we would have a `serializeV0Internal(final short 
version)` that is called as (something like):
   ```
   public byte[] serializeV0() {
     return serializeV0Internal(0).getBytes();
   }
   
   public byte[] serializeV1() {
     final ByteStream out = serializeV0Internal(0);
     
     // add v1 stuff to `out`
     
     return out.getBytes();
   }
   ```
   
   > Making this logic generic enough to handle v0, upgrade-from, and v1 will 
be cumbersome.
   
   Why?
   
   I understand that putting variable length at the end could be simpler. I 
guess I am just used to how it's usually done if Kafka. And personally, if I 
read code it's easier to have a mental model of the different versions in the 
bytes, if older versions are always a prefix of newer versions?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -62,6 +71,42 @@ public void setIfUnset(final SerdeGetter getter) {
             }
         }
 
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean 
isKey) {
+            this.upgradeFromV0 = upgradeFromV0(configs);
+        }
+
+        private static boolean upgradeFromV0(final Map<String, ?> configs) {
+            final Object upgradeFrom = 
configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+            if (!(upgradeFrom instanceof String)) {

Review comment:
       We don't need this check. `StreamsConfig` ensures that it will be a 
`String`. Cf `StreamConfig`:
   ```
   .define(UPGRADE_FROM_CONFIG,
           ConfigDef.Type.STRING,
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to