Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6a40b812
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6a40b812
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6a40b812

Branch: refs/heads/cassandra-2.2
Commit: 6a40b8128697cffc586059984dce71fcf6dfda59
Parents: af0cd32 07103dd
Author: blerer <benjamin.le...@datastax.com>
Authored: Wed Jul 15 17:02:58 2015 +0200
Committer: blerer <benjamin.le...@datastax.com>
Committed: Wed Jul 15 17:05:06 2015 +0200

----------------------------------------------------------------------
 .../cassandra/service/ResponseResolverTest.java | 325 +++++++++++++++++++
 1 file changed, 325 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a40b812/test/unit/org/apache/cassandra/service/ResponseResolverTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ResponseResolverTest.java
index 0000000,54e584d..7e42825
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/service/ResponseResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/ResponseResolverTest.java
@@@ -1,0 -1,306 +1,325 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.service;
+ 
+ 
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.nio.ByteBuffer;
+ import java.util.*;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ 
++import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
++import org.apache.cassandra.config.KSMetaData;
++import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.locator.SimpleStrategy;
++
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.filter.ColumnSlice;
+ import org.apache.cassandra.db.filter.SliceQueryFilter;
+ import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessagingService;
+ 
+ import static org.apache.cassandra.Util.column;
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.fail;
+ 
+ public class ResponseResolverTest extends SchemaLoader
+ {
 -    private final String KEYSPACE = "Keyspace1";
 -    private final String TABLE = "Standard1";
++    private final static String KEYSPACE = "Keyspace1";
++    private final static String TABLE = "Standard1";
++    private final static int MAX_RESPONSE_COUNT = 3;
+ 
++    @BeforeClass
++    public static void defineSchema() throws ConfigurationException
++    {
++        SchemaLoader.prepareServer();
++        SchemaLoader.createKeyspace(KEYSPACE,
++                                    SimpleStrategy.class,
++                                    KSMetaData.optsWithRF(MAX_RESPONSE_COUNT),
++                                    SchemaLoader.standardCFMD(KEYSPACE, 
TABLE));
++    }
++    
+     @Test
+     public void testSingleMessage_RowDigestResolver() throws 
DigestMismatchException, UnknownHostException
+     {
+         ByteBuffer key = bytes("key");
+         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE, 
TABLE);
+         cf.addColumn(column("c1", "v1", 0));
+         Row row = new Row(key, cf);
+ 
 -        testReadResponses(new RowDigestResolver(KEYSPACE, key), row, 
makeReadResponse("127.0.0.1", row));
++        testReadResponses(new RowDigestResolver(KEYSPACE, key, 
MAX_RESPONSE_COUNT), row, makeReadResponse("127.0.0.1", row));
+     }
+ 
+     @Test
+     public void testMultipleMessages_RowDigestResolver() throws 
DigestMismatchException, UnknownHostException
+     {
+         ByteBuffer key = bytes("key");
+         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE, 
TABLE);
+         cf.addColumn(column("c1", "v1", 0));
+         Row row = new Row(key, cf);
+ 
 -        testReadResponses(new RowDigestResolver(KEYSPACE, key),
++        testReadResponses(new RowDigestResolver(KEYSPACE, key, 
MAX_RESPONSE_COUNT),
+                           row,
+                           makeReadResponse("127.0.0.1", row),
+                           makeReadResponse("127.0.0.2", row),
+                           makeReadResponse("127.0.0.3", row));
+     }
+ 
+     @Test(expected = DigestMismatchException.class)
+     public void testDigestMismatch() throws DigestMismatchException, 
UnknownHostException
+     {
+         ByteBuffer key = bytes("key");
+         ColumnFamily cf1 = ArrayBackedSortedColumns.factory.create(KEYSPACE, 
TABLE);
+         cf1.addColumn(column("c1", "v1", 0));
+         Row row1 = new Row(key, cf1);
+ 
+         ColumnFamily cf2 = ArrayBackedSortedColumns.factory.create(KEYSPACE, 
TABLE);
+         cf2.addColumn(column("c1", "v2", 1));
+         Row row2 = new Row(key, cf2);
+ 
 -        testReadResponses(new RowDigestResolver(KEYSPACE, key),
++        testReadResponses(new RowDigestResolver(KEYSPACE, key, 
MAX_RESPONSE_COUNT),
+                           row1,
+                           makeReadResponse("127.0.0.1", row1),
+                           makeReadResponse("127.0.0.2", row2),
+                           makeReadResponse("127.0.0.3", row1));
+     }
+ 
+     @Test
+     public void testMultipleThreads_RowDigestResolver() throws 
DigestMismatchException, UnknownHostException, InterruptedException
+     {
+         ByteBuffer key = bytes("key");
+         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE, 
TABLE);
+         cf.addColumn(column("c1", "v1", 0));
+         Row row = new Row(key, cf);
+ 
 -        testReadResponsesMT(new RowDigestResolver(KEYSPACE, key),
++        testReadResponsesMT(new RowDigestResolver(KEYSPACE, key, 
MAX_RESPONSE_COUNT),
+                             row,
+                             makeReadResponse("127.0.0.1", row),
+                             makeReadResponse("127.0.0.2", row),
+                             makeReadResponse("127.0.0.3", row));
+     }
+ 
+     @Test
+     public void testSingleMessage_RowDataResolver() throws 
DigestMismatchException, UnknownHostException
+     {
+         ByteBuffer key = bytes("key");
+         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE, 
TABLE);
+         cf.addColumn(column("c1", "v1", 0));
+         Row row = new Row(key, cf);
+ 
+         testReadResponses(new RowDataResolver(KEYSPACE,
+                                               key,
+                                               new 
SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 10),
 -                                              System.currentTimeMillis()),
++                                              System.currentTimeMillis(),
++                                              MAX_RESPONSE_COUNT),
+                           row,
+                           makeReadResponse("127.0.0.1", row));
+     }
+ 
+     @Test
+     public void testMultipleMessages_RowDataResolver() throws 
DigestMismatchException, UnknownHostException
+     {
+         ByteBuffer key = bytes("key");
+         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE, 
TABLE);
+         cf.addColumn(column("c1", "v1", 0));
+         Row row = new Row(key, cf);
+ 
+         testReadResponses(new RowDataResolver(KEYSPACE,
+                                               key,
+                                               new 
SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 10),
 -                                              System.currentTimeMillis()),
++                                              System.currentTimeMillis(),
++                                              MAX_RESPONSE_COUNT),
+                           row,
+                           makeReadResponse("127.0.0.1", row),
+                           makeReadResponse("127.0.0.2", row),
+                           makeReadResponse("127.0.0.3", row));
+     }
+ 
+     @Test
+     public void testMultipleThreads_RowDataResolver() throws 
DigestMismatchException, UnknownHostException, InterruptedException
+     {
+         ByteBuffer key = bytes("key");
+         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE, 
TABLE);
+         cf.addColumn(column("c1", "v1", 0));
+         Row row = new Row(key, cf);
+ 
+         testReadResponsesMT(new RowDataResolver(KEYSPACE,
+                                                 key,
+                                                 new 
SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 10),
 -                                                System.currentTimeMillis()),
++                                                System.currentTimeMillis(),
++                                                MAX_RESPONSE_COUNT),
+                             row,
+                             makeReadResponse("127.0.0.1", row),
+                             makeReadResponse("127.0.0.2", row),
+                             makeReadResponse("127.0.0.3", row));
+     }
+ 
+     @Test
+     public void testSingleMessage_RangeSliceResolver() throws 
DigestMismatchException, UnknownHostException
+     {
+         ByteBuffer key = bytes("key");
+         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE, 
TABLE);
+         cf.addColumn(column("c1", "v1", 0));
+ 
+         Row[] expected = new Row[2];
+         for (int i = 0; i < expected.length; i++)
+             expected[i] = new Row(key, cf);
+ 
+         MessageIn<RangeSliceReply> message = makeRangeSlice("127.0.0.1", 
expected);
+ 
+         RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(KEYSPACE, System.currentTimeMillis());
+         resolver.setSources(Collections.singletonList(message.from));
+ 
+         testRangeSlices(resolver, expected, message);
+     }
+ 
+     @Test
+     public void testMultipleMessages_RangeSliceResolver() throws 
DigestMismatchException, UnknownHostException
+     {
+         ByteBuffer key = bytes("key");
+         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE, 
TABLE);
+         cf.addColumn(column("c1", "v1", 0));
+ 
+         Row[] expected = new Row[2];
+         for (int i = 0; i < expected.length; i++)
+             expected[i] = new Row(key, cf);
+ 
+         List<InetAddress> sources = new ArrayList<>(3);
+         sources.add(InetAddress.getByName("127.0.0.1"));
+         sources.add(InetAddress.getByName("127.0.0.2"));
+         sources.add(InetAddress.getByName("127.0.0.3"));
+ 
+         RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(KEYSPACE, System.currentTimeMillis());
+         resolver.setSources(sources);
+ 
+         testRangeSlices(resolver,
+                         expected,
+                         makeRangeSlice("127.0.0.1", expected),
+                         makeRangeSlice("127.0.0.2", expected),
+                         makeRangeSlice("127.0.0.3", expected));
+     }
+ 
+     private void testReadResponses(AbstractRowResolver resolver, Row 
expected, MessageIn<ReadResponse> ... messages) throws DigestMismatchException
+     {
+         for (MessageIn<ReadResponse> message : messages)
+         {
+             resolver.preprocess(message);
+ 
+             Row row = resolver.getData();
+             checkSame(expected, row);
+ 
+             row = resolver.resolve();
+             checkSame(expected, row);
+         }
+     }
+ 
+     private void testReadResponsesMT(final AbstractRowResolver resolver,
+                                      final Row expected,
+                                      final MessageIn<ReadResponse> ... 
messages) throws InterruptedException
+     {
+         for (MessageIn<ReadResponse> message : messages)
+             resolver.preprocess(message);
+ 
+         final int threadCount = 45;
+         ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount);
+         final CountDownLatch finished = new CountDownLatch(threadCount);
+ 
+         for (int i = 0; i < threadCount; i++)
+         {
+             executorService.submit(new Runnable()
+             {
+                 public void run()
+                 {
+                     try
+                     {
+                         Row row = resolver.getData();
+                         checkSame(expected, row);
+ 
+                         row = resolver.resolve();
+                         checkSame(expected, row);
+                     }
+                     catch (DigestMismatchException ex)
+                     {
+                         fail(ex.getMessage());
+                     }
+                     finally
+                     {
+                         finished.countDown();
+                     }
+                 }
+             });
+         }
+ 
+         finished.await();
+         assertEquals(0, executorService.shutdownNow().size());
+ 
+     }
+ 
+     private void testRangeSlices(RangeSliceResponseResolver resolver, Row[] 
expected, MessageIn<RangeSliceReply> ... messages)
+     {
+         for (MessageIn<RangeSliceReply> message : messages)
+         {
+             resolver.preprocess(message);
+ 
+             List<Row> rows = resolver.getData();
+             assertNotNull(rows);
+ 
+             for (int i = 0; i < expected.length; i++)
+                 checkSame(expected[i], rows.get(i));
+ 
+             Iterator<Row> rowIt = resolver.resolve().iterator();
+             assertNotNull(rowIt);
+ 
+             for (Row r : expected)
+                 checkSame(r, rowIt.next());
+         }
+     }
+ 
+     private MessageIn<ReadResponse> makeReadResponse(String address, Row row) 
throws UnknownHostException
+     {
+         return MessageIn.create(InetAddress.getByName(address),
+                                 new ReadResponse(row),
+                                 Collections.<String, byte[]>emptyMap(),
+                                 MessagingService.Verb.INTERNAL_RESPONSE,
+                                 MessagingService.current_version);
+     }
+ 
+     private MessageIn<RangeSliceReply> makeRangeSlice(String address, Row ... 
rows) throws UnknownHostException
+     {
+         return MessageIn.create(InetAddress.getByName(address),
+                                 new RangeSliceReply(Arrays.asList(rows)),
+                                 Collections.<String, byte[]>emptyMap(),
+                                 MessagingService.Verb.INTERNAL_RESPONSE,
+                                 MessagingService.current_version);
+     }
+ 
+     private void checkSame(Row r1, Row r2)
+     {
+         assertEquals(r1.key, r2.key);
+         assertEquals(r1.cf, r2.cf);
+     }
+ }

Reply via email to