Repository: cassandra Updated Branches: refs/heads/trunk 32358d646 -> 4c3ba4f30
Workaround for netty issue causing corrupted data to come off the wire patch by tjake, test by Johan Bjork; reviewed by belliottsmith for (CASSANDRA-7695) Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b3ada2bc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b3ada2bc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b3ada2bc Branch: refs/heads/trunk Commit: b3ada2bc453e84a470b070b56e35a13e0913662b Parents: d8eff03 Author: Jake Luciani <j...@apache.org> Authored: Thu Aug 7 14:35:28 2014 -0400 Committer: Jake Luciani <j...@apache.org> Committed: Thu Aug 7 14:35:28 2014 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/CassandraDaemon.java | 9 + .../apache/cassandra/cql3/CorruptionTest.java | 195 +++++++++++++++++++ 3 files changed, 205 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ada2bc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 26b39e0..aef0c40 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.0-final + * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695) * cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687) * Fix binding null values inside UDT (CASSANDRA-7685) * Fix UDT field selection with empty fields (CASSANDRA-7670) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ada2bc/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 7c85f81..5c88cb1 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.addthis.metrics.reporter.config.ReporterConfig; +import io.netty.util.internal.PlatformDependent; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; @@ -69,6 +70,14 @@ import org.apache.cassandra.utils.Pair; */ public class CassandraDaemon { + + //Workaround for netty issue + static + { + System.setProperty("io.netty.noUnsafe","true"); + assert !PlatformDependent.hasUnsafe(); + } + public static final String MBEAN_NAME = "org.apache.cassandra.db:type=NativeAccess"; // Have a dedicated thread to call exit to avoid deadlock in the case where the thread that wants to invoke exit http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ada2bc/test/long/org/apache/cassandra/cql3/CorruptionTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/cql3/CorruptionTest.java b/test/long/org/apache/cassandra/cql3/CorruptionTest.java new file mode 100644 index 0000000..1a42112 --- /dev/null +++ b/test/long/org/apache/cassandra/cql3/CorruptionTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datastax.driver.core.*; +import com.datastax.driver.core.policies.LoggingRetryPolicy; +import com.datastax.driver.core.policies.Policies; +import com.datastax.driver.core.utils.Bytes; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.service.EmbeddedCassandraService; + +public class CorruptionTest extends SchemaLoader +{ + + private static EmbeddedCassandraService cassandra; + private static Cluster cluster; + private static Session session; + + private static PreparedStatement getStatement; + private static PreparedStatement putStatement; + private static String KEYSPACE = "cass_test"; + private static final String TABLE="put_test"; + private static final String KEY = "SingleFailingKey"; + private static String VALUE; + private final int THREADPOOL_SIZE=40; + + @BeforeClass() + public static void setup() throws ConfigurationException, IOException + { + Schema.instance.clear(); + + cassandra = new EmbeddedCassandraService(); + cassandra.start(); + + cluster = Cluster.builder().addContactPoint("127.0.0.1") + .withRetryPolicy(new LoggingRetryPolicy(Policies.defaultRetryPolicy())) + .withPort(DatabaseDescriptor.getNativeTransportPort()).build(); + session = cluster.connect(); + + session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE +" WITH replication " + + "= {'class':'SimpleStrategy', 'replication_factor':1};"); + session.execute("USE " + KEYSPACE); + session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (" + + "key blob," + + "value blob," + + "PRIMARY KEY (key));"); + + + // Prepared statements + getStatement = session.prepare("SELECT value FROM " + TABLE + " WHERE key = ?;"); + getStatement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + + putStatement = session.prepare("INSERT INTO " + TABLE + " (key, value) VALUES (?, ?);"); + putStatement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + + + + StringBuilder s = new StringBuilder(); + char a='a'; + char z='z'; + for (int i = 0; i < 500*1024; i++) + { + char x = (char)((i%((z-a)+1))+a); + if (x == 'a') + { + x = '\n'; + } + s.append(x); + } + VALUE = s.toString(); + } + + @Test + public void runCorruptionTest() + { + + final CountDownLatch failure = new CountDownLatch(1); + + + ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE); + for (int i = 0; i < THREADPOOL_SIZE; i++) + { + executor.execute(new Runnable() + { + @Override + public void run() + { + for (int i = 0; i < 100000; i++) + { + put(KEY.getBytes(), VALUE.getBytes()); + byte[] res = get(KEY.getBytes()); + //since we're flooding the server we might get some timeouts, that's not + //relevant for this test + if (res == null) + continue; + + if (!Arrays.equals(VALUE.getBytes(), res)) + { + /*try + { + dumpKeys(VALUE.getBytes(), res); + } + catch (IOException e) + { + e.printStackTrace(); + }*/ + failure.countDown(); + } + } + } + + private void dumpKeys(byte[] putdata, byte[] getdata) throws IOException { + String basename = "bad-data-tid" + Thread.currentThread().getId(); + File put = new File(basename+"-put"); + File get = new File(basename+"-get"); + try(FileWriter pw = new FileWriter(put)) { + pw.write(new String(putdata)); + } + try(FileWriter pw = new FileWriter(get)) { + pw.write(new String(getdata)); + } + } + }); + } + + try + { + assert!failure.await(2, TimeUnit.MINUTES); + } + catch (InterruptedException e) + { + + } + executor.shutdownNow(); + + } + + public static byte[] get(byte[] key) + { + BoundStatement boundStatement = new BoundStatement(getStatement); + boundStatement.setBytes(0, ByteBuffer.wrap(key)); + + final com.datastax.driver.core.ResultSet resultSet = session.execute(boundStatement); + final Row row = resultSet.one(); + if (row != null) + { + final ByteBuffer byteBuf = row.getBytes("value"); + return Bytes.getArray(byteBuf); + } + + return null; + } + + public static void put(byte[] key, byte[] value) + { + BoundStatement boundStatement = new BoundStatement(putStatement); + boundStatement.setBytes(0, ByteBuffer.wrap(key)); + boundStatement.setBytes(1, ByteBuffer.wrap(value)); + + session.execute(boundStatement); + } +}