korlov42 commented on code in PR #7471:
URL: https://github.com/apache/ignite-3/pull/7471#discussion_r2736454619
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java:
##########
@@ -17,41 +17,57 @@
package org.apache.ignite.internal.sql.engine.exec;
-import static org.apache.ignite.internal.sql.engine.util.Commons.checkRange;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.sql.engine.util.Commons;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.jetbrains.annotations.Nullable;
/**
* This class represents the volatile state that may be propagated from parent
to its children
* during rewind.
*/
-public class SharedState implements Serializable {
- private static final long serialVersionUID = 42L;
+public class SharedState {
+ private final Long2ObjectMap<Object> correlations;
+
+ public SharedState() {
+ this(new Long2ObjectOpenHashMap<>());
+ }
- private Object[] correlations = new Object[16];
+ SharedState(Long2ObjectMap<Object> correlations) {
+ this.correlations = correlations;
+ }
/**
* Gets correlated value.
*
- * @param id Correlation ID.
+ * @param corrId Correlation ID.
+ * @param fieldIndex Field index.
* @return Correlated value.
*/
- public Object correlatedVariable(int id) {
- checkRange(correlations, id);
+ public @Nullable Object correlatedVariable(int corrId, int fieldIndex) {
+ long key = packToLong(corrId, fieldIndex);
- return correlations[id];
+ return correlations.get(key);
Review Comment:
I think, attempt to read non-existing correlate should result in exception,
otherwise some subtle bugs may sneak in, and this will be hard to spot and
debug
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java:
##########
@@ -17,41 +17,57 @@
package org.apache.ignite.internal.sql.engine.exec;
-import static org.apache.ignite.internal.sql.engine.util.Commons.checkRange;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.sql.engine.util.Commons;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.jetbrains.annotations.Nullable;
/**
* This class represents the volatile state that may be propagated from parent
to its children
* during rewind.
*/
-public class SharedState implements Serializable {
- private static final long serialVersionUID = 42L;
+public class SharedState {
+ private final Long2ObjectMap<Object> correlations;
+
+ public SharedState() {
+ this(new Long2ObjectOpenHashMap<>());
+ }
- private Object[] correlations = new Object[16];
+ SharedState(Long2ObjectMap<Object> correlations) {
+ this.correlations = correlations;
+ }
/**
* Gets correlated value.
*
- * @param id Correlation ID.
+ * @param corrId Correlation ID.
+ * @param fieldIndex Field index.
* @return Correlated value.
*/
- public Object correlatedVariable(int id) {
- checkRange(correlations, id);
+ public @Nullable Object correlatedVariable(int corrId, int fieldIndex) {
Review Comment:
I would just changed `correlatedVariable(int id)` to
`correlatedVariable(long id)`. Before, it was single integer identifier. It was
responsibility of a caller to 1) choose unique identifier and 2) make sure both
sides (writer and reader) use the same identifier. This patch doesn't change
this concept much, the only difference is now correlation variable is a scalar
rather an entire row.
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java:
##########
@@ -485,7 +491,14 @@ private Node<RowT> rightSource() {
private void prepareCorrelations() {
for (int i = 0; i < correlationIds.size(); i++) {
RowT row = i < leftInBuf.size() ? leftInBuf.get(i) :
first(leftInBuf);
- context().correlatedVariable(row, correlationIds.get(i).getId());
+ int corrId = correlationIds.get(i).getId();
Review Comment:
we need to cover this with execution test. I see there are changes in
`CorrelatedNestedLoopJoinExecutionTest`, but, unfortunately, this test verify
whatever but CNLJ. The core of CNLJ execution is correlation setting and right
input rewinding.
Now we also whole cover this trimming
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java:
##########
@@ -303,7 +303,7 @@ public RowFactoryFactory<RowT> rowFactoryFactory() {
}
@Override
- public @Nullable RowT correlatedVariable(int id) {
+ public @Nullable Object correlatedVariable(int corrId, int fieldIndex)
{
return null;
Review Comment:
let's also throw exception here (this is not related to your patch)
##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverterTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.Period;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.SchemaTestUtils;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.sql.ColumnType;
+import org.hamcrest.Matchers;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for class {@link SharedStateMessageConverter}.
+ */
+public class SharedStateMessageConverterTest {
+ private Random rnd;
+
+ /**
+ * Initialization.
+ */
+ @BeforeEach
+ public void initRandom() {
+ long seed = System.currentTimeMillis();
+
+ Loggers.forClass(SharedStateMessageConverterTest.class).info("Using
seed: " + seed + "L; //");
Review Comment:
why don't you create static instance of the logger?
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java:
##########
@@ -51,21 +53,26 @@
public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin {
private static final String REL_TYPE_NAME = "CorrelatedNestedLoopJoin";
+ private final ImmutableBitSet correlationColumns;
Review Comment:
new attribute should be reflected in `explain` as well (don't mess with with
`explainTerms`). Also, we have to make sure that it is properly initialized.
Let's add planner tests for this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]