This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a225705fc5f IGNITE-16619 IndexQuery should support limit (#10767)
a225705fc5f is described below

commit a225705fc5f487c2170dc584913f769f1b80f34b
Author: yurinaryshkin <135707807+yurinarysh...@users.noreply.github.com>
AuthorDate: Thu Jun 15 13:05:20 2023 +0300

    IGNITE-16619 IndexQuery should support limit (#10767)
---
 .../org/apache/ignite/cache/query/IndexQuery.java  |  26 +++
 .../client/thin/ProtocolBitmaskFeature.java        |   5 +-
 .../internal/client/thin/TcpClientCache.java       |   7 +
 .../processors/cache/IgniteCacheProxyImpl.java     |   3 +
 .../platform/client/ClientBitmaskFeature.java      |   5 +-
 .../platform/client/ClientMessageParser.java       |   2 +-
 .../client/cache/ClientCacheIndexQueryRequest.java |  15 +-
 .../ignite/cache/query/IndexQueryLimitTest.java    | 233 +++++++++++++++++++++
 .../ignite/cache/query/IndexQueryTestSuite.java    |   3 +-
 .../cache/query/ThinClientIndexQueryTest.java      |  65 ++++++
 10 files changed, 359 insertions(+), 5 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java 
b/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java
index 0e7f2a1abc9..8c61ccc1936 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java
@@ -50,6 +50,9 @@ public final class IndexQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
     /** Index name. */
     private final @Nullable String idxName;
 
+    /** Limit */
+    private int limit;
+
     /** Index query criteria. */
     private @Nullable List<IndexQueryCriterion> criteria;
 
@@ -152,6 +155,29 @@ public final class IndexQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
         return idxName;
     }
 
+    /**
+     * Gets limit to response records count.
+     *
+     * @return Limit value.
+     */
+    public int getLimit() {
+        return limit;
+    }
+
+    /**
+     * Sets limit to response records count.
+     *
+     * @param limit POsitive limit to set.
+     * @return {@code this} For chaining.
+     */
+    public IndexQuery<K, V> setLimit(int limit) {
+        A.ensure(limit > 0, "Limit must be positive.");
+
+        this.limit = limit;
+
+        return this;
+    }
+
     /**
      * Sets remote cache entries filter.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index 3a586ec4952..751f66dbc27 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -71,7 +71,10 @@ public enum ProtocolBitmaskFeature {
     ALL_AFFINITY_MAPPINGS(13),
 
     /** IndexQuery. */
-    INDEX_QUERY(14);
+    INDEX_QUERY(14),
+
+    /** IndexQuery limit. */
+    INDEX_QUERY_LIMIT(15);
 
     /** */
     private static final EnumSet<ProtocolBitmaskFeature> 
ALL_FEATURES_AS_ENUM_SET =
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 5edf586652b..130447a2309 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -1016,6 +1016,13 @@ public class TcpClientCache<K, V> implements 
ClientCache<K, V> {
                 w.writeBoolean(qry.isLocal());
                 w.writeInt(qry.getPartition() == null ? -1 : 
qry.getPartition());
 
+                if 
(!payloadCh.clientChannel().protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT))
 {
+                    if (qry.getLimit() > 0)
+                        throw new 
ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT);
+                }
+                else
+                    w.writeInt(qry.getLimit());
+
                 w.writeString(qry.getValueType());
                 w.writeString(qry.getIndexName());
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 274f476042d..8d9f23f9a00 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -558,6 +558,9 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
             if (grp != null)
                 qry.projection(grp);
 
+            if (q.getLimit() > 0)
+                qry.limit(q.getLimit());
+
             fut = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.INDEX, 
q.getValueType(), ctx,
                 new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
                     @Override public CacheQueryFuture<Map.Entry<K, V>> 
applyx() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index ae09009be03..e858bdbb8db 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -69,7 +69,10 @@ public enum ClientBitmaskFeature implements 
ThinProtocolFeature {
     ALL_AFFINITY_MAPPINGS(13),
 
     /** IndexQuery. */
-    INDEX_QUERY(14);
+    INDEX_QUERY(14),
+
+    /** IndexQuery limit. */
+    INDEX_QUERY_LIMIT(15);
 
     /** */
     private static final EnumSet<ClientBitmaskFeature> 
ALL_FEATURES_AS_ENUM_SET =
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 1ac13755489..dbd62bc0e28 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -585,7 +585,7 @@ public class ClientMessageParser implements 
ClientListenerMessageParser {
                 return new ClientCacheQueryContinuousRequest(reader);
 
             case OP_QUERY_INDEX:
-                return new ClientCacheIndexQueryRequest(reader);
+                return new ClientCacheIndexQueryRequest(reader, protocolCtx);
 
             case OP_TX_START:
                 return new ClientTxStartRequest(reader);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
index 51d72478fbf..8c282658521 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
@@ -29,7 +29,9 @@ import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.cache.query.InIndexQueryCriterion;
 import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
+import 
org.apache.ignite.internal.processors.platform.client.ClientBitmaskFeature;
 import 
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import 
org.apache.ignite.internal.processors.platform.client.ClientProtocolContext;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
 
 import static org.apache.ignite.internal.binary.GridBinaryMarshaller.ARR_LIST;
@@ -47,8 +49,12 @@ public class ClientCacheIndexQueryRequest extends 
ClientCacheQueryRequest {
 
     /**
      * @param reader Reader.
+     * @param protocolCtx
      */
-    public ClientCacheIndexQueryRequest(BinaryRawReaderEx reader) {
+    public ClientCacheIndexQueryRequest(
+        BinaryRawReaderEx reader,
+        ClientProtocolContext protocolCtx
+    ) {
         super(reader);
 
         pageSize = reader.readInt();
@@ -57,6 +63,10 @@ public class ClientCacheIndexQueryRequest extends 
ClientCacheQueryRequest {
 
         int part = reader.readInt();
 
+        int limit = 0;
+        if 
(protocolCtx.isFeatureSupported(ClientBitmaskFeature.INDEX_QUERY_LIMIT))
+            limit = reader.readInt();
+
         String valType = reader.readString();
 
         String idxName = reader.readString();
@@ -89,6 +99,9 @@ public class ClientCacheIndexQueryRequest extends 
ClientCacheQueryRequest {
 
         if (filterObj != null)
             qry.setFilter(((BinaryObject)filterObj).deserialize());
+
+        if (limit > 0)
+            qry.setLimit(limit);
     }
 
     /** */
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLimitTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLimitTest.java
new file mode 100644
index 00000000000..5c69e523356
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLimitTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt;
+
+/** */
+public class IndexQueryLimitTest extends GridCommonAbstractTest {
+    /** */
+    private static final String CACHE = "TEST_CACHE";
+
+    /** */
+    private static final String IDX = "PERSON_ID_IDX";
+
+    /** */
+    private static final int CNT = 10_000;
+
+    /** */
+    private Ignite crd;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        crd = startGrids(4);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<Long, 
Person>()
+                .setName(CACHE)
+                .setIndexedTypes(Long.class, Person.class)
+                .setAtomicityMode(TRANSACTIONAL)
+                .setCacheMode(REPLICATED);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** */
+    @Test
+    public void testRangeQueriesWithoutDuplicates() throws Exception {
+        checkRangeQueries(1);
+    }
+
+    /** */
+    @Test
+    public void testRangeQueriesWithDuplicates() throws Exception {
+        checkRangeQueries(10);
+    }
+
+    /** */
+    @Test
+    public void testSetLimit() {
+        GridTestUtils.assertThrows(log, () -> new IndexQuery<>(Person.class, 
IDX).setLimit(0),
+            IllegalArgumentException.class, "Limit must be positive.");
+
+        int limit = 1 + new Random().nextInt(1000);
+
+        GridTestUtils.assertThrows(log, () -> new IndexQuery<>(Person.class, 
IDX).setLimit(0 - limit),
+            IllegalArgumentException.class, "Limit must be positive.");
+
+        IndexQuery<Long, Person> qry = new IndexQuery<>(Person.class, IDX);
+
+        qry.setLimit(limit);
+
+        assertEquals(limit, qry.getLimit());
+    }
+
+    /** */
+    private void checkRangeQueries(int duplicates) throws Exception {
+        // Add data
+        insertData(duplicates);
+
+        // All
+        checkLimit(null, 0, CNT, duplicates);
+
+        int pivot = new Random().nextInt(CNT);
+
+        // Lt.
+        checkLimit(lt("id", pivot), 0, pivot, duplicates);
+    }
+
+    /** */
+    private void checkLimit(IndexQueryCriterion criterion, int left, int 
right, int duplicates) throws Exception {
+        int rows = right - left;
+        int limit = new Random().nextInt(rows) + 1;
+
+        // limit < rows
+        checkLimit(criterion, limit, left, left + limit, duplicates);
+
+        // limit >= rows
+        if (rows > 1) {
+            limit = new Random().nextInt(CNT + 2 - rows) + rows;
+
+            checkLimit(criterion, limit, left, right, duplicates);
+        }
+    }
+
+    /** */
+    private void checkLimit(IndexQueryCriterion criterion, int limit, int 
left, int right, int duplicates) throws Exception {
+        IndexQuery<Long, Person> qry = new IndexQuery<>(Person.class, IDX);
+
+        if (criterion != null)
+            qry.setCriteria(criterion);
+
+        qry.setLimit(limit);
+
+        QueryCursor<Cache.Entry<Long, Person>> cursor = 
crd.cache(CACHE).query(qry);
+
+        int expSize = (right - left) * duplicates;
+
+        if (limit > 0 && limit < expSize)
+            expSize = limit;
+
+        Set<Long> expKeys = new HashSet<>(expSize);
+        List<Integer> expOrderedValues = new LinkedList<>();
+
+        loop: for (int i = left; i != right; i++) {
+            for (int j = 0; j < duplicates; j++) {
+                expOrderedValues.add(i);
+
+                expKeys.add((long)CNT * j + i);
+                if (expOrderedValues.size() >= limit)
+                    break loop;
+            }
+        }
+
+        AtomicInteger actSize = new AtomicInteger();
+        ((QueryCursorEx<Cache.Entry<Long, Person>>)cursor).getAll(entry -> {
+            assertEquals(expOrderedValues.remove(0), 
(Integer)entry.getValue().id);
+
+            assertTrue(expKeys.remove(entry.getKey()));
+
+            int persId = entry.getKey().intValue() % CNT;
+
+            assertEquals(new Person(persId), entry.getValue());
+
+            actSize.incrementAndGet();
+        });
+
+        assertEquals(expSize, actSize.get());
+
+        assertTrue(expKeys.isEmpty());
+    }
+
+    /** */
+    private void insertData(int duplicates) {
+        try (IgniteDataStreamer<Long, Person> streamer = 
crd.dataStreamer(CACHE)) {
+            for (int persId = 0; persId < CNT; persId++) {
+                // Create duplicates of data.
+                for (int i = 0; i < duplicates; i++)
+                    streamer.addData((long)CNT * i + persId, new 
Person(persId));
+            }
+        }
+    }
+
+    /** */
+    private static class Person {
+        /** */
+        @QuerySqlField(index = true)
+        final int id;
+
+        /** */
+        Person(int id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "Person[id=" + id + "]";
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            Person person = (Person)o;
+
+            return Objects.equals(id, person.id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
index 3d408c06d2f..8bb75faddad 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
@@ -46,7 +46,8 @@ import org.junit.runners.Suite;
     ThinClientIndexQueryTest.class,
     RepeatedFieldIndexQueryTest.class,
     IndexQueryInCriterionTest.class,
-    IndexQueryInCriterionDescTest.class
+    IndexQueryInCriterionDescTest.class,
+    IndexQueryLimitTest.class
 })
 public class IndexQueryTestSuite {
 }
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java
index 4c4a8ce0951..08cc8b76477 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.cache.query;
 
+import java.lang.reflect.Field;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 import java.util.function.Consumer;
@@ -31,11 +33,13 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.client.ClientCache;
 import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -344,6 +348,26 @@ public class ThinClientIndexQueryTest extends 
GridCommonAbstractTest {
             .setCriteria(crit);
 
         assertClientQuery(cache, left, right, idxQry);
+
+        if (left < right) {
+            Random r = new Random();
+
+            int limit = 1 + r.nextInt(right - left);
+
+            idxQry = new IndexQuery<Integer, Person>(Person.class, idxName)
+                .setCriteria(crit)
+                .setLimit(limit);
+
+            assertClientQuery(cache, left, left + limit, idxQry);
+
+            limit = right - left + r.nextInt(right - left);
+
+            idxQry = new IndexQuery<Integer, Person>(Person.class, idxName)
+                .setCriteria(crit)
+                .setLimit(limit);
+
+            assertClientQuery(cache, left, right, idxQry);
+        }
     }
 
     /** */
@@ -369,6 +393,47 @@ public class ThinClientIndexQueryTest extends 
GridCommonAbstractTest {
         }
     }
 
+    /** */
+    @Test
+    public void testIndexQueryLimitOnOlderProtocolVersion() throws Exception {
+        // Exclude INDEX_QUERY_LIMIT from protocol.
+        Class<?> clazz = 
Class.forName("org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature");
+
+        Field field = clazz.getDeclaredField("ALL_FEATURES_AS_ENUM_SET");
+
+        field.setAccessible(true);
+
+        EnumSet<ProtocolBitmaskFeature> allFeaturesEnumSet = 
(EnumSet<ProtocolBitmaskFeature>)field.get(null);
+
+        allFeaturesEnumSet.remove(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT);
+
+        try {
+            withClientCache((cache) -> {
+                // No limit.
+                IndexQuery<Integer, Person> idxQry = new 
IndexQuery<>(Person.class, IDX_FLD1);
+
+                assertClientQuery(cache, NULLS_CNT, CNT, idxQry);
+
+                // With limit.
+                IndexQuery<Integer, Person> idxQryWithLImit = new 
IndexQuery<Integer, Person>(Person.class, IDX_FLD1)
+                    .setLimit(10);
+
+                GridTestUtils.assertThrowsAnyCause(
+                    log,
+                    () -> {
+                        cache.query(idxQryWithLImit).getAll();
+                        return null;
+                    },
+                    ClientFeatureNotSupportedByServerException.class,
+                    "Feature INDEX_QUERY_LIMIT is not supported by the 
server");
+            });
+        }
+        finally {
+            //revert the features set
+            allFeaturesEnumSet.add(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT);
+        }
+    }
+
     /** */
     private void withClientCache(Consumer<ClientCache<Integer, Person>> 
consumer) {
         ClientConfiguration clnCfg = new ClientConfiguration()

Reply via email to