Merge branch 'cassandra-2.1' into cassandra-2.2

Conflicts:
        CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c33ebcd2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c33ebcd2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c33ebcd2

Branch: refs/heads/cassandra-2.2
Commit: c33ebcd2fd09704ac1cdfa81d12978b9f582b404
Parents: 0522226 9d44186
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Wed Jul 15 10:30:32 2015 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Wed Jul 15 10:30:32 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../locator/DynamicEndpointSnitch.java          | 34 ++++++++--
 .../locator/DynamicEndpointSnitchTest.java      | 69 +++++++++++++++++++-
 3 files changed, 95 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ebcd2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3046ceb,1e21c8d..c2d06e4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -2.1.9
 +2.2.0-rc3
 + * sum() and avg() functions missing for smallint and tinyint types 
(CASSANDRA-9671)
 + * Revert CASSANDRA-9542 (allow native functions in UDA) (CASSANDRA-9771)
 +Merged from 2.1:
+  * Complete CASSANDRA-8448 fix (CASSANDRA-9519)
   * Handle corrupt files on startup (CASSANDRA-9686)
   * Fix clientutil jar and tests (CASSANDRA-9760)
   * (cqlsh) Allow the SSL protocol version to be specified through the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ebcd2/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index b670b6a,6b6286f..3e89dd4
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@@ -216,12 -229,20 +230,20 @@@ public class DynamicEndpointSnitch exte
              return 1;
      }
  
+     public int compareEndpoints(InetAddress target, InetAddress a1, 
InetAddress a2)
+     {
+         // That function is fundamentally unsafe because the scores can 
change at any time and so the result of that
+         // method is not stable for identical arguments. This is why we don't 
rely on super.sortByProximity() in
+         // sortByProximityWithScore().
+         throw new UnsupportedOperationException("You shouldn't wrap the 
DynamicEndpointSnitch (within itself or otherwise)");
+     }
+ 
      public void receiveTiming(InetAddress host, long latency) // this is cheap
      {
 -        ExponentiallyDecayingSample sample = samples.get(host);
 +        ExponentiallyDecayingReservoir sample = samples.get(host);
          if (sample == null)
          {
 -            ExponentiallyDecayingSample maybeNewSample = new 
ExponentiallyDecayingSample(WINDOW_SIZE, ALPHA);
 +            ExponentiallyDecayingReservoir maybeNewSample = new 
ExponentiallyDecayingReservoir(WINDOW_SIZE, ALPHA);
              sample = samples.putIfAbsent(host, maybeNewSample);
              if (sample == null)
                  sample = maybeNewSample;
@@@ -265,10 -286,10 +287,9 @@@
          scores = newScores;
      }
  
- 
      private void reset()
      {
 -        for (ExponentiallyDecayingSample sample : samples.values())
 -            sample.clear();
 +       samples.clear();
      }
  
      public Map<InetAddress, Double> getScores()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ebcd2/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
index c7c1f17,3f90532..64da6d3
--- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
@@@ -90,4 -90,67 +90,67 @@@ public class DynamicEndpointSnitchTes
          order = Arrays.asList(host1, host3, host2);
          assertEquals(order, dsnitch.getSortedListByProximity(self, 
Arrays.asList(host1, host2, host3)));
      }
- }
+ 
+     @Test
+     public void testConcurrency() throws InterruptedException, IOException, 
ConfigurationException
+     {
+         // The goal of this test is to check for CASSANDRA-8448/CASSANDRA-9519
+         double badness = DatabaseDescriptor.getDynamicBadnessThreshold();
+         DatabaseDescriptor.setDynamicBadnessThreshold(0.0);
+ 
+         final int ITERATIONS = 10;
+ 
+         // do this because SS needs to be initialized before DES can work 
properly.
 -        StorageService.instance.initClient(0);
++        StorageService.instance.unsafeInitialize();
+         SimpleSnitch ss = new SimpleSnitch();
+         DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, 
String.valueOf(ss.hashCode()));
+         InetAddress self = FBUtilities.getBroadcastAddress();
+ 
+         List<InetAddress> hosts = new ArrayList<>();
+         // We want a giant list of hosts so that sorting it takes time, 
making it much more likely to reproduce the
+         // problem we're looking for.
+         for (int i = 0; i < 10; i++)
+             for (int j = 0; j < 256; j++)
+                 for (int k = 0; k < 256; k++)
+                     hosts.add(InetAddress.getByAddress(new byte[]{127, 
(byte)i, (byte)j, (byte)k}));
+ 
+         ScoreUpdater updater = new ScoreUpdater(dsnitch, hosts);
+         updater.start();
+ 
+         List<InetAddress> result = null;
+         for (int i = 0; i < ITERATIONS; i++)
+             result = dsnitch.getSortedListByProximity(self, hosts);
+ 
+         updater.stopped = true;
+         updater.join();
+ 
+         DatabaseDescriptor.setDynamicBadnessThreshold(badness);
+     }
+ 
+     public static class ScoreUpdater extends Thread
+     {
+         private static final int SCORE_RANGE = 100;
+ 
+         public volatile boolean stopped;
+ 
+         private final DynamicEndpointSnitch dsnitch;
+         private final List<InetAddress> hosts;
+         private final Random random = new Random();
+ 
+         public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddress> 
hosts)
+         {
+             this.dsnitch = dsnitch;
+             this.hosts = hosts;
+         }
+ 
+         public void run()
+         {
+             while (!stopped)
+             {
+                 InetAddress host = hosts.get(random.nextInt(hosts.size()));
+                 int score = random.nextInt(SCORE_RANGE);
+                 dsnitch.receiveTiming(host, score);
+             }
+         }
+     }
+ }

Reply via email to