Copilot commented on code in PR #7468: URL: https://github.com/apache/ignite-3/pull/7468#discussion_r2721095558
########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec; + +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Map; +import java.util.UUID; +import org.apache.calcite.avatica.util.ByteString; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.sql.engine.message.SharedStateMessage; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory; +import org.apache.ignite.internal.sql.engine.message.field.SingleValueMessage; +import org.apache.ignite.internal.util.IgniteUtils; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.Nullable; + +/** + * Converter between {@link SharedState} and {@link SharedStateMessage}. + */ +public class SharedStateMessageConverter { + /** Message factory. */ + private static final SqlQueryMessagesFactory MESSAGE_FACTORY = new SqlQueryMessagesFactory(); + + @Contract("null -> null; !null -> !null") + static @Nullable SharedStateMessage toMessage(@Nullable SharedState state) { + if (state == null) { + return null; + } + + Long2ObjectMap<Object> correlations = state.correlations(); + Map<Long, NetworkMessage> result = IgniteUtils.newHashMap(correlations.size()); + + for (Long2ObjectMap.Entry<Object> entry : correlations.long2ObjectEntrySet()) { + SingleValueMessage<?> msg = toSingleFieldMessage(entry.getValue()); + + result.put(entry.getLongKey(), msg); + } + + return MESSAGE_FACTORY.sharedStateMessage() + .sharedState(result) + .build(); + } + + @Contract("null -> null; !null -> !null") + static @Nullable SharedState fromMessage(@Nullable SharedStateMessage sharedStateMessage) { + if (sharedStateMessage == null) { + return null; + } + + int size = sharedStateMessage.sharedState().size(); + Long2ObjectMap<Object> correlations = new Long2ObjectOpenHashMap<>(size); + + for (Map.Entry<Long, NetworkMessage> e : sharedStateMessage.sharedState().entrySet()) { + NetworkMessage networkMessage = e.getValue(); + + if (!(networkMessage instanceof SingleValueMessage)) { + throw new IllegalArgumentException("Unexpected message type " + + "[type=" + networkMessage.messageType() + ", class=" + networkMessage.getClass() + ']'); + } + + SingleValueMessage<Object> singleFieldMessage = ((SingleValueMessage<Object>) networkMessage); + + correlations.put(e.getKey().longValue(), extractFieldValue(singleFieldMessage)); + } + + return new SharedState(correlations); + } + + private static @Nullable Object extractFieldValue(SingleValueMessage<Object> msg) { + Object value = msg.field(); + + if (value == null) { + return null; + } + + switch (msg.messageType()) { + case SqlQueryMessageGroup.BYTE_ARRAY_FIELD_MESSAGE: + return new ByteString((byte[]) value); + + case SqlQueryMessageGroup.DECIMAL_FIELD_MESSAGE: + return decimalFromBytes((byte[]) value); + + default: + return value; + } + } + + private static BigDecimal decimalFromBytes(byte[] value) { + ByteBuffer buffer = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN); + + short valScale = buffer.getShort(); + + BigInteger integer = new BigInteger(value, Short.BYTES, value.length - Short.BYTES); + + return new BigDecimal(integer, valScale); + } + + private static byte[] decimalToBytes(BigDecimal value) { + if (value.scale() > Short.MAX_VALUE || value.scale() < Short.MIN_VALUE) { + throw new UnsupportedOperationException("Decimal scale is out of range: " + value.scale()); + } + + byte[] unscaledBytes = value.unscaledValue().toByteArray(); + + ByteBuffer buffer = ByteBuffer.allocate(Short.BYTES + unscaledBytes.length) + .order(ByteOrder.LITTLE_ENDIAN); + + buffer.putShort((short) value.scale()); + buffer.put(unscaledBytes); + + return buffer.array(); + } + + private static SingleValueMessage<?> toSingleFieldMessage(Object value) { + if (value == null) { + return MESSAGE_FACTORY.nullValueMessage().build(); + } + + if (value instanceof Boolean) { + return MESSAGE_FACTORY.booleanValueMessage().field((Boolean) value).build(); + } + if (value instanceof Byte) { + return MESSAGE_FACTORY.byteValueMessage().field((Byte) value).build(); + } + if (value instanceof Short) { + return MESSAGE_FACTORY.shortValueMessage().field((Short) value).build(); + } + if (value instanceof Integer) { + return MESSAGE_FACTORY.intValueMessage().field((Integer) value).build(); + } + if (value instanceof Long) { + return MESSAGE_FACTORY.longValueMessage().field((Long) value).build(); + } + if (value instanceof Float) { + return MESSAGE_FACTORY.floatValueMessage().field((Float) value).build(); + } + if (value instanceof Double) { + return MESSAGE_FACTORY.doublValueMessage().field((Double) value).build(); Review Comment: The factory method name is misspelled. It should be `doubleValueMessage()` instead of `doublValueMessage()` (missing an 'e'). ```suggestion return MESSAGE_FACTORY.doubleValueMessage().field((Double) value).build(); ``` ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java: ########## @@ -20,6 +20,18 @@ import static org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup.GROUP_TYPE; import org.apache.ignite.internal.network.annotations.MessageGroup; +import org.apache.ignite.internal.sql.engine.message.field.BooleanValueMessage; +import org.apache.ignite.internal.sql.engine.message.field.ByteArrayValueMessage; +import org.apache.ignite.internal.sql.engine.message.field.ByteValueMessage; +import org.apache.ignite.internal.sql.engine.message.field.DecimalValueMessage; +import org.apache.ignite.internal.sql.engine.message.field.DoublValueMessage; Review Comment: The class name is misspelled. It should be `DoubleValueMessage` instead of `DoublValueMessage` (missing an 'e'). ########## modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverterTest.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.Period; +import java.util.EnumSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.schema.SchemaTestUtils; +import org.apache.ignite.internal.sql.engine.message.SharedStateMessage; +import org.apache.ignite.internal.sql.engine.util.TypeUtils; +import org.apache.ignite.internal.type.NativeType; +import org.apache.ignite.sql.ColumnType; +import org.hamcrest.Matchers; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Tests for class {@link SharedStateMessageConverter}. + */ +public class SharedStateMessageConverterTest { + private Random rnd; + + /** + * Initialization. + */ + @BeforeEach + public void initRandom() { + long seed = System.currentTimeMillis(); + + Loggers.forClass(SharedStateMessageConverterTest.class).info("Using seed: " + seed + "L; //"); + + rnd = new Random(seed); + } + + @ParameterizedTest + @MethodSource("allTypes") + void singleValue(NativeType type) { + Object value = SchemaTestUtils.generateRandomValue(rnd, type); + + SharedState state = new SharedState(); + state.correlatedVariable(1, 1, TypeUtils.toInternal(value, type.spec())); + + SharedState converted = doConversions(state); + + assertThat(converted.correlations(), equalTo(state.correlations())); + } + + @Test + void multipleValuesWithAllTypes() { + SharedState state = new SharedState(); + + Set<ColumnType> simpleColumnTypes = EnumSet.complementOf(EnumSet.of(ColumnType.STRUCT, ColumnType.NULL)); + + int fieldIdx = 0; + + for (ColumnType typeSpec : simpleColumnTypes) { + Object value; + + if (typeSpec == ColumnType.PERIOD) { + value = Period.between(LocalDate.of(1, 1, 1), LocalDate.now()); + } else if (typeSpec == ColumnType.DURATION) { + value = Duration.ofMillis(Instant.now().toEpochMilli()); + } else { + value = SchemaTestUtils.generateRandomValue(rnd, SchemaTestUtils.specToType(typeSpec)); + } + + state.correlatedVariable(typeSpec.ordinal(), fieldIdx++, TypeUtils.toInternal(value, typeSpec)); + } + + SharedState converted = doConversions(state); + + assertThat(converted.correlations(), equalTo(state.correlations())); + } + + @Test + void nullValue() { + SharedState state = new SharedState(); + state.correlatedVariable(1, 1, null); + + SharedState converted = doConversions(state); + + assertThat(converted.correlations(), equalTo(state.correlations())); + } + + @Test + void nullState() { + assertThat(doConversions(null), is(Matchers.nullValue())); + } Review Comment: Consider adding a test case for an empty SharedState (non-null state with no correlations) to ensure the converter handles this scenario correctly. This would complement the existing nullState test. ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DoublValueMessage.java: ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.message.field; + +import org.apache.ignite.internal.network.annotations.Transferable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; + +/** + * A message that contains a single {@link Double} field. + */ +@Transferable(SqlQueryMessageGroup.DOUBLE_FIELD_MESSAGE) +public interface DoublValueMessage extends SingleValueMessage<Double> { Review Comment: The class name is misspelled. It should be `DoubleValueMessage` instead of `DoublValueMessage` (missing an 'e'). This spelling error affects the interface name, its usage in imports, and the factory method name. ```suggestion public interface DoubleValueMessage extends SingleValueMessage<Double> { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
