This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8907ef8ea71 IGNITE-27431 SQL Calcite: Optimize scans with filter -
Fixes #12600.
8907ef8ea71 is described below
commit 8907ef8ea7100aaf65ada9c7e38d92c8e302adca
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Jan 23 09:49:43 2026 +0300
IGNITE-27431 SQL Calcite: Optimize scans with filter - Fixes #12600.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../calcite/exec/AbstractCacheColumnsScan.java | 32 +++++-
.../query/calcite/exec/AbstractCacheScan.java | 4 +-
.../processors/query/calcite/exec/IndexScan.java | 64 +++++++-----
.../query/calcite/exec/LogicalRelImplementor.java | 75 ++++++++++---
.../query/calcite/exec/SystemViewScan.java | 12 ++-
.../query/calcite/exec/TableRowIterable.java | 43 ++++++++
.../processors/query/calcite/exec/TableScan.java | 29 ++++--
.../query/calcite/exec/rel/ScanNode.java | 36 +++++--
.../query/calcite/exec/rel/ScanTableRowNode.java | 116 +++++++++++++++++++++
.../rel/ProjectableFilterableTableScan.java | 24 +++++
.../calcite/schema/CacheTableDescriptorImpl.java | 37 +++----
.../schema/SystemViewTableDescriptorImpl.java | 31 +++---
.../query/calcite/schema/TableDescriptor.java | 15 ++-
.../calcite/exec/LogicalRelImplementorTest.java | 82 +++++++++++++++
.../exec/rel/ScanTableRowExecutionTest.java | 104 ++++++++++++++++++
.../query/calcite/planner/TestTable.java | 9 +-
.../ignite/testsuites/ExecutionTestSuite.java | 4 +-
17 files changed, 601 insertions(+), 116 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java
index c5966bf9f54..95a0f5aa47b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
+import java.util.Iterator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
@@ -24,7 +25,8 @@ import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDesc
import org.jetbrains.annotations.Nullable;
/** */
-public abstract class AbstractCacheColumnsScan<Row> extends
AbstractCacheScan<Row> {
+public abstract class AbstractCacheColumnsScan<TableRow, Row> extends
AbstractCacheScan<Row>
+ implements TableRowIterable<TableRow, Row> {
/** */
protected final CacheTableDescriptor desc;
@@ -34,8 +36,8 @@ public abstract class AbstractCacheColumnsScan<Row> extends
AbstractCacheScan<Ro
/** */
protected final RelDataType rowType;
- /** Participating columns. */
- protected final ImmutableBitSet requiredColumns;
+ /** Output node row fields to input table row columns mapping. */
+ protected final int[] fieldColMapping;
/** */
AbstractCacheColumnsScan(
@@ -47,9 +49,31 @@ public abstract class AbstractCacheColumnsScan<Row> extends
AbstractCacheScan<Ro
super(ectx, desc.cacheContext(), parts);
this.desc = desc;
- this.requiredColumns = requiredColumns;
rowType = desc.rowType(ectx.getTypeFactory(), requiredColumns);
factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
+
+ ImmutableBitSet reqCols = requiredColumns == null
+ ? ImmutableBitSet.range(0, rowType.getFieldCount())
+ : requiredColumns;
+
+ fieldColMapping = reqCols.toArray();
}
+
+ /** {@inheritDoc} */
+ @Override public final Iterator<TableRow> tableRowIterator() {
+ reserve();
+
+ try {
+ return createTableRowIterator();
+ }
+ catch (Exception e) {
+ release();
+
+ throw e;
+ }
+ }
+
+ /** Table row iterator.*/
+ protected abstract Iterator<TableRow> createTableRowIterator();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
index bc48d3fc5d4..e2327a2b4fc 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
@@ -110,7 +110,7 @@ public abstract class AbstractCacheScan<Row> implements
Iterable<Row>, AutoClose
}
/** */
- private synchronized void reserve() {
+ protected synchronized void reserve() {
if (reservation != null)
return;
@@ -161,7 +161,7 @@ public abstract class AbstractCacheScan<Row> implements
Iterable<Row>, AutoClose
}
/** */
- private synchronized void release() {
+ protected synchronized void release() {
if (reservation != null)
reservation.release();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index 32c051666a1..38034bc93ca 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -56,7 +56,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Scan on index.
*/
-public class IndexScan<Row> extends AbstractCacheColumnsScan<Row> {
+public class IndexScan<Row> extends AbstractCacheColumnsScan<IndexRow, Row> {
/** */
private final GridKernalContext kctx;
@@ -109,7 +109,7 @@ public class IndexScan<Row> extends
AbstractCacheColumnsScan<Row> {
for (int i = 0; i < srcRowType.getFieldCount(); i++)
fieldsStoreTypes[i] =
typeFactory.getResultClass(srcRowType.getFieldList().get(i).getType());
- fieldIdxMapping =
fieldToInlinedKeysMapping(srcRowType.getFieldCount());
+ fieldIdxMapping = fieldToInlinedKeysMapping();
if (!F.isEmpty(ectx.getQryTxEntries())) {
InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler();
@@ -131,7 +131,7 @@ public class IndexScan<Row> extends
AbstractCacheColumnsScan<Row> {
* @return Mapping from target row fields to inlined index keys, or {@code
null} if inlined index keys
* should not be used.
*/
- private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
+ private int[] fieldToInlinedKeysMapping() {
List<InlineIndexKeyType> inlinedKeys =
idx.segment(0).rowHandler().inlineIndexKeyTypes();
// Since inline scan doesn't check expire time, allow it only if
expired entries are eagerly removed.
@@ -141,7 +141,7 @@ public class IndexScan<Row> extends
AbstractCacheColumnsScan<Row> {
// Even if we need some subset of inlined keys we are required to the
read full inlined row, since this row
// is also participated in comparison with other rows when cursor
processing the next index page.
if (inlinedKeys.size() <
idx.segment(0).rowHandler().indexKeyDefinitions().size() ||
- inlinedKeys.size() < (requiredColumns == null ? srcFieldsCnt :
requiredColumns.cardinality()))
+ inlinedKeys.size() < fieldColMapping.length)
return null;
for (InlineIndexKeyType keyType : inlinedKeys) {
@@ -153,14 +153,11 @@ public class IndexScan<Row> extends
AbstractCacheColumnsScan<Row> {
return null;
}
- ImmutableBitSet reqCols = requiredColumns == null ?
ImmutableBitSet.range(0, srcFieldsCnt) :
- requiredColumns;
-
int[] fieldIdxMapping = new int[rowType.getFieldCount()];
- for (int i = 0, j = reqCols.nextSetBit(0); j != -1; j =
reqCols.nextSetBit(j + 1), i++) {
- // j = source field index, i = target field index.
- int keyIdx = idxFieldMapping.indexOf(j);
+ for (int i = 0; i < fieldColMapping.length; i++) {
+ // i = output row field index, fieldColMapping[i] = table row
column index.
+ int keyIdx = idxFieldMapping.indexOf(fieldColMapping[i]);
if (keyIdx >= 0 && keyIdx < inlinedKeys.size())
fieldIdxMapping[i] = keyIdx;
@@ -173,6 +170,11 @@ public class IndexScan<Row> extends
AbstractCacheColumnsScan<Row> {
/** {@inheritDoc} */
@Override protected Iterator<Row> createIterator() {
+ return F.iterator(createTableRowIterator(), this::indexRow2Row, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Iterator<IndexRow> createTableRowIterator() {
RangeIterable<IndexRow> ranges0 = ranges == null ? null : new
TransformRangeIterable<>(ranges, this::row2indexRow);
TreeIndex<IndexRow> treeIdx = treeIndex();
@@ -180,7 +182,20 @@ public class IndexScan<Row> extends
AbstractCacheColumnsScan<Row> {
if (!txChanges.changedKeysEmpty())
treeIdx = new TxAwareTreeIndexWrapper(treeIdx);
- return F.iterator(new TreeIndexIterable<>(treeIdx, ranges0),
this::indexRow2Row, true);
+ return new TreeIndexIterable<>(treeIdx, ranges0).iterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row enrichRow(IndexRow idxRow, Row row, int[]
fieldColMapping) {
+ try {
+ if (idxRow.indexPlainRow())
+ return inlineIndexRow2Row(idxRow, row, fieldColMapping);
+ else
+ return desc.toRow(ectx, idxRow.cacheDataRow(), row,
fieldColMapping);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/** */
@@ -221,28 +236,23 @@ public class IndexScan<Row> extends
AbstractCacheColumnsScan<Row> {
}
/** From IndexRow to Row convertor. */
- protected Row indexRow2Row(IndexRow row) {
- try {
- if (row.indexPlainRow())
- return inlineIndexRow2Row(row);
- else
- return desc.toRow(ectx, row.cacheDataRow(), factory,
requiredColumns);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ protected Row indexRow2Row(IndexRow idxRow) {
+ Row row = factory.create();
+
+ return enrichRow(idxRow, row, fieldColMapping);
}
/** */
- private Row inlineIndexRow2Row(IndexRow row) {
+ private Row inlineIndexRow2Row(IndexRow idxRow, Row row, int[]
fieldColMapping) {
RowHandler<Row> hnd = ectx.rowHandler();
- Row res = factory.create();
-
- for (int i = 0; i < fieldIdxMapping.length; i++)
- hnd.set(i, res, TypeUtils.toInternal(ectx,
row.key(fieldIdxMapping[i]).key()));
+ for (int i = 0; i < fieldColMapping.length; i++) {
+ // Skip not required fields.
+ if (fieldColMapping[i] >= 0)
+ hnd.set(i, row, TypeUtils.toInternal(ectx,
idxRow.key(fieldIdxMapping[i]).key()));
+ }
- return res;
+ return row;
}
/** Query context. */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index e1687bcbdc6..d816ed65510 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -64,6 +64,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanStorageNode;
+import
org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanTableRowNode;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolNode;
@@ -116,6 +117,7 @@ import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
import static
org.apache.ignite.internal.processors.query.calcite.util.TypeUtils.combinedRowType;
@@ -368,10 +370,10 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
ImmutableBitSet requiredColumns = rel.requiredColumns();
List<SearchBounds> searchBounds = rel.searchBounds();
- RelDataType rowType = rel.getDataSourceRowType();
+ RelDataType inputRowType = rel.getDataSourceRowType();
- Predicate<Row> filters = condition == null ? null :
expressionFactory.predicate(condition, rowType);
- Function<Row, Row> prj = projects == null ? null :
expressionFactory.project(projects, rowType);
+ Predicate<Row> filters = condition == null ? null :
expressionFactory.predicate(condition, inputRowType);
+ Function<Row, Row> prj = projects == null ? null :
expressionFactory.project(projects, inputRowType);
RangeIterable<Row> ranges = searchBounds == null ? null :
expressionFactory.ranges(searchBounds, rel.collation(),
tbl.getRowType(typeFactory));
@@ -382,7 +384,8 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
if (idx != null && !tbl.isIndexRebuildInProgress()) {
Iterable<Row> rowsIter = idx.scan(ctx, grp, ranges,
requiredColumns);
- return new ScanStorageNode<>(tbl.name() + '.' + idx.name(), ctx,
rowType, rowsIter, filters, prj);
+ return createStorageScan(tbl.name() + '.' + idx.name(),
rel.getRowType(), inputRowType,
+ rowsIter, filters, prj, requiredColumns,
rel.conditionColumns());
}
else {
// Index was invalidated after planning, workaround through
table-scan -> sort -> index spool.
@@ -402,12 +405,10 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
requiredColumns
);
- // If there are projects in the scan node - after the scan we
already have target row type.
- if (!spoolNodeRequired && projects != null)
- rowType = rel.getRowType();
+ RelDataType rowType = projNodeRequired ? rel.getRowType() :
inputRowType;
- Node<Row> node = new ScanStorageNode<>(tbl.name(), ctx, rowType,
rowsIter, filterHasCorrelation ? null : filters,
- projNodeRequired ? null : prj);
+ Node<Row> node = createStorageScan(tbl.name(), rowType,
inputRowType, rowsIter,
+ filterHasCorrelation ? null : filters, projNodeRequired ? null
: prj, requiredColumns, rel.conditionColumns());
RelCollation collation = rel.collation();
@@ -438,7 +439,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
remappedSearchBounds.add(searchBounds.get(i));
// Collation and row type are already remapped taking into
account requiredColumns.
- ranges = expressionFactory.ranges(remappedSearchBounds,
collation, rowType);
+ ranges = expressionFactory.ranges(remappedSearchBounds,
collation, inputRowType);
}
IndexSpoolNode<Row> spoolNode = IndexSpoolNode.createTreeSpool(
@@ -547,10 +548,10 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
- RelDataType rowType = rel.getDataSourceRowType();
+ RelDataType inputRowType = rel.getDataSourceRowType();
- Predicate<Row> filters = condition == null ? null :
expressionFactory.predicate(condition, rowType);
- Function<Row, Row> prj = projects == null ? null :
expressionFactory.project(projects, rowType);
+ Predicate<Row> filters = condition == null ? null :
expressionFactory.predicate(condition, inputRowType);
+ Function<Row, Row> prj = projects == null ? null :
expressionFactory.project(projects, inputRowType);
ColocationGroup grp = ctx.group(rel.sourceId());
@@ -559,12 +560,14 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
if (idx != null && !tbl.isIndexRebuildInProgress()) {
Iterable<Row> rowsIter = idx.scan(ctx, grp, null, requiredColumns);
- return new ScanStorageNode<>(tbl.name() + '.' + idx.name(), ctx,
rowType, rowsIter, filters, prj);
+ return createStorageScan(tbl.name() + '.' + idx.name(),
rel.getRowType(), inputRowType,
+ rowsIter, filters, prj, requiredColumns,
rel.conditionColumns());
}
else {
Iterable<Row> rowsIter = tbl.scan(ctx, grp, requiredColumns);
- return new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter,
filters, prj);
+ return createStorageScan(tbl.name(), rel.getRowType(),
inputRowType, rowsIter, filters, prj,
+ requiredColumns, rel.conditionColumns());
}
}
@@ -942,4 +945,46 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
public <T extends Node<Row>> T go(IgniteRel rel) {
return (T)visit(rel);
}
+
+ /** */
+ private ScanStorageNode<Row> createStorageScan(
+ String storageName,
+ RelDataType outputRowType,
+ RelDataType inputRowType,
+ Iterable<Row> rowsIter,
+ @Nullable Predicate<Row> filter,
+ @Nullable Function<Row, Row> rowTransformer,
+ @Nullable ImmutableBitSet requiredColumns,
+ @Nullable ImmutableBitSet filterColumns
+ ) {
+ int fieldsCnt = inputRowType.getFieldCount();
+
+ if (filter == null || filterColumns == null ||
filterColumns.cardinality() == fieldsCnt
+ || !(rowsIter instanceof TableRowIterable))
+ return new ScanStorageNode<>(storageName, ctx, outputRowType,
rowsIter, filter, rowTransformer);
+
+ ImmutableBitSet reqCols = requiredColumns == null ?
ImmutableBitSet.range(0, fieldsCnt) : requiredColumns;
+
+ int[] filterColMapping = reqCols.toArray();
+ int[] otherColMapping = filterColMapping.clone();
+
+ for (int i = 0; i < filterColMapping.length; i++) {
+ if (filterColumns.get(i))
+ otherColMapping[i] = -1;
+ else
+ filterColMapping[i] = -1;
+ }
+
+ return new ScanTableRowNode<>(
+ storageName,
+ ctx,
+ outputRowType,
+ inputRowType,
+ (TableRowIterable<Object, Row>)rowsIter,
+ filter,
+ rowTransformer,
+ filterColMapping,
+ otherColMapping
+ );
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
index 76f71f50837..897b908b376 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
@@ -47,8 +47,8 @@ public class SystemViewScan<Row, ViewRow> implements
Iterable<Row> {
/** */
private final RangeIterable<Row> ranges;
- /** Participating colunms. */
- private final ImmutableBitSet requiredColumns;
+ /** Row field to view column mapping. */
+ protected final int[] fieldColMapping;
/** System view field names (for filtering). */
private final String[] filterableFieldNames;
@@ -66,7 +66,6 @@ public class SystemViewScan<Row, ViewRow> implements
Iterable<Row> {
this.ectx = ectx;
this.desc = desc;
this.ranges = ranges;
- this.requiredColumns = requiredColumns;
RelDataType rowType = desc.rowType(ectx.getTypeFactory(),
requiredColumns);
@@ -83,6 +82,11 @@ public class SystemViewScan<Row, ViewRow> implements
Iterable<Row> {
}
}
}
+
+ ImmutableBitSet reqCols = requiredColumns == null ?
ImmutableBitSet.range(0, rowType.getFieldCount())
+ : requiredColumns;
+
+ fieldColMapping = reqCols.toArray();
}
/** {@inheritDoc} */
@@ -123,6 +127,6 @@ public class SystemViewScan<Row, ViewRow> implements
Iterable<Row> {
else
viewIter = view.iterator();
- return F.iterator(viewIter, row -> desc.toRow(ectx, row, factory,
requiredColumns), true);
+ return F.iterator(viewIter, row -> desc.toRow(ectx, row,
factory.create(), fieldColMapping), true);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java
new file mode 100644
index 00000000000..3523419c204
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Iterator;
+
+/**
+ * Interface to iterate over raw table data and form relational node rows from
table row columns.
+ *
+ * @param <TableRow> Raw table row type.
+ * @param <Row> Relational node row type.
+ */
+public interface TableRowIterable<TableRow, Row> extends Iterable<Row> {
+ /**
+ * @return Table row iterator.
+ */
+ public Iterator<TableRow> tableRowIterator();
+
+ /**
+ * Enriches {@code nodeRow} with columns from {@code tableRow}.
+ *
+ * @param tableRow Table row.
+ * @param nodeRow Relational node row.
+ * @param fieldColMapping Mapping from node row fields to table row
columns. If column is not requried
+ * corresponding field has -1 mapped value.
+ * @return Enriched relational node row.
+ */
+ public Row enrichRow(TableRow tableRow, Row nodeRow, int[]
fieldColMapping);
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
index 46c7cde4e67..f43c7de4e5a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
@@ -27,6 +27,7 @@ import java.util.Queue;
import java.util.function.Function;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -40,7 +41,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
/** */
-public class TableScan<Row> extends AbstractCacheColumnsScan<Row> {
+public class TableScan<Row> extends AbstractCacheColumnsScan<CacheDataRow,
Row> {
/** */
protected volatile List<GridDhtLocalPartition> reservedParts;
@@ -56,9 +57,25 @@ public class TableScan<Row> extends
AbstractCacheColumnsScan<Row> {
/** {@inheritDoc} */
@Override protected Iterator<Row> createIterator() {
+ return F.iterator((Iterator<CacheDataRow>)new IteratorImpl(),
+ row -> enrichRow(row, factory.create(), fieldColMapping), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Iterator<CacheDataRow> createTableRowIterator() {
return new IteratorImpl();
}
+ /** */
+ @Override public Row enrichRow(CacheDataRow cacheDataRow, Row row, int[]
fieldColMapping) {
+ try {
+ return desc.toRow(ectx, cacheDataRow, row, fieldColMapping);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
/** {@inheritDoc} */
@Override protected void processReservedTopology(GridDhtPartitionTopology
top) {
List<GridDhtLocalPartition> reservedParts = new
ArrayList<>(parts.cardinality());
@@ -72,7 +89,7 @@ public class TableScan<Row> extends
AbstractCacheColumnsScan<Row> {
/**
* Table scan iterator.
*/
- private class IteratorImpl extends GridIteratorAdapter<Row> {
+ private class IteratorImpl extends GridIteratorAdapter<CacheDataRow> {
/** */
private final Queue<GridDhtLocalPartition> parts;
@@ -86,7 +103,7 @@ public class TableScan<Row> extends
AbstractCacheColumnsScan<Row> {
private Iterator<CacheDataRow> txIter = Collections.emptyIterator();
/** */
- private Row next;
+ private CacheDataRow next;
/** */
private IteratorImpl() {
@@ -114,13 +131,13 @@ public class TableScan<Row> extends
AbstractCacheColumnsScan<Row> {
}
/** {@inheritDoc} */
- @Override public Row nextX() throws IgniteCheckedException {
+ @Override public CacheDataRow nextX() throws IgniteCheckedException {
advance();
if (next == null)
throw new NoSuchElementException();
- Row next = this.next;
+ CacheDataRow next = this.next;
this.next = null;
@@ -170,7 +187,7 @@ public class TableScan<Row> extends
AbstractCacheColumnsScan<Row> {
if (!desc.match(row))
continue;
- next = desc.toRow(ectx, row, factory, requiredColumns);
+ next = row;
break;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index ca6469b09e6..80201b26466 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -34,13 +34,13 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
private final Iterable<Row> src;
/** */
- @Nullable private final Predicate<Row> filter;
+ @Nullable protected final Predicate<Row> filter;
/** */
- @Nullable private final Function<Row, Row> rowTransformer;
+ @Nullable protected final Function<Row, Row> rowTransformer;
/** */
- private Iterator<Row> it;
+ protected Iterator<?> it;
/** */
private int requested;
@@ -146,25 +146,43 @@ public class ScanNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>
}
}
+ /**
+ * @return Next row, or {@code null} if row was filtered out.
+ */
+ protected @Nullable Row processNextRow() {
+ Row r = (Row)it.next();
+
+ if (filter == null || filter.test(r)) {
+ if (rowTransformer != null)
+ r = rowTransformer.apply(r);
+
+ return r;
+ }
+
+ return null;
+ }
+
+ /** */
+ protected Iterator<?> sourceIterator() {
+ return src.iterator();
+ }
+
/**
* @return Count of processed rows.
*/
protected int processNextBatch() throws Exception {
if (it == null)
- it = src.iterator();
+ it = sourceIterator();
int processed = 0;
while (requested > 0 && it.hasNext()) {
checkState();
- Row r = it.next();
+ Row r = processNextRow();
- if (filter == null || filter.test(r)) {
+ if (r != null) {
requested--;
- if (rowTransformer != null)
- r = rowTransformer.apply(r);
-
downstream().push(r);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java
new file mode 100644
index 00000000000..0405292f32f
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Iterator;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.calcite.rel.type.RelDataType;
+import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import
org.apache.ignite.internal.processors.query.calcite.exec.TableRowIterable;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Scan table rows node. With ability to request part of table row columns for
filtering
+ * ({@code ScanStorageNode} can only request full set of requred table row
columns).
+ */
+public class ScanTableRowNode<TableRow, Row> extends ScanStorageNode<Row> {
+ /** */
+ private final TableRowIterable<TableRow, Row> src;
+
+ /** */
+ private final int[] filterColMapping;
+
+ /** */
+ private final int[] otherColMapping;
+
+ /** */
+ private final RowHandler.RowFactory<Row> factory;
+
+ /** */
+ private Row curRow;
+
+ /**
+ * @param storageName Storage (index or table) name.
+ * @param ctx Execution context.
+ * @param outputRowType Output row type.
+ * @param inputRowType Input row type.
+ * @param src Source.
+ * @param filter Row filter.
+ * @param rowTransformer Row transformer (projection).
+ * @param filterColMapping Fields to columns mapping for fields used in
filter.
+ * @param otherColMapping Fields to columns mapping for other fields.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public ScanTableRowNode(
+ String storageName,
+ ExecutionContext<Row> ctx,
+ RelDataType outputRowType,
+ RelDataType inputRowType,
+ TableRowIterable<TableRow, Row> src,
+ Predicate<Row> filter,
+ @Nullable Function<Row, Row> rowTransformer,
+ int[] filterColMapping,
+ int[] otherColMapping
+ ) {
+ super(storageName, ctx, outputRowType, src, filter, rowTransformer);
+
+ assert filter != null;
+
+ factory = ctx.rowHandler().factory(ctx.getTypeFactory(), inputRowType);
+
+ this.src = src;
+ this.filterColMapping = filterColMapping;
+ this.otherColMapping = otherColMapping;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Iterator<?> sourceIterator() {
+ return src.tableRowIterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected @Nullable Row processNextRow() {
+ Row row = curRow == null ? factory.create() : curRow;
+
+ TableRow tableRow = (TableRow)it.next();
+
+ src.enrichRow(tableRow, row, filterColMapping);
+
+ if (filter.test(row)) {
+ src.enrichRow(tableRow, row, otherColMapping);
+
+ if (rowTransformer != null)
+ row = rowTransformer.apply(row);
+
+ curRow = null;
+
+ return row;
+ }
+
+ return null;
+ }
+
+ /** */
+ @Override public void closeInternal() {
+ super.closeInternal();
+
+ curRow = null;
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
index 208e8b70e86..9c41fdd9e3d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
@@ -63,6 +63,9 @@ public abstract class ProjectableFilterableTableScan extends
TableScan {
/** Participating columns. */
protected final ImmutableBitSet requiredColumns;
+ /** Columns used by condition. */
+ protected ImmutableBitSet conditionColumns;
+
/** Required columns from table row type (No need to be serialized, for
caching only). */
protected RelDataType dataSourceRowType;
@@ -208,4 +211,25 @@ public abstract class ProjectableFilterableTableScan
extends TableScan {
return new RelColumnOrigin(getTable(), originColIdx, false);
}
+
+ /** */
+ public @Nullable ImmutableBitSet conditionColumns() {
+ if (condition == null)
+ return null;
+
+ if (conditionColumns == null) {
+ ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
+
+ new RexShuttle() {
+ @Override public RexNode visitLocalRef(RexLocalRef inputRef) {
+ builder.set(inputRef.getIndex());
+ return inputRef;
+ }
+ }.apply(condition);
+
+ conditionColumns = builder.build();
+ }
+
+ return conditionColumns;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
index 22f6c2dcb8f..2cd164bc815 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
@@ -70,7 +70,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
@@ -251,36 +250,28 @@ public class CacheTableDescriptorImpl extends
NullInitializerExpressionFactory
/** {@inheritDoc} */
@Override public <Row> Row toRow(
ExecutionContext<Row> ectx,
- CacheDataRow row,
- RowHandler.RowFactory<Row> factory,
- @Nullable ImmutableBitSet requiredColumns
+ CacheDataRow tableRow,
+ Row row,
+ int[] fieldColMapping
) throws IgniteCheckedException {
- RowHandler<Row> hnd = factory.handler();
+ RowHandler<Row> hnd = ectx.rowHandler();
- assert hnd == ectx.rowHandler();
+ assert hnd.columnCount(row) == fieldColMapping.length :
+ "Unexpected row column count: " + hnd.columnCount(row) + "
expected: " + fieldColMapping.length;
- Row res = factory.create();
+ for (int i = 0; i < fieldColMapping.length; i++) {
+ int colIdx = fieldColMapping[i];
- assert hnd.columnCount(res) == (requiredColumns == null ?
descriptors.length : requiredColumns.cardinality());
+ // Skip not required fields.
+ if (colIdx < 0)
+ continue;
- if (requiredColumns == null) {
- for (int i = 0; i < descriptors.length; i++) {
- CacheColumnDescriptor desc = descriptors[i];
+ CacheColumnDescriptor desc = descriptors[colIdx];
- hnd.set(i, res, TypeUtils.toInternal(ectx,
- desc.value(ectx, cacheContext(), row),
desc.storageType()));
- }
- }
- else {
- for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j =
requiredColumns.nextSetBit(j + 1), i++) {
- CacheColumnDescriptor desc = descriptors[j];
-
- hnd.set(i, res, TypeUtils.toInternal(ectx,
- desc.value(ectx, cacheContext(), row),
desc.storageType()));
- }
+ hnd.set(i, row, TypeUtils.toInternal(ectx, desc.value(ectx,
cacheContext(), tableRow), desc.storageType()));
}
- return res;
+ return row;
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
index 80d7f95b337..ab89ce303fb 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
@@ -40,7 +40,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.spi.systemview.view.SystemViewRowAttributeWalker;
-import org.jetbrains.annotations.Nullable;
import static org.apache.calcite.rel.type.RelDataType.PRECISION_NOT_SPECIFIED;
import static org.apache.calcite.rel.type.RelDataType.SCALE_NOT_SPECIFIED;
@@ -107,24 +106,28 @@ public class SystemViewTableDescriptorImpl<ViewRow>
extends NullInitializerExpre
/** {@inheritDoc} */
@Override public <Row> Row toRow(
ExecutionContext<Row> ectx,
- ViewRow row,
- RowHandler.RowFactory<Row> factory,
- @Nullable ImmutableBitSet requiredColumns
+ ViewRow viewRow,
+ Row row,
+ int[] fieldColMapping
) {
- RowHandler<Row> hnd = factory.handler();
+ RowHandler<Row> hnd = ectx.rowHandler();
- assert hnd == ectx.rowHandler();
+ assert hnd.columnCount(row) == fieldColMapping.length :
+ "Unexpected row column count: " + hnd.columnCount(row) + "
expected: " + fieldColMapping.length;
- Row res = factory.create();
-
- assert hnd.columnCount(res) == (requiredColumns == null ?
descriptors.length : requiredColumns.cardinality());
-
- sysView.walker().visitAll(row, new
SystemViewRowAttributeWalker.AttributeWithValueVisitor() {
+ sysView.walker().visitAll(viewRow, new
SystemViewRowAttributeWalker.AttributeWithValueVisitor() {
private int colIdx;
@Override public <T> void accept(int idx, String name, Class<T>
clazz, T val) {
- if (requiredColumns == null || requiredColumns.get(idx))
- hnd.set(colIdx++, res, TypeUtils.toInternal(ectx, val,
descriptors[idx].storageType()));
+ // Assume fieldColMapping derived from required columns and
sorted.
+ // Assume 'accept' is called with increasing 'idx'.
+
+ // Skip not required fields.
+ while (colIdx < fieldColMapping.length &&
fieldColMapping[colIdx] < 0)
+ colIdx++;
+
+ if (colIdx < fieldColMapping.length && fieldColMapping[colIdx]
== idx)
+ hnd.set(colIdx++, row, TypeUtils.toInternal(ectx, val,
descriptors[idx].storageType()));
}
@Override public void acceptBoolean(int idx, String name, boolean
val) {
@@ -160,7 +163,7 @@ public class SystemViewTableDescriptorImpl<ViewRow> extends
NullInitializerExpre
}
});
- return res;
+ return row;
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
index af3b72a382b..4c2b82f548a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
@@ -26,12 +26,10 @@ import
org.apache.calcite.sql2rel.InitializerExpressionFactory;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import
org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.jetbrains.annotations.Nullable;
/**
*
@@ -94,19 +92,20 @@ public interface TableDescriptor<TableRow> extends
RelProtoDataType, Initializer
boolean isUpdateAllowed(RelOptTable tbl, int colIdx);
/**
- * Converts a cache row to relational node row.
+ * Converts a table row to relational node row.
*
* @param ectx Execution context.
- * @param row Cache row.
- * @param requiredColumns Participating columns.
+ * @param tableRow Table row.
+ * @param row Relational node row.
+ * @param fieldColMapping Mapping from row fields to table columns.
* @return Relational node row.
* @throws IgniteCheckedException If failed.
*/
<Row> Row toRow(
ExecutionContext<Row> ectx,
- TableRow row,
- RowHandler.RowFactory<Row> factory,
- @Nullable ImmutableBitSet requiredColumns
+ TableRow tableRow,
+ Row row,
+ int[] fieldColMapping
) throws IgniteCheckedException;
/**
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
index 1afaf700552..6e4419be92c 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.function.Predicate;
@@ -37,11 +38,13 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.processors.query.QueryUtils;
+import
org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.CollectNode;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
+import
org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanTableRowNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
@@ -51,6 +54,7 @@ import
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryCont
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexBound;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexCount;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import
org.apache.ignite.internal.processors.query.calcite.schema.CacheIndexImpl;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
@@ -59,6 +63,7 @@ import
org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.calcite.tools.Frameworks.createRootSchema;
@@ -375,6 +380,53 @@ public class LogicalRelImplementorTest extends
GridCommonAbstractTest {
checkNodesChain(relImplementor, scan, isProj, isSpool, isSort,
isScanNoFilterNoProject);
}
+ /** */
+ @Test
+ public void testScanTableRow() {
+ RelCollation idxCollation =
TraitUtils.createCollation(Collections.singletonList(2));
+ tbl.addIndex(new ScannableTestIndex(idxCollation,
QueryUtils.PRIMARY_KEY_INDEX, tbl));
+
+ RelDataType rowType = tbl.getRowType(tf);
+ RelDataType sqlTypeInt = rowType.getFieldList().get(2).getType();
+ RelDataType sqlTypeVarchar = rowType.getFieldList().get(3).getType();
+
+ List<RexNode> project =
F.asList(rexBuilder.makeLocalRef(sqlTypeVarchar, 1));
+
+ RexNode filterOneField = rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+ rexBuilder.makeLocalRef(sqlTypeInt, 0),
+ rexBuilder.makeLiteral(1, sqlTypeInt)
+ );
+
+ RexNode filterTwoFields = rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+ rexBuilder.makeCast(sqlTypeVarchar,
rexBuilder.makeLocalRef(sqlTypeInt, 0)),
+ rexBuilder.makeLocalRef(sqlTypeVarchar, 1)
+ );
+
+ ImmutableBitSet requiredColumns = ImmutableBitSet.of(2, 3);
+
+ IgniteIndexScan scan = new IgniteIndexScan(
+ cluster,
+ cluster.traitSet(),
+ qctx.catalogReader().getTable(F.asList("PUBLIC", "TBL")),
+ QueryUtils.PRIMARY_KEY_INDEX,
+ project,
+ filterOneField,
+ RexUtils.buildSortedSearchBounds(cluster, idxCollation,
filterOneField, rowType, requiredColumns),
+ requiredColumns,
+ idxCollation
+ );
+
+ // Not all fields participating in filter, it worth to create
ScanTableRowNode.
+ checkNodesChain(relImplementor, scan, node -> node instanceof
ScanTableRowNode);
+
+ scan = createScan(scan, idxCollation, project, filterTwoFields,
requiredColumns);
+
+ // All fields participating in filter, regular ScanStorageNode should
be created.
+ checkNodesChain(relImplementor, scan, node -> !(node instanceof
ScanTableRowNode));
+ }
+
/** */
private IgniteIndexScan createScan(
IgniteIndexScan templateScan,
@@ -445,4 +497,34 @@ public class LogicalRelImplementorTest extends
GridCommonAbstractTest {
return Collections.emptyList();
}
}
+
+ /** */
+ private static class ScannableTestIndex extends CacheIndexImpl {
+ /** */
+ public ScannableTestIndex(RelCollation collation, String name,
TestTable tbl) {
+ super(collation, name, null, tbl);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <Row> Iterable<Row> scan(
+ ExecutionContext<Row> execCtx,
+ ColocationGroup grp,
+ RangeIterable<Row> ranges,
+ @Nullable ImmutableBitSet requiredColumns
+ ) {
+ return new TableRowIterable<>() {
+ @Override public Iterator<Row> iterator() {
+ return Collections.emptyIterator();
+ }
+
+ @Override public Iterator<Object> tableRowIterator() {
+ return Collections.emptyIterator();
+ }
+
+ @Override public Row enrichRow(Object o, Row nodeRow, int[]
fieldColMapping) {
+ return null;
+ }
+ };
+ }
+ }
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowExecutionTest.java
new file mode 100644
index 00000000000..f88e46d1a3c
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowExecutionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import org.apache.calcite.rel.type.RelDataType;
+import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import
org.apache.ignite.internal.processors.query.calcite.exec.TableRowIterable;
+import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+/**
+ * Execution test for ScanTableRowNode.
+ */
+public class ScanTableRowExecutionTest extends AbstractExecutionTest {
+ /** */
+ @Test
+ public void testScanTableRow() {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class,
String.class, Boolean.class);
+
+ List<Object[]> data = F.asList(
+ new Object[] {0, "0", true},
+ new Object[] {1, "1", false},
+ new Object[] {2, "2", true},
+ new Object[] {3, "3", false},
+ new Object[] {4, "4", true}
+ );
+
+ TableRowIterable<Object[], Object[]> it = new TableRowIterable<>() {
+ @Override public Iterator<Object[]> tableRowIterator() {
+ return data.iterator();
+ }
+
+ @Override public Object[] enrichRow(Object[] tableRow, Object[]
nodeRow, int[] fieldColMapping) {
+ for (int i = 0; i < fieldColMapping.length; i++) {
+ if (fieldColMapping[i] >= 0)
+ nodeRow[i] = tableRow[fieldColMapping[i]];
+ }
+
+ // Due to test invariant we can't return row enriched with
FALSE in the field 2.
+ assertNotSame(Boolean.FALSE, nodeRow[2]);
+
+ return nodeRow;
+ }
+
+ @Override public Iterator<Object[]> iterator() {
+ throw new AssertionError("Unexpected call");
+ }
+ };
+
+ ScanTableRowNode<Object[], Object[]> scan = new ScanTableRowNode<>(
+ "test",
+ ctx,
+ rowType, // Output row type.
+ rowType, // Input row type.
+ it, // Iterator.
+ r -> ((int)r[0] & 1) == 0, // Filter.
+ r -> new Object[] {((int)r[0]) * 2, r[1], r[2]}, // Project.
+ new int[] {0, -1, -1}, // Filter columns mapping.
+ new int[] {-1, 1, 2} // Other columns mapping.
+ );
+
+ RootNode<Object[]> root = new RootNode<>(ctx, rowType);
+ root.register(scan);
+
+ List<Object[]> res = new ArrayList<>();
+
+ while (root.hasNext())
+ res.add(root.next());
+
+ List<Object[]> exp = F.asList(
+ new Object[] {0, "0", true},
+ new Object[] {4, "2", true},
+ new Object[] {8, "4", true}
+ );
+
+ assertEquals(exp.size(), res.size());
+
+ for (int i = 0; i < exp.size(); i++)
+ assertEqualsArraysAware(exp.get(i), res.get(i));
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
index 744c351ddf6..a4e5ab611e8 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
@@ -61,7 +61,6 @@ import
org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryUtils;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import
org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
@@ -411,8 +410,12 @@ public class TestTable implements IgniteCacheTable,
Wrapper {
}
/** {@inheritDoc} */
- @Override public <Row> Row toRow(ExecutionContext<Row> ectx,
CacheDataRow row, RowHandler.RowFactory<Row> factory,
- @Nullable ImmutableBitSet requiredColumns) {
+ @Override public <Row> Row toRow(
+ ExecutionContext<Row> ectx,
+ CacheDataRow cacheDataRow,
+ Row outputRow,
+ int[] fieldColMapping
+ ) {
throw new AssertionError();
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index bd6f6eb2258..c8fbb9d90b1 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -29,6 +29,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitExecuti
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
+import
org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanTableRowExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.SortedIndexSpoolExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolExecutionTest;
@@ -38,7 +39,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Suite;
/**
- * Calcite tests.
+ * Calcite execution tests.
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
@@ -59,6 +60,7 @@ import org.junit.runners.Suite;
LimitExecutionTest.class,
TimeCalculationExecutionTest.class,
UncollectExecutionTest.class,
+ ScanTableRowExecutionTest.class,
})
public class ExecutionTestSuite {
}