Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/778f2a46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/778f2a46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/778f2a46

Branch: refs/heads/cassandra-3.8
Commit: 778f2a46e2df52aa8451aceaf17046e6b8c86ace
Parents: 9ed3b42 00e7ecf
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 12:33:54 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 12:33:54 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  8 ++--
 .../cassandra/streaming/StreamReceiveTask.java  | 50 +++++++++++++++-----
 .../cassandra/streaming/StreamSession.java      | 17 +++++--
 .../streaming/StreamingTransferTest.java        | 30 ++++++++++--
 5 files changed, 83 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 02786c5,7d62f97..8118de1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,14 +1,27 @@@
 -2.2.8
 +3.0.9
+  * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables 
(CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds 
(CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
   * MemoryUtil.getShort() should return an unsigned short also for 
architectures not supporting unaligned memory accesses (CASSANDRA-11973)
  Merged from 2.1:
 - * Don't write shadowed range tombstone (CASSANDRA-12030)
 - * Improve digest calculation in the presence of overlapping tombstones 
(CASSANDRA-11349)
   * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
 - * Account for partition deletions in tombstone histogram (CASSANDRA-12112)
  
  
 -2.2.7
 +3.0.8
 + * Fix potential race in schema during new table creation (CASSANDRA-12083)
 + * cqlsh: fix error handling in rare COPY FROM failure scenario 
(CASSANDRA-12070)
 + * Disable autocompaction during drain (CASSANDRA-11878)
 + * Add a metrics timer to MemtablePool and use it to track time spent blocked 
on memory in MemtableAllocator (CASSANDRA-11327)
 + * Fix upgrading schema with super columns with non-text subcomparators 
(CASSANDRA-12023)
 + * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
 +Merged from 2.2:
   * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
   * Validate bloom_filter_fp_chance against lowest supported
     value when the table is created (CASSANDRA-11920)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6280f3a,b342edc..040906b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -17,9 -17,7 +17,6 @@@
   */
  package org.apache.cassandra.streaming;
  
--import java.io.File;
- import java.io.IOError;
- import java.io.IOException;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.List;
@@@ -36,19 -33,13 +33,20 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.view.View;
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.Pair;
 -
++import org.apache.cassandra.utils.Throwables;
  import org.apache.cassandra.utils.concurrent.Refs;
  
  /**
@@@ -65,16 -55,11 +63,16 @@@ public class StreamReceiveTask extends 
      // total size of files to receive
      private final long totalSize;
  
 +    // Transaction tracking new files received
-     public final LifecycleTransaction txn;
++    private final LifecycleTransaction txn;
 +
      // true if task is done (either completed or aborted)
--    private boolean done = false;
++    private volatile boolean done = false;
  
      //  holds references to SSTables received
 -    protected Collection<SSTableWriter> sstables;
 +    protected Collection<SSTableReader> sstables;
 +
 +    private int remoteSSTablesReceived = 0;
  
      public StreamReceiveTask(StreamSession session, UUID cfId, int 
totalFiles, long totalSize)
      {
@@@ -92,18 -74,16 +90,32 @@@
       *
       * @param sstable SSTable file received.
       */
 -    public synchronized void received(SSTableWriter sstable)
 +    public synchronized void received(SSTableMultiWriter sstable)
      {
          if (done)
++        {
++            logger.warn("[{}] Received sstable {} on already finished stream 
received task. Aborting sstable.", session.planId(),
++                        sstable.getFilename());
++            Throwables.maybeFail(sstable.abort(null));
              return;
++        }
+ 
 -        assert cfId.equals(sstable.metadata.cfId);
 +        remoteSSTablesReceived++;
 +        assert cfId.equals(sstable.getCfId());
  
-         Collection<SSTableReader> finished = sstable.finish(true);
 -        sstables.add(sstable);
++        Collection<SSTableReader> finished = null;
++        try
++        {
++            finished = sstable.finish(true);
++        }
++        catch (Throwable t)
++        {
++            Throwables.maybeFail(sstable.abort(t));
++        }
 +        txn.update(finished, false);
 +        sstables.addAll(finished);
  
 -        if (sstables.size() == totalFiles)
 +        if (remoteSSTablesReceived == totalFiles)
          {
              done = true;
              executor.submit(new OnCompletionRunnable(this));
@@@ -120,6 -100,6 +132,13 @@@
          return totalSize;
      }
  
++    public synchronized LifecycleTransaction getTransaction()
++    {
++        if (done)
++            throw new RuntimeException(String.format("Stream receive task {} 
of cf {} already finished.", session.planId(), cfId));
++        return txn;
++    }
++
      private static class OnCompletionRunnable implements Runnable
      {
          private final StreamReceiveTask task;
@@@ -139,71 -117,52 +158,71 @@@
                  if (kscf == null)
                  {
                      // schema was dropped during streaming
 -                    for (SSTableWriter writer : task.sstables)
 -                        writer.abort();
                      task.sstables.clear();
-                     task.txn.abort();
++                    task.abortTransaction();
 +                    task.session.taskCompleted(task);
                      return;
                  }
 -                ColumnFamilyStore cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 +                cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 +                hasViews = !Iterables.isEmpty(View.findAll(kscf.left, 
kscf.right));
  
 -                File lockfiledir = 
cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
 -                StreamLockfile lockfile = new StreamLockfile(lockfiledir, 
UUID.randomUUID());
 -                lockfile.create(task.sstables);
 -                List<SSTableReader> readers = new ArrayList<>();
 -                for (SSTableWriter writer : task.sstables)
 -                    readers.add(writer.finish(true));
 -                lockfile.delete();
 -                task.sstables.clear();
 +                Collection<SSTableReader> readers = task.sstables;
  
                  try (Refs<SSTableReader> refs = Refs.ref(readers))
                  {
 -                    // add sstables and build secondary indexes
 -                    cfs.addSSTables(readers);
 -                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, 
cfs.indexManager.allIndexesNames());
 -
 -                    //invalidate row and counter cache
 -                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
 +                    //We have a special path for views.
 +                    //Since the view requires cleaning up any pre-existing 
state, we must put
 +                    //all partitions through the same write path as normal 
mutations.
 +                    //This also ensures any 2is are also updated
 +                    if (hasViews)
                      {
 -                        List<Bounds<Token>> boundsToInvalidate = new 
ArrayList<>(readers.size());
 -                        for (SSTableReader sstable : readers)
 -                            boundsToInvalidate.add(new 
Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
 -                        Set<Bounds<Token>> nonOverlappingBounds = 
Bounds.getNonOverlappingBounds(boundsToInvalidate);
 -
 -                        if (cfs.isRowCacheEnabled())
 +                        for (SSTableReader reader : readers)
                          {
 -                            int invalidatedKeys = 
cfs.invalidateRowCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} row 
cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", 
task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), 
cfs.getColumnFamilyName());
 +                            try (ISSTableScanner scanner = 
reader.getScanner())
 +                            {
 +                                while (scanner.hasNext())
 +                                {
 +                                    try (UnfilteredRowIterator rowIterator = 
scanner.next())
 +                                    {
 +                                        //Apply unsafe (we will flush below 
before transaction is done)
 +                                        new 
Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
 +                                    }
 +                                }
 +                            }
                          }
 +                    }
 +                    else
 +                    {
-                         task.txn.finish();
++                        task.finishTransaction();
  
 -                        if (cfs.metadata.isCounter())
 +                        // add sstables and build secondary indexes
 +                        cfs.addSSTables(readers);
 +                        cfs.indexManager.buildAllIndexesBlocking(readers);
 +
 +                        //invalidate row and counter cache
 +                        if (cfs.isRowCacheEnabled() || 
cfs.metadata.isCounter())
                          {
 -                            int invalidatedKeys = 
cfs.invalidateCounterCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} 
counter cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", 
task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), 
cfs.getColumnFamilyName());
 +                            List<Bounds<Token>> boundsToInvalidate = new 
ArrayList<>(readers.size());
 +                            readers.forEach(sstable -> 
boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), 
sstable.last.getToken())));
 +                            Set<Bounds<Token>> nonOverlappingBounds = 
Bounds.getNonOverlappingBounds(boundsToInvalidate);
 +
 +                            if (cfs.isRowCacheEnabled())
 +                            {
 +                                int invalidatedKeys = 
cfs.invalidateRowCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} 
row cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", 
task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), 
cfs.getTableName());
 +                            }
 +
 +                            if (cfs.metadata.isCounter())
 +                            {
 +                                int invalidatedKeys = 
cfs.invalidateCounterCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} 
counter cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", 
task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), 
cfs.getTableName());
 +                            }
                          }
                      }
                  }
@@@ -211,21 -171,10 +230,20 @@@
              }
              catch (Throwable t)
              {
--                logger.error("Error applying streamed data: ", t);
                  JVMStabilityInspector.inspectThrowable(t);
                  task.session.onError(t);
              }
 +            finally
 +            {
 +                //We don't keep the streamed sstables since we've applied 
them manually
 +                //So we abort the txn and delete the streamed sstables
 +                if (hasViews)
 +                {
 +                    if (cfs != null)
 +                        cfs.forceBlockingFlush();
-                     task.txn.abort();
++                    task.abortTransaction();
 +                }
 +            }
          }
      }
  
@@@ -241,7 -190,8 +259,17 @@@
              return;
  
          done = true;
-         txn.abort();
 -        for (SSTableWriter writer : sstables)
 -            writer.abort();
++        abortTransaction();
          sstables.clear();
      }
++
++    private synchronized void abortTransaction()
++    {
++        txn.abort();
++    }
++
++    private synchronized void finishTransaction()
++    {
++        txn.finish();
++    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index bfbedc7,f4c900e..12f561b
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -211,12 -212,6 +211,12 @@@ public class StreamSession implements I
      }
  
  
 +    public LifecycleTransaction getTransaction(UUID cfId)
 +    {
 +        assert receivers.containsKey(cfId);
-         return receivers.get(cfId).txn;
++        return receivers.get(cfId).getTransaction();
 +    }
 +
      /**
       * Bind this session to report to specific {@link StreamResultFuture} and
       * perform pre-streaming initialization.
@@@ -281,8 -276,8 +281,9 @@@
       * @param flushTables flush tables?
       * @param repairedAt the time the repair started.
       */
--    public void addTransferRanges(String keyspace, Collection<Range<Token>> 
ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
++    public synchronized void addTransferRanges(String keyspace, 
Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean 
flushTables, long repairedAt)
      {
++        failIfFinished();
          Collection<ColumnFamilyStore> stores = 
getColumnFamilyStores(keyspace, columnFamilies);
          if (flushTables)
              flushSSTables(stores);
@@@ -300,6 -295,6 +301,12 @@@
          }
      }
  
++    private void failIfFinished()
++    {
++        if (state() == State.COMPLETE || state() == State.FAILED)
++            throw new RuntimeException(String.format("Stream %s is finished 
with state %s", planId(), state().name()));
++    }
++
      private Collection<ColumnFamilyStore> getColumnFamilyStores(String 
keyspace, Collection<String> columnFamilies)
      {
          Collection<ColumnFamilyStore> stores = new HashSet<>();
@@@ -371,8 -369,8 +378,9 @@@
          }
      }
  
--    public void addTransferFiles(Collection<SSTableStreamingSections> 
sstableDetails)
++    public synchronized void 
addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
      {
++        failIfFinished();
          Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
          while (iter.hasNext())
          {
@@@ -745,8 -743,8 +753,9 @@@
          FBUtilities.waitOnFutures(flushes);
      }
  
--    private void prepareReceiving(StreamSummary summary)
++    private synchronized void prepareReceiving(StreamSummary summary)
      {
++        failIfFinished();
          if (summary.files > 0)
              receivers.put(summary.cfId, new StreamReceiveTask(this, 
summary.cfId, summary.files, summary.totalSize));
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/778f2a46/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7223e76,2b16267..6be880c
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -229,14 -238,14 +229,38 @@@ public class StreamingTransferTes
          List<Range<Token>> ranges = new ArrayList<>();
          // wrapped range
          ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), 
p.getToken(ByteBufferUtil.bytes("key0"))));
--        new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, 
cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get();
++        StreamPlan streamPlan = new 
StreamPlan("StreamingTransferTest").transferRanges(LOCAL, 
cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName());
++        streamPlan.execute().get();
          verifyConnectionsAreClosed();
++
++        //cannot add ranges after stream session is finished
++        try
++        {
++            streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), ranges, 
cfs.getColumnFamilyName());
++            fail("Should have thrown exception");
++        }
++        catch (RuntimeException e)
++        {
++            //do nothing
++        }
      }
  
      private void transfer(SSTableReader sstable, List<Range<Token>> ranges) 
throws Exception
      {
--        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, 
makeStreamingDetails(ranges, 
Refs.tryRef(Arrays.asList(sstable)))).execute().get();
++        StreamPlan streamPlan = new 
StreamPlan("StreamingTransferTest").transferFiles(LOCAL, 
makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
++        streamPlan.execute().get();
          verifyConnectionsAreClosed();
++
++        //cannot add files after stream session is finished
++        try
++        {
++            streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, 
Refs.tryRef(Arrays.asList(sstable))));
++            fail("Should have thrown exception");
++        }
++        catch (RuntimeException e)
++        {
++            //do nothing
++        }
      }
  
      /**
@@@ -312,36 -325,27 +336,36 @@@
          String cfname = "StandardInteger1";
          Keyspace keyspace = Keyspace.open(ks);
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
 +        ClusteringComparator comparator = cfs.getComparator();
  
 -        String key = "key0";
 -        Mutation rm = new Mutation(ks, ByteBufferUtil.bytes(key));
 -        // add columns of size slightly less than column_index_size to force 
insert column index
 -        rm.add(cfname, cellname(1), ByteBuffer.wrap(new 
byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
 -        rm.add(cfname, cellname(6), ByteBuffer.wrap(new 
byte[DatabaseDescriptor.getColumnIndexSize()]), 2);
 -        ColumnFamily cf = rm.addOrGet(cfname);
 -        // add RangeTombstones
 -        cf.delete(new DeletionInfo(cellname(2), cellname(3), 
cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        cf.delete(new DeletionInfo(cellname(5), cellname(7), 
cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        cf.delete(new DeletionInfo(cellname(8), cellname(10), 
cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.applyUnsafe();
 +        String key = "key1";
 +
 +
 +        RowUpdateBuilder updates = new RowUpdateBuilder(cfs.metadata, 
FBUtilities.timestampMicros(), key);
  
 -        key = "key1";
 -        rm = new Mutation(ks, ByteBufferUtil.bytes(key));
          // add columns of size slightly less than column_index_size to force 
insert column index
 -        rm.add(cfname, cellname(1), ByteBuffer.wrap(new 
byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
 -        cf = rm.addOrGet(cfname);
 +        updates.clustering(1)
 +                .add("val", ByteBuffer.wrap(new 
byte[DatabaseDescriptor.getColumnIndexSize() - 64]))
 +                .build()
 +                .apply();
 +
 +        updates = new RowUpdateBuilder(cfs.metadata, 
FBUtilities.timestampMicros(), key);
 +        updates.clustering(6)
 +                .add("val", ByteBuffer.wrap(new 
byte[DatabaseDescriptor.getColumnIndexSize()]))
-                .build()
++                .build()
 +                .apply();
 +
          // add RangeTombstones
 -        cf.delete(new DeletionInfo(cellname(2), cellname(3), 
cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.applyUnsafe();
 +        //updates = new RowUpdateBuilder(cfs.metadata, 
FBUtilities.timestampMicros() + 1 , key);
 +        //updates.addRangeTombstone(Slice.make(comparator, 
comparator.make(2), comparator.make(4)))
 +        //        .build()
 +        //        .apply();
 +
 +
 +        updates = new RowUpdateBuilder(cfs.metadata, 
FBUtilities.timestampMicros() + 1, key);
 +        updates.addRangeTombstone(Slice.make(comparator.make(5), 
comparator.make(7)))
 +                .build()
 +                .apply();
  
          cfs.forceBlockingFlush();
  

Reply via email to