Merge branch 'cassandra-3.0' into cassandra-3.9

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8e775eaa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8e775eaa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8e775eaa

Branch: refs/heads/cassandra-3.9
Commit: 8e775eaaf5ee6b355e050ea15f2e334eff680487
Parents: eebadcf 8868b74
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Fri Jul 29 15:29:51 2016 +0800
Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Committed: Fri Jul 29 15:30:27 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 bin/cqlsh.py                                    |  2 +-
 .../cassandra/concurrent/StageManager.java      | 12 +++++--
 .../cassandra/tracing/ExpiredTraceState.java    |  5 +++
 .../apache/cassandra/tracing/TraceState.java    |  5 +++
 .../cassandra/tracing/TraceStateImpl.java       | 38 ++++++++++++++++++++
 .../apache/cassandra/tracing/TracingTest.java   |  3 ++
 7 files changed, 63 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d6a9d16,bb67fc9..ca5b383
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -13,42 -10,6 +13,43 @@@ Merged from 3.0
     to connect with too low of a protocol version (CASSANDRA-11464)
   * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
   * Fix problem with undeleteable rows on upgrade to new sstable format 
(CASSANDRA-12144)
 +Merged from 2.2:
++ * Wait for tracing events before returning response and query at same 
consistency level client side (CASSANDRA-11465)
 + * cqlsh copyutil should get host metadata by connected address 
(CASSANDRA-11979)
 + * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
 +Merged from 2.1:
 + * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
 +
 +
 +3.8
 + * Fix hdr logging for single operation workloads (CASSANDRA-12145)
 + * Fix SASI PREFIX search in CONTAINS mode with partial terms 
(CASSANDRA-12073)
 + * Increase size of flushExecutor thread pool (CASSANDRA-12071)
 + * Partial revert of CASSANDRA-11971, cannot recycle buffer in 
SP.sendMessagesToNonlocalDC (CASSANDRA-11950)
 + * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
 + * Improve details in compaction log message (CASSANDRA-12080)
 + * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
 + * Chunk cache to request compressor-compatible buffers if pool space is 
exhausted (CASSANDRA-11993)
 + * Remove DatabaseDescriptor dependencies from SequentialWriter 
(CASSANDRA-11579)
 + * Move skip_stop_words filter before stemming (CASSANDRA-12078)
 + * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
 + * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
 + * When SEPWorker assigned work, set thread name to match pool 
(CASSANDRA-11966)
 + * Add cross-DC latency metrics (CASSANDRA-11596)
 + * Allow terms in selection clause (CASSANDRA-10783)
 + * Add bind variables to trace (CASSANDRA-11719)
 + * Switch counter shards' clock to timestamps (CASSANDRA-9811)
 + * Introduce HdrHistogram and response/service/wait separation to stress tool 
(CASSANDRA-11853)
 + * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes 
field (CASSANDRA-11718)
 + * Support older ant versions (CASSANDRA-11807)
 + * Estimate compressed on disk size when deciding if sstable size limit 
reached (CASSANDRA-11623)
 + * cassandra-stress profiles should support case sensitive schemas 
(CASSANDRA-11546)
 + * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
 + * Faster streaming (CASSANDRA-9766)
 + * Add prepared query parameter to trace for "Execute CQL3 prepared query" 
session (CASSANDRA-11425)
 + * Add repaired percentage metric (CASSANDRA-11503)
 + * Add Change-Data-Capture (CASSANDRA-8844)
 +Merged from 3.0:
   * Fix paging logic for deleted partitions with static columns 
(CASSANDRA-12107)
   * Wait until the message is being send to decide which serializer must be 
used (CASSANDRA-11393)
   * Fix migration of static thrift column names with non-text comparators 
(CASSANDRA-12147)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/bin/cqlsh.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/concurrent/StageManager.java
index ebc4f76,a201e78..64abf00
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@@ -113,9 -112,19 +112,13 @@@ public class StageManage
          }
      }
  
 -    public final static Runnable NO_OP_TASK = new Runnable()
 -    {
 -        public void run()
 -        {
 -
 -        }
 -    };
++    public final static Runnable NO_OP_TASK = () -> {};
+ 
      /**
       * A TPE that disallows submit so that we don't need to worry about 
unwrapping exceptions on the
-      * tracing stage.  See CASSANDRA-1123 for background.
+      * tracing stage.  See CASSANDRA-1123 for background. We allow submitting 
NO_OP tasks, to allow
+      * a final wait on pending trace events since typically the tracing 
executor is single-threaded, see
+      * CASSANDRA-11465.
       */
      private static class ExecuteOnlyExecutor extends ThreadPoolExecutor 
implements LocalAwareExecutorService
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
index fbe2c33,5cc3c21..bc8d5dd
--- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@@ -37,9 -36,4 +37,14 @@@ class ExpiredTraceState extends TraceSt
      {
          return -1;
      }
 +
 +    protected void traceImpl(String message)
 +    {
 +        delegate.traceImpl(message);
 +    }
++
++    protected void waitForPendingEvents()
++    {
++        delegate.waitForPendingEvents();
++    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceState.java
index 5365d09,03e510f..ec2bc9e
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@@ -177,8 -199,73 +179,10 @@@ public abstract class TraceState implem
          }
      }
  
 -    static void executeMutation(final Mutation mutation)
 -    {
 -        StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
 -        {
 -            protected void runMayThrow() throws Exception
 -            {
 -                mutateWithCatch(mutation);
 -            }
 -        });
 -    }
 -
 -    /**
 -     * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for 
non-local traces (traces
 -     * that are not initiated by local node == coordinator).
 -     */
 -    public static void mutateWithTracing(final ByteBuffer sessionId, final 
String message, final int elapsed, final int ttl)
 -    {
 -        final String threadName = Thread.currentThread().getName();
 -
 -        StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
 -        {
 -            public void runMayThrow()
 -            {
 -                mutateWithCatch(TraceKeyspace.makeEventMutation(sessionId, 
message, elapsed, threadName, ttl));
 -            }
 -        });
 -    }
 -
 -    static void mutateWithCatch(Mutation mutation)
 -    {
 -        try
 -        {
 -            StorageProxy.mutate(Collections.singletonList(mutation), 
ConsistencyLevel.ANY);
 -        }
 -        catch (OverloadedException e)
 -        {
 -            Tracing.logger.warn("Too many nodes are overloaded to save trace 
events");
 -        }
 -    }
 -
 -    /**
 -     * Post a no-op event to the TRACING stage, so that we can be sure that 
any previous mutations
 -     * have at least been applied to one replica. This works because the 
tracking executor only
 -     * has one thread in its pool, see {@link StageManager#tracingExecutor()}.
 -     */
 -    protected void waitForPendingEvents()
 -    {
 -        if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0)
 -            return;
 -
 -        try
 -        {
 -            if (logger.isTraceEnabled())
 -                logger.trace("Waiting for up to {} seconds for trace events 
to complete",
 -                             +WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS);
 -
 -            
StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK)
 -                        .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, 
TimeUnit.SECONDS);
 -        }
 -        catch (Throwable t)
 -        {
 -            JVMStabilityInspector.inspectThrowable(t);
 -            logger.debug("Failed to wait for tracing events to complete: {}", 
t);
 -        }
 -    }
 +    protected abstract void traceImpl(String message);
  
++    protected abstract void waitForPendingEvents();
+ 
      public boolean acquireReference()
      {
          while (true)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceStateImpl.java
index 113ebb7,0000000..e2d3a68
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
@@@ -1,74 -1,0 +1,112 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.tracing;
 +
 +import java.net.InetAddress;
 +import java.util.Collections;
 +import java.util.UUID;
++import java.util.concurrent.ExecutionException;
++import java.util.concurrent.TimeUnit;
++
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.exceptions.OverloadedException;
 +import org.apache.cassandra.service.StorageProxy;
++import org.apache.cassandra.utils.JVMStabilityInspector;
 +import org.apache.cassandra.utils.WrappedRunnable;
 +
 +/**
 + * ThreadLocal state for a tracing session. The presence of an instance of 
this class as a ThreadLocal denotes that an
 + * operation is being traced.
 + */
 +public class TraceStateImpl extends TraceState
 +{
++    private static final Logger logger = 
LoggerFactory.getLogger(TraceStateImpl.class);
++    private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS =
++      
Integer.valueOf(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs",
 "1"));
++
 +    public TraceStateImpl(InetAddress coordinator, UUID sessionId, 
Tracing.TraceType traceType)
 +    {
 +        super(coordinator, sessionId, traceType);
 +    }
 +
 +    protected void traceImpl(String message)
 +    {
 +        final String threadName = Thread.currentThread().getName();
 +        final int elapsed = elapsed();
 +
 +        executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, 
message, elapsed, threadName, ttl));
++        if (logger.isTraceEnabled())
++            logger.trace("Adding <{}> to trace events", message);
++    }
++
++    /**
++     * Post a no-op event to the TRACING stage, so that we can be sure that 
any previous mutations
++     * have at least been applied to one replica. This works because the 
tracking executor only
++     * has one thread in its pool, see {@link StageManager#tracingExecutor()}.
++     */
++    protected void waitForPendingEvents()
++    {
++        if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0)
++            return;
++
++        try
++        {
++            if (logger.isTraceEnabled())
++                logger.trace("Waiting for up to {} seconds for trace events 
to complete",
++                             WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS);
++
++            
StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK)
++                        .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, 
TimeUnit.SECONDS);
++        }
++        catch (Throwable t)
++        {
++            JVMStabilityInspector.inspectThrowable(t);
++            logger.debug("Failed to wait for tracing events to complete: {}", 
t);
++        }
 +    }
 +
 +    static void executeMutation(final Mutation mutation)
 +    {
 +        StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
 +        {
 +            protected void runMayThrow()
 +            {
 +                mutateWithCatch(mutation);
 +            }
 +        });
 +    }
 +
 +    static void mutateWithCatch(Mutation mutation)
 +    {
 +        try
 +        {
 +            StorageProxy.mutate(Collections.singletonList(mutation), 
ConsistencyLevel.ANY);
 +        }
 +        catch (OverloadedException e)
 +        {
 +            Tracing.logger.warn("Too many nodes are overloaded to save trace 
events");
 +        }
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/test/unit/org/apache/cassandra/tracing/TracingTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/tracing/TracingTest.java
index 1b0e507,0000000..30521c0
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/tracing/TracingTest.java
+++ b/test/unit/org/apache/cassandra/tracing/TracingTest.java
@@@ -1,173 -1,0 +1,176 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.tracing;
 +
 +import java.net.InetAddress;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.UUID;
 +
 +import org.junit.Test;
 +
 +import org.apache.cassandra.utils.progress.ProgressEvent;
 +import org.apache.cassandra.utils.progress.ProgressListener;
 +
 +public final class TracingTest
 +{
 +
 +    @Test
 +    public void test()
 +    {
 +        List<String> traces = new ArrayList<>();
 +        Tracing tracing = new TracingImpl(traces);
 +        tracing.newSession(Tracing.TraceType.NONE);
 +        TraceState state = tracing.begin("test-request", 
Collections.<String,String>emptyMap());
 +        state.trace("test-1");
 +        state.trace("test-2");
 +        state.trace("test-3");
 +        tracing.stopSession();
 +
 +        assert null == tracing.get();
 +        assert 4 == traces.size();
 +        assert "test-request".equals(traces.get(0));
 +        assert "test-1".equals(traces.get(1));
 +        assert "test-2".equals(traces.get(2));
 +        assert "test-3".equals(traces.get(3));
 +    }
 +
 +    @Test
 +    public void test_get()
 +    {
 +        List<String> traces = new ArrayList<>();
 +        Tracing tracing = new TracingImpl(traces);
 +        tracing.newSession(Tracing.TraceType.NONE);
 +        tracing.begin("test-request", Collections.<String,String>emptyMap());
 +        tracing.get().trace("test-1");
 +        tracing.get().trace("test-2");
 +        tracing.get().trace("test-3");
 +        tracing.stopSession();
 +
 +        assert null == tracing.get();
 +        assert 4 == traces.size();
 +        assert "test-request".equals(traces.get(0));
 +        assert "test-1".equals(traces.get(1));
 +        assert "test-2".equals(traces.get(2));
 +        assert "test-3".equals(traces.get(3));
 +    }
 +
 +    @Test
 +    public void test_get_uuid()
 +    {
 +        List<String> traces = new ArrayList<>();
 +        Tracing tracing = new TracingImpl(traces);
 +        UUID uuid = tracing.newSession(Tracing.TraceType.NONE);
 +        tracing.begin("test-request", Collections.<String,String>emptyMap());
 +        tracing.get(uuid).trace("test-1");
 +        tracing.get(uuid).trace("test-2");
 +        tracing.get(uuid).trace("test-3");
 +        tracing.stopSession();
 +
 +        assert null == tracing.get();
 +        assert 4 == traces.size();
 +        assert "test-request".equals(traces.get(0));
 +        assert "test-1".equals(traces.get(1));
 +        assert "test-2".equals(traces.get(2));
 +        assert "test-3".equals(traces.get(3));
 +    }
 +
 +    @Test
 +    public void test_states()
 +    {
 +        List<String> traces = new ArrayList<>();
 +        Tracing tracing = new TracingImpl(traces);
 +        tracing.newSession(Tracing.TraceType.REPAIR);
 +        tracing.begin("test-request", Collections.<String,String>emptyMap());
 +        tracing.get().enableActivityNotification("test-tag");
 +        assert TraceState.Status.IDLE == tracing.get().waitActivity(1);
 +        tracing.get().trace("test-1");
 +        assert TraceState.Status.ACTIVE == tracing.get().waitActivity(1);
 +        tracing.get().stop();
 +        assert TraceState.Status.STOPPED == tracing.get().waitActivity(1);
 +        tracing.stopSession();
 +        assert null == tracing.get();
 +    }
 +
 +    @Test
 +    public void test_progress_listener()
 +    {
 +        List<String> traces = new ArrayList<>();
 +        Tracing tracing = new TracingImpl(traces);
 +        tracing.newSession(Tracing.TraceType.REPAIR);
 +        tracing.begin("test-request", Collections.<String,String>emptyMap());
 +        tracing.get().enableActivityNotification("test-tag");
 +
 +        tracing.get().addProgressListener(
 +                new ProgressListener()
 +                {
 +                    public void progress(String tag, ProgressEvent pe)
 +                    {
 +                        assert "test-tag".equals(tag);
 +                        assert "test-trace".equals(pe.getMessage());
 +                    }
 +                });
 +
 +        tracing.get().trace("test-trace");
 +        tracing.stopSession();
 +        assert null == tracing.get();
 +    }
 +
 +    private class TracingImpl extends Tracing
 +    {
 +        private final List<String> traces;
 +
 +        public TracingImpl(List<String> traces)
 +        {
 +            this.traces = traces;
 +        }
 +
 +        public void stopSessionImpl()
 +        {}
 +
 +        public TraceState begin(String request, InetAddress ia, Map<String, 
String> map)
 +        {
 +            traces.add(request);
 +            return get();
 +        }
 +
 +        protected TraceState newTraceState(InetAddress ia, UUID uuid, 
Tracing.TraceType tt)
 +        {
 +            return new TraceState(ia, uuid, tt)
 +            {
 +                protected void traceImpl(String string)
 +                {
 +                    traces.add(string);
 +                }
 +
++                protected void waitForPendingEvents()
++                {
++                }
 +            };
 +        }
 +
 +        public void trace(ByteBuffer bb, String string, int i)
 +        {
 +            throw new UnsupportedOperationException("Not supported yet.");
 +        }
 +    }
 +}

Reply via email to