This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit c2cfebf44f93af061131d73e4dcbf2a9ff582fe8 Merge: f2c9b4c 1f72cc6 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Fri Mar 27 19:04:38 2020 +0100 Merge branch 'cassandra-2.2' into cassandra-3.0 build.xml | 5 + src/java/org/apache/cassandra/net/MessageOut.java | 1 + .../org/apache/cassandra/net/MessagingService.java | 2 +- src/java/org/apache/cassandra/tools/NodeProbe.java | 11 + src/java/org/apache/cassandra/tools/NodeTool.java | 4 +- .../org/apache/cassandra/distributed/Cluster.java | 32 +- .../cassandra/distributed/UpgradeableCluster.java | 32 +- .../apache/cassandra/distributed/api/Feature.java | 24 -- .../cassandra/distributed/api/ICoordinator.java | 36 -- .../cassandra/distributed/api/IInstance.java | 57 ---- .../cassandra/distributed/api/IInstanceConfig.java | 60 ---- .../distributed/api/IIsolatedExecutor.java | 126 ------- .../apache/cassandra/distributed/api/IListen.java | 28 -- .../apache/cassandra/distributed/api/IMessage.java | 37 -- .../cassandra/distributed/api/IMessageFilters.java | 56 ---- .../distributed/impl/AbstractCluster.java | 373 +++++---------------- .../cassandra/distributed/impl/Coordinator.java | 56 ++-- .../impl/DelegatingInvokableInstance.java | 11 +- .../distributed/impl/DistributedTestSnitch.java | 31 +- .../distributed/impl/IInvokableInstance.java | 67 ---- .../distributed/impl/IUpgradeableInstance.java | 1 + .../cassandra/distributed/impl/Instance.java | 272 ++++++++++++--- .../distributed/impl/InstanceClassLoader.java | 130 ------- .../cassandra/distributed/impl/InstanceConfig.java | 98 +++--- .../cassandra/distributed/impl/InstanceKiller.java | 50 +++ .../distributed/impl/IsolatedExecutor.java | 4 + .../apache/cassandra/distributed/impl/Listen.java | 1 - .../cassandra/distributed/impl/MessageFilters.java | 168 ---------- .../impl/{Message.java => MessageImpl.java} | 27 +- .../distributed/impl/NetworkTopology.java | 137 -------- .../apache/cassandra/distributed/impl/RowUtil.java | 1 + .../cassandra/distributed/impl/TracingUtil.java | 2 +- .../cassandra/distributed/impl/Versions.java | 190 ----------- .../mock/nodetool/InternalNodeProbe.java | 33 +- .../mock/nodetool/InternalNodeProbeFactory.java | 11 +- .../cassandra/distributed/test/BootstrapTest.java | 50 +-- .../test/DistributedReadWritePathTest.java | 300 ----------------- .../distributed/test/DistributedTestBase.java | 166 --------- .../distributed/test/GossipSettlesTest.java | 16 +- .../cassandra/distributed/test/GossipTest.java | 4 +- .../distributed/test/MessageFiltersTest.java | 85 +++-- .../distributed/test/MessageForwardingTest.java | 10 +- .../distributed/test/NativeProtocolTest.java | 49 +-- .../distributed/test/NetworkTopologyTest.java | 40 ++- .../cassandra/distributed/test/NodeToolTest.java | 2 +- .../distributed/test/ResourceLeakTest.java | 13 +- .../SharedClusterTestBase.java} | 38 ++- .../distributed/test/SimpleReadWriteTest.java | 276 +++++++++++++++ .../cassandra/distributed/test/TestBaseImpl.java | 47 +++ .../upgrade/CompactStorage2to3UpgradeTest.java | 33 +- .../upgrade/MixedModeReadRepairTest.java | 8 +- .../cassandra/distributed/upgrade/UpgradeTest.java | 57 ++-- .../distributed/upgrade/UpgradeTestBase.java | 21 +- .../apache/cassandra/LogbackStatusListener.java | 2 +- 54 files changed, 1170 insertions(+), 2221 deletions(-) diff --cc build.xml index a40cb7d,ed9c1a2..2527883 --- a/build.xml +++ b/build.xml @@@ -520,7 -534,19 +524,8 @@@ artifactId="cassandra-parent" version="${version}"/> <dependency groupId="junit" artifactId="junit"/> + <dependency groupId="org.mockito" artifactId="mockito-core" /> - <dependency groupId="org.apache.pig" artifactId="pig"> - <exclusion groupId="xmlenc" artifactId="xmlenc"/> - <exclusion groupId="tomcat" artifactId="jasper-runtime"/> - <exclusion groupId="tomcat" artifactId="jasper-compiler"/> - <exclusion groupId="org.eclipse.jdt" artifactId="core"/> - <exclusion groupId="net.sf.kosmosfs" artifactId="kfs"/> - <exclusion groupId="hsqldb" artifactId="hsqldb"/> - <exclusion groupId="antlr" artifactId="antlr"/> - </dependency> - <!-- TODO CASSANDRA-9543 <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/> - --> <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/> <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/> <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/> diff --cc src/java/org/apache/cassandra/net/MessageOut.java index ce190cb,1e291c2..09ff63b --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@@ -30,6 -30,6 +30,7 @@@ import org.apache.cassandra.concurrent. import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; ++import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index 55dbee1,05c8af8..82c06da --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@@ -527,257 -553,19 +553,19 @@@ public abstract class AbstractCluster< } } - protected interface Factory<I extends IInstance, C extends AbstractCluster<I>> - { - C newCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader); - } - - public static class Builder<I extends IInstance, C extends AbstractCluster<I>> + private void uncaughtExceptions(Thread thread, Throwable error) { - private final Factory<I, C> factory; - private int nodeCount; - private int subnet; - private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology; - private TokenSupplier tokenSupplier; - private File root; - private Versions.Version version = Versions.CURRENT; - private Consumer<InstanceConfig> configUpdater; - - public Builder(Factory<I, C> factory) - { - this.factory = factory; - } - - public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier) - { - this.tokenSupplier = tokenSupplier; - return this; - } - - public Builder<I, C> withSubnet(int subnet) - { - this.subnet = subnet; - return this; - } - - public Builder<I, C> withNodes(int nodeCount) - { - this.nodeCount = nodeCount; - return this; - } - - public Builder<I, C> withDCs(int dcCount) - { - return withRacks(dcCount, 1); - } - - public Builder<I, C> withRacks(int dcCount, int racksPerDC) - { - if (nodeCount == 0) - throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder"); - - int totalRacks = dcCount * racksPerDC; - int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer - return withRacks(dcCount, racksPerDC, nodesPerRack); - } - - public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack) - { - if (nodeIdTopology != null) - throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls"); - - nodeIdTopology = new HashMap<>(); - int nodeId = 1; - for (int dc = 1; dc <= dcCount; dc++) - { - for (int rack = 1; rack <= racksPerDC; rack++) - { - for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++) - nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack))); - } - } - // adjust the node count to match the allocatation - final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack; - if (adjustedNodeCount != nodeCount) - { - assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count"; - logger.info("Network topology of {} DCs with {} racks per DC and {} nodes per rack required increasing total nodes to {}", - dcCount, racksPerDC, nodesPerRack, adjustedNodeCount); - nodeCount = adjustedNodeCount; - } - return this; - } - - public Builder<I, C> withDC(String dcName, int nodeCount) + if (!(thread.getContextClassLoader() instanceof InstanceClassLoader)) { - return withRack(dcName, rackName(1), nodeCount); - } - - public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack) - { - if (nodeIdTopology == null) - { - if (nodeCount > 0) - throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks"); - - nodeIdTopology = new HashMap<>(); - } - for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++) - nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName)); - - nodeCount += nodesInRack; - return this; - } - - // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount - public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) - { - if (nodeIdTopology.isEmpty()) - throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId."); - - IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> { - if (nodeIdTopology.get(nodeId) == null) - throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId); - }); - - if (nodeCount != nodeIdTopology.size()) - { - nodeCount = nodeIdTopology.size(); - logger.info("Adjusting node count to {} for supplied network topology", nodeCount); - } - - this.nodeIdTopology = new HashMap<>(nodeIdTopology); - - return this; - } - - public Builder<I, C> withRoot(File root) - { - this.root = root; - return this; - } - - public Builder<I, C> withVersion(Versions.Version version) - { - this.version = version; - return this; - } - - public Builder<I, C> withConfig(Consumer<InstanceConfig> updater) - { - this.configUpdater = updater; - return this; - } - - public C createWithoutStarting() throws IOException - { - if (root == null) - root = Files.createTempDirectory("dtests").toFile(); - - if (nodeCount <= 0) - throw new IllegalStateException("Cluster must have at least one node"); - - if (nodeIdTopology == null) - { - nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed() - .collect(Collectors.toMap(nodeId -> nodeId, - nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0)))); - } - - root.mkdirs(); - setupLogging(root); - - ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader(); - - List<InstanceConfig> configs = new ArrayList<>(); - - if (tokenSupplier == null) - tokenSupplier = evenlyDistributedTokens(nodeCount); - - for (int i = 0; i < nodeCount; ++i) - { - int nodeNum = i + 1; - configs.add(createInstanceConfig(nodeNum)); - } - - return factory.newCluster(root, version, configs, sharedClassLoader); - } - - public InstanceConfig newInstanceConfig(C cluster) - { - return createInstanceConfig(cluster.size() + 1); - } - - private InstanceConfig createInstanceConfig(int nodeNum) - { - String ipPrefix = "127.0." + subnet + "."; - String seedIp = ipPrefix + "1"; - String ipAddress = ipPrefix + nodeNum; - long token = tokenSupplier.token(nodeNum); - - NetworkTopology topology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology); - - InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp); - if (configUpdater != null) - configUpdater.accept(config); - - return config; - } - - public C start() throws IOException - { - C cluster = createWithoutStarting(); - cluster.startup(); - return cluster; - } - } - - public static TokenSupplier evenlyDistributedTokens(int numNodes) - { - long increment = (Long.MAX_VALUE / numNodes) * 2; - return (int nodeId) -> { - assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy", - nodeId, numNodes); - return Long.MIN_VALUE + 1 + nodeId * increment; - }; - } - - public static interface TokenSupplier - { - public long token(int nodeId); - } - - static String dcName(int index) - { - return "datacenter" + index; - } - - static String rackName(int index) - { - return "rack" + index; - } - - private static void setupLogging(File root) - { - try - { - String testConfPath = "test/conf/logback-dtest.xml"; - Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml"); - - if (!logConfPath.toFile().exists()) - { - Files.copy(new File(testConfPath).toPath(), - logConfPath); - } - - System.setProperty("logback.configurationFile", "file://" + logConfPath); - } - catch (IOException e) - { - throw new RuntimeException(e); + Thread.UncaughtExceptionHandler handler = previousHandler; + if (null != handler) + handler.uncaughtException(thread, error); + return; } + InstanceClassLoader cl = (InstanceClassLoader) thread.getContextClassLoader(); + get(cl.getInstanceId()).uncaughtException(thread, error); } -- ++ @Override public void close() { diff --cc test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java index 69c2e44,91a2aaf..e2ebef0 --- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java @@@ -31,11 -31,15 +31,13 @@@ import org.apache.cassandra.cql3.QueryO import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.statements.SelectStatement; - import org.apache.cassandra.db.ConsistencyLevel; + import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.ICoordinator; + import org.apache.cassandra.distributed.api.IInstance; + import org.apache.cassandra.distributed.api.QueryResult; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; -import org.apache.cassandra.service.pager.Pageable; import org.apache.cassandra.service.pager.QueryPager; -import org.apache.cassandra.service.pager.QueryPagers; import org.apache.cassandra.transport.Server; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.ResultMessage; @@@ -71,19 -75,23 +73,23 @@@ public class Coordinator implements ICo }).call(); } - private Object[][] executeInternal(String query, Enum<?> consistencyLevelOrigin, Object[] boundValues) + protected org.apache.cassandra.db.ConsistencyLevel toCassandraCL(ConsistencyLevel cl) + { + return org.apache.cassandra.db.ConsistencyLevel.fromCode(cl.ordinal()); + } + + private QueryResult executeInternal(String query, ConsistencyLevel consistencyLevelOrigin, Object[] boundValues) { - ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name()); - ClientState clientState = ClientState.forInternalCalls(); + ClientState clientState = makeFakeClientState(); CQLStatement prepared = QueryProcessor.getStatement(query, clientState).statement; List<ByteBuffer> boundBBValues = new ArrayList<>(); + ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name()); for (Object boundValue : boundValues) - { boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue)); - } + prepared.validate(QueryState.forInternalCalls().getClientState()); ResultMessage res = prepared.execute(QueryState.forInternalCalls(), - QueryOptions.create(consistencyLevel, + QueryOptions.create(toCassandraCL(consistencyLevel), boundBBValues, false, Integer.MAX_VALUE, @@@ -109,38 -129,40 +127,38 @@@ throw new IllegalArgumentException("Page size should be strictly positive but was " + pageSize); return instance.sync(() -> { - ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name()); + ClientState clientState = makeFakeClientState(); + ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name()); - CQLStatement prepared = QueryProcessor.getStatement(query, ClientState.forInternalCalls()).statement; + CQLStatement prepared = QueryProcessor.getStatement(query, clientState).statement; List<ByteBuffer> boundBBValues = new ArrayList<>(); for (Object boundValue : boundValues) { boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue)); } -- prepared.validate(QueryState.forInternalCalls().getClientState()); ++ prepared.validate(clientState); assert prepared instanceof SelectStatement : "Only SELECT statements can be executed with paging"; - ClientState clientState = QueryState.forInternalCalls().getClientState(); SelectStatement selectStatement = (SelectStatement) prepared; - QueryOptions queryOptions = QueryOptions.create(toCassandraCL(consistencyLevel), - boundBBValues, - false, - pageSize, - null, - null, - Server.CURRENT_VERSION); - Pageable pageable = selectStatement.getPageableCommand(queryOptions); + - QueryPager pager = selectStatement.getQuery(QueryOptions.create(consistencyLevel, ++ QueryPager pager = selectStatement.getQuery(QueryOptions.create(toCassandraCL(consistencyLevel), + boundBBValues, + false, + pageSize, + null, + null, + Server.CURRENT_VERSION), + FBUtilities.nowInSeconds()) + .getPager(null, Server.CURRENT_VERSION); // Usually pager fetches a single page (see SelectStatement#execute). We need to iterate over all // of the results lazily. - QueryPager pager = QueryPagers.pager(pageable, toCassandraCL(consistencyLevel), clientState, null); - Iterator<Object[]> iter = RowUtil.toObjects(selectStatement.getResultMetadata().names, - UntypedResultSet.create(selectStatement, - pager, - pageSize).iterator()); - - // We have to make sure iterator is not running on main thread. return new Iterator<Object[]>() { - Iterator<Object[]> iter = RowUtil.toObjects(UntypedResultSet.create(selectStatement, consistencyLevel, clientState, pager, pageSize)); ++ Iterator<Object[]> iter = RowUtil.toObjects(UntypedResultSet.create(selectStatement, toCassandraCL(consistencyLevel), clientState, pager, pageSize)); + public boolean hasNext() { + // We have to make sure iterator is not running on main thread. return instance.sync(() -> iter.hasNext()).call(); } diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 5a4dcf4,1c19bca..e8c45d8 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -34,7 -39,10 +37,11 @@@ import java.util.concurrent.TimeoutExce import java.util.function.BiConsumer; import java.util.function.Function; + import javax.management.ListenerNotFoundException; + import javax.management.Notification; + import javax.management.NotificationListener; + +import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.SharedExecutorPool; import org.apache.cassandra.concurrent.StageManager; @@@ -63,13 -77,11 +74,15 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.hints.HintsService; +import org.apache.cassandra.index.SecondaryIndexManager; + import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.IndexSummaryManager; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; - import org.apache.cassandra.locator.InetAddressAndPort; ++import org.apache.cassandra.io.util.DataOutputPlus; + import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.IMessageSink; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@@ -247,7 -283,57 +286,57 @@@ public class Instance extends IsolatedE long timestamp = System.currentTimeMillis(); out.writeInt((int) timestamp); messageOut.serialize(out, version); - return new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from); + return new MessageImpl(messageOut.verb.ordinal(), out.toByteArray(), id, version, from); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public static IMessage serializeMessage(MessageIn<?> messageIn, int id, InetSocketAddress from, InetSocketAddress to) + { + try (DataOutputBuffer out = new DataOutputBuffer(1024)) + { + // Serialize header + int version = MessagingService.instance().getVersion(to.getAddress()); + + out.writeInt(MessagingService.PROTOCOL_MAGIC); + out.writeInt(id); + long timestamp = System.currentTimeMillis(); + out.writeInt((int) timestamp); + + // Serialize the message itself + IVersionedSerializer serializer = MessagingService.instance().verbSerializers.get(messageIn.verb); + CompactEndpointSerializationHelper.serialize(from.getAddress(), out); + - out.writeInt(messageIn.verb.ordinal()); ++ out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(messageIn.verb, version).ordinal()); + out.writeInt(messageIn.parameters.size()); + for (Map.Entry<String, byte[]> entry : messageIn.parameters.entrySet()) + { + out.writeUTF(entry.getKey()); + out.writeInt(entry.getValue().length); + out.write(entry.getValue()); + } + + if (messageIn.payload != null && serializer != MessagingService.CallbackDeterminedSerializer.instance) + { + try (DataOutputBuffer dob = new DataOutputBuffer()) + { + serializer.serialize(messageIn.payload, dob, version); + + int size = dob.getLength(); + out.writeInt(size); + out.write(dob.getData(), 0, size); + } + } + else + { + out.writeInt(0); + } + + + return new MessageImpl(messageIn.verb.ordinal(), out.toByteArray(), id, version, from); } catch (IOException e) { @@@ -332,6 -420,6 +423,8 @@@ int partial = input.readInt(); return Pair.create(MessageIn.read(input, version, id), partial); ++ //long currentTime = ApproximateTime.currentTimeMillis(); ++ //return MessageIn.read(input, version, id, MessageIn.readConstructionTime(imessage.from().getAddress(), input, currentTime)); } catch (IOException e) { @@@ -536,19 -623,15 +630,19 @@@ for (int i = 0; i < tokens.size(); i++) { - InetAddressAndPort ep = hosts.get(i); + InetSocketAddress ep = hosts.get(i); - Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostIds.get(i), 1); - Gossiper.instance.injectApplicationState(ep.getAddress(), - ApplicationState.TOKENS, - new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(tokens.get(i)))); - storageService.onChange(ep.getAddress(), - ApplicationState.STATUS, - new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i)))); - Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress())); + UUID hostId = hostIds.get(i); + Token token = tokens.get(i); + Gossiper.runInGossipStageBlocking(() -> { - Gossiper.instance.initializeNodeUnsafe(ep.address, hostId, 1); - Gossiper.instance.injectApplicationState(ep.address, ++ Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostId, 1); ++ Gossiper.instance.injectApplicationState(ep.getAddress(), + ApplicationState.TOKENS, + new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token))); - storageService.onChange(ep.address, ++ storageService.onChange(ep.getAddress(), + ApplicationState.STATUS, + new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token))); - Gossiper.instance.realMarkAlive(ep.address, Gossiper.instance.getEndpointStateForEndpoint(ep.address)); ++ Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress())); + }); int messagingVersion = cluster.get(ep).isShutdown() ? MessagingService.current_version : Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion()); @@@ -570,11 -653,9 +664,12 @@@ return shutdown(true); } + @Override public Future<Void> shutdown(boolean graceful) { + if (!graceful) + MessagingService.instance().shutdown(false); + Future<?> future = async((ExecutorService executor) -> { Throwable error = null; diff --cc test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java index b76c455,625b4aa..f3eb327 --- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java +++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java @@@ -23,9 -23,10 +23,11 @@@ import java.lang.management.ManagementF import java.util.Iterator; import java.util.Map; + import javax.management.ListenerNotFoundException; + import com.google.common.collect.Multimap; +import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.db.HintedHandOffManager; import org.apache.cassandra.db.Keyspace; diff --cc test/distributed/org/apache/cassandra/distributed/test/GossipTest.java index c2e9e4f,0000000..83c62c8 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java @@@ -1,115 -1,0 +1,115 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.stream.Collectors; + +import com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + - public class GossipTest extends DistributedTestBase ++public class GossipTest extends TestBaseImpl +{ + + @Test + public void nodeDownDuringMove() throws Throwable + { + int liveCount = 1; + System.setProperty("cassandra.ring_delay_ms", "5000"); // down from 30s default + System.setProperty("cassandra.consistent.rangemovement", "false"); + System.setProperty("cassandra.consistent.simultaneousmoves.allow", "true"); + try (Cluster cluster = Cluster.build(2 + liveCount) + .withConfig(config -> config.with(NETWORK).with(GOSSIP)) + .createWithoutStarting()) + { + int fail = liveCount + 1; + int late = fail + 1; + for (int i = 1 ; i <= liveCount ; ++i) + cluster.get(i).startup(); + cluster.get(fail).startup(); + Collection<String> expectTokens = cluster.get(fail).callsOnInstance(() -> + StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()) + .stream().map(Object::toString).collect(Collectors.toList()) + ).call(); + - InetAddress failAddress = cluster.get(fail).broadcastAddressAndPort().address; ++ InetAddress failAddress = cluster.get(fail).broadcastAddress().getAddress(); + // wait for NORMAL state + for (int i = 1 ; i <= liveCount ; ++i) + { + cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> { + EndpointState ep; + while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint)) + || ep.getApplicationState(ApplicationState.STATUS) == null + || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("NORMAL")) + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L)); + }).accept(failAddress); + } + + // set ourselves to MOVING, and wait for it to propagate + cluster.get(fail).runOnInstance(() -> { + + Token token = Iterables.getFirst(StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()), null); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.moving(token)); + }); + + for (int i = 1 ; i <= liveCount ; ++i) + { + cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> { + EndpointState ep; + while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint)) + || (ep.getApplicationState(ApplicationState.STATUS) == null + || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING"))) + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L)); + }).accept(failAddress); + } + + cluster.get(fail).shutdown(false).get(); + cluster.get(late).startup(); + cluster.get(late).acceptsOnInstance((InetAddress endpoint) -> { + EndpointState ep; + while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint)) + || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING")) + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L)); + }).accept(failAddress); + + Collection<String> tokens = cluster.get(late).appliesOnInstance((InetAddress endpoint) -> + StorageService.instance.getTokenMetadata().getTokens(failAddress) + .stream().map(Object::toString).collect(Collectors.toList()) + ).apply(failAddress); + + Assert.assertEquals(expectTokens, tokens); + } + } + +} diff --cc test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java index 07e7428,062f401..f4398da --- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java @@@ -37,10 -37,27 +37,26 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; - public class MessageFiltersTest extends DistributedTestBase + public class MessageFiltersTest extends TestBaseImpl { - @Test - public void simpleFiltersTest() throws Throwable + public void simpleInboundFiltersTest() + { + simpleFiltersTest(true); + } + + @Test + public void simpleOutboundFiltersTest() + { + simpleFiltersTest(false); + } + + private interface Permit + { + boolean test(int from, int to, IMessage msg); + } + + private static void simpleFiltersTest(boolean inbound) { int VERB1 = MessagingService.Verb.READ.ordinal(); int VERB2 = MessagingService.Verb.REQUEST_RESPONSE.ordinal(); @@@ -52,21 -69,22 +68,22 @@@ String MSG2 = "msg2"; MessageFilters filters = new MessageFilters(); - MessageFilters.Filter filter = filters.allVerbs().from(1).drop(); - Permit permit = inbound ? filters::permitInbound : filters::permitOutbound; ++ Permit permit = inbound ? (from, to, msg) -> filters.permitInbound(from, to, msg) : (from, to, msg) -> filters.permitOutbound(from, to, msg); - Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); - Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1))); - Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1))); - Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); + IMessageFilters.Filter filter = filters.allVerbs().inbound(inbound).from(1).drop(); + Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1))); + Assert.assertFalse(permit.test(i1, i2, msg(VERB2, MSG1))); + Assert.assertFalse(permit.test(i1, i2, msg(VERB3, MSG1))); + Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1))); filter.off(); - Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1))); filters.reset(); - filters.verbs(VERB1).from(1).to(2).drop(); - Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); - Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1))); - Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); - Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1))); + filters.verbs(VERB1).inbound(inbound).from(1).to(2).drop(); + Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i1, i2, msg(VERB2, MSG1))); + Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i2, i3, msg(VERB2, MSG1))); filters.reset(); AtomicInteger counter = new AtomicInteger(); diff --cc test/distributed/org/apache/cassandra/distributed/test/SharedClusterTestBase.java index 091e5f0,0000000..c502af2 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SharedClusterTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SharedClusterTestBase.java @@@ -1,36 -1,0 +1,52 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + - package org.apache.cassandra.distributed.api; ++package org.apache.cassandra.distributed.test; + - import org.apache.cassandra.locator.InetAddressAndPort; ++import java.io.IOException; + - import java.util.stream.Stream; ++import org.junit.After; ++import org.junit.AfterClass; ++import org.junit.BeforeClass; + - public interface ICluster ++import org.apache.cassandra.distributed.Cluster; ++import org.apache.cassandra.distributed.api.ICluster; ++ ++public class SharedClusterTestBase extends TestBaseImpl +{ ++ protected static ICluster cluster; ++ ++ @BeforeClass ++ public static void before() throws IOException ++ { ++ cluster = init(Cluster.build().withNodes(3).start()); ++ } + - IInstance get(int i); - IInstance get(InetAddressAndPort endpoint); - int size(); - Stream<? extends IInstance> stream(); - Stream<? extends IInstance> stream(String dcName); - Stream<? extends IInstance> stream(String dcName, String rackName); - IMessageFilters filters(); ++ @AfterClass ++ public static void after() throws Exception ++ { ++ cluster.close(); ++ } + ++ @After ++ public void afterEach() ++ { ++ cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE); ++ init(cluster); ++ } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java index 0000000,0000000..f1f8674 new file mode 100644 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java @@@ -1,0 -1,0 +1,276 @@@ ++package org.apache.cassandra.distributed.test; ++ ++import org.junit.Assert; ++import org.junit.Test; ++ ++import org.apache.cassandra.db.Keyspace; ++import org.apache.cassandra.distributed.api.ConsistencyLevel; ++import org.apache.cassandra.distributed.api.ICluster; ++import org.apache.cassandra.distributed.api.IInvokableInstance; ++ ++import static org.junit.Assert.assertEquals; ++ ++import static org.apache.cassandra.distributed.shared.AssertUtils.*; ++ ++// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository ++public class SimpleReadWriteTest extends SharedClusterTestBase ++{ ++ @Test ++ public void coordinatorReadTest() throws Throwable ++ { ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); ++ ++ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); ++ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)"); ++ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)"); ++ ++ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ++ ConsistencyLevel.ALL, ++ 1), ++ row(1, 1, 1), ++ row(1, 2, 2), ++ row(1, 3, 3)); ++ } ++ ++ @Test ++ public void largeMessageTest() throws Throwable ++ { ++ int largeMessageThreshold = 1024 * 64; ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))"); ++ StringBuilder builder = new StringBuilder(); ++ for (int i = 0; i < largeMessageThreshold; i++) ++ builder.append('a'); ++ String s = builder.toString(); ++ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ++ ConsistencyLevel.ALL, ++ s); ++ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ++ ConsistencyLevel.ALL, ++ 1), ++ row(1, 1, s)); ++ } ++ ++ @Test ++ public void coordinatorWriteTest() throws Throwable ++ { ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); ++ ++ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)", ++ ConsistencyLevel.QUORUM); ++ ++ for (int i = 0; i < 3; i++) ++ { ++ assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), ++ row(1, 1, 1)); ++ } ++ ++ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ++ ConsistencyLevel.QUORUM), ++ row(1, 1, 1)); ++ } ++ ++ @Test ++ public void readRepairTest() throws Throwable ++ { ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); ++ ++ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); ++ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); ++ ++ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1")); ++ ++ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ++ ConsistencyLevel.ALL), // ensure node3 in preflist ++ row(1, 1, 1)); ++ ++ // Verify that data got repaired to the third node ++ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), ++ row(1, 1, 1)); ++ } ++ ++ @Test ++ public void writeWithSchemaDisagreement() throws Throwable ++ { ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); ++ ++ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); ++ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); ++ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); ++ ++ // Introduce schema disagreement ++ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); ++ ++ Exception thrown = null; ++ try ++ { ++ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)", ++ ConsistencyLevel.QUORUM); ++ } ++ catch (RuntimeException e) ++ { ++ thrown = e; ++ } ++ ++ Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node")); ++ Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization")); ++ } ++ ++ @Test ++ public void readWithSchemaDisagreement() throws Throwable ++ { ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); ++ ++ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); ++ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); ++ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); ++ ++ // Introduce schema disagreement ++ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); ++ ++ Exception thrown = null; ++ try ++ { ++ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ++ ConsistencyLevel.ALL), ++ row(1, 1, 1, null)); ++ } ++ catch (Exception e) ++ { ++ thrown = e; ++ } ++ ++ Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node")); ++ Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization")); ++ } ++ ++ @Test ++ public void simplePagedReadsTest() throws Throwable ++ { ++ ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); ++ ++ int size = 100; ++ Object[][] results = new Object[size][]; ++ for (int i = 0; i < size; i++) ++ { ++ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", ++ ConsistencyLevel.QUORUM, ++ i, i); ++ results[i] = new Object[]{ 1, i, i }; ++ } ++ ++ // Make sure paged read returns same results with different page sizes ++ for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 }) ++ { ++ assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl", ++ ConsistencyLevel.QUORUM, ++ pageSize), ++ results); ++ } ++ } ++ ++ @Test ++ public void pagingWithRepairTest() throws Throwable ++ { ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); ++ ++ int size = 100; ++ Object[][] results = new Object[size][]; ++ for (int i = 0; i < size; i++) ++ { ++ // Make sure that data lands on different nodes and not coordinator ++ cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", ++ i, i); ++ ++ results[i] = new Object[]{ 1, i, i }; ++ } ++ ++ // Make sure paged read returns same results with different page sizes ++ for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 }) ++ { ++ assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl", ++ ConsistencyLevel.ALL, ++ pageSize), ++ results); ++ } ++ ++ assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"), ++ results); ++ } ++ ++ @Test ++ public void pagingTests() throws Throwable ++ { ++ try (ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start())) ++ { ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); ++ singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); ++ ++ for (int i = 0; i < 10; i++) ++ { ++ for (int j = 0; j < 10; j++) ++ { ++ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", ++ ConsistencyLevel.QUORUM, ++ i, j, i + i); ++ singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", ++ ConsistencyLevel.QUORUM, ++ i, j, i + i); ++ } ++ } ++ ++ int[] pageSizes = new int[]{ 1, 2, 3, 5, 10, 20, 50 }; ++ String[] statements = new String[]{ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5", ++ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5", ++ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10", ++ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3", ++ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2", ++ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2", ++ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC", ++ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC", ++ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC", ++ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3", ++ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2", ++ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2", ++ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3", ++ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)", ++ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2" ++ }; ++ for (String statement : statements) ++ { ++ for (int pageSize : pageSizes) ++ { ++ assertRows(cluster.coordinator(1) ++ .executeWithPaging(statement, ++ ConsistencyLevel.QUORUM, pageSize), ++ singleNode.coordinator(1) ++ .executeWithPaging(statement, ++ ConsistencyLevel.QUORUM, Integer.MAX_VALUE)); ++ } ++ } ++ } ++ } ++ ++ @Test ++ public void metricsCountQueriesTest() throws Throwable ++ { ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); ++ for (int i = 0; i < 100; i++) ++ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i); ++ ++ long readCount1 = readCount((IInvokableInstance) cluster.get(1)); ++ long readCount2 = readCount((IInvokableInstance) cluster.get(2)); ++ for (int i = 0; i < 100; i++) ++ cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i); ++ ++ readCount1 = readCount((IInvokableInstance) cluster.get(1)) - readCount1; ++ readCount2 = readCount((IInvokableInstance) cluster.get(2)) - readCount2; ++ assertEquals(readCount1, readCount2); ++ assertEquals(100, readCount1); ++ } ++ ++ private long readCount(IInvokableInstance instance) ++ { ++ return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount()); ++ } ++} diff --cc test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java index 0000000,a89a352..0e0561a mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java +++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java @@@ -1,0 -1,49 +1,47 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.test; + + import org.junit.After; + import org.junit.BeforeClass; + + import org.apache.cassandra.distributed.api.ICluster; + import org.apache.cassandra.distributed.api.IInstance; + import org.apache.cassandra.distributed.shared.Builder; + import org.apache.cassandra.distributed.shared.DistributedTestBase; + + public class TestBaseImpl extends DistributedTestBase + { - protected static final TestBaseImpl impl = new TestBaseImpl(); - + @After + public void afterEach() { + super.afterEach(); + } + + @BeforeClass + public static void beforeClass() throws Throwable { + ICluster.setup(); + } + + @Override + public <I extends IInstance, C extends ICluster> Builder<I, C> builder() { + // This is definitely not the smartest solution, but given the complexity of the alternatives and low risk, we can just rely on the + // fact that this code is going to work accross _all_ versions. + return (Builder<I, C>) org.apache.cassandra.distributed.Cluster.build(); + } + } diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java index 5c45d52,0000000..f138861 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java @@@ -1,102 -1,0 +1,99 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import org.junit.Test; + - import org.apache.cassandra.db.ConsistencyLevel; ++import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; - import org.apache.cassandra.distributed.impl.Versions; - import org.apache.cassandra.distributed.test.DistributedTestBase; ++import org.apache.cassandra.distributed.shared.DistributedTestBase; ++import org.apache.cassandra.distributed.shared.Versions; ++import static org.apache.cassandra.distributed.shared.AssertUtils.*; + +public class CompactStorage2to3UpgradeTest extends UpgradeTestBase +{ + @Test + public void multiColumn() throws Throwable + { + new TestCase() + .upgrade(Versions.Major.v22, Versions.Major.v30) + .setup(cluster -> { + assert cluster.size() == 3; + int rf = cluster.size() - 1; + assert rf == 2; + cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};"); + cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v1 int, v2 text, PRIMARY KEY (pk)) WITH COMPACT STORAGE"); + ICoordinator coordinator = cluster.coordinator(1); + // these shouldn't be replicated by the 3rd node + coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (3, 3, '3')", ConsistencyLevel.ALL); + coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (9, 9, '9')", ConsistencyLevel.ALL); - for (int i=0; i<cluster.size(); i++) ++ for (int i = 0; i < cluster.size(); i++) + { - int nodeNum = i+1; ++ int nodeNum = i + 1; + System.out.println(String.format("****** node %s: %s", nodeNum, cluster.get(nodeNum).config())); + } - + }) + .runAfterNodeUpgrade(((cluster, node) -> { + if (node != 2) + return; + + Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL); + Object[][] expected = { - DistributedTestBase.row(9, 9, "9"), - DistributedTestBase.row(3, 3, "3") ++ row(9, 9, "9"), ++ row(3, 3, "3") + }; - DistributedTestBase.assertRows(rows, expected); - ++ assertRows(rows, expected); + })).run(); + } + + @Test + public void singleColumn() throws Throwable + { + new TestCase() + .upgrade(Versions.Major.v22, Versions.Major.v30) + .setup(cluster -> { + assert cluster.size() == 3; + int rf = cluster.size() - 1; + assert rf == 2; + cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};"); + cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v int, PRIMARY KEY (pk)) WITH COMPACT STORAGE"); + ICoordinator coordinator = cluster.coordinator(1); + // these shouldn't be replicated by the 3rd node + coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (3, 3)", ConsistencyLevel.ALL); + coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (9, 9)", ConsistencyLevel.ALL); - for (int i=0; i<cluster.size(); i++) ++ for (int i = 0; i < cluster.size(); i++) + { - int nodeNum = i+1; ++ int nodeNum = i + 1; + System.out.println(String.format("****** node %s: %s", nodeNum, cluster.get(nodeNum).config())); + } - + }) + .runAfterNodeUpgrade(((cluster, node) -> { + + if (node < 2) + return; + + Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL); + Object[][] expected = { - DistributedTestBase.row(9, 9), - DistributedTestBase.row(3, 3) ++ row(9, 9), ++ row(3, 3) + }; - DistributedTestBase.assertRows(rows, expected); - ++ assertRows(rows, expected); + })).run(); + } - } ++} diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java index 31f4b84,e69e38a..b98829d --- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java @@@ -20,11 -20,10 +20,11 @@@ package org.apache.cassandra.distribute import org.junit.Test; - import org.apache.cassandra.db.ConsistencyLevel; - import org.apache.cassandra.distributed.impl.Versions; - import org.apache.cassandra.distributed.test.DistributedTestBase; + import org.apache.cassandra.distributed.api.ConsistencyLevel; ++import org.apache.cassandra.distributed.shared.DistributedTestBase; + import org.apache.cassandra.distributed.shared.Versions; - import static org.apache.cassandra.distributed.impl.Versions.find; + import static org.apache.cassandra.distributed.shared.Versions.find; public class MixedModeReadRepairTest extends UpgradeTestBase { diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java index 5a927fc,93ae78e..81e580d --- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java @@@ -18,16 -18,13 +18,16 @@@ package org.apache.cassandra.distributed.upgrade; +import java.util.Iterator; + +import com.google.common.collect.Iterators; import org.junit.Test; - import org.apache.cassandra.db.ConsistencyLevel; - import org.apache.cassandra.distributed.impl.Versions; - import org.apache.cassandra.distributed.test.DistributedTestBase; + import org.apache.cassandra.distributed.api.ConsistencyLevel; + import org.apache.cassandra.distributed.shared.Versions; - import static junit.framework.Assert.assertEquals; + import junit.framework.Assert; + import static org.apache.cassandra.distributed.shared.AssertUtils.*; public class UpgradeTest extends UpgradeTestBase { @@@ -36,55 -33,22 +36,54 @@@ public void upgradeTest() throws Throwable { new TestCase() - .upgrade(Versions.Major.v22, Versions.Major.v30) - .setup((cluster) -> { - cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X) ++ .upgrade(Versions.Major.v22, Versions.Major.v30) + .setup((cluster) -> { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - cluster.get(1).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)"); - cluster.get(3).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)"); - }) - .runAfterClusterUpgrade((cluster) -> { - DistributedTestBase.assertRows(cluster.coordinator(1).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?", - ConsistencyLevel.ALL, - 1), - DistributedTestBase.row(1, 1, 1), - DistributedTestBase.row(1, 2, 2), - DistributedTestBase.row(1, 3, 3)); - }).run(); + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)"); + }) + .runAfterClusterUpgrade((cluster) -> { + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", - ConsistencyLevel.ALL, - 1), - row(1, 1, 1), - row(1, 2, 2), - row(1, 3, 3)); ++ ConsistencyLevel.ALL, ++ 1), ++ row(1, 1, 1), ++ row(1, 2, 2), ++ row(1, 3, 3)); + }).run(); } + @Test + public void mixedModePagingTest() throws Throwable + { + new TestCase() + .upgrade(Versions.Major.v22, Versions.Major.v30) + .nodes(2) + .nodesToUpgrade(2) + .setup((cluster) -> { - cluster.schemaChange("ALTER KEYSPACE " + DistributedTestBase.KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"); - cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) with compact storage"); ++ cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"); ++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) with compact storage"); + for (int i = 0; i < 100; i++) + for (int j = 0; j < 200; j++) - cluster.coordinator(2).execute("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, 1)", ConsistencyLevel.ALL, i, j); - cluster.forEach((i) -> i.flush(DistributedTestBase.KEYSPACE)); ++ cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, 1)", ConsistencyLevel.ALL, i, j); ++ cluster.forEach((i) -> i.flush(KEYSPACE)); + for (int i = 0; i < 100; i++) + for (int j = 10; j < 30; j++) - cluster.coordinator(2).execute("DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl where pk=? and ck=?", ConsistencyLevel.ALL, i, j); - cluster.forEach((i) -> i.flush(DistributedTestBase.KEYSPACE)); ++ cluster.coordinator(2).execute("DELETE FROM " + KEYSPACE + ".tbl where pk=? and ck=?", ConsistencyLevel.ALL, i, j); ++ cluster.forEach((i) -> i.flush(KEYSPACE)); + }) + .runAfterClusterUpgrade((cluster) -> { + for (int i = 0; i < 100; i++) + { + for (int pageSize = 10; pageSize < 100; pageSize++) + { - Iterator<Object[]> res = cluster.coordinator(1).executeWithPaging("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?", ++ Iterator<Object[]> res = cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", + ConsistencyLevel.ALL, + pageSize, i); - assertEquals(180, Iterators.size(res)); ++ Assert.assertEquals(180, Iterators.size(res)); + } + } + }).run(); + } - - } + } diff --cc test/unit/org/apache/cassandra/LogbackStatusListener.java index d16058b,0000000..1f95bd4 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/LogbackStatusListener.java +++ b/test/unit/org/apache/cassandra/LogbackStatusListener.java @@@ -1,538 -1,0 +1,538 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.util.Locale; + +import org.slf4j.ILoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.spi.LoggerContextListener; +import ch.qos.logback.core.status.Status; +import ch.qos.logback.core.status.StatusListener; - import org.apache.cassandra.distributed.impl.InstanceClassLoader; ++import org.apache.cassandra.distributed.shared.InstanceClassLoader; + +/* + * Listen for logback readiness and then redirect stdout/stderr to logback + */ +public class LogbackStatusListener implements StatusListener, LoggerContextListener +{ + + public static final PrintStream originalOut = System.out; + public static final PrintStream originalErr = System.err; + + private volatile boolean hadPreInstallError = false; + private volatile boolean haveInstalled = false; + private volatile boolean haveRegisteredListener = false; + + private PrintStream replacementOut; + private PrintStream replacementErr; + + @Override + public void addStatusEvent(Status s) + { + if (!haveInstalled && (s.getLevel() != 0 || s.getEffectiveLevel() != 0)) + { + // if we encounter an error during setup, we're not sure what state we're in, so we just don't switch + // we should log this fact, though, so that we know that we're not necessarily capturing stdout + LoggerFactory.getLogger(LogbackStatusListener.class) + .warn("Encountered non-info status in logger setup; aborting stdout capture: '" + s.getMessage() + '\''); + hadPreInstallError = true; + } + + if (hadPreInstallError) + return; + + if (s.getMessage().startsWith("Registering current configuration as safe fallback point")) + { + onStart(null); + } + + if (haveInstalled && !haveRegisteredListener) + { + // we register ourselves as a listener after the fact, because we enable ourselves before the LoggerFactory + // is properly initialised, hence before it can accept any LoggerContextListener registrations + tryRegisterListener(); + } + + if (s.getMessage().equals("Logback context being closed via shutdown hook")) + { + onStop(null); + } + } + + private static PrintStream wrapLogger(Logger logger, PrintStream original, String encodingProperty, boolean error) throws Exception + { + final String encoding = System.getProperty(encodingProperty); + OutputStream os = new ToLoggerOutputStream(logger, encoding, error); + return encoding != null ? new WrappedPrintStream(os, true, encoding, original) + : new WrappedPrintStream(os, true, original); + } + + private static class ToLoggerOutputStream extends ByteArrayOutputStream + { + final Logger logger; + final String encoding; + final boolean error; + + private ToLoggerOutputStream(Logger logger, String encoding, boolean error) + { + this.logger = logger; + this.encoding = encoding; + this.error = error; + } + + @Override + public void flush() throws IOException + { + try + { + //Filter out stupid PrintStream empty flushes + if (size() == 0) return; + + //Filter out newlines, log framework provides its own + if (size() == 1) + { + byte[] bytes = toByteArray(); + if (bytes[0] == 0xA) + return; + } + + //Filter out Windows newline + if (size() == 2) + { + byte[] bytes = toByteArray(); + if (bytes[0] == 0xD && bytes[1] == 0xA) + return; + } + + String statement; + if (encoding != null) + statement = new String(toByteArray(), encoding); + else + statement = new String(toByteArray()); + + if (error) + logger.error(statement); + else + logger.info(statement); + } + finally + { + reset(); + } + } + }; + + private static class WrappedPrintStream extends PrintStream + { + private long asyncAppenderThreadId = Long.MIN_VALUE; + private final PrintStream original; + + public WrappedPrintStream(OutputStream out, boolean autoFlush, PrintStream original) + { + super(out, autoFlush); + this.original = original; + } + + public WrappedPrintStream(OutputStream out, boolean autoFlush, String encoding, PrintStream original) throws UnsupportedEncodingException + { + super(out, autoFlush, encoding); + this.original = original; + } + + /* + * Long and the short of it is that we don't want to serve logback a fake System.out/err. + * ConsoleAppender is replaced so it always goes to the real System.out/err, but logback itself + * will at times try to log to System.out/err when it has issues. + * + * Now here is the problem. There is a deadlock if a thread logs to System.out, blocks on the async + * appender queue, and the async appender thread tries to log to System.out directly as part of some + * internal logback issue. + * + * So to prevent this we have to exhaustively check before locking in the PrintStream and forward + * to real System.out/err if it is the async appender + */ + private boolean isAsyncAppender() + { + //Set the thread id based on the name + Thread currentThread = Thread.currentThread(); + long currentThreadId = currentThread.getId(); + if (asyncAppenderThreadId == Long.MIN_VALUE && + currentThread.getName().equals("AsyncAppender-Worker-ASYNC") && + !InstanceClassLoader.wasLoadedByAnInstanceClassLoader(currentThread.getClass())) + { + asyncAppenderThreadId = currentThreadId; + } + if (currentThreadId == asyncAppenderThreadId) + original.println("Was in async appender"); + return currentThreadId == asyncAppenderThreadId; + } + + @Override + public void flush() + { + if (isAsyncAppender()) + original.flush(); + else + super.flush(); + } + + @Override + public void close() + { + if (isAsyncAppender()) + original.close(); + else + super.flush(); + } + + @Override + public void write(int b) + { + if (isAsyncAppender()) + original.write(b); + else + super.write(b); + } + + @Override + public void write(byte[] buf, int off, int len) + { + if (isAsyncAppender()) + original.write(buf, off, len); + else + super.write(buf, off, len); + } + + @Override + public void print(boolean b) + { + if (isAsyncAppender()) + original.print(b); + else + super.print(b); + } + + @Override + public void print(char c) + { + if (isAsyncAppender()) + original.print(c); + else + super.print(c); + } + + @Override + public void print(int i) + { + if (isAsyncAppender()) + original.print(i); + else + super.print(i); + } + + @Override + public void print(long l) + { + if (isAsyncAppender()) + original.print(l); + else + super.print(l); + } + + @Override + public void print(float f) + { + if (isAsyncAppender()) + original.print(f); + else + super.print(f); + } + + @Override + public void print(double d) + { + if (isAsyncAppender()) + original.print(d); + else + super.print(d); + } + + @Override + public void print(char[] s) + { + if(isAsyncAppender()) + original.println(s); + else + super.print(s); + } + + @Override + public void print(String s) + { + if (isAsyncAppender()) + original.print(s); + else + super.print(s); + } + + @Override + public void print(Object obj) + { + if (isAsyncAppender()) + original.print(obj); + else + super.print(obj); + } + + @Override + public void println() + { + if (isAsyncAppender()) + original.println(); + else + super.println(); + } + + @Override + public void println(boolean v) + { + if (isAsyncAppender()) + original.println(v); + else + super.println(v); + } + + @Override + public void println(char v) + { + if (isAsyncAppender()) + original.println(v); + else + super.println(v); + } + + @Override + public void println(int v) + { + if (isAsyncAppender()) + original.println(v); + else + super.println(v); + } + + @Override + public void println(long v) + { + if (isAsyncAppender()) + original.println(v); + else + super.println(v); + } + + @Override + public void println(float v) + { + if (isAsyncAppender()) + original.println(v); + else + super.println(v); + } + + @Override + public void println(double v) + { + if (isAsyncAppender()) + original.println(v); + else + super.println(v); + } + + @Override + public void println(char[] v) + { + if (isAsyncAppender()) + original.println(v); + else + super.println(v); + } + + @Override + public void println(String v) + { + if (isAsyncAppender()) + original.println(v); + else + super.println(v); + } + + @Override + public void println(Object v) + { + if (isAsyncAppender()) + original.println(v); + else + super.println(v); + } + + @Override + public PrintStream printf(String format, Object... args) + { + if (isAsyncAppender()) + return original.printf(format, args); + else + return super.printf(format, args); + } + + @Override + public PrintStream printf(Locale l, String format, Object... args) + { + if (isAsyncAppender()) + return original.printf(l, format, args); + else + return super.printf(l, format, args); + } + + @Override + public PrintStream format(String format, Object... args) + { + if (isAsyncAppender()) + return original.format(format, args); + else + return super.format(format, args); + } + + @Override + public PrintStream format(Locale l, String format, Object... args) + { + if (isAsyncAppender()) + return original.format(l, format, args); + else + return super.format(l, format, args); + } + + @Override + public PrintStream append(CharSequence csq) + { + if (isAsyncAppender()) + return original.append(csq); + else + return super.append(csq); + } + + @Override + public PrintStream append(CharSequence csq, int start, int end) + { + if (isAsyncAppender()) + return original.append(csq, start, end); + else + return super.append(csq, start, end); + } + + @Override + public PrintStream append(char c) + { + if (isAsyncAppender()) + return original.append(c); + else + return super.append(c); + } } + + public boolean isResetResistant() + { + return false; + } + + public synchronized void onStart(LoggerContext loggerContext) + { + if (!hadPreInstallError && !haveInstalled) + { + if (InstanceClassLoader.wasLoadedByAnInstanceClassLoader(getClass()) + || System.out.getClass().getName().contains("LogbackStatusListener")) + { + // don't operate if we're a dtest node, or if we're not the first to swap System.out for some other reason + hadPreInstallError = true; + return; + } + try + { + Logger stdoutLogger = LoggerFactory.getLogger("stdout"); + Logger stderrLogger = LoggerFactory.getLogger("stderr"); + + replacementOut = wrapLogger(stdoutLogger, originalOut, "sun.stdout.encoding", false); + System.setOut(replacementOut); + replacementErr = wrapLogger(stderrLogger, originalErr, "sun.stderr.encoding", true); + System.setErr(replacementErr); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + haveInstalled = true; + } + } + + public synchronized void onReset(LoggerContext loggerContext) + { + onStop(loggerContext); + } + + public synchronized void onStop(LoggerContext loggerContext) + { + if (haveInstalled) + { + if (replacementOut != null) replacementOut.flush(); + if (replacementErr != null) replacementErr.flush(); + System.setErr(originalErr); + System.setOut(originalOut); + hadPreInstallError = false; + haveInstalled = false; + haveRegisteredListener = false; + if (haveRegisteredListener) + { + ((LoggerContext)LoggerFactory.getILoggerFactory()).removeListener(this); + } + } + } + + public void onLevelChange(ch.qos.logback.classic.Logger logger, Level level) + { + } + + private synchronized void tryRegisterListener() + { + if (haveInstalled && !haveRegisteredListener) + { + ILoggerFactory factory = LoggerFactory.getILoggerFactory(); + if (factory instanceof LoggerContext) + { + ((LoggerContext) factory).addListener(this); + haveRegisteredListener = true; + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org