alex-plekhanov commented on code in PR #11438:
URL: https://github.com/apache/ignite/pull/11438#discussion_r1756358365


##########
modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite3.java:
##########
@@ -375,7 +376,9 @@
     CacheEventsCdcTest.class,
     CdcIndexRebuildTest.class,
 
-    DumpCacheConfigTest.class
+    DumpCacheConfigTest.class,
+
+    H2TransactionAwareQueriesEnabledTest.class

Review Comment:
   Let's add comma at the EOL



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java:
##########
@@ -3050,24 +3061,35 @@ public List<FieldsQueryCursor<List<?>>> querySqlFields(
                                 failOnMultipleStmts
                             );
 
+                            QueryContext qryCtx = QueryContext.of(
+                                qry,
+                                cliCtx,
+                                cancel,
+                                qryProps,
+                                userTx == null ? null : userTx.xidVersion()
+                            );
+
                             if (qry instanceof SqlFieldsQueryEx && 
((SqlFieldsQueryEx)qry).isBatched()) {
                                 res = qryEngine.queryBatched(
-                                    QueryContext.of(qry, cliCtx, cancel, 
qryProps),
+                                    qryCtx,
                                     schemaName,
                                     qry.getSql(),
                                     ((SqlFieldsQueryEx)qry).batchedArguments()
                                 );
                             }
                             else {
                                 res = qryEngine.query(
-                                    QueryContext.of(qry, cliCtx, cancel, 
qryProps),
+                                    qryCtx,
                                     schemaName,
                                     qry.getSql(),
                                     qry.getArgs() != null ? qry.getArgs() : 
X.EMPTY_OBJECT_ARRAY
                                 );
                             }
                         }
                         else {
+                            if (userTx != null && txAwareQueriesEnabled)
+                                throw new CacheException("SQL aware queries 
supported only for Calcite query engine");

Review Comment:
   `SQL aware queries are not supported by Indexing query engine`
   Also, not sure about `CacheException`. Perhaps, `IgniteSQLException` with 
SqlStateCode.UNSUPPORTED_OPERATION?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java:
##########
@@ -245,11 +246,28 @@ else if (c2 != HIGHEST_VALUE)
     /** */
     @SuppressWarnings("rawtypes")
     private static int compare(Object o1, Object o2, int nullComparison) {
+        if (o1 instanceof BinaryObjectImpl)
+            return compareBinary(o1, o2, nullComparison);
+
         final Comparable c1 = (Comparable)o1;
         final Comparable c2 = (Comparable)o2;
         return RelFieldCollation.compare(c1, c2, nullComparison);
     }
 
+    /** */
+    private static int compareBinary(Object o1, Object o2, int nullComparison) 
{
+        if (o1 == o2) {
+            return 0;
+        }
+        else if (o1 == null) {
+            return nullComparison;
+        }
+        else if (o2 == null) {

Review Comment:
   Redundant braces



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ListCursor.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Comparator;
+import java.util.List;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cursor to navigate through a sorted list with duplicates.
+ */
+public class ListCursor<Row> implements GridCursor<Row> {

Review Comment:
   Something like `SortedListRangeCursor`?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/FilteredCursor.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.lang.GridCursor;
+
+/**
+ * Cursor wrapper that skips all entires that maps to any of {@code skipKeys} 
key.
+ * <b>Note, for the performance reasons content of {@code skipKeys} will be 
changed during iteration.</b>
+ */
+class FilteredCursor<R> implements GridCursor<R> {

Review Comment:
   "Filtered" usually assume some custom predicate to filter. Here we have 
predefined key predicate, so I think it's worth to add "Key" to class name. 
Something like "KeyFilteringCursor".  



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/FilteredCursor.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.lang.GridCursor;
+
+/**
+ * Cursor wrapper that skips all entires that maps to any of {@code skipKeys} 
key.
+ * <b>Note, for the performance reasons content of {@code skipKeys} will be 
changed during iteration.</b>
+ */
+class FilteredCursor<R> implements GridCursor<R> {
+    /** Sorted cursor. */

Review Comment:
   It can be unsorted as well



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java:
##########
@@ -245,11 +246,28 @@ else if (c2 != HIGHEST_VALUE)
     /** */
     @SuppressWarnings("rawtypes")
     private static int compare(Object o1, Object o2, int nullComparison) {
+        if (o1 instanceof BinaryObjectImpl)
+            return compareBinary(o1, o2, nullComparison);
+
         final Comparable c1 = (Comparable)o1;
         final Comparable c2 = (Comparable)o2;
         return RelFieldCollation.compare(c1, c2, nullComparison);
     }
 
+    /** */
+    private static int compareBinary(Object o1, Object o2, int nullComparison) 
{
+        if (o1 == o2) {

Review Comment:
   Redundant braces



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java:
##########
@@ -153,60 +164,133 @@ public Index queryIndex() {
 
     /** {@inheritDoc} */
     @Override public long count(ExecutionContext<?> ectx, ColocationGroup grp, 
boolean notNull) {
+        if (idx == null || !grp.nodeIds().contains(ectx.localNodeId()))
+            return 0L;
+
+        int[] locParts = grp.partitions(ectx.localNodeId());
+
+        IndexingQueryFilter filter = new 
IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(),
+            ectx.topologyVersion(), locParts);
+
+        InlineIndex iidx = idx.unwrap(InlineIndex.class);
+
+        TreeRowClosure<IndexRow, IndexRow> rowFilter = countRowFilter(notNull, 
iidx);
+
         long cnt = 0;
 
-        if (idx != null && grp.nodeIds().contains(ectx.localNodeId())) {
-            IndexingQueryFilter filter = new 
IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(),
-                ectx.topologyVersion(), grp.partitions(ectx.localNodeId()));
+        if (!F.isEmpty(ectx.getTxWriteEntries())) {
+            IgniteBiTuple<Set<KeyCacheObject>, List<CacheDataRow>> txChanges = 
transactionData(
+                ectx.getTxWriteEntries(),
+                iidx.indexDefinition().cacheInfo().cacheId(),
+                locParts,
+                Function.identity()
+            );
 
-            InlineIndex iidx = idx.unwrap(InlineIndex.class);
+            if (!txChanges.get1().isEmpty()) {
+                // This call will change `txChanges.get1()` content.
+                // Removing found key from set more efficient so we break some 
rules here.
+                rowFilter = transactionAwareCountRowFilter(rowFilter, 
txChanges.get1());
 
-            BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = null;
+                cnt = countTransactionRows(iidx, txChanges.get2());
+            }
+        }
 
-            boolean checkExpired = 
!tbl.descriptor().cacheContext().config().isEagerTtl();
+        try {
+            for (int i = 0; i < iidx.segmentsCount(); ++i)
+                cnt += iidx.count(i, new IndexQueryContext(filter, rowFilter));

Review Comment:
   Looks like there was an error, we should reset `skipCheck` for each segment



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java:
##########
@@ -245,11 +246,28 @@ else if (c2 != HIGHEST_VALUE)
     /** */
     @SuppressWarnings("rawtypes")
     private static int compare(Object o1, Object o2, int nullComparison) {
+        if (o1 instanceof BinaryObjectImpl)
+            return compareBinary(o1, o2, nullComparison);
+
         final Comparable c1 = (Comparable)o1;
         final Comparable c2 = (Comparable)o2;
         return RelFieldCollation.compare(c1, c2, nullComparison);
     }
 
+    /** */
+    private static int compareBinary(Object o1, Object o2, int nullComparison) 
{
+        if (o1 == o2) {
+            return 0;
+        }
+        else if (o1 == null) {

Review Comment:
   Redundant braces



##########
modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/SegmentedIndexCursor.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cache.query.index.sorted.inline;
+
+import java.util.Comparator;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.cache.query.index.SortOrder;
+import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
+import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
+import org.apache.ignite.internal.cache.query.index.sorted.IndexRowComparator;
+import 
org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition;
+import org.apache.ignite.internal.util.lang.GridCursor;
+
+/** Single cursor over multiple segments. The next value is chosen with the 
index row comparator. */
+public class SegmentedIndexCursor implements GridCursor<IndexRow> {

Review Comment:
   Perhaps we should also add `Sorted` to the class name. Up to you.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java:
##########
@@ -389,10 +453,12 @@ private static class InlineIndexRowFactory implements 
BPlusTree.TreeRowFactory<I
         /** */
         private InlineIndexRowFactory(
             InlineIndexKeyType[] keyTypes,
-            InlineIndexRowHandler idxRowHnd
+            InlineIndexRowHandler idxRowHnd,
+            boolean useCacheRow

Review Comment:
   useCacheRow is redundant. Instead of this flag null InlineIndexRowFactory 
can be used



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ListCursor.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Comparator;
+import java.util.List;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cursor to navigate through a sorted list with duplicates.
+ */
+public class ListCursor<Row> implements GridCursor<Row> {
+    /** */
+    private final Comparator<Row> comp;
+
+    /** List of rows. */
+    private final List<Row> rows;
+
+    /** Upper bound. */
+    private final Row upper;
+
+    /** Include upper bound. */
+    private final boolean includeUpper;
+
+    /** Current row. */
+    private Row row;
+
+    /** Current index of list element. */
+    private int idx;
+
+    /**
+     * @param comp Rows comparator.
+     * @param rows List of rows.
+     * @param lower Lower bound.
+     * @param upper Upper bound.
+     * @param lowerInclude {@code True} for inclusive lower bound.
+     * @param upperInclude {@code True} for inclusive upper bound.
+     */
+    public ListCursor(
+        Comparator<Row> comp,
+        List<Row> rows,
+        @Nullable Row lower,
+        @Nullable Row upper,
+        boolean lowerInclude,
+        boolean upperInclude
+    ) {
+        this.comp = comp;
+        this.rows = rows;
+        this.upper = upper;
+        this.includeUpper = upperInclude;
+
+        idx = lower == null ? 0 : lowerBound(rows, lower, lowerInclude);
+    }
+
+    /**
+     * Searches the lower bound (skipping duplicates) using a binary search.
+     *
+     * @param rows List of rows.
+     * @param bound Lower bound.
+     * @return Lower bound position in the list.
+     */
+    private int lowerBound(List<Row> rows, Row bound, boolean includeBound) {
+        int low = 0, high = rows.size() - 1, idx = -1;
+
+        while (low <= high) {
+            int mid = (high - low) / 2 + low;
+            int compRes = comp.compare(rows.get(mid), bound);
+
+            if (compRes > 0)
+                high = mid - 1;
+            else if (compRes == 0 && includeBound) {
+                idx = mid;
+                high = mid - 1;
+            }
+            else
+                low = mid + 1;
+        }
+
+        return idx == -1 ? low : idx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean next() {
+        if (idx == rows.size() || (upper != null && 
-comp.compare(rows.get(idx), upper) < (includeUpper ? 0 : 1)))

Review Comment:
   Why comp.compare condition was reversed? Looks like it works the same way, 
but a little bit more complicated to understand.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java:
##########
@@ -507,4 +596,64 @@ protected TreeIndexWrapper(InlineIndex idx) {
             }
         }
     }
+
+    /**
+     * @param entries Entries changed in transaction.
+     * @param cacheId Cache id.
+     * @param parts Partitions set.
+     * @param mapper Mapper to specific data type.
+     * @return First, set of object changed in transaction, second, list of 
transaction data in required format.
+     * @param <R> Required type.
+     */
+    public static <R> IgniteBiTuple<Set<KeyCacheObject>, List<R>> 
transactionData(
+        Collection<IgniteTxEntry> entries,
+        int cacheId,
+        int[] parts,
+        Function<CacheDataRow, R> mapper
+    ) {
+        if (F.isEmpty(entries))
+            return F.t(Collections.emptySet(), Collections.emptyList());
+
+        // Expecting parts are sorted or almost sorted and amount of 
transaction entries are relatively small.
+        if (parts != null)
+            Arrays.sort(parts);
+
+        Set<KeyCacheObject> changedKeys = new HashSet<>(entries.size());
+        List<R> newAndUpdatedRows = new ArrayList<>(entries.size());
+
+        for (IgniteTxEntry e : entries) {
+            int part = e.key().partition();
+
+            assert part != -1;
+
+            if (e.cacheId() != cacheId)
+                continue;
+
+            if (parts != null && Arrays.binarySearch(parts, part) < 0)

Review Comment:
   Parts as BitSet? Will be faster than sorting and binary search



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java:
##########
@@ -362,8 +422,12 @@ private synchronized void release() {
 
         InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler();
 
-        InlineIndexRowFactory rowFactory = isInlineScan() ?
-            new InlineIndexRowFactory(rowHnd.inlineIndexKeyTypes().toArray(new 
InlineIndexKeyType[0]), rowHnd) : null;
+        InlineIndexRowFactory rowFactory = isInlineScan()

Review Comment:
   isInlineScan() && txChanges != null && !F.isEmpty(txChanges.get1())



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java:
##########
@@ -3201,27 +3223,43 @@ private QueryEngine engineForQuery(SqlClientContext 
cliCtx, SqlFieldsQuery qry)
      * Execute query setting busy lock, preserving current cache context and 
properly handling checked exceptions.
      *
      * @param cctx Cache context.
+     * @param userTx User transaction.
      * @param supplier Code to be executed.
      * @return Result.
      */
-    private <T> T executeQuerySafe(@Nullable final GridCacheContext<?, ?> 
cctx, GridPlainOutClosure<T> supplier) {
+    private <T> T executeQuerySafe(
+        @Nullable final GridCacheContext<?, ?> cctx,
+        @Nullable final GridNearTxLocal userTx,
+        GridPlainOutClosure<T> supplier
+    ) {
         GridCacheContext oldCctx = curCache.get();
 
         curCache.set(cctx);
 
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
-
         try {
-            return supplier.apply();
+            if (!busyLock.enterBusy())
+                throw new IllegalStateException("Failed to execute query (grid 
is stopping).");
+
+            if (userTx != null)
+                userTx.suspend();
+
+            try {
+                return supplier.apply();
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+            finally {
+                curCache.set(oldCctx);
+
+                busyLock.leaveBusy();
+
+                if (userTx != null)
+                    userTx.resume();

Review Comment:
   Do we have guarantee that DML statement is always executed synchroniously, 
and there can't be situation, when tx is resumed in users thread, but this tx 
is also used by DML statement after cursor is created? I see 
`cur.iterator().hasNext()` in `executePlan` method, perhaps it makes execution 
of DML sync (because regular queries are executed async). 



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java:
##########
@@ -85,7 +88,19 @@ public ExecutionPlan map(MappingService mappingService, 
MappingQueryContext ctx)
                 else
                     ex.addSuppressed(e);
 
-                fragments = replace(fragments, e.fragment(), new 
FragmentSplitter(e.node()).go(e.fragment()));
+                RelNode cutPoint = e.node();
+
+                // TableModify inside transaction must be executed locally.
+                boolean forceLocTableModify = 
Commons.queryTransactionVersion(ctx) != null && cutPoint instanceof 
IgniteTableModify;
+
+                if (forceLocTableModify)
+                    cutPoint = ((SingleRel)cutPoint).getInput(); // Cuts 
TableScan instead of TableModification.

Review Comment:
   After this split we have:
   fragment 1 with TableModify and broadcast distribution 
   fragment 2 with sender to broadcast distribution
   It's not correct, since second fragment will send data to all nodes and 
there are some hacks required to change TableModify (mapLocalTableModify)
   



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java:
##########
@@ -507,4 +596,64 @@ protected TreeIndexWrapper(InlineIndex idx) {
             }
         }
     }
+
+    /**
+     * @param entries Entries changed in transaction.
+     * @param cacheId Cache id.
+     * @param parts Partitions set.
+     * @param mapper Mapper to specific data type.
+     * @return First, set of object changed in transaction, second, list of 
transaction data in required format.
+     * @param <R> Required type.
+     */
+    public static <R> IgniteBiTuple<Set<KeyCacheObject>, List<R>> 
transactionData(

Review Comment:
   Move method to the ExecutionContext? To avoid access it from another classes.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java:
##########
@@ -153,60 +164,133 @@ public Index queryIndex() {
 
     /** {@inheritDoc} */
     @Override public long count(ExecutionContext<?> ectx, ColocationGroup grp, 
boolean notNull) {
+        if (idx == null || !grp.nodeIds().contains(ectx.localNodeId()))
+            return 0L;
+
+        int[] locParts = grp.partitions(ectx.localNodeId());
+
+        IndexingQueryFilter filter = new 
IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(),
+            ectx.topologyVersion(), locParts);
+
+        InlineIndex iidx = idx.unwrap(InlineIndex.class);
+
+        TreeRowClosure<IndexRow, IndexRow> rowFilter = countRowFilter(notNull, 
iidx);
+
         long cnt = 0;
 
-        if (idx != null && grp.nodeIds().contains(ectx.localNodeId())) {
-            IndexingQueryFilter filter = new 
IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(),
-                ectx.topologyVersion(), grp.partitions(ectx.localNodeId()));
+        if (!F.isEmpty(ectx.getTxWriteEntries())) {
+            IgniteBiTuple<Set<KeyCacheObject>, List<CacheDataRow>> txChanges = 
transactionData(
+                ectx.getTxWriteEntries(),
+                iidx.indexDefinition().cacheInfo().cacheId(),
+                locParts,
+                Function.identity()
+            );
 
-            InlineIndex iidx = idx.unwrap(InlineIndex.class);
+            if (!txChanges.get1().isEmpty()) {
+                // This call will change `txChanges.get1()` content.
+                // Removing found key from set more efficient so we break some 
rules here.
+                rowFilter = transactionAwareCountRowFilter(rowFilter, 
txChanges.get1());
 
-            BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = null;
+                cnt = countTransactionRows(iidx, txChanges.get2());
+            }
+        }
 
-            boolean checkExpired = 
!tbl.descriptor().cacheContext().config().isEagerTtl();
+        try {
+            for (int i = 0; i < iidx.segmentsCount(); ++i)
+                cnt += iidx.count(i, new IndexQueryContext(filter, rowFilter));
 
-            if (notNull) {
-                boolean nullsFirst = 
collation.getFieldCollations().get(0).nullDirection ==
-                    RelFieldCollation.NullDirection.FIRST;
+            return cnt;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Unable to count index records.", e);
+        }
+    }
 
-                BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter =
-                    IndexScan.createNotNullRowFilter(iidx, checkExpired);
+    /** */
+    private @Nullable TreeRowClosure<IndexRow, IndexRow> 
countRowFilter(boolean notNull, InlineIndex iidx) {
+        boolean checkExpired = 
!tbl.descriptor().cacheContext().config().isEagerTtl();
+
+        if (notNull) {
+            boolean nullsFirst = 
collation.getFieldCollations().get(0).nullDirection == 
RelFieldCollation.NullDirection.FIRST;
+
+            TreeRowClosure<IndexRow, IndexRow> notNullRowFilter = 
IndexScan.createNotNullRowFilter(iidx, checkExpired);
+
+            return new TreeRowClosure<IndexRow, IndexRow>() {
+                private boolean skipCheck;
+
+                @Override public boolean apply(
+                    BPlusTree<IndexRow, IndexRow> tree,
+                    BPlusIO<IndexRow> io,
+                    long pageAddr,
+                    int idx
+                ) throws IgniteCheckedException {
+                    // If we have NULLS-FIRST collation, all values after 
first not-null value will be not-null,
+                    // don't need to check it with notNullRowFilter.
+                    // In case of NULL-LAST collation, all values after first 
null value will be null,
+                    // don't need to check it too.
+                    if (skipCheck && !checkExpired)
+                        return nullsFirst;
+
+                    boolean res = notNullRowFilter.apply(tree, io, pageAddr, 
idx);
+
+                    if (res == nullsFirst)
+                        skipCheck = true;
+
+                    return res;
+                }
+
+                @Override public IndexRow lastRow() {
+                    return (skipCheck && !checkExpired)
+                        ? null
+                        : notNullRowFilter.lastRow();
+                }
+            };
+        }
+        else if (checkExpired)
+            return IndexScan.createNotExpiredRowFilter();
 
-                AtomicBoolean skipCheck = new AtomicBoolean();
+        return null;
+    }
 
-                rowFilter = new BPlusTree.TreeRowClosure<IndexRow, IndexRow>() 
{
-                    @Override public boolean apply(
-                        BPlusTree<IndexRow, IndexRow> tree,
-                        BPlusIO<IndexRow> io,
-                        long pageAddr,
-                        int idx
-                    ) throws IgniteCheckedException {
-                        // If we have NULLS-FIRST collation, all values after 
first not-null value will be not-null,
-                        // don't need to check it with notNullRowFilter.
-                        // In case of NULL-LAST collation, all values after 
first null value will be null,
-                        // don't need to check it too.
-                        if (skipCheck.get() && !checkExpired)
-                            return nullsFirst;
+    /** */
+    private static @NotNull TreeRowClosure<IndexRow, IndexRow> 
transactionAwareCountRowFilter(
+        TreeRowClosure<IndexRow, IndexRow> rowFilter,
+        Set<KeyCacheObject> skipKeys
+    ) {
+        return new TreeRowClosure<IndexRow, IndexRow>() {
+            @Override public boolean apply(
+                BPlusTree<IndexRow, IndexRow> tree,
+                BPlusIO<IndexRow> io,
+                long pageAddr,
+                int idx
+            ) throws IgniteCheckedException {
+                if (rowFilter != null && !rowFilter.apply(tree, io, pageAddr, 
idx))
+                    return false;
 
-                        boolean res = notNullRowFilter.apply(tree, io, 
pageAddr, idx);
+                if (skipKeys.isEmpty())
+                    return true;
 
-                        if (res == nullsFirst)
-                            skipCheck.set(true);
+                IndexRow row = rowFilter == null ? null : rowFilter.lastRow();
 
-                        return res;
-                    }
-                };
-            }
-            else if (checkExpired)
-                rowFilter = IndexScan.createNotExpiredRowFilter();
+                if (row == null)
+                    row = tree.getRow(io, pageAddr, idx);
 
-            try {
-                for (int i = 0; i < iidx.segmentsCount(); ++i)
-                    cnt += iidx.count(i, new IndexQueryContext(filter, 
rowFilter));
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException("Unable to count index records.", e);
+                return !skipKeys.remove(row.cacheDataRow().key());
             }
+        };
+    }
+
+    /** */
+    private static long countTransactionRows(InlineIndex iidx, 
List<CacheDataRow> changedRows) {
+        InlineIndexRowHandler rowHnd = iidx.segment(0).rowHandler();
+
+        long cnt = 0;
+
+        for (CacheDataRow txRow : changedRows) {
+            if (rowHnd.indexKey(0, txRow) == NullIndexKey.INSTANCE)

Review Comment:
   We should skip row if `indexKey(0) == null` only when `notNull` flag is set



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java:
##########
@@ -153,60 +164,133 @@ public Index queryIndex() {
 
     /** {@inheritDoc} */
     @Override public long count(ExecutionContext<?> ectx, ColocationGroup grp, 
boolean notNull) {
+        if (idx == null || !grp.nodeIds().contains(ectx.localNodeId()))
+            return 0L;
+
+        int[] locParts = grp.partitions(ectx.localNodeId());
+
+        IndexingQueryFilter filter = new 
IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(),
+            ectx.topologyVersion(), locParts);
+
+        InlineIndex iidx = idx.unwrap(InlineIndex.class);
+
+        TreeRowClosure<IndexRow, IndexRow> rowFilter = countRowFilter(notNull, 
iidx);
+
         long cnt = 0;
 
-        if (idx != null && grp.nodeIds().contains(ectx.localNodeId())) {
-            IndexingQueryFilter filter = new 
IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(),
-                ectx.topologyVersion(), grp.partitions(ectx.localNodeId()));
+        if (!F.isEmpty(ectx.getTxWriteEntries())) {
+            IgniteBiTuple<Set<KeyCacheObject>, List<CacheDataRow>> txChanges = 
transactionData(
+                ectx.getTxWriteEntries(),
+                iidx.indexDefinition().cacheInfo().cacheId(),
+                locParts,
+                Function.identity()
+            );
 
-            InlineIndex iidx = idx.unwrap(InlineIndex.class);
+            if (!txChanges.get1().isEmpty()) {
+                // This call will change `txChanges.get1()` content.
+                // Removing found key from set more efficient so we break some 
rules here.
+                rowFilter = transactionAwareCountRowFilter(rowFilter, 
txChanges.get1());
 
-            BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = null;
+                cnt = countTransactionRows(iidx, txChanges.get2());
+            }
+        }
 
-            boolean checkExpired = 
!tbl.descriptor().cacheContext().config().isEagerTtl();
+        try {
+            for (int i = 0; i < iidx.segmentsCount(); ++i)
+                cnt += iidx.count(i, new IndexQueryContext(filter, rowFilter));
 
-            if (notNull) {
-                boolean nullsFirst = 
collation.getFieldCollations().get(0).nullDirection ==
-                    RelFieldCollation.NullDirection.FIRST;
+            return cnt;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Unable to count index records.", e);
+        }
+    }
 
-                BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter =
-                    IndexScan.createNotNullRowFilter(iidx, checkExpired);
+    /** */
+    private @Nullable TreeRowClosure<IndexRow, IndexRow> 
countRowFilter(boolean notNull, InlineIndex iidx) {
+        boolean checkExpired = 
!tbl.descriptor().cacheContext().config().isEagerTtl();
+
+        if (notNull) {
+            boolean nullsFirst = 
collation.getFieldCollations().get(0).nullDirection == 
RelFieldCollation.NullDirection.FIRST;
+
+            TreeRowClosure<IndexRow, IndexRow> notNullRowFilter = 
IndexScan.createNotNullRowFilter(iidx, checkExpired);
+
+            return new TreeRowClosure<IndexRow, IndexRow>() {
+                private boolean skipCheck;
+
+                @Override public boolean apply(
+                    BPlusTree<IndexRow, IndexRow> tree,
+                    BPlusIO<IndexRow> io,
+                    long pageAddr,
+                    int idx
+                ) throws IgniteCheckedException {
+                    // If we have NULLS-FIRST collation, all values after 
first not-null value will be not-null,
+                    // don't need to check it with notNullRowFilter.
+                    // In case of NULL-LAST collation, all values after first 
null value will be null,
+                    // don't need to check it too.
+                    if (skipCheck && !checkExpired)
+                        return nullsFirst;
+
+                    boolean res = notNullRowFilter.apply(tree, io, pageAddr, 
idx);
+
+                    if (res == nullsFirst)
+                        skipCheck = true;
+
+                    return res;
+                }
+
+                @Override public IndexRow lastRow() {
+                    return (skipCheck && !checkExpired)
+                        ? null
+                        : notNullRowFilter.lastRow();
+                }
+            };
+        }
+        else if (checkExpired)
+            return IndexScan.createNotExpiredRowFilter();
 
-                AtomicBoolean skipCheck = new AtomicBoolean();
+        return null;
+    }
 
-                rowFilter = new BPlusTree.TreeRowClosure<IndexRow, IndexRow>() 
{
-                    @Override public boolean apply(
-                        BPlusTree<IndexRow, IndexRow> tree,
-                        BPlusIO<IndexRow> io,
-                        long pageAddr,
-                        int idx
-                    ) throws IgniteCheckedException {
-                        // If we have NULLS-FIRST collation, all values after 
first not-null value will be not-null,
-                        // don't need to check it with notNullRowFilter.
-                        // In case of NULL-LAST collation, all values after 
first null value will be null,
-                        // don't need to check it too.
-                        if (skipCheck.get() && !checkExpired)
-                            return nullsFirst;
+    /** */
+    private static @NotNull TreeRowClosure<IndexRow, IndexRow> 
transactionAwareCountRowFilter(

Review Comment:
   Looks overcomplicated without performance benefits. Perhaps we can just 
disable `index count` optimization for tx aware queries and use regular table 
scans.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to