Copilot commented on code in PR #7468:
URL: https://github.com/apache/ignite-3/pull/7468#discussion_r2720972193


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java:
##########
@@ -17,41 +17,58 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import static org.apache.ignite.internal.sql.engine.util.Commons.checkRange;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.sql.engine.util.Commons;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * This class represents the volatile state that may be propagated from parent 
to its children
  * during rewind.
  */
-public class SharedState implements Serializable {
-    private static final long serialVersionUID = 42L;
+public class SharedState {
+    private final Long2ObjectMap<Object> correlations;
 
-    private Object[] correlations = new Object[16];
+    public SharedState() {
+        this(new Long2ObjectOpenHashMap<>());
+    }
+
+    SharedState(Long2ObjectMap<Object> correlations) {
+        this.correlations = correlations;
+    }
 
     /**
      * Gets correlated value.
      *
-     * @param id Correlation ID.
+     * @param corrId Correlation ID.
      * @return Correlated value.
      */
-    public Object correlatedVariable(int id) {
-        checkRange(correlations, id);
+    public @Nullable Object correlatedVariable(int corrId, int fieldIndex) {
+        long key = packToLong(corrId, fieldIndex);
 
-        return correlations[id];
+        return correlations.get(key);
     }
 
     /**
      * Sets correlated value.
      *
-     * @param id Correlation ID.
+     * @param corrId Correlation ID.
+     * @param fieldIndex Field index.
      * @param value Correlated value.
      */
-    public void correlatedVariable(int id, Object value) {
-        correlations = Commons.ensureCapacity(correlations, id + 1);
+    public void correlatedVariable(int corrId, int fieldIndex, @Nullable 
Object value) {
+        long key = packToLong(corrId, fieldIndex);
+
+        correlations.put(key, value);
+    }
+
+    Long2ObjectMap<Object> correlations() {
+        return Long2ObjectMaps.unmodifiable(correlations);
+    }
+
+    private static long packToLong(int corrId, int fieldIdx) {
+        assert fieldIdx >= 0 : "fieldIdx=" + fieldIdx;
 
-        correlations[id] = value;
+        return (((corrId & 0xFFFF_FFFFL) << 32 | fieldIdx));

Review Comment:
   The bitmask in the packToLong method appears to have an issue. The mask 
`0xFFFF_FFFFL` is applied to `corrId` before shifting left by 32 bits. This 
mask only preserves the lower 32 bits of the int value, but since `corrId` is 
already an int (32 bits), the mask is redundant. However, the more significant 
issue is that the bitwise OR operation (`|`) is missing proper parentheses 
around the shifted value. The current expression `(((corrId & 0xFFFF_FFFFL) << 
32 | fieldIdx))` is equivalent to `(corrId & 0xFFFF_FFFFL) << 32 | fieldIdx` 
due to operator precedence. While this works correctly, for clarity and to 
avoid confusion, consider either removing the unnecessary mask or making the 
expression clearer: `((long) corrId << 32) | (fieldIdx & 0xFFFF_FFFFL)`.
   ```suggestion
           return ((long) corrId << 32) | (fieldIdx & 0xFFFF_FFFFL);
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.calcite.avatica.util.ByteString;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
+import org.apache.ignite.internal.sql.engine.message.field.SingleFieldMessage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Contract;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converter between {@link SharedState} and {@link SharedStateMessage}.
+ */
+public class SharedStateMessageConverter {
+    /** Message factory. */
+    private static final SqlQueryMessagesFactory MESSAGE_FACTORY = new 
SqlQueryMessagesFactory();
+
+    @Contract("null -> null; !null -> !null")
+    static @Nullable SharedStateMessage toMessage(@Nullable SharedState state) 
{
+        if (state == null) {
+            return null;
+        }
+
+        Long2ObjectMap<Object> correlations = state.correlations();
+        Map<Long, NetworkMessage> result = 
IgniteUtils.newHashMap(correlations.size());
+
+        for (Long2ObjectMap.Entry<Object> entry : 
correlations.long2ObjectEntrySet()) {
+            SingleFieldMessage<?> msg = toSingleFieldMessage(entry.getValue());
+
+            result.put(entry.getLongKey(), msg);
+        }
+
+        return MESSAGE_FACTORY.sharedStateMessage()
+                .sharedState(result)
+                .build();
+    }
+
+    @Contract("null -> null; !null -> !null")
+    static @Nullable SharedState fromMessage(@Nullable SharedStateMessage 
sharedStateMessage) {
+        if (sharedStateMessage == null) {
+            return null;
+        }
+
+        int size = sharedStateMessage.sharedState().size();
+        Long2ObjectMap<Object> correlations = new 
Long2ObjectOpenHashMap<>(size);
+
+        for (Map.Entry<Long, NetworkMessage> e : 
sharedStateMessage.sharedState().entrySet()) {
+            NetworkMessage networkMessage = e.getValue();
+
+            if (!(networkMessage instanceof SingleFieldMessage)) {
+                throw new IllegalArgumentException("Unexpected message type "
+                        + "[type=" + networkMessage.messageType() + ", class=" 
+ networkMessage.getClass());

Review Comment:
   The error message is missing a closing bracket. It should end with `"]"` to 
match the opening bracket in the format string. The current message produces 
output like `"Unexpected message type [type=..., class=..."` instead of 
`"Unexpected message type [type=..., class=...]"`.
   ```suggestion
                           + "[type=" + networkMessage.messageType() + ", 
class=" + networkMessage.getClass() + ']');
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java:
##########
@@ -17,41 +17,58 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import static org.apache.ignite.internal.sql.engine.util.Commons.checkRange;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.sql.engine.util.Commons;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * This class represents the volatile state that may be propagated from parent 
to its children
  * during rewind.
  */
-public class SharedState implements Serializable {
-    private static final long serialVersionUID = 42L;
+public class SharedState {
+    private final Long2ObjectMap<Object> correlations;
 
-    private Object[] correlations = new Object[16];
+    public SharedState() {
+        this(new Long2ObjectOpenHashMap<>());
+    }
+
+    SharedState(Long2ObjectMap<Object> correlations) {
+        this.correlations = correlations;
+    }
 
     /**
      * Gets correlated value.
      *
-     * @param id Correlation ID.
+     * @param corrId Correlation ID.
      * @return Correlated value.
      */
-    public Object correlatedVariable(int id) {
-        checkRange(correlations, id);
+    public @Nullable Object correlatedVariable(int corrId, int fieldIndex) {
+        long key = packToLong(corrId, fieldIndex);
 
-        return correlations[id];
+        return correlations.get(key);
     }

Review Comment:
   The Javadoc is missing the `@param fieldIndex` documentation for the 
parameter that was added to this method. The return description should also be 
updated to clarify that it returns a specific field value from a correlated 
variable, not just a "correlated value".



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.calcite.avatica.util.ByteString;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
+import org.apache.ignite.internal.sql.engine.message.field.SingleFieldMessage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Contract;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converter between {@link SharedState} and {@link SharedStateMessage}.
+ */
+public class SharedStateMessageConverter {
+    /** Message factory. */
+    private static final SqlQueryMessagesFactory MESSAGE_FACTORY = new 
SqlQueryMessagesFactory();
+
+    @Contract("null -> null; !null -> !null")
+    static @Nullable SharedStateMessage toMessage(@Nullable SharedState state) 
{
+        if (state == null) {
+            return null;
+        }
+
+        Long2ObjectMap<Object> correlations = state.correlations();
+        Map<Long, NetworkMessage> result = 
IgniteUtils.newHashMap(correlations.size());
+
+        for (Long2ObjectMap.Entry<Object> entry : 
correlations.long2ObjectEntrySet()) {
+            SingleFieldMessage<?> msg = toSingleFieldMessage(entry.getValue());
+
+            result.put(entry.getLongKey(), msg);
+        }
+
+        return MESSAGE_FACTORY.sharedStateMessage()
+                .sharedState(result)
+                .build();
+    }
+
+    @Contract("null -> null; !null -> !null")
+    static @Nullable SharedState fromMessage(@Nullable SharedStateMessage 
sharedStateMessage) {
+        if (sharedStateMessage == null) {
+            return null;
+        }
+
+        int size = sharedStateMessage.sharedState().size();
+        Long2ObjectMap<Object> correlations = new 
Long2ObjectOpenHashMap<>(size);
+
+        for (Map.Entry<Long, NetworkMessage> e : 
sharedStateMessage.sharedState().entrySet()) {
+            NetworkMessage networkMessage = e.getValue();
+
+            if (!(networkMessage instanceof SingleFieldMessage)) {
+                throw new IllegalArgumentException("Unexpected message type "
+                        + "[type=" + networkMessage.messageType() + ", class=" 
+ networkMessage.getClass());
+            }
+
+            SingleFieldMessage<Object> msg = ((SingleFieldMessage<Object>) 
e.getValue());

Review Comment:
   The unchecked cast at line 82 could be avoided by using the already checked 
`networkMessage` variable instead of calling `e.getValue()` again. This would 
make the code clearer and avoid the unnecessary duplicate lookup. Change line 
82 to: `SingleFieldMessage<Object> msg = ((SingleFieldMessage<Object>) 
networkMessage);`
   ```suggestion
               SingleFieldMessage<Object> msg = (SingleFieldMessage<Object>) 
networkMessage;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to