This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new a225705fc5f IGNITE-16619 IndexQuery should support limit (#10767) a225705fc5f is described below commit a225705fc5f487c2170dc584913f769f1b80f34b Author: yurinaryshkin <135707807+yurinarysh...@users.noreply.github.com> AuthorDate: Thu Jun 15 13:05:20 2023 +0300 IGNITE-16619 IndexQuery should support limit (#10767) --- .../org/apache/ignite/cache/query/IndexQuery.java | 26 +++ .../client/thin/ProtocolBitmaskFeature.java | 5 +- .../internal/client/thin/TcpClientCache.java | 7 + .../processors/cache/IgniteCacheProxyImpl.java | 3 + .../platform/client/ClientBitmaskFeature.java | 5 +- .../platform/client/ClientMessageParser.java | 2 +- .../client/cache/ClientCacheIndexQueryRequest.java | 15 +- .../ignite/cache/query/IndexQueryLimitTest.java | 233 +++++++++++++++++++++ .../ignite/cache/query/IndexQueryTestSuite.java | 3 +- .../cache/query/ThinClientIndexQueryTest.java | 65 ++++++ 10 files changed, 359 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java index 0e7f2a1abc9..8c61ccc1936 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java @@ -50,6 +50,9 @@ public final class IndexQuery<K, V> extends Query<Cache.Entry<K, V>> { /** Index name. */ private final @Nullable String idxName; + /** Limit */ + private int limit; + /** Index query criteria. */ private @Nullable List<IndexQueryCriterion> criteria; @@ -152,6 +155,29 @@ public final class IndexQuery<K, V> extends Query<Cache.Entry<K, V>> { return idxName; } + /** + * Gets limit to response records count. + * + * @return Limit value. + */ + public int getLimit() { + return limit; + } + + /** + * Sets limit to response records count. + * + * @param limit POsitive limit to set. + * @return {@code this} For chaining. + */ + public IndexQuery<K, V> setLimit(int limit) { + A.ensure(limit > 0, "Limit must be positive."); + + this.limit = limit; + + return this; + } + /** * Sets remote cache entries filter. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java index 3a586ec4952..751f66dbc27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java @@ -71,7 +71,10 @@ public enum ProtocolBitmaskFeature { ALL_AFFINITY_MAPPINGS(13), /** IndexQuery. */ - INDEX_QUERY(14); + INDEX_QUERY(14), + + /** IndexQuery limit. */ + INDEX_QUERY_LIMIT(15); /** */ private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET = diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java index 5edf586652b..130447a2309 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java @@ -1016,6 +1016,13 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> { w.writeBoolean(qry.isLocal()); w.writeInt(qry.getPartition() == null ? -1 : qry.getPartition()); + if (!payloadCh.clientChannel().protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT)) { + if (qry.getLimit() > 0) + throw new ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT); + } + else + w.writeInt(qry.getLimit()); + w.writeString(qry.getValueType()); w.writeString(qry.getIndexName()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index 274f476042d..8d9f23f9a00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -558,6 +558,9 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< if (grp != null) qry.projection(grp); + if (q.getLimit() > 0) + qry.limit(q.getLimit()); + fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.INDEX, q.getValueType(), ctx, new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() { @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java index ae09009be03..e858bdbb8db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java @@ -69,7 +69,10 @@ public enum ClientBitmaskFeature implements ThinProtocolFeature { ALL_AFFINITY_MAPPINGS(13), /** IndexQuery. */ - INDEX_QUERY(14); + INDEX_QUERY(14), + + /** IndexQuery limit. */ + INDEX_QUERY_LIMIT(15); /** */ private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET = diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java index 1ac13755489..dbd62bc0e28 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java @@ -585,7 +585,7 @@ public class ClientMessageParser implements ClientListenerMessageParser { return new ClientCacheQueryContinuousRequest(reader); case OP_QUERY_INDEX: - return new ClientCacheIndexQueryRequest(reader); + return new ClientCacheIndexQueryRequest(reader, protocolCtx); case OP_TX_START: return new ClientTxStartRequest(reader); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java index 51d72478fbf..8c282658521 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java @@ -29,7 +29,9 @@ import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.cache.query.InIndexQueryCriterion; import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion; +import org.apache.ignite.internal.processors.platform.client.ClientBitmaskFeature; import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientProtocolContext; import org.apache.ignite.internal.processors.platform.client.ClientResponse; import static org.apache.ignite.internal.binary.GridBinaryMarshaller.ARR_LIST; @@ -47,8 +49,12 @@ public class ClientCacheIndexQueryRequest extends ClientCacheQueryRequest { /** * @param reader Reader. + * @param protocolCtx */ - public ClientCacheIndexQueryRequest(BinaryRawReaderEx reader) { + public ClientCacheIndexQueryRequest( + BinaryRawReaderEx reader, + ClientProtocolContext protocolCtx + ) { super(reader); pageSize = reader.readInt(); @@ -57,6 +63,10 @@ public class ClientCacheIndexQueryRequest extends ClientCacheQueryRequest { int part = reader.readInt(); + int limit = 0; + if (protocolCtx.isFeatureSupported(ClientBitmaskFeature.INDEX_QUERY_LIMIT)) + limit = reader.readInt(); + String valType = reader.readString(); String idxName = reader.readString(); @@ -89,6 +99,9 @@ public class ClientCacheIndexQueryRequest extends ClientCacheQueryRequest { if (filterObj != null) qry.setFilter(((BinaryObject)filterObj).deserialize()); + + if (limit > 0) + qry.setLimit(limit); } /** */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLimitTest.java b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLimitTest.java new file mode 100644 index 00000000000..5c69e523356 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLimitTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; + +/** */ +public class IndexQueryLimitTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final String IDX = "PERSON_ID_IDX"; + + /** */ + private static final int CNT = 10_000; + + /** */ + private Ignite crd; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + crd = startGrids(4); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<Long, Person>() + .setName(CACHE) + .setIndexedTypes(Long.class, Person.class) + .setAtomicityMode(TRANSACTIONAL) + .setCacheMode(REPLICATED); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** */ + @Test + public void testRangeQueriesWithoutDuplicates() throws Exception { + checkRangeQueries(1); + } + + /** */ + @Test + public void testRangeQueriesWithDuplicates() throws Exception { + checkRangeQueries(10); + } + + /** */ + @Test + public void testSetLimit() { + GridTestUtils.assertThrows(log, () -> new IndexQuery<>(Person.class, IDX).setLimit(0), + IllegalArgumentException.class, "Limit must be positive."); + + int limit = 1 + new Random().nextInt(1000); + + GridTestUtils.assertThrows(log, () -> new IndexQuery<>(Person.class, IDX).setLimit(0 - limit), + IllegalArgumentException.class, "Limit must be positive."); + + IndexQuery<Long, Person> qry = new IndexQuery<>(Person.class, IDX); + + qry.setLimit(limit); + + assertEquals(limit, qry.getLimit()); + } + + /** */ + private void checkRangeQueries(int duplicates) throws Exception { + // Add data + insertData(duplicates); + + // All + checkLimit(null, 0, CNT, duplicates); + + int pivot = new Random().nextInt(CNT); + + // Lt. + checkLimit(lt("id", pivot), 0, pivot, duplicates); + } + + /** */ + private void checkLimit(IndexQueryCriterion criterion, int left, int right, int duplicates) throws Exception { + int rows = right - left; + int limit = new Random().nextInt(rows) + 1; + + // limit < rows + checkLimit(criterion, limit, left, left + limit, duplicates); + + // limit >= rows + if (rows > 1) { + limit = new Random().nextInt(CNT + 2 - rows) + rows; + + checkLimit(criterion, limit, left, right, duplicates); + } + } + + /** */ + private void checkLimit(IndexQueryCriterion criterion, int limit, int left, int right, int duplicates) throws Exception { + IndexQuery<Long, Person> qry = new IndexQuery<>(Person.class, IDX); + + if (criterion != null) + qry.setCriteria(criterion); + + qry.setLimit(limit); + + QueryCursor<Cache.Entry<Long, Person>> cursor = crd.cache(CACHE).query(qry); + + int expSize = (right - left) * duplicates; + + if (limit > 0 && limit < expSize) + expSize = limit; + + Set<Long> expKeys = new HashSet<>(expSize); + List<Integer> expOrderedValues = new LinkedList<>(); + + loop: for (int i = left; i != right; i++) { + for (int j = 0; j < duplicates; j++) { + expOrderedValues.add(i); + + expKeys.add((long)CNT * j + i); + if (expOrderedValues.size() >= limit) + break loop; + } + } + + AtomicInteger actSize = new AtomicInteger(); + ((QueryCursorEx<Cache.Entry<Long, Person>>)cursor).getAll(entry -> { + assertEquals(expOrderedValues.remove(0), (Integer)entry.getValue().id); + + assertTrue(expKeys.remove(entry.getKey())); + + int persId = entry.getKey().intValue() % CNT; + + assertEquals(new Person(persId), entry.getValue()); + + actSize.incrementAndGet(); + }); + + assertEquals(expSize, actSize.get()); + + assertTrue(expKeys.isEmpty()); + } + + /** */ + private void insertData(int duplicates) { + try (IgniteDataStreamer<Long, Person> streamer = crd.dataStreamer(CACHE)) { + for (int persId = 0; persId < CNT; persId++) { + // Create duplicates of data. + for (int i = 0; i < duplicates; i++) + streamer.addData((long)CNT * i + persId, new Person(persId)); + } + } + } + + /** */ + private static class Person { + /** */ + @QuerySqlField(index = true) + final int id; + + /** */ + Person(int id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "Person[id=" + id + "]"; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Person person = (Person)o; + + return Objects.equals(id, person.id); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java index 3d408c06d2f..8bb75faddad 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java @@ -46,7 +46,8 @@ import org.junit.runners.Suite; ThinClientIndexQueryTest.class, RepeatedFieldIndexQueryTest.class, IndexQueryInCriterionTest.class, - IndexQueryInCriterionDescTest.class + IndexQueryInCriterionDescTest.class, + IndexQueryLimitTest.class }) public class IndexQueryTestSuite { } diff --git a/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java index 4c4a8ce0951..08cc8b76477 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java @@ -17,7 +17,9 @@ package org.apache.ignite.cache.query; +import java.lang.reflect.Field; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Random; import java.util.function.Consumer; @@ -31,11 +33,13 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.client.ClientCache; import org.apache.ignite.client.ClientException; +import org.apache.ignite.client.ClientFeatureNotSupportedByServerException; import org.apache.ignite.client.IgniteClient; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -344,6 +348,26 @@ public class ThinClientIndexQueryTest extends GridCommonAbstractTest { .setCriteria(crit); assertClientQuery(cache, left, right, idxQry); + + if (left < right) { + Random r = new Random(); + + int limit = 1 + r.nextInt(right - left); + + idxQry = new IndexQuery<Integer, Person>(Person.class, idxName) + .setCriteria(crit) + .setLimit(limit); + + assertClientQuery(cache, left, left + limit, idxQry); + + limit = right - left + r.nextInt(right - left); + + idxQry = new IndexQuery<Integer, Person>(Person.class, idxName) + .setCriteria(crit) + .setLimit(limit); + + assertClientQuery(cache, left, right, idxQry); + } } /** */ @@ -369,6 +393,47 @@ public class ThinClientIndexQueryTest extends GridCommonAbstractTest { } } + /** */ + @Test + public void testIndexQueryLimitOnOlderProtocolVersion() throws Exception { + // Exclude INDEX_QUERY_LIMIT from protocol. + Class<?> clazz = Class.forName("org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature"); + + Field field = clazz.getDeclaredField("ALL_FEATURES_AS_ENUM_SET"); + + field.setAccessible(true); + + EnumSet<ProtocolBitmaskFeature> allFeaturesEnumSet = (EnumSet<ProtocolBitmaskFeature>)field.get(null); + + allFeaturesEnumSet.remove(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT); + + try { + withClientCache((cache) -> { + // No limit. + IndexQuery<Integer, Person> idxQry = new IndexQuery<>(Person.class, IDX_FLD1); + + assertClientQuery(cache, NULLS_CNT, CNT, idxQry); + + // With limit. + IndexQuery<Integer, Person> idxQryWithLImit = new IndexQuery<Integer, Person>(Person.class, IDX_FLD1) + .setLimit(10); + + GridTestUtils.assertThrowsAnyCause( + log, + () -> { + cache.query(idxQryWithLImit).getAll(); + return null; + }, + ClientFeatureNotSupportedByServerException.class, + "Feature INDEX_QUERY_LIMIT is not supported by the server"); + }); + } + finally { + //revert the features set + allFeaturesEnumSet.add(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT); + } + } + /** */ private void withClientCache(Consumer<ClientCache<Integer, Person>> consumer) { ClientConfiguration clnCfg = new ClientConfiguration()