This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git
commit cc3e43c710a0fb683b7e955f641e221ccc2e5d54 Author: David Capwell <dcapw...@gmail.com> AuthorDate: Wed Apr 22 19:14:24 2020 -0700 In-jvm dtest IInstance and ICoordinator should use QueryResult as the base API Patch by David Capwell; reviewed by Alex Petrov for CASSANDRA-15756. --- pom.xml | 23 +++ .../cassandra/distributed/api/ICoordinator.java | 28 ++- .../cassandra/distributed/api/IInstance.java | 7 +- .../cassandra/distributed/api/NodeToolResult.java | 3 +- .../cassandra/distributed/api/QueryResult.java | 91 ++------- .../cassandra/distributed/api/QueryResults.java | 204 +++++++++++++++++++++ .../org/apache/cassandra/distributed/api/Row.java | 116 ++++++++++-- .../{QueryResult.java => SimpleQueryResult.java} | 46 +++-- .../cassandra/distributed/shared/AssertUtils.java | 37 ++++ .../cassandra/distributed/shared/FutureUtils.java | 95 ++++++++++ .../cassandra/distributed/api/QueryResultTest.java | 198 ++++++++++++++++++++ 11 files changed, 740 insertions(+), 108 deletions(-) diff --git a/pom.xml b/pom.xml index df733c6..62c90d8 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,25 @@ <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>5.6.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <version>5.6.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>3.15.0</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -65,6 +84,10 @@ </excludes> </configuration> </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.22.2</version> + </plugin> </plugins> </pluginManagement> <plugins> diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java index 34087d0..3a96b63 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java +++ b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java @@ -22,6 +22,8 @@ import java.util.Iterator; import java.util.UUID; import java.util.concurrent.Future; +import org.apache.cassandra.distributed.shared.FutureUtils; + // The cross-version API requires that a Coordinator can be constructed without any constructor arguments public interface ICoordinator { @@ -30,13 +32,31 @@ public interface ICoordinator return executeWithResult(query, consistencyLevel, boundValues).toObjectArrays(); } - QueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues); + SimpleQueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues); + + default Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues) + { + return executeWithPagingWithResult(query, consistencyLevel, pageSize, boundValues).map(Row::toObjectArray); + } + + QueryResult executeWithPagingWithResult(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues); + + default Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues) + { + return FutureUtils.map(asyncExecuteWithTracingWithResult(sessionId, query, consistencyLevel, boundValues), r -> r.toObjectArrays()); + } - Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues); + Future<SimpleQueryResult> asyncExecuteWithTracingWithResult(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues); - Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues); + default Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues) + { + return executeWithTracingWithResult(sessionId, query, consistencyLevel, boundValues).toObjectArrays(); + } - Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues); + default SimpleQueryResult executeWithTracingWithResult(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues) + { + return FutureUtils.waitOn(asyncExecuteWithTracingWithResult(sessionId, query, consistencyLevel, boundValues)); + } IInstance instance(); } diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java index 90c8242..8895d94 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java +++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java @@ -31,7 +31,12 @@ public interface IInstance extends IIsolatedExecutor void schemaChangeInternal(String query); - public Object[][] executeInternal(String query, Object... args); + default Object[][] executeInternal(String query, Object... args) + { + return executeInternalWithResult(query, args).toObjectArrays(); + } + + SimpleQueryResult executeInternalWithResult(String query, Object... args); IInstanceConfig config(); diff --git a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java index 2e5c5f0..bdd75b5 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java +++ b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java @@ -147,7 +147,8 @@ public class NodeToolResult } } - private static String getStackTraceAsString(Throwable throwable) { + private static String getStackTraceAsString(Throwable throwable) + { StringWriter stringWriter = new StringWriter(); throwable.printStackTrace(new PrintWriter(stringWriter)); return stringWriter.toString(); diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java index e72d33e..9281794 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java +++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java @@ -15,12 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.distributed.api; import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.Objects; +import java.util.List; +import java.util.function.Function; import java.util.function.Predicate; /** @@ -52,88 +51,34 @@ import java.util.function.Predicate; * }</code> * <p> * Both cases have the same issue; reference to a row from a previous call to {@link #hasNext()}. Since the same {@link Row} - * object can be used accross different calls to {@link #hasNext()} this would mean any attempt to access after the fact + * object can be used across different calls to {@link #hasNext()} this would mean any attempt to access after the fact * points to newer data. If this behavior is not desirable and access is needed between calls, then {@link Row#copy()} * should be used; this will clone the {@link Row} and return a new object pointing to the same data. */ -public class QueryResult implements Iterator<Row> +public interface QueryResult extends Iterator<Row> { - public static final QueryResult EMPTY = new QueryResult(new String[0], null); - - private final String[] names; - private final Object[][] results; - private final Predicate<Row> filter; - private final Row row; - private int offset = -1; - - public QueryResult(String[] names, Object[][] results) - { - this.names = Objects.requireNonNull(names, "names"); - this.results = results; - this.row = new Row(names); - this.filter = ignore -> true; - } - - private QueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset) - { - this.names = names; - this.results = results; - this.filter = filter; - this.offset = offset; - this.row = new Row(names); - } - - public String[] getNames() - { - return names; - } - - public boolean isEmpty() - { - return results.length == 0; - } - - public int size() - { - return results.length; - } - - public QueryResult filter(Predicate<Row> fn) - { - return new QueryResult(names, results, filter.and(fn), offset); - } + List<String> names(); - /** - * Get all rows as a 2d array. Any calls to {@link #filter(Predicate)} will be ignored and the array returned will - * be the full set from the query. - */ - public Object[][] toObjectArrays() + default QueryResult filter(Predicate<Row> fn) { - return results; + return QueryResults.filter(this, fn); } - @Override - public boolean hasNext() + default <A> Iterator<A> map(Function<? super Row, ? extends A> fn) { - if (results == null) - return false; - while ((offset += 1) < results.length) + return new Iterator<A>() { - row.setResults(results[offset]); - if (filter.test(row)) + @Override + public boolean hasNext() { - return true; + return QueryResult.this.hasNext(); } - } - row.setResults(null); - return false; - } - @Override - public Row next() - { - if (offset < 0 || offset >= results.length) - throw new NoSuchElementException(); - return row; + @Override + public A next() + { + return fn.apply(QueryResult.this.next()); + } + }; } } diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java new file mode 100644 index 0000000..80202eb --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.api; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.Predicate; + +public final class QueryResults +{ + private static final SimpleQueryResult EMPTY = new SimpleQueryResult(new String[0], null); + + private QueryResults() {} + + public static SimpleQueryResult empty() + { + return EMPTY; + } + + public static QueryResult fromIterator(String[] names, Iterator<Row> iterator) + { + Objects.requireNonNull(names, "names"); + Objects.requireNonNull(iterator, "iterator"); + return new IteratorQueryResult(names, iterator); + } + + public static QueryResult fromObjectArrayIterator(String[] names, Iterator<Object[]> iterator) + { + Row row = new Row(names); + return fromIterator(names, new Iterator<Row>() + { + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public Row next() + { + row.setResults(iterator.next()); + return row; + } + }); + } + + public static QueryResult filter(QueryResult result, Predicate<Row> fn) + { + return new FilterQueryResult(result, fn); + } + + public static Builder builder() + { + return new Builder(); + } + + public static final class Builder + { + private static final int UNSET = -1; + + private int numColumns = UNSET; + private String[] names; + private List<Object[]> results = new ArrayList<>(); + + public Builder columns(String... columns) + { + if (columns != null) + { + if (numColumns == UNSET) + numColumns = columns.length; + + if (numColumns != columns.length) + throw new AssertionError("Attempted to add column names with different column count; " + + "expected " + numColumns + " columns but given " + Arrays.toString(columns)); + } + + names = columns; + return this; + } + + public Builder row(Object... values) + { + if (numColumns == UNSET) + numColumns = values.length; + + if (numColumns != values.length) + throw new AssertionError("Attempted to add row with different column count; " + + "expected " + numColumns + " columns but given " + Arrays.toString(values)); + results.add(values); + return this; + } + + public SimpleQueryResult build() + { + if (names == null) + { + if (numColumns == UNSET) + return QueryResults.empty(); + names = new String[numColumns]; + for (int i = 0; i < numColumns; i++) + names[i] = "unknown"; + } + return new SimpleQueryResult(names, results.stream().toArray(Object[][]::new)); + } + } + + private static final class IteratorQueryResult implements QueryResult + { + private final List<String> names; + private final Iterator<Row> iterator; + + private IteratorQueryResult(String[] names, Iterator<Row> iterator) + { + this(Collections.unmodifiableList(Arrays.asList(names)), iterator); + } + + private IteratorQueryResult(List<String> names, Iterator<Row> iterator) + { + this.names = names; + this.iterator = iterator; + } + + @Override + public List<String> names() + { + return names; + } + + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public Row next() + { + return iterator.next(); + } + } + + private static final class FilterQueryResult implements QueryResult + { + private final QueryResult delegate; + private final Predicate<Row> filter; + private Row current; + + private FilterQueryResult(QueryResult delegate, Predicate<Row> filter) + { + this.delegate = delegate; + this.filter = filter; + } + + @Override + public List<String> names() + { + return delegate.names(); + } + + @Override + public boolean hasNext() + { + while (delegate.hasNext()) + { + Row row = delegate.next(); + if (filter.test(row)) + { + current = row; + return true; + } + } + current = null; + return false; + } + + @Override + public Row next() + { + if (current == null) + throw new NoSuchElementException(); + return current; + } + } +} diff --git a/src/main/java/org/apache/cassandra/distributed/api/Row.java b/src/main/java/org/apache/cassandra/distributed/api/Row.java index 530edc1..ff4efbe 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/Row.java +++ b/src/main/java/org/apache/cassandra/distributed/api/Row.java @@ -19,8 +19,10 @@ package org.apache.cassandra.distributed.api; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; @@ -30,27 +32,33 @@ import java.util.UUID; /** * Data representing a single row in a query result. * <p> - * This class is mutable from the parent {@link QueryResult} and can have the row it points to changed between calls - * to {@link QueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call; + * This class is mutable from the parent {@link SimpleQueryResult} and can have the row it points to changed between calls + * to {@link SimpleQueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call; * to get around this, a call to {@link #copy()} will return a new object pointing to the same row. */ public class Row { + private static final int NOT_FOUND = -1; + + private final String[] names; private final Map<String, Integer> nameIndex; private Object[] results; // mutable to avoid allocations in loops public Row(String[] names) { Objects.requireNonNull(names, "names"); + this.names = names; this.nameIndex = new HashMap<>(names.length); for (int i = 0; i < names.length; i++) { - nameIndex.put(names[i], i); + // if duplicate names, always index by the first one seen + nameIndex.putIfAbsent(names[i], i); } } - private Row(Map<String, Integer> nameIndex) + private Row(String[] names, Map<String, Integer> nameIndex) { + this.names = names; this.nameIndex = nameIndex; } @@ -60,49 +68,135 @@ public class Row } /** - * Creates a copy of the current row; can be used past calls to {@link QueryResult#hasNext()}. + * Creates a copy of the current row; can be used past calls to {@link SimpleQueryResult#hasNext()}. */ public Row copy() { - Row copy = new Row(nameIndex); + Row copy = new Row(names, nameIndex); copy.setResults(results); return copy; } + public <T> T get(int index) + { + checkAccess(); + if (index < 0 || index >= results.length) + throw new NoSuchElementException("by index: " + index); + return (T) results[index]; + } + public <T> T get(String name) { checkAccess(); int idx = findIndex(name); - if (idx == -1) - return null; + if (idx == NOT_FOUND) + throw new NoSuchElementException("by name: " + name); return (T) results[idx]; } + public Short getShort(int index) + { + return get(index); + } + + public Short getShort(String name) + { + return get(name); + } + + public Integer getInteger(int index) + { + return get(index); + } + + public Integer getInteger(String name) + { + return get(name); + } + + public Long getLong(int index) + { + return get(index); + } + + public Long getLong(String name) + { + return get(name); + } + + public Float getFloat(int index) + { + return get(index); + } + + public Float getFloat(String name) + { + return get(name); + } + + public Double getDouble(int index) + { + return get(index); + } + + public Double getDouble(String name) + { + return get(name); + } + + public String getString(int index) + { + return get(index); + } + public String getString(String name) { return get(name); } + public UUID getUUID(int index) + { + return get(index); + } + public UUID getUUID(String name) { return get(name); } + public Date getTimestamp(int index) + { + return get(index); + } + public Date getTimestamp(String name) { return get(name); } + public <T> Set<T> getSet(int index) + { + return get(index); + } + public <T> Set<T> getSet(String name) { return get(name); } + /** + * Get the row as a array. + */ + public Object[] toObjectArray() + { + return results; + } + public String toString() { return "Row{" + - "names=" + nameIndex.keySet() + - ", results=" + Arrays.toString(results) + + "names=" + Arrays.toString(names) + + ", results=" + (results == null ? "[]" : Arrays.toString(results)) + '}'; } @@ -114,6 +208,6 @@ public class Row private int findIndex(String name) { - return nameIndex.getOrDefault(name, -1); + return nameIndex.getOrDefault(name, NOT_FOUND); } } diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java similarity index 76% copy from src/main/java/org/apache/cassandra/distributed/api/QueryResult.java copy to src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java index e72d33e..a44411d 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java +++ b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java @@ -15,13 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.distributed.api; -import java.util.Iterator; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * A table of data representing a complete query result. @@ -56,17 +59,15 @@ import java.util.function.Predicate; * points to newer data. If this behavior is not desirable and access is needed between calls, then {@link Row#copy()} * should be used; this will clone the {@link Row} and return a new object pointing to the same data. */ -public class QueryResult implements Iterator<Row> +public class SimpleQueryResult implements QueryResult { - public static final QueryResult EMPTY = new QueryResult(new String[0], null); - private final String[] names; private final Object[][] results; private final Predicate<Row> filter; private final Row row; private int offset = -1; - public QueryResult(String[] names, Object[][] results) + public SimpleQueryResult(String[] names, Object[][] results) { this.names = Objects.requireNonNull(names, "names"); this.results = results; @@ -74,7 +75,7 @@ public class QueryResult implements Iterator<Row> this.filter = ignore -> true; } - private QueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset) + private SimpleQueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset) { this.names = names; this.results = results; @@ -83,24 +84,23 @@ public class QueryResult implements Iterator<Row> this.row = new Row(names); } - public String[] getNames() - { - return names; - } - - public boolean isEmpty() + public List<String> names() { - return results.length == 0; + return Collections.unmodifiableList(Arrays.asList(names)); } - public int size() + public SimpleQueryResult filter(Predicate<Row> fn) { - return results.length; + return new SimpleQueryResult(names, results, filter.and(fn), offset); } - public QueryResult filter(Predicate<Row> fn) + /** + * Reset the cursor to the start of the query result; if the query result has not been iterated, this has no effect. + */ + public void reset() { - return new QueryResult(names, results, filter.and(fn), offset); + offset = -1; + row.setResults(null); } /** @@ -132,8 +132,18 @@ public class QueryResult implements Iterator<Row> @Override public Row next() { + // no null check needed for results since offset only increments IFF results is not null if (offset < 0 || offset >= results.length) throw new NoSuchElementException(); return row; } + + @Override + public String toString() { + if (results == null) + return "[]"; + return Stream.of(results) + .map(Arrays::toString) + .collect(Collectors.joining(",", "[", "]")); + } } diff --git a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java index 8e6254a..a388af3 100644 --- a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java +++ b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java @@ -18,6 +18,10 @@ package org.apache.cassandra.distributed.shared; +import org.apache.cassandra.distributed.api.QueryResult; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.api.Row; + import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -26,6 +30,34 @@ import java.util.List; public class AssertUtils { + public static void assertRows(QueryResult actual, QueryResult expected) + { + if (actual instanceof SimpleQueryResult && expected instanceof SimpleQueryResult) + { + assertRows((SimpleQueryResult) actual, (SimpleQueryResult) expected); + } + else + { + assertRows(actual.map(Row::toObjectArray), expected.map(Row::toObjectArray)); + } + } + + public static void assertRows(SimpleQueryResult actual, SimpleQueryResult expected) + { + while (actual.hasNext()) { + if (!expected.hasNext()) + throw new AssertionError(rowsNotEqualErrorMessage(actual, expected)); + + Row next = actual.next(); + Row exectedRow = expected.next(); + + assertTrue(rowsNotEqualErrorMessage(actual, expected), + Arrays.equals(next.toObjectArray(), exectedRow.toObjectArray())); + } + if (expected.hasNext()) + throw new AssertionError(rowsNotEqualErrorMessage(actual, expected)); + } + public static void assertRows(Object[][] actual, Object[]... expected) { assertEquals(rowsNotEqualErrorMessage(actual, expected), @@ -82,6 +114,11 @@ public class AssertUtils Arrays.toString(actual)); } + public static String rowsNotEqualErrorMessage(SimpleQueryResult actual, SimpleQueryResult expected) + { + return String.format("Expected: %s\nActual: %s\n", expected, actual); + } + public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected) { return String.format("Expected: %s\nActual: %s\n", diff --git a/src/main/java/org/apache/cassandra/distributed/shared/FutureUtils.java b/src/main/java/org/apache/cassandra/distributed/shared/FutureUtils.java new file mode 100644 index 0000000..9f97e8d --- /dev/null +++ b/src/main/java/org/apache/cassandra/distributed/shared/FutureUtils.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.shared; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +public final class FutureUtils { + private FutureUtils() { } + + public static <T> T waitOn(Future<T> f) + { + try + { + return f.get(); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + catch (ExecutionException e) + { + Throwable cause = e.getCause(); + if (cause instanceof Error) throw (Error) cause; + if (cause instanceof RuntimeException) throw (RuntimeException) cause; + throw new RuntimeException(cause); + } + } + + public static <A, B> Future<B> map(Future<A> future, Function<? super A, ? extends B> fn) { + if (future == null) throw new NullPointerException("Future is null"); + if (fn == null) throw new NullPointerException("Function is null"); + + if (future instanceof CompletableFuture) { + return ((CompletableFuture<A>) future).thenApply(fn); + } + return new MapFuture<>(future, fn); + } + + private static final class MapFuture<A, B> implements Future<B> + { + private final Future<A> parent; + private final Function<? super A, ? extends B> fn; + + private MapFuture(Future<A> parent, Function<? super A, ? extends B> fn) { + this.parent = parent; + this.fn = fn; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return parent.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return parent.isCancelled(); + } + + @Override + public boolean isDone() { + return parent.isDone(); + } + + @Override + public B get() throws InterruptedException, ExecutionException { + return fn.apply(parent.get()); + } + + @Override + public B get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return fn.apply(parent.get(timeout, unit)); + } + } +} diff --git a/src/test/java/org/apache/cassandra/distributed/api/QueryResultTest.java b/src/test/java/org/apache/cassandra/distributed/api/QueryResultTest.java new file mode 100644 index 0000000..6bef78b --- /dev/null +++ b/src/test/java/org/apache/cassandra/distributed/api/QueryResultTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.api; + +import org.apache.cassandra.distributed.shared.AssertUtils; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class QueryResultTest +{ + @Test + public void empty() + { + QueryResult result = QueryResults.empty(); + + assertThat(result.names()).isEmpty(); + assertThat(result.toString()).isEqualTo("[]"); + + assertThat(result.hasNext()).isFalse(); + assertThatThrownBy(result::next).isInstanceOf(NoSuchElementException.class); + + QueryResult filtered = result.filter(ignore -> true); + assertThat(filtered.hasNext()).isFalse(); + assertThatThrownBy(filtered::next).isInstanceOf(NoSuchElementException.class); + + Iterator<Object> it = result.map(r -> r.get("undefined")); + assertThat(it.hasNext()).isFalse(); + assertThatThrownBy(it::next).isInstanceOf(NoSuchElementException.class); + } + + @Test + public void equals() + { + String[] names = { "fname", "lname"}; + Object[][] rows = { + new Object[] { "david", "capwell"}, + new Object[] { "alex", "petrov"}, + new Object[] { "dinesh", "joshi"}, + }; + SimpleQueryResult result = new SimpleQueryResult(names, rows); + SimpleQueryResult fromBuilder = QueryResults.builder() + .columns(names) + .row(rows[0]) + .row(rows[1]) + .row(rows[2]) + .build(); + + AssertUtils.assertRows(result, fromBuilder); + } + + @Test + public void notEqualLength() + { + String[] names = { "fname", "lname"}; + Object[][] rows = { + new Object[] { "david", "capwell"}, + new Object[] { "alex", "petrov"}, + new Object[] { "dinesh", "joshi"}, + }; + SimpleQueryResult result = new SimpleQueryResult(names, rows); + SimpleQueryResult fromBuilder = QueryResults.builder() + .columns(names) + .row(rows[0]) + .row(rows[1]) + .row(rows[2]) + .row("chris", "lohfink") + .build(); + + assertThatThrownBy(() -> AssertUtils.assertRows(result, fromBuilder)) + .isInstanceOf(AssertionError.class) + .hasMessageContaining("Expected: ") + .hasMessageContaining("Actual: "); + } + + @Test + public void notEqualColumnLength() + { + String[] names = { "fname", "lname"}; + Object[][] rows = { + new Object[] { "david", "capwell"}, + new Object[] { "alex", "petrov"}, + new Object[] { "dinesh", "joshi"}, + }; + SimpleQueryResult result = new SimpleQueryResult(names, rows); + SimpleQueryResult fromBuilder = QueryResults.builder() + .columns("fname") + .row("david") + .row("alex") + .row("dinesh") + .build(); + + assertThatThrownBy(() -> AssertUtils.assertRows(result, fromBuilder)) + .isInstanceOf(AssertionError.class) + .hasMessageContaining("Expected: ") + .hasMessageContaining("Actual: "); + } + + @Test + public void notEqualContent() + { + String[] names = { "fname", "lname"}; + Object[][] rows = { + new Object[] { "david", "capwell"}, + new Object[] { "alex", "petrov"}, + new Object[] { "dinesh", "joshi"}, + }; + SimpleQueryResult result = new SimpleQueryResult(names, rows); + SimpleQueryResult fromBuilder = QueryResults.builder() + .columns(names) + .row("david", "Capwell") + .row("alex", "Petrov") + .row("dinesh", "Joshi") + .build(); + + assertThatThrownBy(() -> AssertUtils.assertRows(result, fromBuilder)) + .isInstanceOf(AssertionError.class) + .hasMessageContaining("Expected: ") + .hasMessageContaining("Actual: "); + } + + @Test + public void completeFilter() + { + SimpleQueryResult qr = QueryResults.builder() + .row(1, 2, 3, 4) + .row(5, 6, 7, 7) + .row(1, 2, 4, 8) + .row(2, 4, 6, 12) + .build(); + + SimpleQueryResult filtered = qr.filter(row -> row.getInteger(0).intValue() != 1); + + AssertUtils.assertRows(filtered, QueryResults.builder() + .row(5, 6, 7, 7) + .row(2, 4, 6, 12) + .build()); + } + + @Test + public void completeMap() + { + SimpleQueryResult qr = QueryResults.builder() + .row(1, 2, 3, 4) + .row(5, 6, 7, 7) + .row(1, 2, 4, 8) + .row(2, 4, 6, 12) + .build(); + + Iterator<Integer> it = qr.map(r -> r.getInteger(0)); + List<Integer> result = new ArrayList<>(4); + it.forEachRemaining(result::add); + + assertThat(result).isEqualTo(Arrays.asList(1, 5, 1, 2)); + } + + @Test + public void iteratorFilter() + { + String[] names = {"first"}; + List<Object[]> values = new ArrayList<>(); + values.add(new Object[] { "david" }); + values.add(new Object[] { "alex" }); + values.add(new Object[] { "dinesh" }); + + QueryResult qr = QueryResults.fromObjectArrayIterator(names, values.iterator()) + .filter(r -> !"david".equals(r.getString("first"))) + .filter(r -> !"alex".equals(r.getString("first"))); + + AssertUtils.assertRows(qr, QueryResults.builder() + .columns(names) + .row("dinesh") + .build()); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org