Repository: james-project Updated Branches: refs/heads/master 322d5797a -> 4870766d4
JAMES-1945 Handle paging with Cassandra Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/4870766d Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/4870766d Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/4870766d Branch: refs/heads/master Commit: 4870766d47e61256f438af1d935754daa8a36d3b Parents: f096bdd Author: benwa <btell...@linagora.com> Authored: Tue Feb 21 16:50:29 2017 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Tue Feb 21 14:23:08 2017 +0100 ---------------------------------------------------------------------- .../cassandra/utils/CassandraUtils.java | 11 +- .../backends/cassandra/utils/PaggingTest.java | 110 +++++++++++++++++++ 2 files changed, 120 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/4870766d/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java index a9916bc..1732dd2 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java @@ -27,8 +27,17 @@ import com.datastax.driver.core.Row; public class CassandraUtils { + private static final int FETCH_NEXT_PAGE_ADVANCE_IN_ROW = 100; + public static Stream<Row> convertToStream(ResultSet resultSet) { - return StreamSupport.stream(resultSet.spliterator(), true); + return StreamSupport.stream(resultSet.spliterator(), true) + .peek(row -> ensureFetchedNextPage(resultSet)); + } + + private static void ensureFetchedNextPage(ResultSet resultSet) { + if (resultSet.getAvailableWithoutFetching() == FETCH_NEXT_PAGE_ADVANCE_IN_ROW && !resultSet.isFullyFetched()) { + resultSet.fetchMoreResults(); + } } } http://git-wip-us.apache.org/repos/asf/james-project/blob/4870766d/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/PaggingTest.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/PaggingTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/PaggingTest.java new file mode 100644 index 0000000..a643818 --- /dev/null +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/PaggingTest.java @@ -0,0 +1,110 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.cassandra.utils; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.UUID; +import java.util.stream.IntStream; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.components.CassandraIndex; +import org.apache.james.backends.cassandra.components.CassandraModule; +import org.apache.james.backends.cassandra.components.CassandraTable; +import org.apache.james.backends.cassandra.components.CassandraType; +import org.apache.james.util.CompletableFutureUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.datastax.driver.core.utils.UUIDs; +import com.google.common.collect.ImmutableList; + +public class PaggingTest { + private static final String TABLE_NAME = "test"; + private static final String ID = "id"; + private static final String CLUSTERING = "clustering"; + private static final UUID UUID = UUIDs.timeBased(); + + private CassandraCluster cassandra; + private CassandraAsyncExecutor executor; + + @Before + public void setUp() { + cassandra = CassandraCluster.create(new CassandraModule() { + @Override + public List<CassandraTable> moduleTables() { + return ImmutableList.of(new CassandraTable(TABLE_NAME, + SchemaBuilder.createTable(TABLE_NAME) + .ifNotExists() + .addPartitionKey(ID, DataType.timeuuid()) + .addClusteringColumn(CLUSTERING, DataType.bigint()))); + } + + @Override + public List<CassandraIndex> moduleIndex() { + return ImmutableList.of(); + } + + @Override + public List<CassandraType> moduleTypes() { + return ImmutableList.of(); + } + }); + cassandra.ensureAllTables(); + executor = new CassandraAsyncExecutor(cassandra.getConf()); + } + + @After + public void tearDown() { + cassandra.clearAllTables(); + } + + @Test + public void pagingShouldWork() { + int fetchSize = 200; + int size = 2 * fetchSize + 50; + + CompletableFutureUtil.allOf( + IntStream.range(0, size) + .boxed() + .map(i -> + executor + .executeVoid(insertInto(TABLE_NAME) + .value(ID, UUID) + .value(CLUSTERING, i)))) + .join(); + + assertThat( + executor.execute(select() + .from(TABLE_NAME) + .where(eq(ID, UUID)) + .setFetchSize(fetchSize)) + .join()) + .hasSize(size); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org