This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ef1490faf2b IGNITE-28102 Jdbc. Fix client observable timestamp skew in
multistatement query cases (#7949)
ef1490faf2b is described below
commit ef1490faf2baf69a163818b6711536ae11f21bd7
Author: Ilya Korol <[email protected]>
AuthorDate: Wed Apr 8 17:49:49 2026 +0300
IGNITE-28102 Jdbc. Fix client observable timestamp skew in multistatement
query cases (#7949)
---
.../handler/ClientInboundMessageHandler.java | 2 +-
.../handler/requests/sql/ClientSqlCommon.java | 33 ++-
.../sql/ClientSqlCursorNextResultRequest.java | 50 ++--
.../requests/sql/ClientSqlExecuteRequest.java | 7 +-
.../ignite/jdbc/ItJdbcMultiStatementSelfTest.java | 47 +++-
.../ignite/jdbc/util/RowColumnProjection.java | 45 ++++
.../ignite/jdbc/util/RowsProjectionMatcher.java | 274 +++++++++++++++++++++
.../ignite/jdbc/util/StatementResultCheck.java | 80 ++++++
8 files changed, 496 insertions(+), 42 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 7cea0def276..f7581fb228b 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -1119,7 +1119,7 @@ public class ClientInboundMessageHandler
);
case ClientOp.SQL_CURSOR_NEXT_RESULT_SET:
- return ClientSqlCursorNextResultRequest.process(in, resources,
partitionOperationsExecutor, metrics);
+ return
ClientSqlCursorNextResultRequest.process(partitionOperationsExecutor, in,
resources, metrics, tsTracker);
case ClientOp.OPERATION_CANCEL:
return ClientOperationCancelRequest.process(in, cancelHandles);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
index aa8acf537b5..72adcb2e8c2 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
@@ -36,7 +36,9 @@ import
org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTupleContainer;
import org.apache.ignite.internal.binarytuple.BinaryTupleParser;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.sql.QueryModifier;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.sql.SqlCommon;
@@ -268,6 +270,7 @@ class ClientSqlCommon {
ClientResourceRegistry resources,
AsyncResultSetImpl asyncResultSet,
ClientHandlerMetricSource metrics,
+ HybridTimestampTracker parentTsTracker,
int pageSize,
boolean includePartitionAwarenessMeta,
boolean sqlDirectTxMappingSupported,
@@ -277,7 +280,7 @@ class ClientSqlCommon {
) {
try {
Long nextResultResourceId = sqlMultiStatementSupported &&
asyncResultSet.cursor().hasNextResult()
- ?
saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize,
resources, executor)
+ ?
saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize,
resources, parentTsTracker, executor)
: null;
if ((asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages()))
{
@@ -317,10 +320,11 @@ class ClientSqlCommon {
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> nextResultFuture,
int pageSize,
ClientResourceRegistry resources,
+ HybridTimestampTracker parentTsTracker,
Executor executor
) throws IgniteInternalCheckedException {
ClientResource resource = new ClientResource(
- new CursorWithPageSize(nextResultFuture, pageSize),
+ new NextCursorContext(parentTsTracker, pageSize,
nextResultFuture),
() -> nextResultFuture.thenAccept(cur ->
iterateThroughResultsAndCloseThem(cur, executor))
);
@@ -414,22 +418,33 @@ class ClientSqlCommon {
}
}
- /** Holder of the cursor future and page size. */
- static class CursorWithPageSize {
+ /** Holder of the context for future result set retrieval. */
+ static class NextCursorContext {
private final CompletableFuture<AsyncSqlCursor<InternalSqlRow>>
cursorFuture;
private final int pageSize;
-
- CursorWithPageSize(CompletableFuture<AsyncSqlCursor<InternalSqlRow>>
cursorFuture, int pageSize) {
- this.cursorFuture = cursorFuture;
+ private final HybridTimestampTracker parentTsTracker;
+
+ NextCursorContext(
+ HybridTimestampTracker parentTsTracker,
+ int pageSize,
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture
+ ) {
+ this.parentTsTracker = parentTsTracker;
this.pageSize = pageSize;
+ this.cursorFuture = cursorFuture;
}
- CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture() {
- return cursorFuture;
+ /** Tracker of the request that initiated query processing (i.e.
{@link ClientOp#SQL_EXEC}). */
+ HybridTimestampTracker parentTsTracker() {
+ return parentTsTracker;
}
int pageSize() {
return pageSize;
}
+
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture() {
+ return cursorFuture;
+ }
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
index 615ad7678cc..d80fc38c829 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
@@ -23,8 +23,9 @@ import
org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.client.handler.ResponseWriter;
-import
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.CursorWithPageSize;
+import
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.NextCursorContext;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.sql.api.AsyncResultSetImpl;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
@@ -37,43 +38,54 @@ public class ClientSqlCursorNextResultRequest {
/**
* Processes the request.
*
+ * @param operationExecutor Operation executor.
* @param in Unpacker.
+ * @param resources Resource bundle.
+ * @param metrics Client metrics.
+ * @param requestTsTracker TS tracker attached to current request
processing.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
+ Executor operationExecutor,
ClientMessageUnpacker in,
ClientResourceRegistry resources,
- Executor operationExecutor,
- ClientHandlerMetricSource metrics
+ ClientHandlerMetricSource metrics,
+ HybridTimestampTracker requestTsTracker
) throws IgniteInternalCheckedException {
long resourceId = in.unpackLong();
ClientResource resource = resources.remove(resourceId);
- CursorWithPageSize cursorWithPageSize =
resource.get(CursorWithPageSize.class);
- int pageSize = cursorWithPageSize.pageSize();
+ NextCursorContext nextCursorContext =
resource.get(NextCursorContext.class);
+ HybridTimestampTracker parentTsTracker =
nextCursorContext.parentTsTracker();
+ int pageSize = nextCursorContext.pageSize();
- CompletableFuture<ResponseWriter> f = cursorWithPageSize.cursorFuture()
+ CompletableFuture<ResponseWriter> f = nextCursorContext.cursorFuture()
.thenComposeAsync(cur -> cur.requestNextAsync(pageSize)
.thenApply(batchRes -> new AsyncResultSetImpl<SqlRow>(
cur,
batchRes,
pageSize
)
- ).thenCompose(asyncResultSet ->
- ClientSqlCommon.writeResultSetAsync(
- resources,
- asyncResultSet,
- metrics,
- pageSize,
- false,
- false,
- true,
- false,
- operationExecutor)
- ).thenApply(rsWriter -> rsWriter), operationExecutor);
+ ).thenCompose(asyncResultSet -> {
+ // For multi-statement DML operations, this will
help us keep the client's timestamp tracker up to date and
+ // ensure client reads are consistent with the
latest updates.
+ requestTsTracker.update(parentTsTracker.get());
+
+ return ClientSqlCommon.writeResultSetAsync(
+ resources,
+ asyncResultSet,
+ metrics,
+ parentTsTracker,
+ pageSize,
+ false,
+ false,
+ true,
+ false,
+ operationExecutor);
+ }).thenApply(rsWriter -> rsWriter), operationExecutor);
f.whenCompleteAsync((r, t) -> {
if (t != null) {
- cursorWithPageSize.cursorFuture().thenAccept(cur ->
closeRemainingCursors(cur, false, operationExecutor));
+ nextCursorContext.cursorFuture().thenAccept(cur ->
closeRemainingCursors(cur, false, operationExecutor));
}
}, operationExecutor);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 03478d8bfb0..ec39a525a68 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -98,7 +98,7 @@ public class ClientSqlExecuteRequest {
ClockService clockService,
NotificationSender notificationSender,
@Nullable String username,
- boolean sqlMultistatementsSupported,
+ boolean sqlMultistatementSupported,
boolean sqlPartitionAwarenessQualifiedNameSupported,
Consumer<SqlQueryType> queryTypeListener
) {
@@ -122,7 +122,7 @@ public class ClientSqlExecuteRequest {
resIdHolder
);
- ClientSqlProperties props = new ClientSqlProperties(in,
sqlMultistatementsSupported);
+ ClientSqlProperties props = new ClientSqlProperties(in,
sqlMultistatementSupported);
String statement = in.unpackString();
Object[] arguments = readArgsNotNull(in);
@@ -147,10 +147,11 @@ public class ClientSqlExecuteRequest {
resources,
asyncResultSet,
metrics,
+ timestampTracker,
props.pageSize(),
includePartitionAwarenessMeta,
sqlDirectTxMappingSupported,
- sqlMultistatementsSupported,
+ sqlMultistatementSupported,
sqlPartitionAwarenessQualifiedNameSupported,
operationExecutor))
.thenApply(rsWriter -> out -> {
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
index 6cb97f70825..f3c0cb7431b 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
@@ -19,6 +19,11 @@ package org.apache.ignite.jdbc;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.jdbc.util.JdbcTestUtils.assertThrowsSqlException;
+import static
org.apache.ignite.jdbc.util.RowsProjectionMatcher.hasValuesInAnyOrder;
+import static org.apache.ignite.jdbc.util.RowsProjectionMatcher.hasValuesOrder;
+import static org.apache.ignite.jdbc.util.StatementResultCheck.isResultSet;
+import static org.apache.ignite.jdbc.util.StatementResultCheck.isUpdateCounter;
+import static org.apache.ignite.jdbc.util.StatementResultCheck.noMoreResults;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
@@ -35,9 +40,11 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.jdbc.JdbcStatement;
import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.jdbc.util.StatementResultCheck;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -281,18 +288,26 @@ public class ItJdbcMultiStatementSelfTest extends
AbstractJdbcSelfTest {
@Test
public void testMixedDmlQueryExecute() throws Exception {
- boolean res = stmt.execute("INSERT INTO TEST_TX VALUES (6, 5, '5');
DELETE FROM TEST_TX WHERE ID=6; SELECT 1;");
- assertFalse(res);
- assertEquals(1, getResultSetSize());
+ assertStatementResults("INSERT INTO TEST_TX VALUES (6, 5, '5'); DELETE
FROM TEST_TX WHERE ID=6; SELECT 1;",
+ isUpdateCounter(1),
+ isUpdateCounter(1),
+ isResultSet(rs -> rs.getInt(1), hasValuesOrder(1)),
+ noMoreResults()
+ );
- res = stmt.execute("SELECT 1; INSERT INTO TEST_TX VALUES (7, 5, '5');
DELETE FROM TEST_TX WHERE ID=6;");
- assertEquals(true, res);
- assertEquals(1, getResultSetSize());
+ assertStatementResults("SELECT 1; INSERT INTO TEST_TX VALUES (7, 5,
'5'); DELETE FROM TEST_TX WHERE ID=6;",
+ isResultSet(rs -> rs.getInt(1), hasValuesOrder(1)),
+ isUpdateCounter(1),
+ isUpdateCounter(0),
+ noMoreResults()
+ );
- // empty results set in the middle
- res = stmt.execute("SELECT * FROM TEST_TX; INSERT INTO TEST_TX VALUES
(6, 6, '6'); SELECT * FROM TEST_TX;");
- assertEquals(true, res);
- assertEquals(11, getResultSetSize());
+ assertStatementResults("SELECT * FROM TEST_TX; INSERT INTO TEST_TX
VALUES (6, 6, '6'); SELECT * FROM TEST_TX;",
+ isResultSet(rs -> rs.getInt(1), hasValuesInAnyOrder(1, 2, 3,
4, 7)),
+ isUpdateCounter(1),
+ isResultSet(rs -> rs.getInt(1), hasValuesInAnyOrder(1, 2, 3,
4, 6, 7)),
+ noMoreResults()
+ );
}
@Test
@@ -780,6 +795,18 @@ public class ItJdbcMultiStatementSelfTest extends
AbstractJdbcSelfTest {
return size;
}
+ /** Verifies that after query execution statement returns results that
satisfy provided assertions. */
+ private void assertStatementResults(String query, StatementResultCheck...
resultCheck) throws SQLException {
+ List<StatementResultCheck> checks = List.of(resultCheck);
+
+ stmt.execute(query);
+
+ for (StatementResultCheck check : checks) {
+ check.check(stmt);
+ stmt.getMoreResults();
+ }
+ }
+
private boolean checkNoMoreResults() throws SQLException {
boolean more = stmt.getMoreResults();
int updCnt = stmt.getUpdateCount();
diff --git
a/modules/jdbc/src/testFixtures/java/org/apache/ignite/jdbc/util/RowColumnProjection.java
b/modules/jdbc/src/testFixtures/java/org/apache/ignite/jdbc/util/RowColumnProjection.java
new file mode 100644
index 00000000000..f621f6d5f46
--- /dev/null
+++
b/modules/jdbc/src/testFixtures/java/org/apache/ignite/jdbc/util/RowColumnProjection.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.util;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Functional interface for extracting a value from the current row of a
{@link ResultSet}.
+ *
+ * @param <T> Type of the extracted value.
+ */
+@FunctionalInterface
+public interface RowColumnProjection<T> {
+ /** Extracts a value from the current row of {@code rs}. */
+ T extract(ResultSet rs) throws SQLException;
+
+ /** Drains result set to list by projecting each record with provided
extractor. */
+ static <T> List<T> projectRowsColumn(ResultSet rs, RowColumnProjection<T>
extractor) throws SQLException {
+ List<T> result = new ArrayList<>();
+
+ while (rs.next()) {
+ result.add(extractor.extract(rs));
+ }
+
+ return result;
+ }
+}
diff --git
a/modules/jdbc/src/testFixtures/java/org/apache/ignite/jdbc/util/RowsProjectionMatcher.java
b/modules/jdbc/src/testFixtures/java/org/apache/ignite/jdbc/util/RowsProjectionMatcher.java
new file mode 100644
index 00000000000..e63107f6b4e
--- /dev/null
+++
b/modules/jdbc/src/testFixtures/java/org/apache/ignite/jdbc/util/RowsProjectionMatcher.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.util;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.hamcrest.Description;
+import org.hamcrest.SelfDescribing;
+import org.hamcrest.TypeSafeMatcher;
+import org.jetbrains.annotations.Nullable;
+
+/** Matcher for result set rows column projection. */
+public abstract class RowsProjectionMatcher<T> extends
TypeSafeMatcher<List<T>> {
+
+ private @Nullable MismatchDescriber mismatch;
+
+ @Override
+ protected final boolean matchesSafely(List<T> projection) {
+ try {
+ doMatch(projection);
+ return true;
+
+ } catch (MismatchException e) {
+ mismatch = e.describer();
+ return false;
+ }
+ }
+
+ /**
+ * Performs the actual matching.
+ *
+ * @throws MismatchException on first mismatch.
+ */
+ abstract void doMatch(List<T> projection) throws MismatchException;
+
+ @Override
+ protected void describeMismatchSafely(List<T> projection, Description
description) {
+ assert mismatch != null;
+ description.appendText("Actual projection " + projection + " ");
+ mismatch.describeTo(description);
+ }
+
+ /** Matcher which ensures that checked projection contains all values from
provided list. */
+ @SafeVarargs
+ public static <T> RowsProjectionMatcher<T> hasValuesInAnyOrder(T...
expectedVals) {
+ return hasValues("Rows projection with following values in any order
", (projection, expectedProjection) -> {
+
+ Map<T, Integer> remaining = new HashMap<>();
+ for (T value : expectedProjection) {
+ remaining.merge(value, 1, Integer::sum);
+ }
+
+ int expectedSize = expectedProjection.size();
+ int actualSize = projection.size();
+
+ for (int index = 0; index < actualSize; index++) {
+
+ if (index >= expectedSize) {
+ throw MismatchException.tooManyRecords(actualSize,
expectedSize,
+ projection.subList(index, actualSize)
+ );
+ }
+
+ T actual = projection.get(index);
+ Integer count = remaining.get(actual);
+
+ if (count == null) {
+ throw MismatchException.unexpectedValue(actual, index,
+ remaining.entrySet().stream()
+ .flatMap(e ->
Collections.nCopies(e.getValue(), e.getKey()).stream())
+ .collect(Collectors.toList()));
+ }
+
+ if (count == 1) {
+ remaining.remove(actual);
+ } else {
+ remaining.put(actual, count - 1);
+ }
+ }
+
+ if (!remaining.isEmpty()) {
+ throw MismatchException.notEnoughRecords(actualSize,
expectedSize,
+ remaining.entrySet().stream()
+ .flatMap(e ->
Collections.nCopies(e.getValue(), e.getKey()).stream())
+ .collect(Collectors.toList()));
+ }
+
+ }, expectedVals);
+ }
+
+ /** Matcher which ensures that checked projection contains all values from
provided list in same order. */
+ @SafeVarargs
+ public static <T> RowsProjectionMatcher<T> hasValuesOrder(T...
expectedVals) {
+ return hasValues("Rows projection with following values order ",
(projection, expectedProjection) -> {
+
+ int expectedSize = expectedProjection.size();
+ int actualSize = projection.size();
+
+ int index;
+
+ for (index = 0; index < actualSize; index++) {
+
+ if (index >= expectedSize) {
+ throw MismatchException.tooManyRecords(actualSize,
expectedSize,
+ projection.subList(index, actualSize)
+ );
+ }
+
+ T actual = projection.get(index);
+ T expected = expectedProjection.get(index);
+
+ if (!Objects.equals(actual, expected)) {
+ throw MismatchException.unexpectedValue(actual, index,
expected);
+ }
+ }
+
+ if (index < expectedSize) {
+ throw MismatchException.notEnoughRecords(actualSize,
expectedSize,
+ expectedProjection.subList(index, expectedSize));
+ }
+
+ }, expectedVals);
+ }
+
+ @SafeVarargs
+ private static <T> RowsProjectionMatcher<T> hasValues(
+ String baseDescription,
+ ProjectionCheck<T> projectionCheck,
+ T... expectedVals
+ ) {
+ List<T> expectedValues = Arrays.asList(expectedVals);
+
+ return new RowsProjectionMatcher<>() {
+
+ @Override
+ protected void doMatch(List<T> projection) throws
MismatchException {
+ int expectedSize = expectedValues.size();
+ int actualSize = projection.size();
+
+ if (expectedSize == 0 && actualSize > 0) {
+ throw MismatchException.shouldHaveBeenEmpty();
+ }
+
+ projectionCheck.check(projection, expectedValues);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(expectedValues.isEmpty()
+ ? "Empty projection"
+ : (baseDescription + expectedValues));
+ }
+ };
+ }
+
+
+ /** Encapsulates a mismatch description for {@link RowsProjectionMatcher}.
*/
+ @FunctionalInterface
+ private interface MismatchDescriber extends SelfDescribing {
+ String INDENTATION = " ";
+ }
+
+ /** Utility interface that encapsulates part of projection check. */
+ @FunctionalInterface
+ private interface ProjectionCheck<T> {
+ void check(List<T> projection, List<T> expectedProjection) throws
MismatchException;
+ }
+
+ @SuppressWarnings("CheckedExceptionClass")
+ private static class MismatchException extends Exception {
+
+ private static final long serialVersionUID = 351697710366897072L;
+
+ private final MismatchDescriber describer;
+
+ private MismatchException(MismatchDescriber describer) {
+ this.describer = describer;
+ }
+
+ MismatchDescriber describer() {
+ return describer;
+ }
+
+ /**
+ * An unexpected value was encountered while matching in any order.
+ *
+ * @param actual The value that was actually extracted.
+ * @param rowIndex Zero-based row index at which the mismatch occurred.
+ * @param unmatched Remaining expected values that have not been
matched yet.
+ */
+ static <T> MismatchException unexpectedValue(T actual, int rowIndex,
List<T> unmatched) {
+ return new MismatchException(description -> description
+ .appendText("has unexpected value ")
+ .appendValue(actual)
+ .appendText(" at row #" + rowIndex + ";\n")
+ .appendText(MismatchDescriber.INDENTATION + "Expected
values not matched yet: " + unmatched)
+ );
+ }
+
+ /**
+ * A value at a specific position did not match the expected value
(strict-order check).
+ *
+ * @param actual The value that was actually extracted.
+ * @param rowIndex Zero-based row index at which the mismatch occurred.
+ * @param expected The value that was expected at this position.
+ */
+ static <T> MismatchException unexpectedValue(T actual, int rowIndex, T
expected) {
+ return new MismatchException(description -> description
+ .appendText("at row #" + rowIndex + " has ")
+ .appendValue(actual)
+ .appendText(" while ")
+ .appendValue(expected)
+ .appendText(" was expected")
+ );
+ }
+
+ /**
+ * Projection contains more rows than expected.
+ *
+ * @param actualSize Number of rows actually included by projection.
+ * @param expectedSize Total number of rows that were expected.
+ * @param redundant The tail of the projection list that wasn't
expected.
+ */
+ static <T> MismatchException tooManyRecords(int actualSize, int
expectedSize, List<T> redundant) {
+ return new MismatchException(description -> description
+ .appendText("has more rows than expected, got " +
actualSize + " but " + expectedSize + " was expected;\n")
+ .appendText(MismatchDescriber.INDENTATION + "Redundant
values: " + redundant)
+ );
+ }
+
+ /**
+ * Projection contains supposed to be empty.
+ */
+ static <T> MismatchException shouldHaveBeenEmpty() {
+ return new MismatchException(description -> description
+ .appendText("has values")
+ );
+ }
+
+ /**
+ * The projection ended before all expected values were seen.
+ *
+ * @param actualSize Number of rows actually included by projection.
+ * @param expectedSize Total number of rows that were expected.
+ * @param missing The tail of the expected list that was never reached.
+ */
+ static <T> MismatchException notEnoughRecords(int actualSize, int
expectedSize, List<T> missing) {
+ return new MismatchException(description -> description
+ .appendText("has fewer rows than expected, got " +
actualSize + " but " + expectedSize + " was expected;\n")
+ .appendText(MismatchDescriber.INDENTATION + "Missing
values: " + missing)
+ );
+ }
+ }
+}
diff --git
a/modules/jdbc/src/testFixtures/java/org/apache/ignite/jdbc/util/StatementResultCheck.java
b/modules/jdbc/src/testFixtures/java/org/apache/ignite/jdbc/util/StatementResultCheck.java
new file mode 100644
index 00000000000..45984b33488
--- /dev/null
+++
b/modules/jdbc/src/testFixtures/java/org/apache/ignite/jdbc/util/StatementResultCheck.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.util;
+
+import static
org.apache.ignite.jdbc.util.RowColumnProjection.projectRowsColumn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/**
+ * Utility class for checking statement results after query execution.
+ */
+@FunctionalInterface
+public interface StatementResultCheck {
+
+ /**
+ * Actual statement check. Call-site is responsible for calling {@link
Statement#getMoreResults()} before this method.
+ *
+ * @param statement Statement that was used for query execution.
+ * @throws SQLException If error occur during accessing statement result.
+ */
+ void check(Statement statement) throws SQLException;
+
+ /** Assert that no more results are retrievable from this statement. */
+ static StatementResultCheck noMoreResults() {
+ return stmt -> {
+ assertNull(stmt.getResultSet());
+ assertEquals(-1, stmt.getUpdateCount());
+ };
+ }
+
+ /** Assert that next result is update counter. */
+ static StatementResultCheck isUpdateCounter(int expected) {
+ return stmt -> {
+ int updateCounter = stmt.getUpdateCount();
+ assertEquals(expected, updateCounter, "Expected update counter
equal to " + expected + ", but got " + updateCounter);
+ };
+ }
+
+ /** Assert that next result is {@link ResultSet}. */
+ static StatementResultCheck isResultSet() {
+ return StatementResultCheck::assertRs;
+ }
+
+ /** Assert that next result is {@link ResultSet} with rows satisfying
provided matcher. */
+ static <T> StatementResultCheck isResultSet(RowColumnProjection<T>
projection, RowsProjectionMatcher<T> matcher) {
+ return stmt -> {
+ ResultSet rs = assertRs(stmt);
+ assertThat(projectRowsColumn(rs, projection), matcher);
+ };
+ }
+
+ private static ResultSet assertRs(Statement statement) throws SQLException
{
+ ResultSet rs = statement.getResultSet();
+ assertNotNull(rs, "Expected next ResultSet, but got <null>");
+
+ return rs;
+ }
+
+}