This is an automated email from the ASF dual-hosted git repository. anovikov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 36450ff06d IGNITE-20878 Basic criteria queries for record view (#2882) 36450ff06d is described below commit 36450ff06da48c06567d9e79eaabf9c017a651e9 Author: Andrey Novikov <anovi...@apache.org> AuthorDate: Thu Jan 4 17:00:38 2024 +0700 IGNITE-20878 Basic criteria queries for record view (#2882) --- .../java/org/apache/ignite/lang/AsyncCursor.java | 69 +++++++++++ .../main/java/org/apache/ignite/lang/Cursor.java | 33 +++++ .../main/java/org/apache/ignite/sql/ResultSet.java | 10 +- .../apache/ignite/sql/async/AsyncResultSet.java | 32 ++--- .../java/org/apache/ignite/table/RecordView.java | 3 +- .../org/apache/ignite/table/criteria/Criteria.java | 25 ++++ .../table/criteria/CriteriaQueryOptions.java | 91 ++++++++++++++ .../ignite/table/criteria/CriteriaQuerySource.java | 82 +++++++++++++ .../apache/ignite/table/criteria/package-info.java | 22 ++++ .../client/table/ClientRecordBinaryView.java | 34 ++++++ .../internal/client/table/ClientRecordView.java | 34 ++++++ .../ignite/client/fakes/FakeIgniteTables.java | 3 +- .../internal/table/criteria/CursorAdapter.java | 122 +++++++++++++++++++ .../table/criteria/QueryCriteriaAsyncCursor.java | 77 ++++++++++++ .../testframework/matchers/TupleMatcher.java | 49 ++++++++ .../ignite/internal/index/IndexManagerTest.java | 3 +- .../runner/app/ItIgniteNodeRestartTest.java | 12 +- .../app/client/ItAbstractThinClientTest.java | 10 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 3 +- .../internal/ClusterPerClassIntegrationTest.java | 27 ++++- .../exec/ExecutableTableRegistrySelfTest.java | 6 +- modules/table/build.gradle | 2 + .../rebalance/ItRebalanceDistributedTest.java | 5 +- .../ignite/internal/table/ItColocationTest.java | 3 +- .../ignite/internal/table/ItCriteriaQueryTest.java | 134 +++++++++++++++++++++ .../ignite/internal/table/AbstractTableView.java | 8 +- .../internal/table/KeyValueBinaryViewImpl.java | 6 +- .../ignite/internal/table/KeyValueViewImpl.java | 5 +- .../internal/table/RecordBinaryViewImpl.java | 38 +++++- .../ignite/internal/table/RecordViewImpl.java | 50 +++++++- .../apache/ignite/internal/table/TableImpl.java | 21 ++-- .../internal/table/distributed/TableManager.java | 12 +- .../internal/table/InteropOperationsTest.java | 14 +-- .../table/KeyValueBinaryViewOperationsTest.java | 6 +- .../KeyValueViewOperationsSimpleSchemaTest.java | 17 +-- .../internal/table/KeyValueViewOperationsTest.java | 2 + .../table/RecordBinaryViewOperationsTest.java | 6 +- .../internal/table/RecordViewOperationsTest.java | 4 +- .../internal/table/TableKvOperationsTestBase.java | 4 +- .../table/distributed/TableManagerTest.java | 4 +- .../apache/ignite/distributed/ItTxTestCluster.java | 6 +- 41 files changed, 1013 insertions(+), 81 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/lang/AsyncCursor.java b/modules/api/src/main/java/org/apache/ignite/lang/AsyncCursor.java new file mode 100644 index 0000000000..8a7e273538 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/lang/AsyncCursor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import java.util.concurrent.CompletableFuture; + +/** + * Provides methods to iterate over operation results and release underlying resources in an asynchronous way. + * + * @param <T> The type of elements returned by this iterator. + */ +public interface AsyncCursor<T> { + /** + * Returns the current page content if the operation returns results. + * + * @return Iterable set of elements. + * @throws IgniteException If no results is returned. + */ + Iterable<T> currentPage(); + + /** + * Returns the current page size if the operation return results. + * + * @return The size of {@link #currentPage()}. + * @throws IgniteException If no results is returned. + */ + int currentPageSize(); + + /** + * Fetches the next page of results asynchronously. + * The current page is changed after the future completion. + * The methods {@link #currentPage()}, {@link #currentPageSize()}, {@link #hasMorePages()} + * use the current page and return consistent results between complete last page future and call {@code fetchNextPage}. + * + * @return A future which will be completed when next page will be fetched and set as the current page. + * The future will return {@code this} for chaining. + * @throws IgniteException If resource is closed or if there are no more results. + */ + CompletableFuture<? extends AsyncCursor<T>> fetchNextPage(); + + /** + * Indicates whether there are more pages of results. + * + * @return {@code True} if there are more pages with results, {@code false} otherwise. + */ + boolean hasMorePages(); + + /** + * Closes this cursor and releases any underlying resources. + * + * @return A future which will be completed when the resources will be actually released. + */ + CompletableFuture<Void> closeAsync(); +} diff --git a/modules/api/src/main/java/org/apache/ignite/lang/Cursor.java b/modules/api/src/main/java/org/apache/ignite/lang/Cursor.java new file mode 100644 index 0000000000..4aa6a8b658 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/lang/Cursor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import java.util.Iterator; + +/** + * Provides methods to iterate over operation results and release underlying resources. + * + * @param <T> The type of elements returned by this iterator. + */ +public interface Cursor<T> extends Iterator<T>, AutoCloseable { + /** + * Closes this cursor and releases any underlying resources. + */ + @Override + void close(); +} diff --git a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java index 988efc5b9a..fbb5c409ac 100644 --- a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java +++ b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java @@ -17,7 +17,7 @@ package org.apache.ignite.sql; -import java.util.Iterator; +import org.apache.ignite.lang.Cursor; import org.apache.ignite.table.mapper.Mapper; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -40,7 +40,7 @@ import org.jetbrains.annotations.Nullable; * @see Session#execute(Transaction, String, Object...) * @see Session#execute(Transaction, Mapper, String, Object...) */ -public interface ResultSet<T> extends Iterator<T>, AutoCloseable { +public interface ResultSet<T> extends Cursor<T> { /** * Returns metadata for the results if the result contains rows (if {@link #hasRowSet()} returns {@code true}). * @@ -70,12 +70,12 @@ public interface ResultSet<T> extends Iterator<T>, AutoCloseable { long affectedRows(); /** - * Indicated whether the query that had produced the result was a conditional query. - * E.g., for query "Create table if not exists", the method returns {@code true} if + * Indicates whether the query that had produced the result was a conditional query. + * E.g., for query "Create table if not exists", the method returns {@code true} if * the operation was successful, and {@code false} if the operation was ignored becasue * the table already existed. * - * <p>Note: If the method returns {@code false}, either {@link #affectedRows()} returns + * <p>Note: If the method returns {@code false}, either {@link #affectedRows()} returns * the number the affected rows or {@link #hasRowSet()} returns {@code true}. * * @return {@code True} if the query is conditional, {@code false} otherwise. diff --git a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java index 05a37a99ae..4ba91a0822 100644 --- a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java +++ b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java @@ -18,10 +18,13 @@ package org.apache.ignite.sql.async; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.lang.AsyncCursor; +import org.apache.ignite.sql.CursorClosedException; import org.apache.ignite.sql.NoRowSetExpectedException; import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.Session; +import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.table.mapper.Mapper; import org.apache.ignite.tx.Transaction; @@ -56,9 +59,9 @@ import org.jetbrains.annotations.Nullable; * @see Session#executeAsync(Transaction, String, Object...) * @see Session#executeAsync(Transaction, Mapper, String, Object...) */ -public interface AsyncResultSet<T> { +public interface AsyncResultSet<T> extends AsyncCursor<T> { /** - * Returns metadata for query results. If the result set contains rows ({@link #hasRowSet()}, returns {@code true}). + * Returns metadata for query results. If the result set contains rows ({@link #hasRowSet()}, returns {@code true}). * If not applicable, returns {@code null}. * * @return ResultSet Metadata. @@ -88,11 +91,11 @@ public interface AsyncResultSet<T> { long affectedRows(); /** - * Indicates whether the query that had produced the result was a conditional query. - * E.g., for query "Create table if not exists", the method returns {@code true} if + * Indicates whether the query that had produced the result was a conditional query. + * E.g., for query "Create table if not exists", the method returns {@code true} if * the operation was successful or {@code false} if the operation was ignored because the table already existed. * - * <p>Note: If the method returns {@code false}, then either {@link #affectedRows()} returns the number of + * <p>Note: If the method returns {@code false}, then either {@link #affectedRows()} returns the number of * affected rows, or {@link #hasRowSet()} returns {@code true}, or the conditional DDL query is not applied. * * @return {@code True} if a conditional query is applied, {@code false} otherwise. @@ -106,6 +109,7 @@ public interface AsyncResultSet<T> { * @return Iterable set of rows. * @throws NoRowSetExpectedException if no row set is returned. */ + @Override Iterable<T> currentPage(); /** @@ -114,31 +118,29 @@ public interface AsyncResultSet<T> { * @return The size of {@link #currentPage()}. * @throws NoRowSetExpectedException if no row set is returned. */ + @Override int currentPageSize(); /** * Fetches the next page of results asynchronously. - * The future that is completed with the same {@code AsyncResultSet} object. * The current page is changed after the future completion. * The methods {@link #currentPage()}, {@link #currentPageSize()}, {@link #hasMorePages()} * use the current page and return consistent results between complete last page future and call {@code fetchNextPage}. * - * @return Operation future. - * @throws NoRowSetExpectedException if no row set is expected as a query result. + * @return A future which will be completed when next page will be fetched and set as the current page. + * The future will return {@code this} for chaining. + * @throws NoRowSetExpectedException If no row set is expected as a query result. + * @throws CursorClosedException If cursor is closed. + * @throws SqlException If there are no more pages. */ + @Override CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage(); - /** - * Indicates whether there are more pages of results. - * - * @return {@code True} if there are more pages with results, {@code false} otherwise. - */ - boolean hasMorePages(); - /** * Invalidates a query result, stops the query, and cleans up query resources. * * @return Operation future. */ + @Override CompletableFuture<Void> closeAsync(); } diff --git a/modules/api/src/main/java/org/apache/ignite/table/RecordView.java b/modules/api/src/main/java/org/apache/ignite/table/RecordView.java index e95fc98b2e..2ab3a35cda 100644 --- a/modules/api/src/main/java/org/apache/ignite/table/RecordView.java +++ b/modules/api/src/main/java/org/apache/ignite/table/RecordView.java @@ -20,6 +20,7 @@ package org.apache.ignite.table; import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.table.criteria.CriteriaQuerySource; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -29,7 +30,7 @@ import org.jetbrains.annotations.Nullable; * @param <R> Mapped record type. * @see org.apache.ignite.table.mapper.Mapper */ -public interface RecordView<R> extends DataStreamerTarget<R> { +public interface RecordView<R> extends DataStreamerTarget<R>, CriteriaQuerySource<R> { /** * Gets a record with the same key column values as the given one from a table. * diff --git a/modules/api/src/main/java/org/apache/ignite/table/criteria/Criteria.java b/modules/api/src/main/java/org/apache/ignite/table/criteria/Criteria.java new file mode 100644 index 0000000000..1734c30157 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/table/criteria/Criteria.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.table.criteria; + +/** + * Represents a criteria query predicate. + */ +public interface Criteria { + // No-op. +} diff --git a/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaQueryOptions.java b/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaQueryOptions.java new file mode 100644 index 0000000000..951f604d5f --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaQueryOptions.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.table.criteria; + +/** + * Options useful for tuning the criteria query. + * + * @see CriteriaQuerySource + */ +public class CriteriaQueryOptions { + /** Default options. */ + public static final CriteriaQueryOptions DEFAULT = builder().build(); + + /** Maximum number of rows per page. */ + private final int pageSize; + + /** + * Constructor. + * + * @param pageSize Maximum number of rows per page. + */ + private CriteriaQueryOptions(int pageSize) { + this.pageSize = pageSize; + } + + /** + * Creates a new builder. + * + * @return Builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Returns a page size - the maximum number of result rows that can be fetched at a time. + * + * @return Maximum number of rows per page. + */ + public int pageSize() { + return pageSize; + } + + /** + * Builder. + */ + public static class Builder { + /** Maximum number of rows per page. */ + private int pageSize = 1000; + + /** + * Sets a page size - the maximum number of result rows that can be fetched at a time. + * + * @param pageSize Maximum number of rows per page. + * @return {@code this} for chaining. + */ + public Builder pageSize(int pageSize) { + if (pageSize <= 0) { + throw new IllegalArgumentException("Page size must be positive: " + pageSize); + } + + this.pageSize = pageSize; + + return this; + } + + /** + * Builds the options. + * + * @return Criteria query options. + */ + public CriteriaQueryOptions build() { + return new CriteriaQueryOptions(pageSize); + } + } +} diff --git a/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaQuerySource.java b/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaQuerySource.java new file mode 100644 index 0000000000..bfcf9fd3cc --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaQuerySource.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.table.criteria; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.lang.AsyncCursor; +import org.apache.ignite.lang.Cursor; +import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.tx.Transaction; +import org.jetbrains.annotations.Nullable; + +/** + * Represents an entity that can be used to execute predicate-based criteria queries. + * + * @param <T> The type of elements returned by iterator. + */ +public interface CriteriaQuerySource<T> { + /** + * Executes predicate-based criteria query. + * + * @param tx Transaction to execute the query within or {@code null} to run within implicit transaction. + * @param criteria The predicate to filter entries or {@code null} to return all entries from the underlying table. + * @return Iterator with query results. + * @throws IgniteException If failed. + */ + default Cursor<T> query(@Nullable Transaction tx, @Nullable Criteria criteria) { + return query(tx, criteria, null); + } + + /** + * Executes a predicate-based criteria query. + * + * @param tx Transaction to execute the query within or {@code null} to run within implicit transaction. + * @param criteria The predicate to filter entries or {@code null} to return all entries from the underlying table. + * @param opts Criteria query options or {@code null} to use default. + * @return Iterator with query results. + * @throws IgniteException If failed. + */ + Cursor<T> query(@Nullable Transaction tx, @Nullable Criteria criteria, @Nullable CriteriaQueryOptions opts); + + /** + * Executes a predicate-based criteria query in an asynchronous way. + * + * @param tx Transaction to execute the query within or {@code null} to run within implicit transaction. + * @param criteria The predicate to filter entries or {@code null} to return all entries from the underlying table. + * @return Future that represents the pending completion of the operation. + * @throws IgniteException If failed. + */ + default CompletableFuture<AsyncCursor<T>> queryAsync(@Nullable Transaction tx, @Nullable Criteria criteria) { + return queryAsync(tx, criteria, null); + } + + /** + * Executes a predicate-based criteria query in an asynchronous way. + * + * @param tx Transaction to execute the query within or {@code null} to run within implicit transaction. + * @param criteria The predicate to filter entries or {@code null} to return all entries from the underlying table. + * @param opts Criteria query options or {@code null} to use default. + * @return Future that represents the pending completion of the operation. + * @throws IgniteException If failed. + */ + CompletableFuture<AsyncCursor<T>> queryAsync( + @Nullable Transaction tx, + @Nullable Criteria criteria, + @Nullable CriteriaQueryOptions opts + ); +} diff --git a/modules/api/src/main/java/org/apache/ignite/table/criteria/package-info.java b/modules/api/src/main/java/org/apache/ignite/table/criteria/package-info.java new file mode 100644 index 0000000000..bb857acdf1 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/table/criteria/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains API classes for criteria queries. + */ + +package org.apache.ignite.table.criteria; diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java index 1a75c59a9f..1646367394 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java @@ -29,10 +29,20 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow.Publisher; import org.apache.ignite.client.RetryLimitPolicy; import org.apache.ignite.internal.client.proto.ClientOp; +import org.apache.ignite.internal.client.sql.ClientSessionBuilder; +import org.apache.ignite.internal.client.sql.ClientStatementBuilder; import org.apache.ignite.internal.streamer.StreamerBatchSender; +import org.apache.ignite.internal.table.criteria.CursorAdapter; +import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor; +import org.apache.ignite.lang.AsyncCursor; +import org.apache.ignite.lang.Cursor; +import org.apache.ignite.sql.Session; +import org.apache.ignite.sql.Statement; import org.apache.ignite.table.DataStreamerOptions; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; +import org.apache.ignite.table.criteria.Criteria; +import org.apache.ignite.table.criteria.CriteriaQueryOptions; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -379,4 +389,28 @@ public class ClientRecordBinaryView implements RecordView<Tuple> { return ClientDataStreamer.streamData(publisher, opts, batchSender, provider, tbl); } + + /** {@inheritDoc} */ + @Override + public Cursor<Tuple> query(@Nullable Transaction tx, @Nullable Criteria criteria, @Nullable CriteriaQueryOptions opts) { + return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts))); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<AsyncCursor<Tuple>> queryAsync( + @Nullable Transaction tx, + @Nullable Criteria criteria, + @Nullable CriteriaQueryOptions opts + ) { + //TODO: implement serialization of criteria to SQL https://issues.apache.org/jira/browse/IGNITE-20879 + var query = "SELECT * FROM " + tbl.name(); + var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts; + + Statement statement = new ClientStatementBuilder().query(query).pageSize(opts0.pageSize()).build(); + Session session = new ClientSessionBuilder(tbl.channel()).build(); + + return session.executeAsync(tx, statement) + .thenApply(resultSet -> new QueryCriteriaAsyncCursor<>(resultSet, session::close)); + } } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java index 41f89af706..5a2b7eee97 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java @@ -30,9 +30,19 @@ import java.util.concurrent.Flow.Publisher; import org.apache.ignite.client.RetryLimitPolicy; import org.apache.ignite.internal.client.proto.ClientOp; import org.apache.ignite.internal.client.proto.TuplePart; +import org.apache.ignite.internal.client.sql.ClientSessionBuilder; +import org.apache.ignite.internal.client.sql.ClientStatementBuilder; import org.apache.ignite.internal.streamer.StreamerBatchSender; +import org.apache.ignite.internal.table.criteria.CursorAdapter; +import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor; +import org.apache.ignite.lang.AsyncCursor; +import org.apache.ignite.lang.Cursor; +import org.apache.ignite.sql.Session; +import org.apache.ignite.sql.Statement; import org.apache.ignite.table.DataStreamerOptions; import org.apache.ignite.table.RecordView; +import org.apache.ignite.table.criteria.Criteria; +import org.apache.ignite.table.criteria.CriteriaQueryOptions; import org.apache.ignite.table.mapper.Mapper; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -375,4 +385,28 @@ public class ClientRecordView<R> implements RecordView<R> { return ClientDataStreamer.streamData(publisher, opts, batchSender, provider, tbl); } + + /** {@inheritDoc} */ + @Override + public Cursor<R> query(@Nullable Transaction tx, @Nullable Criteria criteria, @Nullable CriteriaQueryOptions opts) { + return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts))); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<AsyncCursor<R>> queryAsync( + @Nullable Transaction tx, + @Nullable Criteria criteria, + @Nullable CriteriaQueryOptions opts + ) { + //TODO: implement serialization of criteria to SQL https://issues.apache.org/jira/browse/IGNITE-20879 + var query = "SELECT * FROM " + tbl.name(); + var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts; + + Statement statement = new ClientStatementBuilder().query(query).pageSize(opts0.pageSize()).build(); + Session session = new ClientSessionBuilder(tbl.channel()).build(); + + return session.executeAsync(tx, ser.mapper(), statement) + .thenApply(resultSet -> new QueryCriteriaAsyncCursor<>(resultSet, session::close)); + } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java index 27cc2368f2..87df575c62 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java @@ -213,7 +213,8 @@ public class FakeIgniteTables implements IgniteTablesInternal { public CompletableFuture<Integer> schemaVersionAtNow(int tableId) { return completedFuture(schemaReg.lastKnownSchemaVersion()); } - } + }, + new FakeIgniteSql() ); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/CursorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/CursorAdapter.java new file mode 100644 index 0000000000..d1684943ba --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/CursorAdapter.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.criteria; + +import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture; +import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPublicException; +import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow; +import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; + +import java.util.Iterator; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import org.apache.ignite.lang.AsyncCursor; +import org.apache.ignite.lang.Cursor; + +/** + * Synchronous wrapper over {@link AsyncCursor}. + */ +public class CursorAdapter<T> implements Cursor<T> { + /** Wrapped asynchronous cursor. */ + private final AsyncCursor<T> ac; + + /** Iterator. */ + private final Iterator<T> it; + + /** + * Constructor. + * + * @param ac Asynchronous cursor. + */ + public CursorAdapter(AsyncCursor<T> ac) { + assert ac != null; + + this.ac = ac; + this.it = new IteratorImpl<>(ac); + } + + /** {@inheritDoc} */ + @Override + public void close() { + try { + convertToPublicFuture(ac.closeAsync().toCompletableFuture()).join(); + } catch (Throwable e) { + throw sneakyThrow(mapToPublicException(unwrapCause(e))); + } + } + + /** {@inheritDoc} */ + @Override + public boolean hasNext() { + return it.hasNext(); + } + + /** {@inheritDoc} */ + @Override + public T next() { + return it.next(); + } + + private static class IteratorImpl<T> implements Iterator<T> { + private AsyncCursor<T> curRes; + + private CompletionStage<? extends AsyncCursor<T>> nextPageStage; + + private Iterator<T> curPage; + + IteratorImpl(AsyncCursor<T> ars) { + curRes = ars; + + advance(); + } + + @Override + public boolean hasNext() { + if (curPage.hasNext()) { + return true; + } else if (nextPageStage != null) { + try { + curRes = nextPageStage.toCompletableFuture().join(); + } catch (CompletionException ex) { + throw sneakyThrow(unwrapCause(ex)); + } + + advance(); + + return curPage.hasNext(); + } else { + return false; + } + } + + private void advance() { + curPage = curRes.currentPage().iterator(); + + if (curRes.hasMorePages()) { + nextPageStage = curRes.fetchNextPage(); + } else { + nextPageStage = null; + } + } + + @Override + public T next() { + return curPage.next(); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/QueryCriteriaAsyncCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/QueryCriteriaAsyncCursor.java new file mode 100644 index 0000000000..1ee79939e8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/QueryCriteriaAsyncCursor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.criteria; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.lang.AsyncCursor; +import org.apache.ignite.sql.async.AsyncResultSet; + +/** + * Wrapper over {@link AsyncResultSet} for criteria queries. + */ +public class QueryCriteriaAsyncCursor<T> implements AsyncCursor<T> { + private final AsyncResultSet<T> ars; + + private final Runnable closeRun; + + /** + * Constructor. + * + * @param ars Asynchronous result set. + * @param closeRun Callback to be invoked after result is closed. + */ + public QueryCriteriaAsyncCursor(AsyncResultSet<? extends T> ars, Runnable closeRun) { + this.ars = (AsyncResultSet<T>) ars; + this.closeRun = closeRun; + } + + /** {@inheritDoc} */ + @Override + public Iterable<T> currentPage() { + return ars.currentPage(); + } + + /** {@inheritDoc} */ + @Override + public int currentPageSize() { + return ars.currentPageSize(); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() { + return ars.fetchNextPage() + .whenComplete((v, t) -> { + if (t == null && !hasMorePages()) { + closeRun.run(); + } + }); + } + + /** {@inheritDoc} */ + @Override + public boolean hasMorePages() { + return ars.hasMorePages(); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<Void> closeAsync() { + return ars.closeAsync().thenRun(closeRun); + } +} diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/TupleMatcher.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/TupleMatcher.java new file mode 100644 index 0000000000..9246d5ad5f --- /dev/null +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/TupleMatcher.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.testframework.matchers; + +import org.apache.ignite.table.Tuple; +import org.hamcrest.FeatureMatcher; +import org.hamcrest.Matcher; +import org.jetbrains.annotations.Nullable; + +/** + * Matchers for {@link Tuple}. + */ +public final class TupleMatcher { + private TupleMatcher() { + } + + /** + * Creates a matcher for matching column value. + * + * @param columnName Column name in SQL-parser style notation; e.g., <br> + * "myColumn" - "MYCOLUMN", returns the index of the column ignores case sensitivity, <br> + * "\"MyColumn\"" - "MyColumn", returns the index of the column with respect to case sensitivity. + * @param valueMatcher Matcher for matching column value. + * @return Matcher for matching column value with given value. + */ + public static <T> Matcher<Tuple> tupleValue(String columnName, Matcher<T> valueMatcher) { + return new FeatureMatcher<>(valueMatcher, "A tuple with value", "value") { + @Override + protected @Nullable T featureValueOf(Tuple actual) { + return actual.value(columnName); + } + }; + } +} diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java index 043b49bb49..5c82a4e9fb 100644 --- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java @@ -95,6 +95,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; +import org.apache.ignite.sql.IgniteSql; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -357,7 +358,7 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { when(internalTable.tableId()).thenReturn(tableId); when(internalTable.storage()).thenReturn(mvTableStorage); - return spy(new TableImpl(internalTable, new HeapLockManager(), new ConstantSchemaVersions(1))); + return spy(new TableImpl(internalTable, new HeapLockManager(), new ConstantSchemaVersions(1), mock(IgniteSql.class))); } private CompletableFuture<MvTableStorage> getMvTableStorageLatestRevision(int tableId) { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 5fce588f45..51b482171a 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -121,6 +121,7 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.configuration.GcConfiguration; +import org.apache.ignite.internal.sql.api.IgniteSqlImpl; import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.storage.DataStorageModule; @@ -138,6 +139,7 @@ import org.apache.ignite.internal.testframework.TestIgnitionManager; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.impl.HeapLockManager; +import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; import org.apache.ignite.internal.tx.impl.TxManagerImpl; import org.apache.ignite.internal.tx.message.TxMessageGroup; @@ -404,6 +406,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { var schemaSyncService = new SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier); + var sqlRef = new AtomicReference<IgniteSqlImpl>(); + TableManager tableManager = new TableManager( name, registry, @@ -428,7 +432,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { schemaSyncService, catalogManager, new HybridTimestampTracker(), - placementDriverManager.placementDriver() + placementDriverManager.placementDriver(), + sqlRef::get ); var indexManager = new IndexManager(schemaManager, tableManager, catalogManager, metaStorageMgr, registry); @@ -452,6 +457,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { placementDriverManager.placementDriver() ); + sqlRef.set(new IgniteSqlImpl(name, qryEngine, new IgniteTransactionsImpl(txManager, new HybridTimestampTracker()))); + // Preparing the result map. components.add(vault); @@ -483,7 +490,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { distributionZoneManager, tableManager, indexManager, - qryEngine + qryEngine, + sqlRef.get() ); for (IgniteComponent component : otherComponents) { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java index 6ea27a3825..08c412743a 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java @@ -67,7 +67,7 @@ public abstract class ItAbstractThinClientTest extends BaseIgniteAbstractTest { private IgniteClient client; /** - * Before each. + * Before all. */ @BeforeAll void beforeAll(TestInfo testInfo, @WorkDirectory Path workDir) throws InterruptedException { @@ -199,5 +199,13 @@ public abstract class ItAbstractThinClientTest extends BaseIgniteAbstractTest { public int key; public String val; + + public int getKey() { + return key; + } + + public String getVal() { + return val; + } } } diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 882429c919..59c1f69d82 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -610,7 +610,8 @@ public class IgniteImpl implements Ignite { schemaSyncService, catalogManager, observableTimestampTracker, - placementDriverMgr.placementDriver() + placementDriverMgr.placementDriver(), + this::sql ); indexManager = new IndexManager(schemaManager, distributedTblMgr, catalogManager, metaStorageMgr, registry); diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java index d49b884f58..959bb5dbe6 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java @@ -353,17 +353,38 @@ public abstract class ClusterPerClassIntegrationTest extends IgniteIntegrationTe * {@link #deletePeople(String, int...)} to remove people. */ protected static class Person { - final int id; + int id; - final String name; + String name; - final double salary; + double salary; + + public Person() { + //No-op. + } public Person(int id, String name, double salary) { this.id = id; this.name = name; this.salary = salary; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Person person = (Person) o; + return id == person.id && Double.compare(salary, person.salary) == 0 && Objects.equals(name, person.name); + } + + @Override + public int hashCode() { + return Objects.hash(id, name, salary); + } } /** diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java index 411dae61dd..75a9d42dd7 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec; import static java.util.Collections.emptyIterator; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.Map; @@ -44,6 +45,7 @@ import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.type.NativeTypes; +import org.apache.ignite.sql.IgniteSql; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -136,7 +138,9 @@ public class ExecutableTableRegistrySelfTest extends BaseIgniteAbstractTest { int schemaVersion = 1; int tableVersion = 10; - TableImpl table = new TableImpl(internalTable, schemaRegistry, new HeapLockManager(), new ConstantSchemaVersions(tableVersion)); + TableImpl table = new TableImpl(internalTable, schemaRegistry, new HeapLockManager(), new ConstantSchemaVersions(tableVersion), + mock(IgniteSql.class)); + SchemaDescriptor schemaDescriptor = newDescriptor(schemaVersion); when(tableManager.tableAsync(tableId)).thenReturn(CompletableFuture.completedFuture(table)); diff --git a/modules/table/build.gradle b/modules/table/build.gradle index 55382658cd..8f4e72f14c 100644 --- a/modules/table/build.gradle +++ b/modules/table/build.gradle @@ -120,9 +120,11 @@ dependencies { integrationTestImplementation project(':ignite-runner') integrationTestImplementation project(':ignite-index') integrationTestImplementation project(':ignite-rest') + integrationTestImplementation project(':ignite-client') integrationTestImplementation project(':ignite-client-handler') integrationTestImplementation project(':ignite-page-memory') integrationTestImplementation project(':ignite-storage-page-memory') + integrationTestImplementation project(':ignite-sql-engine') integrationTestImplementation(testFixtures(project)) integrationTestImplementation(testFixtures(project(':ignite-api'))) integrationTestImplementation(testFixtures(project(':ignite-core'))) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 5d3b051fa6..850d689df6 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -48,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; @@ -186,6 +187,7 @@ import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.StaticNodeFinder; import org.apache.ignite.raft.jraft.rpc.RpcRequests; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.Table; import org.apache.ignite.utils.ClusterServiceTestUtils; import org.jetbrains.annotations.Nullable; @@ -1061,7 +1063,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { schemaSyncService, catalogManager, new HybridTimestampTracker(), - placementDriver + placementDriver, + () -> mock(IgniteSql.class) ) { @Override protected TxStateTableStorage createTxStateTableStorage( diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index fec00c73a3..c72763b031 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -104,6 +104,7 @@ import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.SingleClusterNodeResolver; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.Tuple; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -433,7 +434,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest { schemaRegistry = new DummySchemaManagerImpl(schema); - tbl = new TableImpl(intTable, schemaRegistry, new HeapLockManager(), new ConstantSchemaVersions(1)); + tbl = new TableImpl(intTable, schemaRegistry, new HeapLockManager(), new ConstantSchemaVersions(1), mock(IgniteSql.class)); marshaller = new TupleMarshallerImpl(schema); } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItCriteriaQueryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItCriteriaQueryTest.java new file mode 100644 index 0000000000..61ac09b90d --- /dev/null +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItCriteriaQueryTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.apache.ignite.internal.testframework.matchers.TupleMatcher.tupleValue; +import static org.apache.ignite.table.criteria.CriteriaQueryOptions.builder; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.google.common.collect.Lists; +import java.util.stream.Stream; +import org.apache.ignite.Ignite; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.AsyncCursor; +import org.apache.ignite.lang.Cursor; +import org.apache.ignite.table.RecordView; +import org.apache.ignite.table.Tuple; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Tests for the criteria query API. + */ +public class ItCriteriaQueryTest extends ClusterPerClassIntegrationTest { + private static IgniteClient CLIENT; + + /** {@inheritDoc} */ + @Override + protected int initialNodes() { + return 1; + } + + /** {@inheritDoc} */ + @BeforeAll + @Override + protected void beforeAll(TestInfo testInfo) { + super.beforeAll(testInfo); + + CLIENT = IgniteClient.builder() + .addresses("127.0.0.1:" + CLUSTER.aliveNode().clientAddress().port()).build(); + + createTable(DEFAULT_TABLE_NAME, 1, 8); + + for (int i = 0; i < 3; i++) { + insertPeople(DEFAULT_TABLE_NAME, new Person(i, "name" + i, 10.0d * i)); + } + } + + @AfterAll + void stopClient() throws Exception { + IgniteUtils.closeAll(CLIENT); + } + + private static Stream<Arguments> testRecordBinaryView() { + return Stream.of( + Arguments.of(CLIENT), + Arguments.of(CLUSTER.aliveNode()) + ); + } + + @ParameterizedTest(autoCloseArguments = false) + @MethodSource + public void testRecordBinaryView(Ignite ignite) { + RecordView<Tuple> view = ignite.tables().table(DEFAULT_TABLE_NAME).recordView(); + + try (Cursor<Tuple> cur = view.query(null, null)) { + assertThat(Lists.newArrayList(cur), containsInAnyOrder( + allOf(tupleValue("id", is(0)), tupleValue("name", is("name0")), tupleValue("salary", is(0.0d))), + allOf(tupleValue("id", is(1)), tupleValue("name", is("name1")), tupleValue("salary", is(10.0d))), + allOf(tupleValue("id", is(2)), tupleValue("name", is("name2")), tupleValue("salary", is(20.0d))) + )); + } + } + + private static Stream<Arguments> testRecordPojoView() { + return Stream.of( + // TODO https://issues.apache.org/jira/browse/IGNITE-20977 + //Arguments.of(CLUSTER.aliveNode()), + Arguments.of(CLIENT) + ); + } + + @ParameterizedTest(autoCloseArguments = false) + @MethodSource + public void testRecordPojoView(Ignite ignite) { + RecordView<Person> view = ignite.tables().table(DEFAULT_TABLE_NAME).recordView(Person.class); + + try (Cursor<Person> cur = view.query(null, null)) { + assertThat(Lists.newArrayList(cur), containsInAnyOrder( + new Person(0, "name0", 0.0d), + new Person(1, "name1", 10.0d), + new Person(2, "name2", 20.0d) + )); + } + } + + @Test + public void testOptions() { + RecordView<Person> view = CLIENT.tables().table(DEFAULT_TABLE_NAME).recordView(Person.class); + + AsyncCursor<Person> ars = await(view.queryAsync(null, null, builder().pageSize(2).build())); + + assertNotNull(ars); + assertEquals(2, ars.currentPageSize()); + await(ars.closeAsync()); + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java index 0a3304053e..6de0d37fbd 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVer import org.apache.ignite.internal.table.distributed.schema.SchemaVersions; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.ExceptionUtils; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -46,16 +47,21 @@ abstract class AbstractTableView { /** Table row view converter. */ protected final TableViewRowConverter rowConverter; + /** Ignite SQL facade. */ + protected final IgniteSql sql; + /** * Constructor. * * @param tbl Internal table. * @param schemaVersions Schema versions access. * @param schemaReg Schema registry. + * @param sql Ignite SQL facade. */ - AbstractTableView(InternalTable tbl, SchemaVersions schemaVersions, SchemaRegistry schemaReg) { + AbstractTableView(InternalTable tbl, SchemaVersions schemaVersions, SchemaRegistry schemaReg, IgniteSql sql) { this.tbl = tbl; this.schemaVersions = schemaVersions; + this.sql = sql; this.rowConverter = new TableViewRowConverter(schemaReg); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java index da4d606b44..0ad9e64c67 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.MarshallerException; import org.apache.ignite.lang.NullableValue; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.DataStreamerOptions; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.Tuple; @@ -59,9 +60,10 @@ public class KeyValueBinaryViewImpl extends AbstractTableView implements KeyValu * @param tbl Table storage. * @param schemaReg Schema registry. * @param schemaVersions Schema versions access. + * @param sql Ignite SQL facade. */ - public KeyValueBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg, SchemaVersions schemaVersions) { - super(tbl, schemaVersions, schemaReg); + public KeyValueBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg, SchemaVersions schemaVersions, IgniteSql sql) { + super(tbl, schemaVersions, schemaReg, sql); marshallerCache = new TupleMarshallerCache(schemaReg); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java index 8f1179e251..51bdfe46b5 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.NullableValue; import org.apache.ignite.lang.UnexpectedNullValueException; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.DataStreamerOptions; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.mapper.Mapper; @@ -64,6 +65,7 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu * @param tbl Table storage. * @param schemaRegistry Schema registry. * @param schemaVersions Schema versions access. + * @param sql Ignite SQL facade. * @param keyMapper Key class mapper. * @param valueMapper Value class mapper. */ @@ -71,10 +73,11 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView implements KeyValu InternalTable tbl, SchemaRegistry schemaRegistry, SchemaVersions schemaVersions, + IgniteSql sql, Mapper<K> keyMapper, Mapper<V> valueMapper ) { - super(tbl, schemaVersions, schemaRegistry); + super(tbl, schemaVersions, schemaRegistry, sql); marshallerFactory = (schema) -> new KvMarshallerImpl<>(schema, keyMapper, valueMapper); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java index f1c943de94..247ee7d45d 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java @@ -31,13 +31,22 @@ import org.apache.ignite.internal.schema.marshaller.TupleMarshaller; import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException; import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.streamer.StreamerBatchSender; +import org.apache.ignite.internal.table.criteria.CursorAdapter; +import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor; import org.apache.ignite.internal.table.distributed.schema.SchemaVersions; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.lang.AsyncCursor; +import org.apache.ignite.lang.Cursor; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.MarshallerException; +import org.apache.ignite.sql.IgniteSql; +import org.apache.ignite.sql.Session; +import org.apache.ignite.sql.Statement; import org.apache.ignite.table.DataStreamerOptions; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; +import org.apache.ignite.table.criteria.Criteria; +import org.apache.ignite.table.criteria.CriteriaQueryOptions; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -53,9 +62,10 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie * @param tbl The table. * @param schemaRegistry Table schema registry. * @param schemaVersions Schema versions access. + * @param sql Ignite SQL facade. */ - public RecordBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaRegistry, SchemaVersions schemaVersions) { - super(tbl, schemaVersions, schemaRegistry); + public RecordBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaRegistry, SchemaVersions schemaVersions, IgniteSql sql) { + super(tbl, schemaVersions, schemaRegistry, sql); marshallerCache = new TupleMarshallerCache(schemaRegistry); } @@ -434,4 +444,28 @@ public class RecordBinaryViewImpl extends AbstractTableView implements RecordVie return DataStreamer.streamData(publisher, options, batchSender, partitioner); } + + /** {@inheritDoc} */ + @Override + public Cursor<Tuple> query(@Nullable Transaction tx, @Nullable Criteria criteria, @Nullable CriteriaQueryOptions opts) { + return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts))); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<AsyncCursor<Tuple>> queryAsync( + @Nullable Transaction tx, + @Nullable Criteria criteria, + @Nullable CriteriaQueryOptions opts + ) { + //TODO: implement serialization of criteria to SQL https://issues.apache.org/jira/browse/IGNITE-20879 + var query = "SELECT * FROM " + tbl.name(); + var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts; + + Statement statement = sql.statementBuilder().query(query).pageSize(opts0.pageSize()).build(); + Session session = sql.createSession(); + + return session.executeAsync(tx, statement) + .thenApply(resultSet -> new QueryCriteriaAsyncCursor<>(resultSet, session::close)); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java index 4e680c6b65..4dcba9fb50 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java @@ -33,11 +33,20 @@ import org.apache.ignite.internal.schema.marshaller.RecordMarshaller; import org.apache.ignite.internal.schema.marshaller.reflection.RecordMarshallerImpl; import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.streamer.StreamerBatchSender; +import org.apache.ignite.internal.table.criteria.CursorAdapter; +import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor; import org.apache.ignite.internal.table.distributed.schema.SchemaVersions; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.lang.AsyncCursor; +import org.apache.ignite.lang.Cursor; import org.apache.ignite.lang.MarshallerException; +import org.apache.ignite.sql.IgniteSql; +import org.apache.ignite.sql.Session; +import org.apache.ignite.sql.Statement; import org.apache.ignite.table.DataStreamerOptions; import org.apache.ignite.table.RecordView; +import org.apache.ignite.table.criteria.Criteria; +import org.apache.ignite.table.criteria.CriteriaQueryOptions; import org.apache.ignite.table.mapper.Mapper; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -46,6 +55,9 @@ import org.jetbrains.annotations.Nullable; * Record view implementation. */ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R> { + /** Record class mapper. */ + private final Mapper<R> mapper; + /** Marshaller factory. */ private final Function<SchemaDescriptor, RecordMarshaller<R>> marshallerFactory; @@ -59,10 +71,18 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R * @param schemaRegistry Schema registry. * @param schemaVersions Schema versions access. * @param mapper Record class mapper. + * @param sql Ignite SQL facade. */ - public RecordViewImpl(InternalTable tbl, SchemaRegistry schemaRegistry, SchemaVersions schemaVersions, Mapper<R> mapper) { - super(tbl, schemaVersions, schemaRegistry); - + public RecordViewImpl( + InternalTable tbl, + SchemaRegistry schemaRegistry, + SchemaVersions schemaVersions, + Mapper<R> mapper, + IgniteSql sql + ) { + super(tbl, schemaVersions, schemaRegistry, sql); + + this.mapper = mapper; marshallerFactory = (schema) -> new RecordMarshallerImpl<>(schema, mapper); } @@ -517,4 +537,28 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R return DataStreamer.streamData(publisher, options, batchSender, partitioner); } + + /** {@inheritDoc} */ + @Override + public Cursor<R> query(@Nullable Transaction tx, @Nullable Criteria criteria, @Nullable CriteriaQueryOptions opts) { + return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts))); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<AsyncCursor<R>> queryAsync( + @Nullable Transaction tx, + @Nullable Criteria criteria, + @Nullable CriteriaQueryOptions opts + ) { + //TODO: implement serialization of criteria to SQL https://issues.apache.org/jira/browse/IGNITE-20879 + var query = "SELECT * FROM " + tbl.name(); + var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts; + + Statement statement = sql.statementBuilder().query(query).pageSize(opts0.pageSize()).build(); + Session session = sql.createSession(); + + return session.executeAsync(tx, mapper, statement) + .thenApply(resultSet -> new QueryCriteriaAsyncCursor<>(resultSet, session::close)); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java index e816e2b57b..6a5c316a5a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.table.distributed.schema.SchemaVersions; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.lang.ErrorGroups; import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; @@ -65,6 +66,9 @@ public class TableImpl implements TableViewInternal { private final SchemaVersions schemaVersions; + /** Ignite SQL facade. */ + private final IgniteSql sql; + /** Schema registry. Should be set either in constructor or via {@link #schemaView(SchemaRegistry)} before start of using the table. */ private volatile SchemaRegistry schemaReg; @@ -80,11 +84,13 @@ public class TableImpl implements TableViewInternal { * @param tbl The table. * @param lockManager Lock manager. * @param schemaVersions Schema versions access. + * @param sql Ignite SQL facade. */ - public TableImpl(InternalTable tbl, LockManager lockManager, SchemaVersions schemaVersions) { + public TableImpl(InternalTable tbl, LockManager lockManager, SchemaVersions schemaVersions, IgniteSql sql) { this.tbl = tbl; this.lockManager = lockManager; this.schemaVersions = schemaVersions; + this.sql = sql; } /** @@ -94,10 +100,11 @@ public class TableImpl implements TableViewInternal { * @param schemaReg Table schema registry. * @param lockManager Lock manager. * @param schemaVersions Schema versions access. + * @param sql Ignite SQL facade. */ @TestOnly - public TableImpl(InternalTable tbl, SchemaRegistry schemaReg, LockManager lockManager, SchemaVersions schemaVersions) { - this(tbl, lockManager, schemaVersions); + public TableImpl(InternalTable tbl, SchemaRegistry schemaReg, LockManager lockManager, SchemaVersions schemaVersions, IgniteSql sql) { + this(tbl, lockManager, schemaVersions, sql); this.schemaReg = schemaReg; } @@ -142,22 +149,22 @@ public class TableImpl implements TableViewInternal { @Override public <R> RecordView<R> recordView(Mapper<R> recMapper) { - return new RecordViewImpl<>(tbl, schemaReg, schemaVersions, recMapper); + return new RecordViewImpl<>(tbl, schemaReg, schemaVersions, recMapper, sql); } @Override public RecordView<Tuple> recordView() { - return new RecordBinaryViewImpl(tbl, schemaReg, schemaVersions); + return new RecordBinaryViewImpl(tbl, schemaReg, schemaVersions, sql); } @Override public <K, V> KeyValueView<K, V> keyValueView(Mapper<K> keyMapper, Mapper<V> valMapper) { - return new KeyValueViewImpl<>(tbl, schemaReg, schemaVersions, keyMapper, valMapper); + return new KeyValueViewImpl<>(tbl, schemaReg, schemaVersions, sql, keyMapper, valMapper); } @Override public KeyValueView<Tuple, Tuple> keyValueView() { - return new KeyValueBinaryViewImpl(tbl, schemaReg, schemaVersions); + return new KeyValueBinaryViewImpl(tbl, schemaReg, schemaVersions, sql); } @Override diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 9bd9b4d6ef..5fc4899ecd 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -75,6 +75,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntSupplier; import java.util.function.LongFunction; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -185,6 +186,7 @@ import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.TopologyService; import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.Table; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -339,6 +341,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** Placement driver. */ private final PlacementDriver placementDriver; + /** A supplier function that returns {@link IgniteSql}. */ + private final Supplier<IgniteSql> sql; + private final SchemaVersions schemaVersions; private final PartitionReplicatorNodeRecovery partitionReplicatorNodeRecovery; @@ -367,6 +372,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { * @param raftGroupServiceFactory Factory that is used for creation of raft group services for replication groups. * @param vaultManager Vault manager. * @param placementDriver Placement driver. + * @param sql A supplier function that returns {@link IgniteSql}. */ public TableManager( String nodeName, @@ -392,7 +398,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { SchemaSyncService schemaSyncService, CatalogService catalogService, HybridTimestampTracker observableTimestampTracker, - PlacementDriver placementDriver + PlacementDriver placementDriver, + Supplier<IgniteSql> sql ) { this.clusterService = clusterService; this.raftMgr = raftMgr; @@ -414,6 +421,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { this.catalogService = catalogService; this.observableTimestampTracker = observableTimestampTracker; this.placementDriver = placementDriver; + this.sql = sql; TopologyService topologyService = clusterService.topologyService(); @@ -1116,7 +1124,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { partitions, clusterService.topologyService(), txManager, tableStorage, txStateStorage, replicaSvc, clock, observableTimestampTracker, placementDriver); - var table = new TableImpl(internalTable, lockMgr, schemaVersions); + var table = new TableImpl(internalTable, lockMgr, schemaVersions, sql.get()); // TODO: IGNITE-18595 We need to do something different to wait for indexes before full rebalancing table.addIndexesToWait(collectTableIndexIds(tableId, catalogVersion, onNodeRecovery)); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java index 7a67343096..4b59bf557f 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.type.NativeType; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; @@ -136,17 +137,10 @@ public class InteropOperationsTest extends BaseIgniteAbstractTest { SchemaVersions schemaVersions = new ConstantSchemaVersions(schemaVersion); - table = new TableImpl(intTable, schemaRegistry, new HeapLockManager(), schemaVersions); - kvBinView = new KeyValueBinaryViewImpl(intTable, schemaRegistry, schemaVersions); - - kvView = new KeyValueViewImpl<>( - intTable, - schemaRegistry, - schemaVersions, - Mapper.of(Long.class, "id"), - Mapper.of(Value.class) - ); + table = new TableImpl(intTable, schemaRegistry, new HeapLockManager(), schemaVersions, mock(IgniteSql.class)); + kvBinView = table.keyValueView(); + kvView = table.keyValueView(Mapper.of(Long.class, "id"), Mapper.of(Value.class)); rBinView = table.recordView(); rView = table.recordView(Mapper.of(Row.class)); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java index 8de46d2ee5..7eaca14a5e 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java @@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -43,6 +44,7 @@ import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl; import org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException; import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; import org.apache.ignite.internal.type.NativeTypes; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.Tuple; import org.junit.jupiter.api.Test; @@ -450,7 +452,9 @@ public class KeyValueBinaryViewOperationsTest extends TableKvOperationsTestBase SchemaDescriptor schema = schemaDescriptor(); InternalTable internalTable = spy(createInternalTable(schema)); - KeyValueView<Tuple, Tuple> view = new KeyValueBinaryViewImpl(internalTable, new DummySchemaManagerImpl(schema), schemaVersions); + KeyValueView<Tuple, Tuple> view = new KeyValueBinaryViewImpl( + internalTable, new DummySchemaManagerImpl(schema), schemaVersions, mock(IgniteSql.class) + ); BinaryRow resultRow = new TupleMarshallerImpl(schema).marshal(Tuple.create().set("ID", 1L).set("VAL", 2L)); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsSimpleSchemaTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsSimpleSchemaTest.java index 2ed3faa793..2f1fe8f75a 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsSimpleSchemaTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsSimpleSchemaTest.java @@ -35,8 +35,6 @@ import java.util.stream.Collectors; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaTestUtils; -import org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions; -import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; import org.apache.ignite.internal.type.NativeType; import org.apache.ignite.internal.type.NativeTypeSpec; import org.apache.ignite.internal.type.NativeTypes; @@ -498,8 +496,7 @@ public class KeyValueViewOperationsSimpleSchemaTest extends TableKvOperationsTes assertFalse(type.mismatch(NativeTypes.fromObject(val))); - KeyValueViewImpl<Long, Object> kvView = kvViewForValueType(type, - (Class<Object>) val.getClass(), true); + KeyValueView<Long, Object> kvView = kvViewForValueType(type, (Class<Object>) val.getClass(), true); kvView.put(null, key, val); @@ -651,7 +648,7 @@ public class KeyValueViewOperationsSimpleSchemaTest extends TableKvOperationsTes @SuppressWarnings("ConstantConditions") @Test public void nonNullableValueColumn() { - KeyValueViewImpl<Long, Long> tbl = kvViewForValueType(NativeTypes.INT64, Long.class, false); + KeyValueView<Long, Long> tbl = kvViewForValueType(NativeTypes.INT64, Long.class, false); assertThrows(NullPointerException.class, () -> tbl.getAndPut(null, 1L, null)); assertThrows(NullPointerException.class, () -> tbl.getAndReplace(null, 1L, null)); @@ -675,7 +672,7 @@ public class KeyValueViewOperationsSimpleSchemaTest extends TableKvOperationsTes * @param valueClass Value class. * @param nullable Nullability flag for the value type. */ - private <T> KeyValueViewImpl<Long, T> kvViewForValueType(NativeType type, Class<T> valueClass, boolean nullable) { + private <T> KeyValueView<Long, T> kvViewForValueType(NativeType type, Class<T> valueClass, boolean nullable) { Mapper<Long> keyMapper = Mapper.of(Long.class, "id"); Mapper<T> valMapper = Mapper.of(valueClass, "val"); @@ -687,12 +684,6 @@ public class KeyValueViewOperationsSimpleSchemaTest extends TableKvOperationsTes TableViewInternal table = createTable(schema); - return new KeyValueViewImpl<>( - table.internalTable(), - new DummySchemaManagerImpl(schema), - new ConstantSchemaVersions(1), - keyMapper, - valMapper - ); + return table.keyValueView(keyMapper, valMapper); } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java index dcb30d93ec..5ebfb386b7 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java @@ -71,6 +71,7 @@ import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.lang.MarshallerException; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.mapper.Mapper; import org.junit.jupiter.api.BeforeEach; @@ -721,6 +722,7 @@ public class KeyValueViewOperationsTest extends TableKvOperationsTestBase { internalTable, new DummySchemaManagerImpl(schema), schemaVersions, + mock(IgniteSql.class), keyMapper, valMapper ); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java index 12d23ae078..295980048e 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java @@ -33,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -50,6 +51,7 @@ import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; import org.apache.ignite.internal.table.impl.TestTupleBuilder; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; import org.junit.jupiter.api.Assertions; @@ -582,7 +584,9 @@ public class RecordBinaryViewOperationsTest extends TableKvOperationsTestBase { SchemaDescriptor schema = schemaDescriptor(); InternalTable internalTable = spy(createInternalTable(schema)); - RecordView<Tuple> view = new RecordBinaryViewImpl(internalTable, new DummySchemaManagerImpl(schema), schemaVersions); + RecordView<Tuple> view = new RecordBinaryViewImpl( + internalTable, new DummySchemaManagerImpl(schema), schemaVersions, mock(IgniteSql.class) + ); BinaryRow resultRow = new TupleMarshallerImpl(schema).marshal(Tuple.create().set("id", 1L).set("val", 2L)); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java index dd0f303ccb..a7c62b3bdf 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.type.NativeTypeSpec; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.mapper.Mapper; import org.junit.jupiter.api.BeforeEach; @@ -389,7 +390,8 @@ public class RecordViewOperationsTest extends TableKvOperationsTestBase { internalTable, new DummySchemaManagerImpl(schema), schemaVersions, - recMapper + recMapper, + mock(IgniteSql.class) ); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java index 1a343dbd66..f784c3677a 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableKvOperationsTestBase.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; +import org.apache.ignite.sql.IgniteSql; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -66,7 +67,8 @@ abstract class TableKvOperationsTestBase extends BaseIgniteAbstractTest { internalTable, new DummySchemaManagerImpl(schema), new HeapLockManager(), - schemaVersions + schemaVersions, + mock(IgniteSql.class) ); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 3e572d2d15..629e1451c0 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -123,6 +123,7 @@ import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.TopologyService; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.Table; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; @@ -733,7 +734,8 @@ public class TableManagerTest extends IgniteAbstractTest { new AlwaysSyncedSchemaSyncService(), catalogManager, new HybridTimestampTracker(), - new TestPlacementDriver(node) + new TestPlacementDriver(node), + () -> mock(IgniteSql.class) ) { @Override diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 9c99a918c6..baf6dca9f8 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -141,6 +141,7 @@ import org.apache.ignite.network.NodeFinder; import org.apache.ignite.network.StaticNodeFinder; import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; +import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.tx.IgniteTransactions; import org.apache.ignite.utils.ClusterServiceTestUtils; import org.jetbrains.annotations.Nullable; @@ -631,7 +632,7 @@ public class ItTxTestCluster { } } - CompletableFuture.allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join(); + allOf(partitionReadyFutures.toArray(new CompletableFuture[0])).join(); raftClients.computeIfAbsent(tableName, t -> new ArrayList<>()).addAll(clients.values()); @@ -652,7 +653,8 @@ public class ItTxTestCluster { ), new DummySchemaManagerImpl(schemaDescriptor), clientTxManager.lockManager(), - new ConstantSchemaVersions(SCHEMA_VERSION) + new ConstantSchemaVersions(SCHEMA_VERSION), + mock(IgniteSql.class) ); }