Repository: james-project
Updated Branches:
  refs/heads/master 322d5797a -> 4870766d4


JAMES-1945 Handle paging with Cassandra


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/4870766d
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/4870766d
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/4870766d

Branch: refs/heads/master
Commit: 4870766d47e61256f438af1d935754daa8a36d3b
Parents: f096bdd
Author: benwa <btell...@linagora.com>
Authored: Tue Feb 21 16:50:29 2017 +0700
Committer: Antoine Duprat <adup...@linagora.com>
Committed: Tue Feb 21 14:23:08 2017 +0100

----------------------------------------------------------------------
 .../cassandra/utils/CassandraUtils.java         |  11 +-
 .../backends/cassandra/utils/PaggingTest.java   | 110 +++++++++++++++++++
 2 files changed, 120 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/4870766d/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
index a9916bc..1732dd2 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
@@ -27,8 +27,17 @@ import com.datastax.driver.core.Row;
 
 public class CassandraUtils {
 
+    private static final int FETCH_NEXT_PAGE_ADVANCE_IN_ROW = 100;
+
     public static Stream<Row> convertToStream(ResultSet resultSet) {
-        return StreamSupport.stream(resultSet.spliterator(), true);
+        return StreamSupport.stream(resultSet.spliterator(), true)
+            .peek(row -> ensureFetchedNextPage(resultSet));
+    }
+
+    private static void ensureFetchedNextPage(ResultSet resultSet) {
+        if (resultSet.getAvailableWithoutFetching() == 
FETCH_NEXT_PAGE_ADVANCE_IN_ROW && !resultSet.isFullyFetched()) {
+            resultSet.fetchMoreResults();
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/4870766d/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/PaggingTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/PaggingTest.java
 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/PaggingTest.java
new file mode 100644
index 0000000..a643818
--- /dev/null
+++ 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/PaggingTest.java
@@ -0,0 +1,110 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.cassandra.utils;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.IntStream;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.components.CassandraIndex;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.components.CassandraTable;
+import org.apache.james.backends.cassandra.components.CassandraType;
+import org.apache.james.util.CompletableFutureUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.collect.ImmutableList;
+
+public class PaggingTest {
+    private static final String TABLE_NAME = "test";
+    private static final String ID = "id";
+    private static final String CLUSTERING = "clustering";
+    private static final UUID UUID = UUIDs.timeBased();
+
+    private CassandraCluster cassandra;
+    private CassandraAsyncExecutor executor;
+
+    @Before
+    public void setUp() {
+        cassandra = CassandraCluster.create(new CassandraModule() {
+            @Override
+            public List<CassandraTable> moduleTables() {
+                return ImmutableList.of(new CassandraTable(TABLE_NAME,
+                    SchemaBuilder.createTable(TABLE_NAME)
+                        .ifNotExists()
+                        .addPartitionKey(ID, DataType.timeuuid())
+                        .addClusteringColumn(CLUSTERING, DataType.bigint())));
+            }
+
+            @Override
+            public List<CassandraIndex> moduleIndex() {
+                return ImmutableList.of();
+            }
+
+            @Override
+            public List<CassandraType> moduleTypes() {
+                return ImmutableList.of();
+            }
+        });
+        cassandra.ensureAllTables();
+        executor = new CassandraAsyncExecutor(cassandra.getConf());
+    }
+
+    @After
+    public void tearDown() {
+        cassandra.clearAllTables();
+    }
+
+    @Test
+    public void pagingShouldWork() {
+        int fetchSize = 200;
+        int size = 2 * fetchSize + 50;
+
+        CompletableFutureUtil.allOf(
+            IntStream.range(0, size)
+                .boxed()
+                .map(i ->
+                    executor
+                        .executeVoid(insertInto(TABLE_NAME)
+                            .value(ID, UUID)
+                            .value(CLUSTERING, i))))
+            .join();
+
+        assertThat(
+            executor.execute(select()
+                .from(TABLE_NAME)
+                .where(eq(ID, UUID))
+                .setFetchSize(fetchSize))
+                .join())
+            .hasSize(size);
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to