This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8907ef8ea71 IGNITE-27431 SQL Calcite: Optimize scans with filter - 
Fixes #12600.
8907ef8ea71 is described below

commit 8907ef8ea7100aaf65ada9c7e38d92c8e302adca
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Jan 23 09:49:43 2026 +0300

    IGNITE-27431 SQL Calcite: Optimize scans with filter - Fixes #12600.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../calcite/exec/AbstractCacheColumnsScan.java     |  32 +++++-
 .../query/calcite/exec/AbstractCacheScan.java      |   4 +-
 .../processors/query/calcite/exec/IndexScan.java   |  64 +++++++-----
 .../query/calcite/exec/LogicalRelImplementor.java  |  75 ++++++++++---
 .../query/calcite/exec/SystemViewScan.java         |  12 ++-
 .../query/calcite/exec/TableRowIterable.java       |  43 ++++++++
 .../processors/query/calcite/exec/TableScan.java   |  29 ++++--
 .../query/calcite/exec/rel/ScanNode.java           |  36 +++++--
 .../query/calcite/exec/rel/ScanTableRowNode.java   | 116 +++++++++++++++++++++
 .../rel/ProjectableFilterableTableScan.java        |  24 +++++
 .../calcite/schema/CacheTableDescriptorImpl.java   |  37 +++----
 .../schema/SystemViewTableDescriptorImpl.java      |  31 +++---
 .../query/calcite/schema/TableDescriptor.java      |  15 ++-
 .../calcite/exec/LogicalRelImplementorTest.java    |  82 +++++++++++++++
 .../exec/rel/ScanTableRowExecutionTest.java        | 104 ++++++++++++++++++
 .../query/calcite/planner/TestTable.java           |   9 +-
 .../ignite/testsuites/ExecutionTestSuite.java      |   4 +-
 17 files changed, 601 insertions(+), 116 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java
index c5966bf9f54..95a0f5aa47b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
+import java.util.Iterator;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
@@ -24,7 +25,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDesc
 import org.jetbrains.annotations.Nullable;
 
 /** */
-public abstract class AbstractCacheColumnsScan<Row> extends 
AbstractCacheScan<Row> {
+public abstract class AbstractCacheColumnsScan<TableRow, Row> extends 
AbstractCacheScan<Row>
+    implements TableRowIterable<TableRow, Row> {
     /** */
     protected final CacheTableDescriptor desc;
 
@@ -34,8 +36,8 @@ public abstract class AbstractCacheColumnsScan<Row> extends 
AbstractCacheScan<Ro
     /** */
     protected final RelDataType rowType;
 
-    /** Participating columns. */
-    protected final ImmutableBitSet requiredColumns;
+    /** Output node row fields to input table row columns mapping. */
+    protected final int[] fieldColMapping;
 
     /** */
     AbstractCacheColumnsScan(
@@ -47,9 +49,31 @@ public abstract class AbstractCacheColumnsScan<Row> extends 
AbstractCacheScan<Ro
         super(ectx, desc.cacheContext(), parts);
 
         this.desc = desc;
-        this.requiredColumns = requiredColumns;
 
         rowType = desc.rowType(ectx.getTypeFactory(), requiredColumns);
         factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
+
+        ImmutableBitSet reqCols = requiredColumns == null
+            ? ImmutableBitSet.range(0, rowType.getFieldCount())
+            : requiredColumns;
+
+        fieldColMapping = reqCols.toArray();
     }
+
+    /** {@inheritDoc} */
+    @Override public final Iterator<TableRow> tableRowIterator() {
+        reserve();
+
+        try {
+            return createTableRowIterator();
+        }
+        catch (Exception e) {
+            release();
+
+            throw e;
+        }
+    }
+
+    /** Table row iterator.*/
+    protected abstract Iterator<TableRow> createTableRowIterator();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
index bc48d3fc5d4..e2327a2b4fc 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java
@@ -110,7 +110,7 @@ public abstract class AbstractCacheScan<Row> implements 
Iterable<Row>, AutoClose
     }
 
     /** */
-    private synchronized void reserve() {
+    protected synchronized void reserve() {
         if (reservation != null)
             return;
 
@@ -161,7 +161,7 @@ public abstract class AbstractCacheScan<Row> implements 
Iterable<Row>, AutoClose
     }
 
     /** */
-    private synchronized void release() {
+    protected synchronized void release() {
         if (reservation != null)
             reservation.release();
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index 32c051666a1..38034bc93ca 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -56,7 +56,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Scan on index.
  */
-public class IndexScan<Row> extends AbstractCacheColumnsScan<Row> {
+public class IndexScan<Row> extends AbstractCacheColumnsScan<IndexRow, Row> {
     /** */
     private final GridKernalContext kctx;
 
@@ -109,7 +109,7 @@ public class IndexScan<Row> extends 
AbstractCacheColumnsScan<Row> {
         for (int i = 0; i < srcRowType.getFieldCount(); i++)
             fieldsStoreTypes[i] = 
typeFactory.getResultClass(srcRowType.getFieldList().get(i).getType());
 
-        fieldIdxMapping = 
fieldToInlinedKeysMapping(srcRowType.getFieldCount());
+        fieldIdxMapping = fieldToInlinedKeysMapping();
 
         if (!F.isEmpty(ectx.getQryTxEntries())) {
             InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler();
@@ -131,7 +131,7 @@ public class IndexScan<Row> extends 
AbstractCacheColumnsScan<Row> {
      * @return Mapping from target row fields to inlined index keys, or {@code 
null} if inlined index keys
      * should not be used.
      */
-    private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
+    private int[] fieldToInlinedKeysMapping() {
         List<InlineIndexKeyType> inlinedKeys = 
idx.segment(0).rowHandler().inlineIndexKeyTypes();
 
         // Since inline scan doesn't check expire time, allow it only if 
expired entries are eagerly removed.
@@ -141,7 +141,7 @@ public class IndexScan<Row> extends 
AbstractCacheColumnsScan<Row> {
         // Even if we need some subset of inlined keys we are required to the 
read full inlined row, since this row
         // is also participated in comparison with other rows when cursor 
processing the next index page.
         if (inlinedKeys.size() < 
idx.segment(0).rowHandler().indexKeyDefinitions().size() ||
-            inlinedKeys.size() < (requiredColumns == null ? srcFieldsCnt : 
requiredColumns.cardinality()))
+            inlinedKeys.size() < fieldColMapping.length)
             return null;
 
         for (InlineIndexKeyType keyType : inlinedKeys) {
@@ -153,14 +153,11 @@ public class IndexScan<Row> extends 
AbstractCacheColumnsScan<Row> {
                 return null;
         }
 
-        ImmutableBitSet reqCols = requiredColumns == null ? 
ImmutableBitSet.range(0, srcFieldsCnt) :
-            requiredColumns;
-
         int[] fieldIdxMapping = new int[rowType.getFieldCount()];
 
-        for (int i = 0, j = reqCols.nextSetBit(0); j != -1; j = 
reqCols.nextSetBit(j + 1), i++) {
-            // j = source field index, i = target field index.
-            int keyIdx = idxFieldMapping.indexOf(j);
+        for (int i = 0; i < fieldColMapping.length; i++) {
+            // i = output row field index, fieldColMapping[i] = table row 
column index.
+            int keyIdx = idxFieldMapping.indexOf(fieldColMapping[i]);
 
             if (keyIdx >= 0 && keyIdx < inlinedKeys.size())
                 fieldIdxMapping[i] = keyIdx;
@@ -173,6 +170,11 @@ public class IndexScan<Row> extends 
AbstractCacheColumnsScan<Row> {
 
     /** {@inheritDoc} */
     @Override protected Iterator<Row> createIterator() {
+        return F.iterator(createTableRowIterator(), this::indexRow2Row, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Iterator<IndexRow> createTableRowIterator() {
         RangeIterable<IndexRow> ranges0 = ranges == null ? null : new 
TransformRangeIterable<>(ranges, this::row2indexRow);
 
         TreeIndex<IndexRow> treeIdx = treeIndex();
@@ -180,7 +182,20 @@ public class IndexScan<Row> extends 
AbstractCacheColumnsScan<Row> {
         if (!txChanges.changedKeysEmpty())
             treeIdx = new TxAwareTreeIndexWrapper(treeIdx);
 
-        return F.iterator(new TreeIndexIterable<>(treeIdx, ranges0), 
this::indexRow2Row, true);
+        return new TreeIndexIterable<>(treeIdx, ranges0).iterator();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Row enrichRow(IndexRow idxRow, Row row, int[] 
fieldColMapping) {
+        try {
+            if (idxRow.indexPlainRow())
+                return inlineIndexRow2Row(idxRow, row, fieldColMapping);
+            else
+                return desc.toRow(ectx, idxRow.cacheDataRow(), row, 
fieldColMapping);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
     }
 
     /** */
@@ -221,28 +236,23 @@ public class IndexScan<Row> extends 
AbstractCacheColumnsScan<Row> {
     }
 
     /** From IndexRow to Row convertor. */
-    protected Row indexRow2Row(IndexRow row) {
-        try {
-            if (row.indexPlainRow())
-                return inlineIndexRow2Row(row);
-            else
-                return desc.toRow(ectx, row.cacheDataRow(), factory, 
requiredColumns);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
+    protected Row indexRow2Row(IndexRow idxRow) {
+        Row row = factory.create();
+
+        return enrichRow(idxRow, row, fieldColMapping);
     }
 
     /** */
-    private Row inlineIndexRow2Row(IndexRow row) {
+    private Row inlineIndexRow2Row(IndexRow idxRow, Row row, int[] 
fieldColMapping) {
         RowHandler<Row> hnd = ectx.rowHandler();
 
-        Row res = factory.create();
-
-        for (int i = 0; i < fieldIdxMapping.length; i++)
-            hnd.set(i, res, TypeUtils.toInternal(ectx, 
row.key(fieldIdxMapping[i]).key()));
+        for (int i = 0; i < fieldColMapping.length; i++) {
+            // Skip not required fields.
+            if (fieldColMapping[i] >= 0)
+                hnd.set(i, row, TypeUtils.toInternal(ectx, 
idxRow.key(fieldIdxMapping[i]).key()));
+        }
 
-        return res;
+        return row;
     }
 
     /** Query context. */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index e1687bcbdc6..d816ed65510 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -64,6 +64,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanStorageNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanTableRowNode;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolNode;
@@ -116,6 +117,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
 import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
 import static 
org.apache.ignite.internal.processors.query.calcite.util.TypeUtils.combinedRowType;
@@ -368,10 +370,10 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
         ImmutableBitSet requiredColumns = rel.requiredColumns();
         List<SearchBounds> searchBounds = rel.searchBounds();
 
-        RelDataType rowType = rel.getDataSourceRowType();
+        RelDataType inputRowType = rel.getDataSourceRowType();
 
-        Predicate<Row> filters = condition == null ? null : 
expressionFactory.predicate(condition, rowType);
-        Function<Row, Row> prj = projects == null ? null : 
expressionFactory.project(projects, rowType);
+        Predicate<Row> filters = condition == null ? null : 
expressionFactory.predicate(condition, inputRowType);
+        Function<Row, Row> prj = projects == null ? null : 
expressionFactory.project(projects, inputRowType);
         RangeIterable<Row> ranges = searchBounds == null ? null :
             expressionFactory.ranges(searchBounds, rel.collation(), 
tbl.getRowType(typeFactory));
 
@@ -382,7 +384,8 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
         if (idx != null && !tbl.isIndexRebuildInProgress()) {
             Iterable<Row> rowsIter = idx.scan(ctx, grp, ranges, 
requiredColumns);
 
-            return new ScanStorageNode<>(tbl.name() + '.' + idx.name(), ctx, 
rowType, rowsIter, filters, prj);
+            return createStorageScan(tbl.name() + '.' + idx.name(), 
rel.getRowType(), inputRowType,
+                rowsIter, filters, prj, requiredColumns, 
rel.conditionColumns());
         }
         else {
             // Index was invalidated after planning, workaround through 
table-scan -> sort -> index spool.
@@ -402,12 +405,10 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
                 requiredColumns
             );
 
-            // If there are projects in the scan node - after the scan we 
already have target row type.
-            if (!spoolNodeRequired && projects != null)
-                rowType = rel.getRowType();
+            RelDataType rowType = projNodeRequired ? rel.getRowType() : 
inputRowType;
 
-            Node<Row> node = new ScanStorageNode<>(tbl.name(), ctx, rowType, 
rowsIter, filterHasCorrelation ? null : filters,
-                projNodeRequired ? null : prj);
+            Node<Row> node = createStorageScan(tbl.name(), rowType, 
inputRowType, rowsIter,
+                filterHasCorrelation ? null : filters, projNodeRequired ? null 
: prj, requiredColumns, rel.conditionColumns());
 
             RelCollation collation = rel.collation();
 
@@ -438,7 +439,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
                         remappedSearchBounds.add(searchBounds.get(i));
 
                     // Collation and row type are already remapped taking into 
account requiredColumns.
-                    ranges = expressionFactory.ranges(remappedSearchBounds, 
collation, rowType);
+                    ranges = expressionFactory.ranges(remappedSearchBounds, 
collation, inputRowType);
                 }
 
                 IndexSpoolNode<Row> spoolNode = IndexSpoolNode.createTreeSpool(
@@ -547,10 +548,10 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
 
         IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
 
-        RelDataType rowType = rel.getDataSourceRowType();
+        RelDataType inputRowType = rel.getDataSourceRowType();
 
-        Predicate<Row> filters = condition == null ? null : 
expressionFactory.predicate(condition, rowType);
-        Function<Row, Row> prj = projects == null ? null : 
expressionFactory.project(projects, rowType);
+        Predicate<Row> filters = condition == null ? null : 
expressionFactory.predicate(condition, inputRowType);
+        Function<Row, Row> prj = projects == null ? null : 
expressionFactory.project(projects, inputRowType);
 
         ColocationGroup grp = ctx.group(rel.sourceId());
 
@@ -559,12 +560,14 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
         if (idx != null && !tbl.isIndexRebuildInProgress()) {
             Iterable<Row> rowsIter = idx.scan(ctx, grp, null, requiredColumns);
 
-            return new ScanStorageNode<>(tbl.name() + '.' + idx.name(), ctx, 
rowType, rowsIter, filters, prj);
+            return createStorageScan(tbl.name() + '.' + idx.name(), 
rel.getRowType(), inputRowType,
+                rowsIter, filters, prj, requiredColumns, 
rel.conditionColumns());
         }
         else {
             Iterable<Row> rowsIter = tbl.scan(ctx, grp, requiredColumns);
 
-            return new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, 
filters, prj);
+            return createStorageScan(tbl.name(), rel.getRowType(), 
inputRowType, rowsIter, filters, prj,
+                requiredColumns, rel.conditionColumns());
         }
     }
 
@@ -942,4 +945,46 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
     public <T extends Node<Row>> T go(IgniteRel rel) {
         return (T)visit(rel);
     }
+
+    /** */
+    private ScanStorageNode<Row> createStorageScan(
+        String storageName,
+        RelDataType outputRowType,
+        RelDataType inputRowType,
+        Iterable<Row> rowsIter,
+        @Nullable Predicate<Row> filter,
+        @Nullable Function<Row, Row> rowTransformer,
+        @Nullable ImmutableBitSet requiredColumns,
+        @Nullable ImmutableBitSet filterColumns
+    ) {
+        int fieldsCnt = inputRowType.getFieldCount();
+
+        if (filter == null || filterColumns == null || 
filterColumns.cardinality() == fieldsCnt
+            || !(rowsIter instanceof TableRowIterable))
+            return new ScanStorageNode<>(storageName, ctx, outputRowType, 
rowsIter, filter, rowTransformer);
+
+        ImmutableBitSet reqCols = requiredColumns == null ? 
ImmutableBitSet.range(0, fieldsCnt) : requiredColumns;
+
+        int[] filterColMapping = reqCols.toArray();
+        int[] otherColMapping = filterColMapping.clone();
+
+        for (int i = 0; i < filterColMapping.length; i++) {
+            if (filterColumns.get(i))
+                otherColMapping[i] = -1;
+            else
+                filterColMapping[i] = -1;
+        }
+
+        return new ScanTableRowNode<>(
+            storageName,
+            ctx,
+            outputRowType,
+            inputRowType,
+            (TableRowIterable<Object, Row>)rowsIter,
+            filter,
+            rowTransformer,
+            filterColMapping,
+            otherColMapping
+        );
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
index 76f71f50837..897b908b376 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java
@@ -47,8 +47,8 @@ public class SystemViewScan<Row, ViewRow> implements 
Iterable<Row> {
     /** */
     private final RangeIterable<Row> ranges;
 
-    /** Participating colunms. */
-    private final ImmutableBitSet requiredColumns;
+    /** Row field to view column mapping. */
+    protected final int[] fieldColMapping;
 
     /** System view field names (for filtering). */
     private final String[] filterableFieldNames;
@@ -66,7 +66,6 @@ public class SystemViewScan<Row, ViewRow> implements 
Iterable<Row> {
         this.ectx = ectx;
         this.desc = desc;
         this.ranges = ranges;
-        this.requiredColumns = requiredColumns;
 
         RelDataType rowType = desc.rowType(ectx.getTypeFactory(), 
requiredColumns);
 
@@ -83,6 +82,11 @@ public class SystemViewScan<Row, ViewRow> implements 
Iterable<Row> {
                 }
             }
         }
+
+        ImmutableBitSet reqCols = requiredColumns == null ? 
ImmutableBitSet.range(0, rowType.getFieldCount())
+            : requiredColumns;
+
+        fieldColMapping = reqCols.toArray();
     }
 
     /** {@inheritDoc} */
@@ -123,6 +127,6 @@ public class SystemViewScan<Row, ViewRow> implements 
Iterable<Row> {
         else
             viewIter = view.iterator();
 
-        return F.iterator(viewIter, row -> desc.toRow(ectx, row, factory, 
requiredColumns), true);
+        return F.iterator(viewIter, row -> desc.toRow(ectx, row, 
factory.create(), fieldColMapping), true);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java
new file mode 100644
index 00000000000..3523419c204
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Iterator;
+
+/**
+ * Interface to iterate over raw table data and form relational node rows from 
table row columns.
+ *
+ * @param <TableRow> Raw table row type.
+ * @param <Row> Relational node row type.
+ */
+public interface TableRowIterable<TableRow, Row> extends Iterable<Row> {
+    /**
+     * @return Table row iterator.
+     */
+    public Iterator<TableRow> tableRowIterator();
+
+    /**
+     * Enriches {@code nodeRow} with columns from {@code tableRow}.
+     *
+     * @param tableRow Table row.
+     * @param nodeRow Relational node row.
+     * @param fieldColMapping Mapping from node row fields to table row 
columns. If column is not requried
+     *        corresponding field has -1 mapped value.
+     * @return Enriched relational node row.
+     */
+    public Row enrichRow(TableRow tableRow, Row nodeRow, int[] 
fieldColMapping);
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
index 46c7cde4e67..f43c7de4e5a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
@@ -27,6 +27,7 @@ import java.util.Queue;
 import java.util.function.Function;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -40,7 +41,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 /** */
-public class TableScan<Row> extends AbstractCacheColumnsScan<Row> {
+public class TableScan<Row> extends AbstractCacheColumnsScan<CacheDataRow, 
Row> {
     /** */
     protected volatile List<GridDhtLocalPartition> reservedParts;
 
@@ -56,9 +57,25 @@ public class TableScan<Row> extends 
AbstractCacheColumnsScan<Row> {
 
     /** {@inheritDoc} */
     @Override protected Iterator<Row> createIterator() {
+        return F.iterator((Iterator<CacheDataRow>)new IteratorImpl(),
+            row -> enrichRow(row, factory.create(), fieldColMapping), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Iterator<CacheDataRow> createTableRowIterator() {
         return new IteratorImpl();
     }
 
+    /** */
+    @Override public Row enrichRow(CacheDataRow cacheDataRow, Row row, int[] 
fieldColMapping) {
+        try {
+            return desc.toRow(ectx, cacheDataRow, row, fieldColMapping);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override protected void processReservedTopology(GridDhtPartitionTopology 
top) {
         List<GridDhtLocalPartition> reservedParts = new 
ArrayList<>(parts.cardinality());
@@ -72,7 +89,7 @@ public class TableScan<Row> extends 
AbstractCacheColumnsScan<Row> {
     /**
      * Table scan iterator.
      */
-    private class IteratorImpl extends GridIteratorAdapter<Row> {
+    private class IteratorImpl extends GridIteratorAdapter<CacheDataRow> {
         /** */
         private final Queue<GridDhtLocalPartition> parts;
 
@@ -86,7 +103,7 @@ public class TableScan<Row> extends 
AbstractCacheColumnsScan<Row> {
         private Iterator<CacheDataRow> txIter = Collections.emptyIterator();
 
         /** */
-        private Row next;
+        private CacheDataRow next;
 
         /** */
         private IteratorImpl() {
@@ -114,13 +131,13 @@ public class TableScan<Row> extends 
AbstractCacheColumnsScan<Row> {
         }
 
         /** {@inheritDoc} */
-        @Override public Row nextX() throws IgniteCheckedException {
+        @Override public CacheDataRow nextX() throws IgniteCheckedException {
             advance();
 
             if (next == null)
                 throw new NoSuchElementException();
 
-            Row next = this.next;
+            CacheDataRow next = this.next;
 
             this.next = null;
 
@@ -170,7 +187,7 @@ public class TableScan<Row> extends 
AbstractCacheColumnsScan<Row> {
                     if (!desc.match(row))
                         continue;
 
-                    next = desc.toRow(ectx, row, factory, requiredColumns);
+                    next = row;
 
                     break;
                 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index ca6469b09e6..80201b26466 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -34,13 +34,13 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
     private final Iterable<Row> src;
 
     /** */
-    @Nullable private final Predicate<Row> filter;
+    @Nullable protected final Predicate<Row> filter;
 
     /** */
-    @Nullable private final Function<Row, Row> rowTransformer;
+    @Nullable protected final Function<Row, Row> rowTransformer;
 
     /** */
-    private Iterator<Row> it;
+    protected Iterator<?> it;
 
     /** */
     private int requested;
@@ -146,25 +146,43 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
         }
     }
 
+    /**
+     * @return Next row, or {@code null} if row was filtered out.
+     */
+    protected @Nullable Row processNextRow() {
+        Row r = (Row)it.next();
+
+        if (filter == null || filter.test(r)) {
+            if (rowTransformer != null)
+                r = rowTransformer.apply(r);
+
+            return r;
+        }
+
+        return null;
+    }
+
+    /** */
+    protected Iterator<?> sourceIterator() {
+        return src.iterator();
+    }
+
     /**
      * @return Count of processed rows.
      */
     protected int processNextBatch() throws Exception {
         if (it == null)
-            it = src.iterator();
+            it = sourceIterator();
 
         int processed = 0;
         while (requested > 0 && it.hasNext()) {
             checkState();
 
-            Row r = it.next();
+            Row r = processNextRow();
 
-            if (filter == null || filter.test(r)) {
+            if (r != null) {
                 requested--;
 
-                if (rowTransformer != null)
-                    r = rowTransformer.apply(r);
-
                 downstream().push(r);
             }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java
new file mode 100644
index 00000000000..0405292f32f
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Iterator;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.TableRowIterable;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Scan table rows node. With ability to request part of table row columns for 
filtering
+ * ({@code ScanStorageNode} can only request full set of requred table row 
columns).
+ */
+public class ScanTableRowNode<TableRow, Row> extends ScanStorageNode<Row> {
+    /** */
+    private final TableRowIterable<TableRow, Row> src;
+
+    /** */
+    private final int[] filterColMapping;
+
+    /** */
+    private final int[] otherColMapping;
+
+    /** */
+    private final RowHandler.RowFactory<Row> factory;
+
+    /** */
+    private Row curRow;
+
+    /**
+     * @param storageName Storage (index or table) name.
+     * @param ctx Execution context.
+     * @param outputRowType Output row type.
+     * @param inputRowType Input row type.
+     * @param src Source.
+     * @param filter Row filter.
+     * @param rowTransformer Row transformer (projection).
+     * @param filterColMapping Fields to columns mapping for fields used in 
filter.
+     * @param otherColMapping Fields to columns mapping for other fields.
+     */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    public ScanTableRowNode(
+        String storageName,
+        ExecutionContext<Row> ctx,
+        RelDataType outputRowType,
+        RelDataType inputRowType,
+        TableRowIterable<TableRow, Row> src,
+        Predicate<Row> filter,
+        @Nullable Function<Row, Row> rowTransformer,
+        int[] filterColMapping,
+        int[] otherColMapping
+    ) {
+        super(storageName, ctx, outputRowType, src, filter, rowTransformer);
+
+        assert filter != null;
+
+        factory = ctx.rowHandler().factory(ctx.getTypeFactory(), inputRowType);
+
+        this.src = src;
+        this.filterColMapping = filterColMapping;
+        this.otherColMapping = otherColMapping;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Iterator<?> sourceIterator() {
+        return src.tableRowIterator();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable Row processNextRow() {
+        Row row = curRow == null ? factory.create() : curRow;
+
+        TableRow tableRow = (TableRow)it.next();
+
+        src.enrichRow(tableRow, row, filterColMapping);
+
+        if (filter.test(row)) {
+            src.enrichRow(tableRow, row, otherColMapping);
+
+            if (rowTransformer != null)
+                row = rowTransformer.apply(row);
+
+            curRow = null;
+
+            return row;
+        }
+
+        return null;
+    }
+
+    /** */
+    @Override public void closeInternal() {
+        super.closeInternal();
+
+        curRow = null;
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
index 208e8b70e86..9c41fdd9e3d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
@@ -63,6 +63,9 @@ public abstract class ProjectableFilterableTableScan extends 
TableScan {
     /** Participating columns. */
     protected final ImmutableBitSet requiredColumns;
 
+    /** Columns used by condition. */
+    protected ImmutableBitSet conditionColumns;
+
     /** Required columns from table row type (No need to be serialized, for 
caching only). */
     protected RelDataType dataSourceRowType;
 
@@ -208,4 +211,25 @@ public abstract class ProjectableFilterableTableScan 
extends TableScan {
 
         return new RelColumnOrigin(getTable(), originColIdx, false);
     }
+
+    /** */
+    public @Nullable ImmutableBitSet conditionColumns() {
+        if (condition == null)
+            return null;
+
+        if (conditionColumns == null) {
+            ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
+
+            new RexShuttle() {
+                @Override public RexNode visitLocalRef(RexLocalRef inputRef) {
+                    builder.set(inputRef.getIndex());
+                    return inputRef;
+                }
+            }.apply(condition);
+
+            conditionColumns = builder.build();
+        }
+
+        return conditionColumns;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
index 22f6c2dcb8f..2cd164bc815 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
@@ -70,7 +70,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
@@ -251,36 +250,28 @@ public class CacheTableDescriptorImpl extends 
NullInitializerExpressionFactory
     /** {@inheritDoc} */
     @Override public <Row> Row toRow(
         ExecutionContext<Row> ectx,
-        CacheDataRow row,
-        RowHandler.RowFactory<Row> factory,
-        @Nullable ImmutableBitSet requiredColumns
+        CacheDataRow tableRow,
+        Row row,
+        int[] fieldColMapping
     ) throws IgniteCheckedException {
-        RowHandler<Row> hnd = factory.handler();
+        RowHandler<Row> hnd = ectx.rowHandler();
 
-        assert hnd == ectx.rowHandler();
+        assert hnd.columnCount(row) == fieldColMapping.length :
+            "Unexpected row column count: " + hnd.columnCount(row) + " 
expected: " + fieldColMapping.length;
 
-        Row res = factory.create();
+        for (int i = 0; i < fieldColMapping.length; i++) {
+            int colIdx = fieldColMapping[i];
 
-        assert hnd.columnCount(res) == (requiredColumns == null ? 
descriptors.length : requiredColumns.cardinality());
+            // Skip not required fields.
+            if (colIdx < 0)
+                continue;
 
-        if (requiredColumns == null) {
-            for (int i = 0; i < descriptors.length; i++) {
-                CacheColumnDescriptor desc = descriptors[i];
+            CacheColumnDescriptor desc = descriptors[colIdx];
 
-                hnd.set(i, res, TypeUtils.toInternal(ectx,
-                    desc.value(ectx, cacheContext(), row), 
desc.storageType()));
-            }
-        }
-        else {
-            for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = 
requiredColumns.nextSetBit(j + 1), i++) {
-                CacheColumnDescriptor desc = descriptors[j];
-
-                hnd.set(i, res, TypeUtils.toInternal(ectx,
-                    desc.value(ectx, cacheContext(), row), 
desc.storageType()));
-            }
+            hnd.set(i, row, TypeUtils.toInternal(ectx, desc.value(ectx, 
cacheContext(), tableRow), desc.storageType()));
         }
 
-        return res;
+        return row;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
index 80d7f95b337..ab89ce303fb 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
@@ -40,7 +40,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.systemview.view.SystemView;
 import org.apache.ignite.spi.systemview.view.SystemViewRowAttributeWalker;
-import org.jetbrains.annotations.Nullable;
 
 import static org.apache.calcite.rel.type.RelDataType.PRECISION_NOT_SPECIFIED;
 import static org.apache.calcite.rel.type.RelDataType.SCALE_NOT_SPECIFIED;
@@ -107,24 +106,28 @@ public class SystemViewTableDescriptorImpl<ViewRow> 
extends NullInitializerExpre
     /** {@inheritDoc} */
     @Override public <Row> Row toRow(
         ExecutionContext<Row> ectx,
-        ViewRow row,
-        RowHandler.RowFactory<Row> factory,
-        @Nullable ImmutableBitSet requiredColumns
+        ViewRow viewRow,
+        Row row,
+        int[] fieldColMapping
     ) {
-        RowHandler<Row> hnd = factory.handler();
+        RowHandler<Row> hnd = ectx.rowHandler();
 
-        assert hnd == ectx.rowHandler();
+        assert hnd.columnCount(row) == fieldColMapping.length :
+            "Unexpected row column count: " + hnd.columnCount(row) + " 
expected: " + fieldColMapping.length;
 
-        Row res = factory.create();
-
-        assert hnd.columnCount(res) == (requiredColumns == null ? 
descriptors.length : requiredColumns.cardinality());
-
-        sysView.walker().visitAll(row, new 
SystemViewRowAttributeWalker.AttributeWithValueVisitor() {
+        sysView.walker().visitAll(viewRow, new 
SystemViewRowAttributeWalker.AttributeWithValueVisitor() {
             private int colIdx;
 
             @Override public <T> void accept(int idx, String name, Class<T> 
clazz, T val) {
-                if (requiredColumns == null || requiredColumns.get(idx))
-                    hnd.set(colIdx++, res, TypeUtils.toInternal(ectx, val, 
descriptors[idx].storageType()));
+                // Assume fieldColMapping derived from required columns and 
sorted.
+                // Assume 'accept' is called with increasing 'idx'.
+
+                // Skip not required fields.
+                while (colIdx < fieldColMapping.length && 
fieldColMapping[colIdx] < 0)
+                    colIdx++;
+
+                if (colIdx < fieldColMapping.length && fieldColMapping[colIdx] 
== idx)
+                    hnd.set(colIdx++, row, TypeUtils.toInternal(ectx, val, 
descriptors[idx].storageType()));
             }
 
             @Override public void acceptBoolean(int idx, String name, boolean 
val) {
@@ -160,7 +163,7 @@ public class SystemViewTableDescriptorImpl<ViewRow> extends 
NullInitializerExpre
             }
         });
 
-        return res;
+        return row;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
index af3b72a382b..4c2b82f548a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
@@ -26,12 +26,10 @@ import 
org.apache.calcite.sql2rel.InitializerExpressionFactory;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.IgniteCheckedException;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -94,19 +92,20 @@ public interface TableDescriptor<TableRow> extends 
RelProtoDataType, Initializer
     boolean isUpdateAllowed(RelOptTable tbl, int colIdx);
 
     /**
-     * Converts a cache row to relational node row.
+     * Converts a table row to relational node row.
      *
      * @param ectx Execution context.
-     * @param row Cache row.
-     * @param requiredColumns Participating columns.
+     * @param tableRow Table row.
+     * @param row Relational node row.
+     * @param fieldColMapping Mapping from row fields to table columns.
      * @return Relational node row.
      * @throws IgniteCheckedException If failed.
      */
     <Row> Row toRow(
         ExecutionContext<Row> ectx,
-        TableRow row,
-        RowHandler.RowFactory<Row> factory,
-        @Nullable ImmutableBitSet requiredColumns
+        TableRow tableRow,
+        Row row,
+        int[] fieldColMapping
     ) throws IgniteCheckedException;
 
     /**
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
index 1afaf700552..6e4419be92c 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.function.Predicate;
@@ -37,11 +38,13 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.mapping.Mappings;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.CollectNode;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanTableRowNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
@@ -51,6 +54,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryCont
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexBound;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexCount;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import 
org.apache.ignite.internal.processors.query.calcite.schema.CacheIndexImpl;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
@@ -59,6 +63,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 import static org.apache.calcite.tools.Frameworks.createRootSchema;
@@ -375,6 +380,53 @@ public class LogicalRelImplementorTest extends 
GridCommonAbstractTest {
         checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, 
isScanNoFilterNoProject);
     }
 
+    /** */
+    @Test
+    public void testScanTableRow() {
+        RelCollation idxCollation = 
TraitUtils.createCollation(Collections.singletonList(2));
+        tbl.addIndex(new ScannableTestIndex(idxCollation, 
QueryUtils.PRIMARY_KEY_INDEX, tbl));
+
+        RelDataType rowType = tbl.getRowType(tf);
+        RelDataType sqlTypeInt = rowType.getFieldList().get(2).getType();
+        RelDataType sqlTypeVarchar = rowType.getFieldList().get(3).getType();
+
+        List<RexNode> project = 
F.asList(rexBuilder.makeLocalRef(sqlTypeVarchar, 1));
+
+        RexNode filterOneField = rexBuilder.makeCall(
+            SqlStdOperatorTable.EQUALS,
+            rexBuilder.makeLocalRef(sqlTypeInt, 0),
+            rexBuilder.makeLiteral(1, sqlTypeInt)
+        );
+
+        RexNode filterTwoFields = rexBuilder.makeCall(
+            SqlStdOperatorTable.EQUALS,
+            rexBuilder.makeCast(sqlTypeVarchar, 
rexBuilder.makeLocalRef(sqlTypeInt, 0)),
+            rexBuilder.makeLocalRef(sqlTypeVarchar, 1)
+        );
+
+        ImmutableBitSet requiredColumns = ImmutableBitSet.of(2, 3);
+
+        IgniteIndexScan scan = new IgniteIndexScan(
+            cluster,
+            cluster.traitSet(),
+            qctx.catalogReader().getTable(F.asList("PUBLIC", "TBL")),
+            QueryUtils.PRIMARY_KEY_INDEX,
+            project,
+            filterOneField,
+            RexUtils.buildSortedSearchBounds(cluster, idxCollation, 
filterOneField, rowType, requiredColumns),
+            requiredColumns,
+            idxCollation
+        );
+
+        // Not all fields participating in filter, it worth to create 
ScanTableRowNode.
+        checkNodesChain(relImplementor, scan, node -> node instanceof 
ScanTableRowNode);
+
+        scan = createScan(scan, idxCollation, project, filterTwoFields, 
requiredColumns);
+
+        // All fields participating in filter, regular ScanStorageNode should 
be created.
+        checkNodesChain(relImplementor, scan, node -> !(node instanceof 
ScanTableRowNode));
+    }
+
     /** */
     private IgniteIndexScan createScan(
         IgniteIndexScan templateScan,
@@ -445,4 +497,34 @@ public class LogicalRelImplementorTest extends 
GridCommonAbstractTest {
             return Collections.emptyList();
         }
     }
+
+    /** */
+    private static class ScannableTestIndex extends CacheIndexImpl {
+        /** */
+        public ScannableTestIndex(RelCollation collation, String name, 
TestTable tbl) {
+            super(collation, name, null, tbl);
+        }
+
+        /** {@inheritDoc} */
+        @Override public <Row> Iterable<Row> scan(
+            ExecutionContext<Row> execCtx,
+            ColocationGroup grp,
+            RangeIterable<Row> ranges,
+            @Nullable ImmutableBitSet requiredColumns
+        ) {
+            return new TableRowIterable<>() {
+                @Override public Iterator<Row> iterator() {
+                    return Collections.emptyIterator();
+                }
+
+                @Override public Iterator<Object> tableRowIterator() {
+                    return Collections.emptyIterator();
+                }
+
+                @Override public Row enrichRow(Object o, Row nodeRow, int[] 
fieldColMapping) {
+                    return null;
+                }
+            };
+        }
+    }
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowExecutionTest.java
new file mode 100644
index 00000000000..f88e46d1a3c
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowExecutionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.TableRowIterable;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+/**
+ * Execution test for ScanTableRowNode.
+ */
+public class ScanTableRowExecutionTest extends AbstractExecutionTest {
+    /** */
+    @Test
+    public void testScanTableRow() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, 
String.class, Boolean.class);
+
+        List<Object[]> data = F.asList(
+            new Object[] {0, "0", true},
+            new Object[] {1, "1", false},
+            new Object[] {2, "2", true},
+            new Object[] {3, "3", false},
+            new Object[] {4, "4", true}
+        );
+
+        TableRowIterable<Object[], Object[]> it = new TableRowIterable<>() {
+            @Override public Iterator<Object[]> tableRowIterator() {
+                return data.iterator();
+            }
+
+            @Override public Object[] enrichRow(Object[] tableRow, Object[] 
nodeRow, int[] fieldColMapping) {
+                for (int i = 0; i < fieldColMapping.length; i++) {
+                    if (fieldColMapping[i] >= 0)
+                        nodeRow[i] = tableRow[fieldColMapping[i]];
+                }
+
+                // Due to test invariant we can't return row enriched with 
FALSE in the field 2.
+                assertNotSame(Boolean.FALSE, nodeRow[2]);
+
+                return nodeRow;
+            }
+
+            @Override public Iterator<Object[]> iterator() {
+                throw new AssertionError("Unexpected call");
+            }
+        };
+
+        ScanTableRowNode<Object[], Object[]> scan = new ScanTableRowNode<>(
+            "test",
+            ctx,
+            rowType, // Output row type.
+            rowType, // Input row type.
+            it, // Iterator.
+            r -> ((int)r[0] & 1) == 0, // Filter.
+            r -> new Object[] {((int)r[0]) * 2, r[1], r[2]}, // Project.
+            new int[] {0, -1, -1}, // Filter columns mapping.
+            new int[] {-1, 1, 2} // Other columns mapping.
+        );
+
+        RootNode<Object[]> root = new RootNode<>(ctx, rowType);
+        root.register(scan);
+
+        List<Object[]> res = new ArrayList<>();
+
+        while (root.hasNext())
+            res.add(root.next());
+
+        List<Object[]> exp = F.asList(
+            new Object[] {0, "0", true},
+            new Object[] {4, "2", true},
+            new Object[] {8, "4", true}
+        );
+
+        assertEquals(exp.size(), res.size());
+
+        for (int i = 0; i < exp.size(); i++)
+            assertEqualsArraysAware(exp.get(i), res.get(i));
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
index 744c351ddf6..a4e5ab611e8 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
@@ -61,7 +61,6 @@ import 
org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
@@ -411,8 +410,12 @@ public class TestTable implements IgniteCacheTable, 
Wrapper {
         }
 
         /** {@inheritDoc} */
-        @Override public <Row> Row toRow(ExecutionContext<Row> ectx, 
CacheDataRow row, RowHandler.RowFactory<Row> factory,
-            @Nullable ImmutableBitSet requiredColumns) {
+        @Override public <Row> Row toRow(
+            ExecutionContext<Row> ectx,
+            CacheDataRow cacheDataRow,
+            Row outputRow,
+            int[] fieldColMapping
+        ) {
             throw new AssertionError();
         }
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index bd6f6eb2258..c8fbb9d90b1 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -29,6 +29,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitExecuti
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusExecutionTest;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanTableRowExecutionTest;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateExecutionTest;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.SortedIndexSpoolExecutionTest;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolExecutionTest;
@@ -38,7 +39,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
 /**
- * Calcite tests.
+ * Calcite execution tests.
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
@@ -59,6 +60,7 @@ import org.junit.runners.Suite;
     LimitExecutionTest.class,
     TimeCalculationExecutionTest.class,
     UncollectExecutionTest.class,
+    ScanTableRowExecutionTest.class,
 })
 public class ExecutionTestSuite {
 }

Reply via email to