spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610890414



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if 
the key is
+ * part of the left join (true) or right join (false). This class is only 
useful when a state
+ * store needs to be shared between left and right processors, and each 
processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide<K> {
+    private final K key;
+    private final boolean thisJoin;

Review comment:
       I changed to leftJoin. But I seems you suggested adding two bool 
variables, one for left and another for rigth?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if 
the key is
+ * part of the left join (true) or right join (false). This class is only 
useful when a state
+ * store needs to be shared between left and right processors, and each 
processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide<K> {
+    private final K key;
+    private final boolean thisJoin;
+
+    private KeyAndJoinSide(final boolean thisJoin, final K key) {
+        this.key = Objects.requireNonNull(key, "key is null");
+        this.thisJoin = thisJoin;
+    }
+
+    /**
+     * Create a new {@link KeyAndJoinSide} instance if the provide {@code key} 
is not {@code null}.
+     *
+     * @param thisJoin True if the key is part of the left topic (reference as 
thisJoin in {@code KStreamImplJoin})
+     * @param key      the key
+     * @param <K>      the type of the key
+     * @return a new {@link KeyAndJoinSide} instance if the provide {@code 
key} is not {@code null}
+     */
+    public static <K> KeyAndJoinSide<K> make(final boolean thisJoin, final K 
key) {
+        return new KeyAndJoinSide<>(thisJoin, key);
+    }
+
+    public boolean isThisJoin() {
+        return thisJoin;
+    }
+
+    public K getKey() {
+        return key;
+    }
+
+    @Override
+    public String toString() {
+        return "<" + thisJoin + "," + key + ">";

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class KeyAndJoinSideDeserializer<K> implements 
Deserializer<KeyAndJoinSide<K>> {
+    private final Deserializer<K> keyDeserializer;
+
+    KeyAndJoinSideDeserializer(final Deserializer<K> keyDeserializer) {
+        this.keyDeserializer = Objects.requireNonNull(keyDeserializer, 
"keyDeserializer is null");
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        keyDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public KeyAndJoinSide<K> deserialize(final String topic, final byte[] 
data) {
+        final boolean bool = data[0] == 1 ? true : false;
+        final K key = keyDeserializer.deserialize(topic, rawKey(data));
+
+        return KeyAndJoinSide.make(bool, key);
+    }
+
+    static byte[] rawKey(final byte[] data) {
+        final int rawValueLength = data.length - 1;
+
+        return ByteBuffer

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class KeyAndJoinSideDeserializer<K> implements 
Deserializer<KeyAndJoinSide<K>> {
+    private final Deserializer<K> keyDeserializer;
+
+    KeyAndJoinSideDeserializer(final Deserializer<K> keyDeserializer) {
+        this.keyDeserializer = Objects.requireNonNull(keyDeserializer, 
"keyDeserializer is null");
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        keyDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public KeyAndJoinSide<K> deserialize(final String topic, final byte[] 
data) {
+        final boolean bool = data[0] == 1 ? true : false;
+        final K key = keyDeserializer.deserialize(topic, rawKey(data));
+
+        return KeyAndJoinSide.make(bool, key);
+    }
+
+    static byte[] rawKey(final byte[] data) {
+        final int rawValueLength = data.length - 1;

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValue.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link 
KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in 
the left side of a
+ * join or on the right side. This {@link ValueOrOtherValue} object contains 
either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right 
topic.
+ */
+public class ValueOrOtherValue<V1, V2> {

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValue.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link 
KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in 
the left side of a
+ * join or on the right side. This {@link ValueOrOtherValue} object contains 
either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right 
topic.
+ */
+public class ValueOrOtherValue<V1, V2> {
+    private final V1 thisValue;
+    private final V2 otherValue;
+
+    private ValueOrOtherValue(final V1 thisValue, final V2 otherValue) {
+        this.thisValue = thisValue;
+        this.otherValue = otherValue;
+    }
+
+    /**
+     * Create a new {@link ValueOrOtherValue} instance with the V1 value as 
{@code thisValue} and
+     * V2 value as null.
+     *
+     * @param thisValue the V1 value
+     * @param <V1>      the type of the value
+     * @return a new {@link ValueOrOtherValue} instance
+     */
+    public static <V1, V2> ValueOrOtherValue<V1, V2> makeValue(final V1 
thisValue) {
+        return new ValueOrOtherValue<>(thisValue, null);
+    }
+
+    /**
+     * Create a new {@link ValueOrOtherValue} instance with the V2 value as 
{@code otherValue} and
+     * V1 value as null.
+     *
+     * @param otherValue the V2 value
+     * @param <V2>       the type of the value
+     * @return a new {@link ValueOrOtherValue} instance
+     */
+    public static <V1, V2> ValueOrOtherValue<V1, V2> makeOtherValue(final V2 
otherValue) {
+        return new ValueOrOtherValue<>(null, otherValue);
+    }
+
+    public V1 getThisValue() {
+        return thisValue;
+    }
+
+    public V2 getOtherValue() {
+        return otherValue;
+    }
+
+    @Override
+    public String toString() {
+        return "<" + thisValue + "," + otherValue + ">";

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValueDeserializer.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class ValueOrOtherValueDeserializer<V1, V2> implements 
Deserializer<ValueOrOtherValue<V1, V2>> {
+    public final Deserializer<V1> thisDeserializer;
+    public final Deserializer<V2> otherDeserializer;
+
+    public ValueOrOtherValueDeserializer(final Deserializer<V1> 
thisDeserializer, final Deserializer<V2> otherDeserializer) {
+        this.thisDeserializer = Objects.requireNonNull(thisDeserializer);
+        this.otherDeserializer = Objects.requireNonNull(otherDeserializer);
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs,
+                          final boolean isKey) {
+        thisDeserializer.configure(configs, isKey);
+        otherDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueOrOtherValue<V1, V2> deserialize(final String topic, final 
byte[] joinedValues) {
+        if (joinedValues == null || joinedValues.length == 0) {
+            return null;
+        }
+
+        final boolean thisJoin = joinedValues[0] == 1 ? true : false;
+        return thisJoin
+            ? ValueOrOtherValue.makeValue(thisDeserializer.deserialize(topic, 
rawValue(joinedValues)))
+            : 
ValueOrOtherValue.makeOtherValue(otherDeserializer.deserialize(topic, 
rawValue(joinedValues)));
+    }
+
+    static byte[] rawValue(final byte[] joinedValues) {
+        final int rawValueLength = joinedValues.length - 1;
+
+        return ByteBuffer

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValue.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link 
KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in 
the left side of a
+ * join or on the right side. This {@link ValueOrOtherValue} object contains 
either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right 
topic.
+ */
+public class ValueOrOtherValue<V1, V2> {
+    private final V1 thisValue;
+    private final V2 otherValue;
+
+    private ValueOrOtherValue(final V1 thisValue, final V2 otherValue) {
+        this.thisValue = thisValue;

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValueSerializer.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Serializes a {@link ValueOrOtherValue}. The serialized bytes starts with a 
byte that references
+ * to whether the value is V1 or V2.
+ */
+public class ValueOrOtherValueSerializer<V1, V2> implements 
Serializer<ValueOrOtherValue<V1, V2>> {
+    private final Serializer<V1> thisSerializer;
+    private final Serializer<V2> otherSerializer;
+
+    public ValueOrOtherValueSerializer(final Serializer<V1> thisSerializer, 
final Serializer<V2> otherSerializer) {
+        this.thisSerializer = Objects.requireNonNull(thisSerializer);
+        this.otherSerializer = Objects.requireNonNull(otherSerializer);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueOrOtherValue<V1, 
V2> data) {
+        if (data == null) {
+            return null;
+        }
+
+        final byte[] rawThisValue = (data.getThisValue() != null) ? 
thisSerializer.serialize(topic, data.getThisValue()) : null;

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +132,40 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, 
ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+            final String outerJoinStoreGeneratedName = 
builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to