alex-plekhanov commented on code in PR #11438: URL: https://github.com/apache/ignite/pull/11438#discussion_r1756358365
########## modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite3.java: ########## @@ -375,7 +376,9 @@ CacheEventsCdcTest.class, CdcIndexRebuildTest.class, - DumpCacheConfigTest.class + DumpCacheConfigTest.class, + + H2TransactionAwareQueriesEnabledTest.class Review Comment: Let's add comma at the EOL ########## modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java: ########## @@ -3050,24 +3061,35 @@ public List<FieldsQueryCursor<List<?>>> querySqlFields( failOnMultipleStmts ); + QueryContext qryCtx = QueryContext.of( + qry, + cliCtx, + cancel, + qryProps, + userTx == null ? null : userTx.xidVersion() + ); + if (qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isBatched()) { res = qryEngine.queryBatched( - QueryContext.of(qry, cliCtx, cancel, qryProps), + qryCtx, schemaName, qry.getSql(), ((SqlFieldsQueryEx)qry).batchedArguments() ); } else { res = qryEngine.query( - QueryContext.of(qry, cliCtx, cancel, qryProps), + qryCtx, schemaName, qry.getSql(), qry.getArgs() != null ? qry.getArgs() : X.EMPTY_OBJECT_ARRAY ); } } else { + if (userTx != null && txAwareQueriesEnabled) + throw new CacheException("SQL aware queries supported only for Calcite query engine"); Review Comment: `SQL aware queries are not supported by Indexing query engine` Also, not sure about `CacheException`. Perhaps, `IgniteSQLException` with SqlStateCode.UNSUPPORTED_OPERATION? ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java: ########## @@ -245,11 +246,28 @@ else if (c2 != HIGHEST_VALUE) /** */ @SuppressWarnings("rawtypes") private static int compare(Object o1, Object o2, int nullComparison) { + if (o1 instanceof BinaryObjectImpl) + return compareBinary(o1, o2, nullComparison); + final Comparable c1 = (Comparable)o1; final Comparable c2 = (Comparable)o2; return RelFieldCollation.compare(c1, c2, nullComparison); } + /** */ + private static int compareBinary(Object o1, Object o2, int nullComparison) { + if (o1 == o2) { + return 0; + } + else if (o1 == null) { + return nullComparison; + } + else if (o2 == null) { Review Comment: Redundant braces ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ListCursor.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec; + +import java.util.Comparator; +import java.util.List; +import org.apache.ignite.internal.util.lang.GridCursor; +import org.jetbrains.annotations.Nullable; + +/** + * Cursor to navigate through a sorted list with duplicates. + */ +public class ListCursor<Row> implements GridCursor<Row> { Review Comment: Something like `SortedListRangeCursor`? ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/FilteredCursor.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec; + +import java.util.Set; +import java.util.function.Function; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.lang.GridCursor; + +/** + * Cursor wrapper that skips all entires that maps to any of {@code skipKeys} key. + * <b>Note, for the performance reasons content of {@code skipKeys} will be changed during iteration.</b> + */ +class FilteredCursor<R> implements GridCursor<R> { Review Comment: "Filtered" usually assume some custom predicate to filter. Here we have predefined key predicate, so I think it's worth to add "Key" to class name. Something like "KeyFilteringCursor". ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/FilteredCursor.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec; + +import java.util.Set; +import java.util.function.Function; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.lang.GridCursor; + +/** + * Cursor wrapper that skips all entires that maps to any of {@code skipKeys} key. + * <b>Note, for the performance reasons content of {@code skipKeys} will be changed during iteration.</b> + */ +class FilteredCursor<R> implements GridCursor<R> { + /** Sorted cursor. */ Review Comment: It can be unsorted as well ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java: ########## @@ -245,11 +246,28 @@ else if (c2 != HIGHEST_VALUE) /** */ @SuppressWarnings("rawtypes") private static int compare(Object o1, Object o2, int nullComparison) { + if (o1 instanceof BinaryObjectImpl) + return compareBinary(o1, o2, nullComparison); + final Comparable c1 = (Comparable)o1; final Comparable c2 = (Comparable)o2; return RelFieldCollation.compare(c1, c2, nullComparison); } + /** */ + private static int compareBinary(Object o1, Object o2, int nullComparison) { + if (o1 == o2) { Review Comment: Redundant braces ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java: ########## @@ -153,60 +164,133 @@ public Index queryIndex() { /** {@inheritDoc} */ @Override public long count(ExecutionContext<?> ectx, ColocationGroup grp, boolean notNull) { + if (idx == null || !grp.nodeIds().contains(ectx.localNodeId())) + return 0L; + + int[] locParts = grp.partitions(ectx.localNodeId()); + + IndexingQueryFilter filter = new IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(), + ectx.topologyVersion(), locParts); + + InlineIndex iidx = idx.unwrap(InlineIndex.class); + + TreeRowClosure<IndexRow, IndexRow> rowFilter = countRowFilter(notNull, iidx); + long cnt = 0; - if (idx != null && grp.nodeIds().contains(ectx.localNodeId())) { - IndexingQueryFilter filter = new IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(), - ectx.topologyVersion(), grp.partitions(ectx.localNodeId())); + if (!F.isEmpty(ectx.getTxWriteEntries())) { + IgniteBiTuple<Set<KeyCacheObject>, List<CacheDataRow>> txChanges = transactionData( + ectx.getTxWriteEntries(), + iidx.indexDefinition().cacheInfo().cacheId(), + locParts, + Function.identity() + ); - InlineIndex iidx = idx.unwrap(InlineIndex.class); + if (!txChanges.get1().isEmpty()) { + // This call will change `txChanges.get1()` content. + // Removing found key from set more efficient so we break some rules here. + rowFilter = transactionAwareCountRowFilter(rowFilter, txChanges.get1()); - BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = null; + cnt = countTransactionRows(iidx, txChanges.get2()); + } + } - boolean checkExpired = !tbl.descriptor().cacheContext().config().isEagerTtl(); + try { + for (int i = 0; i < iidx.segmentsCount(); ++i) + cnt += iidx.count(i, new IndexQueryContext(filter, rowFilter)); Review Comment: Looks like there was an error, we should reset `skipCheck` for each segment ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java: ########## @@ -245,11 +246,28 @@ else if (c2 != HIGHEST_VALUE) /** */ @SuppressWarnings("rawtypes") private static int compare(Object o1, Object o2, int nullComparison) { + if (o1 instanceof BinaryObjectImpl) + return compareBinary(o1, o2, nullComparison); + final Comparable c1 = (Comparable)o1; final Comparable c2 = (Comparable)o2; return RelFieldCollation.compare(c1, c2, nullComparison); } + /** */ + private static int compareBinary(Object o1, Object o2, int nullComparison) { + if (o1 == o2) { + return 0; + } + else if (o1 == null) { Review Comment: Redundant braces ########## modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/SegmentedIndexCursor.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cache.query.index.sorted.inline; + +import java.util.Comparator; +import java.util.PriorityQueue; +import java.util.Queue; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.cache.query.index.SortOrder; +import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition; +import org.apache.ignite.internal.cache.query.index.sorted.IndexRow; +import org.apache.ignite.internal.cache.query.index.sorted.IndexRowComparator; +import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition; +import org.apache.ignite.internal.util.lang.GridCursor; + +/** Single cursor over multiple segments. The next value is chosen with the index row comparator. */ +public class SegmentedIndexCursor implements GridCursor<IndexRow> { Review Comment: Perhaps we should also add `Sorted` to the class name. Up to you. ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java: ########## @@ -389,10 +453,12 @@ private static class InlineIndexRowFactory implements BPlusTree.TreeRowFactory<I /** */ private InlineIndexRowFactory( InlineIndexKeyType[] keyTypes, - InlineIndexRowHandler idxRowHnd + InlineIndexRowHandler idxRowHnd, + boolean useCacheRow Review Comment: useCacheRow is redundant. Instead of this flag null InlineIndexRowFactory can be used ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ListCursor.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec; + +import java.util.Comparator; +import java.util.List; +import org.apache.ignite.internal.util.lang.GridCursor; +import org.jetbrains.annotations.Nullable; + +/** + * Cursor to navigate through a sorted list with duplicates. + */ +public class ListCursor<Row> implements GridCursor<Row> { + /** */ + private final Comparator<Row> comp; + + /** List of rows. */ + private final List<Row> rows; + + /** Upper bound. */ + private final Row upper; + + /** Include upper bound. */ + private final boolean includeUpper; + + /** Current row. */ + private Row row; + + /** Current index of list element. */ + private int idx; + + /** + * @param comp Rows comparator. + * @param rows List of rows. + * @param lower Lower bound. + * @param upper Upper bound. + * @param lowerInclude {@code True} for inclusive lower bound. + * @param upperInclude {@code True} for inclusive upper bound. + */ + public ListCursor( + Comparator<Row> comp, + List<Row> rows, + @Nullable Row lower, + @Nullable Row upper, + boolean lowerInclude, + boolean upperInclude + ) { + this.comp = comp; + this.rows = rows; + this.upper = upper; + this.includeUpper = upperInclude; + + idx = lower == null ? 0 : lowerBound(rows, lower, lowerInclude); + } + + /** + * Searches the lower bound (skipping duplicates) using a binary search. + * + * @param rows List of rows. + * @param bound Lower bound. + * @return Lower bound position in the list. + */ + private int lowerBound(List<Row> rows, Row bound, boolean includeBound) { + int low = 0, high = rows.size() - 1, idx = -1; + + while (low <= high) { + int mid = (high - low) / 2 + low; + int compRes = comp.compare(rows.get(mid), bound); + + if (compRes > 0) + high = mid - 1; + else if (compRes == 0 && includeBound) { + idx = mid; + high = mid - 1; + } + else + low = mid + 1; + } + + return idx == -1 ? low : idx; + } + + /** {@inheritDoc} */ + @Override public boolean next() { + if (idx == rows.size() || (upper != null && -comp.compare(rows.get(idx), upper) < (includeUpper ? 0 : 1))) Review Comment: Why comp.compare condition was reversed? Looks like it works the same way, but a little bit more complicated to understand. ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java: ########## @@ -507,4 +596,64 @@ protected TreeIndexWrapper(InlineIndex idx) { } } } + + /** + * @param entries Entries changed in transaction. + * @param cacheId Cache id. + * @param parts Partitions set. + * @param mapper Mapper to specific data type. + * @return First, set of object changed in transaction, second, list of transaction data in required format. + * @param <R> Required type. + */ + public static <R> IgniteBiTuple<Set<KeyCacheObject>, List<R>> transactionData( + Collection<IgniteTxEntry> entries, + int cacheId, + int[] parts, + Function<CacheDataRow, R> mapper + ) { + if (F.isEmpty(entries)) + return F.t(Collections.emptySet(), Collections.emptyList()); + + // Expecting parts are sorted or almost sorted and amount of transaction entries are relatively small. + if (parts != null) + Arrays.sort(parts); + + Set<KeyCacheObject> changedKeys = new HashSet<>(entries.size()); + List<R> newAndUpdatedRows = new ArrayList<>(entries.size()); + + for (IgniteTxEntry e : entries) { + int part = e.key().partition(); + + assert part != -1; + + if (e.cacheId() != cacheId) + continue; + + if (parts != null && Arrays.binarySearch(parts, part) < 0) Review Comment: Parts as BitSet? Will be faster than sorting and binary search ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java: ########## @@ -362,8 +422,12 @@ private synchronized void release() { InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler(); - InlineIndexRowFactory rowFactory = isInlineScan() ? - new InlineIndexRowFactory(rowHnd.inlineIndexKeyTypes().toArray(new InlineIndexKeyType[0]), rowHnd) : null; + InlineIndexRowFactory rowFactory = isInlineScan() Review Comment: isInlineScan() && txChanges != null && !F.isEmpty(txChanges.get1()) ########## modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java: ########## @@ -3201,27 +3223,43 @@ private QueryEngine engineForQuery(SqlClientContext cliCtx, SqlFieldsQuery qry) * Execute query setting busy lock, preserving current cache context and properly handling checked exceptions. * * @param cctx Cache context. + * @param userTx User transaction. * @param supplier Code to be executed. * @return Result. */ - private <T> T executeQuerySafe(@Nullable final GridCacheContext<?, ?> cctx, GridPlainOutClosure<T> supplier) { + private <T> T executeQuerySafe( + @Nullable final GridCacheContext<?, ?> cctx, + @Nullable final GridNearTxLocal userTx, + GridPlainOutClosure<T> supplier + ) { GridCacheContext oldCctx = curCache.get(); curCache.set(cctx); - if (!busyLock.enterBusy()) - throw new IllegalStateException("Failed to execute query (grid is stopping)."); - try { - return supplier.apply(); + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to execute query (grid is stopping)."); + + if (userTx != null) + userTx.suspend(); + + try { + return supplier.apply(); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + finally { + curCache.set(oldCctx); + + busyLock.leaveBusy(); + + if (userTx != null) + userTx.resume(); Review Comment: Do we have guarantee that DML statement is always executed synchroniously, and there can't be situation, when tx is resumed in users thread, but this tx is also used by DML statement after cursor is created? I see `cur.iterator().hasNext()` in `executePlan` method, perhaps it makes execution of DML sync (because regular queries are executed async). ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java: ########## @@ -85,7 +88,19 @@ public ExecutionPlan map(MappingService mappingService, MappingQueryContext ctx) else ex.addSuppressed(e); - fragments = replace(fragments, e.fragment(), new FragmentSplitter(e.node()).go(e.fragment())); + RelNode cutPoint = e.node(); + + // TableModify inside transaction must be executed locally. + boolean forceLocTableModify = Commons.queryTransactionVersion(ctx) != null && cutPoint instanceof IgniteTableModify; + + if (forceLocTableModify) + cutPoint = ((SingleRel)cutPoint).getInput(); // Cuts TableScan instead of TableModification. Review Comment: After this split we have: fragment 1 with TableModify and broadcast distribution fragment 2 with sender to broadcast distribution It's not correct, since second fragment will send data to all nodes and there are some hacks required to change TableModify (mapLocalTableModify) ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java: ########## @@ -507,4 +596,64 @@ protected TreeIndexWrapper(InlineIndex idx) { } } } + + /** + * @param entries Entries changed in transaction. + * @param cacheId Cache id. + * @param parts Partitions set. + * @param mapper Mapper to specific data type. + * @return First, set of object changed in transaction, second, list of transaction data in required format. + * @param <R> Required type. + */ + public static <R> IgniteBiTuple<Set<KeyCacheObject>, List<R>> transactionData( Review Comment: Move method to the ExecutionContext? To avoid access it from another classes. ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java: ########## @@ -153,60 +164,133 @@ public Index queryIndex() { /** {@inheritDoc} */ @Override public long count(ExecutionContext<?> ectx, ColocationGroup grp, boolean notNull) { + if (idx == null || !grp.nodeIds().contains(ectx.localNodeId())) + return 0L; + + int[] locParts = grp.partitions(ectx.localNodeId()); + + IndexingQueryFilter filter = new IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(), + ectx.topologyVersion(), locParts); + + InlineIndex iidx = idx.unwrap(InlineIndex.class); + + TreeRowClosure<IndexRow, IndexRow> rowFilter = countRowFilter(notNull, iidx); + long cnt = 0; - if (idx != null && grp.nodeIds().contains(ectx.localNodeId())) { - IndexingQueryFilter filter = new IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(), - ectx.topologyVersion(), grp.partitions(ectx.localNodeId())); + if (!F.isEmpty(ectx.getTxWriteEntries())) { + IgniteBiTuple<Set<KeyCacheObject>, List<CacheDataRow>> txChanges = transactionData( + ectx.getTxWriteEntries(), + iidx.indexDefinition().cacheInfo().cacheId(), + locParts, + Function.identity() + ); - InlineIndex iidx = idx.unwrap(InlineIndex.class); + if (!txChanges.get1().isEmpty()) { + // This call will change `txChanges.get1()` content. + // Removing found key from set more efficient so we break some rules here. + rowFilter = transactionAwareCountRowFilter(rowFilter, txChanges.get1()); - BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = null; + cnt = countTransactionRows(iidx, txChanges.get2()); + } + } - boolean checkExpired = !tbl.descriptor().cacheContext().config().isEagerTtl(); + try { + for (int i = 0; i < iidx.segmentsCount(); ++i) + cnt += iidx.count(i, new IndexQueryContext(filter, rowFilter)); - if (notNull) { - boolean nullsFirst = collation.getFieldCollations().get(0).nullDirection == - RelFieldCollation.NullDirection.FIRST; + return cnt; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Unable to count index records.", e); + } + } - BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter = - IndexScan.createNotNullRowFilter(iidx, checkExpired); + /** */ + private @Nullable TreeRowClosure<IndexRow, IndexRow> countRowFilter(boolean notNull, InlineIndex iidx) { + boolean checkExpired = !tbl.descriptor().cacheContext().config().isEagerTtl(); + + if (notNull) { + boolean nullsFirst = collation.getFieldCollations().get(0).nullDirection == RelFieldCollation.NullDirection.FIRST; + + TreeRowClosure<IndexRow, IndexRow> notNullRowFilter = IndexScan.createNotNullRowFilter(iidx, checkExpired); + + return new TreeRowClosure<IndexRow, IndexRow>() { + private boolean skipCheck; + + @Override public boolean apply( + BPlusTree<IndexRow, IndexRow> tree, + BPlusIO<IndexRow> io, + long pageAddr, + int idx + ) throws IgniteCheckedException { + // If we have NULLS-FIRST collation, all values after first not-null value will be not-null, + // don't need to check it with notNullRowFilter. + // In case of NULL-LAST collation, all values after first null value will be null, + // don't need to check it too. + if (skipCheck && !checkExpired) + return nullsFirst; + + boolean res = notNullRowFilter.apply(tree, io, pageAddr, idx); + + if (res == nullsFirst) + skipCheck = true; + + return res; + } + + @Override public IndexRow lastRow() { + return (skipCheck && !checkExpired) + ? null + : notNullRowFilter.lastRow(); + } + }; + } + else if (checkExpired) + return IndexScan.createNotExpiredRowFilter(); - AtomicBoolean skipCheck = new AtomicBoolean(); + return null; + } - rowFilter = new BPlusTree.TreeRowClosure<IndexRow, IndexRow>() { - @Override public boolean apply( - BPlusTree<IndexRow, IndexRow> tree, - BPlusIO<IndexRow> io, - long pageAddr, - int idx - ) throws IgniteCheckedException { - // If we have NULLS-FIRST collation, all values after first not-null value will be not-null, - // don't need to check it with notNullRowFilter. - // In case of NULL-LAST collation, all values after first null value will be null, - // don't need to check it too. - if (skipCheck.get() && !checkExpired) - return nullsFirst; + /** */ + private static @NotNull TreeRowClosure<IndexRow, IndexRow> transactionAwareCountRowFilter( + TreeRowClosure<IndexRow, IndexRow> rowFilter, + Set<KeyCacheObject> skipKeys + ) { + return new TreeRowClosure<IndexRow, IndexRow>() { + @Override public boolean apply( + BPlusTree<IndexRow, IndexRow> tree, + BPlusIO<IndexRow> io, + long pageAddr, + int idx + ) throws IgniteCheckedException { + if (rowFilter != null && !rowFilter.apply(tree, io, pageAddr, idx)) + return false; - boolean res = notNullRowFilter.apply(tree, io, pageAddr, idx); + if (skipKeys.isEmpty()) + return true; - if (res == nullsFirst) - skipCheck.set(true); + IndexRow row = rowFilter == null ? null : rowFilter.lastRow(); - return res; - } - }; - } - else if (checkExpired) - rowFilter = IndexScan.createNotExpiredRowFilter(); + if (row == null) + row = tree.getRow(io, pageAddr, idx); - try { - for (int i = 0; i < iidx.segmentsCount(); ++i) - cnt += iidx.count(i, new IndexQueryContext(filter, rowFilter)); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Unable to count index records.", e); + return !skipKeys.remove(row.cacheDataRow().key()); } + }; + } + + /** */ + private static long countTransactionRows(InlineIndex iidx, List<CacheDataRow> changedRows) { + InlineIndexRowHandler rowHnd = iidx.segment(0).rowHandler(); + + long cnt = 0; + + for (CacheDataRow txRow : changedRows) { + if (rowHnd.indexKey(0, txRow) == NullIndexKey.INSTANCE) Review Comment: We should skip row if `indexKey(0) == null` only when `notNull` flag is set ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java: ########## @@ -153,60 +164,133 @@ public Index queryIndex() { /** {@inheritDoc} */ @Override public long count(ExecutionContext<?> ectx, ColocationGroup grp, boolean notNull) { + if (idx == null || !grp.nodeIds().contains(ectx.localNodeId())) + return 0L; + + int[] locParts = grp.partitions(ectx.localNodeId()); + + IndexingQueryFilter filter = new IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(), + ectx.topologyVersion(), locParts); + + InlineIndex iidx = idx.unwrap(InlineIndex.class); + + TreeRowClosure<IndexRow, IndexRow> rowFilter = countRowFilter(notNull, iidx); + long cnt = 0; - if (idx != null && grp.nodeIds().contains(ectx.localNodeId())) { - IndexingQueryFilter filter = new IndexingQueryFilterImpl(tbl.descriptor().cacheContext().kernalContext(), - ectx.topologyVersion(), grp.partitions(ectx.localNodeId())); + if (!F.isEmpty(ectx.getTxWriteEntries())) { + IgniteBiTuple<Set<KeyCacheObject>, List<CacheDataRow>> txChanges = transactionData( + ectx.getTxWriteEntries(), + iidx.indexDefinition().cacheInfo().cacheId(), + locParts, + Function.identity() + ); - InlineIndex iidx = idx.unwrap(InlineIndex.class); + if (!txChanges.get1().isEmpty()) { + // This call will change `txChanges.get1()` content. + // Removing found key from set more efficient so we break some rules here. + rowFilter = transactionAwareCountRowFilter(rowFilter, txChanges.get1()); - BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = null; + cnt = countTransactionRows(iidx, txChanges.get2()); + } + } - boolean checkExpired = !tbl.descriptor().cacheContext().config().isEagerTtl(); + try { + for (int i = 0; i < iidx.segmentsCount(); ++i) + cnt += iidx.count(i, new IndexQueryContext(filter, rowFilter)); - if (notNull) { - boolean nullsFirst = collation.getFieldCollations().get(0).nullDirection == - RelFieldCollation.NullDirection.FIRST; + return cnt; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Unable to count index records.", e); + } + } - BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter = - IndexScan.createNotNullRowFilter(iidx, checkExpired); + /** */ + private @Nullable TreeRowClosure<IndexRow, IndexRow> countRowFilter(boolean notNull, InlineIndex iidx) { + boolean checkExpired = !tbl.descriptor().cacheContext().config().isEagerTtl(); + + if (notNull) { + boolean nullsFirst = collation.getFieldCollations().get(0).nullDirection == RelFieldCollation.NullDirection.FIRST; + + TreeRowClosure<IndexRow, IndexRow> notNullRowFilter = IndexScan.createNotNullRowFilter(iidx, checkExpired); + + return new TreeRowClosure<IndexRow, IndexRow>() { + private boolean skipCheck; + + @Override public boolean apply( + BPlusTree<IndexRow, IndexRow> tree, + BPlusIO<IndexRow> io, + long pageAddr, + int idx + ) throws IgniteCheckedException { + // If we have NULLS-FIRST collation, all values after first not-null value will be not-null, + // don't need to check it with notNullRowFilter. + // In case of NULL-LAST collation, all values after first null value will be null, + // don't need to check it too. + if (skipCheck && !checkExpired) + return nullsFirst; + + boolean res = notNullRowFilter.apply(tree, io, pageAddr, idx); + + if (res == nullsFirst) + skipCheck = true; + + return res; + } + + @Override public IndexRow lastRow() { + return (skipCheck && !checkExpired) + ? null + : notNullRowFilter.lastRow(); + } + }; + } + else if (checkExpired) + return IndexScan.createNotExpiredRowFilter(); - AtomicBoolean skipCheck = new AtomicBoolean(); + return null; + } - rowFilter = new BPlusTree.TreeRowClosure<IndexRow, IndexRow>() { - @Override public boolean apply( - BPlusTree<IndexRow, IndexRow> tree, - BPlusIO<IndexRow> io, - long pageAddr, - int idx - ) throws IgniteCheckedException { - // If we have NULLS-FIRST collation, all values after first not-null value will be not-null, - // don't need to check it with notNullRowFilter. - // In case of NULL-LAST collation, all values after first null value will be null, - // don't need to check it too. - if (skipCheck.get() && !checkExpired) - return nullsFirst; + /** */ + private static @NotNull TreeRowClosure<IndexRow, IndexRow> transactionAwareCountRowFilter( Review Comment: Looks overcomplicated without performance benefits. Perhaps we can just disable `index count` optimization for tx aware queries and use regular table scans. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org