Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7452b205
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7452b205
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7452b205

Branch: refs/heads/cassandra-2.2
Commit: 7452b20503c376c9ea15fdfac8da0c78381b3f73
Parents: fc67545 f6cab37
Author: Tyler Hobbs <tylerlho...@gmail.com>
Authored: Wed Sep 30 10:49:42 2015 -0500
Committer: Tyler Hobbs <tylerlho...@gmail.com>
Committed: Wed Sep 30 10:49:42 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/cassandra/transport/Event.java   | 29 +++++---
 .../org/apache/cassandra/transport/Server.java  | 71 +++++++++++++-------
 3 files changed, 70 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7452b205/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c36c6f5,0ad2b36..45070b2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,6 +1,18 @@@
 -2.1.10
 +2.2.2
 + * cqlsh prompt includes name of keyspace after failed `use` statement 
(CASSANDRA-10369)
 + * Configurable page size in cqlsh (CASSANDRA-9855)
 + * Defer default role manager setup until all nodes are on 2.2+ 
(CASSANDRA-9761)
 + * Cancel transaction for sstables we wont redistribute index summary
 +   for (CASSANDRA-10270)
 + * Handle missing RoleManager in config after upgrade to 2.2 
(CASSANDRA-10209) 
 + * Retry snapshot deletion after compaction and gc on Windows 
(CASSANDRA-10222)
 + * Fix failure to start with space in directory path on Windows 
(CASSANDRA-10239)
 + * Fix repair hang when snapshot failed (CASSANDRA-10057)
 + * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks
 +   (CASSANDRA-10199)
 +Merged from 2.1:
+  * Avoid misleading pushed notifications when multiple nodes
+    share an rpc_address (CASSANDRA-10052)
   * Fix dropping undroppable when message queue is full (CASSANDRA-10113)
   * Fix potential ClassCastException during paging (CASSANDRA-10352)
   * Prevent ALTER TYPE from creating circular references (CASSANDRA-10339)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7452b205/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7452b205/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/Server.java
index 72a1b60,c21a669..d610bff
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@@ -55,6 -50,12 +55,7 @@@ import org.apache.cassandra.metrics.Cli
  import org.apache.cassandra.security.SSLFactory;
  import org.apache.cassandra.service.*;
  import org.apache.cassandra.transport.messages.EventMessage;
 -import io.netty.bootstrap.ServerBootstrap;
 -import io.netty.channel.*;
 -import io.netty.channel.group.ChannelGroup;
 -import io.netty.channel.group.DefaultChannelGroup;
 -import io.netty.handler.ssl.SslHandler;
+ import org.apache.cassandra.utils.FBUtilities;
  
  public class Server implements CassandraDaemon.Server
  {
@@@ -409,9 -382,28 +410,31 @@@
              }
          }
  
+         private void send(InetAddress endpoint, Event.NodeEvent event)
+         {
++            if (logger.isTraceEnabled())
++                logger.trace("Sending event for endpoint {}, rpc address {}", 
endpoint, event.nodeAddress());
++
+             // If the endpoint is not the local node, extract the node address
+             // and if it is the same as our own RPC broadcast address (which 
defaults to the rcp address)
+             // then don't send the notification. This covers the case of 
rpc_address set to "localhost",
+             // which is not useful to any driver and in fact may cauase 
serious problems to some drivers,
+             // see CASSANDRA-10052
+             if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
+                 
event.nodeAddress().equals(DatabaseDescriptor.getBroadcastRpcAddress()))
+                 return;
+ 
+             send(event);
+         }
+ 
+         private void send(Event event)
+         {
+             server.connectionTracker.send(event);
+         }
+ 
          public void onJoinCluster(InetAddress endpoint)
          {
 -            send(endpoint, 
Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
 +            onTopologyChange(endpoint, 
Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
          }
  
          public void onLeaveCluster(InetAddress endpoint)
@@@ -431,35 -425,9 +454,35 @@@
  
          public void onDown(InetAddress endpoint)
          {
 -            Event.StatusChange.Status prev = lastStatusChange.put(endpoint, 
Event.StatusChange.Status.DOWN);
 -            if (prev == null || prev != Event.StatusChange.Status.DOWN)
 -                send(endpoint, 
Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
 +            onStatusChange(endpoint, 
Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
 +        }
 +
 +        private void onTopologyChange(InetAddress endpoint, 
Event.TopologyChange event)
 +        {
 +            if (logger.isTraceEnabled())
 +                logger.trace("Topology changed event : {}, {}", endpoint, 
event.change);
 +
 +            LatestEvent prev = latestEvents.get(endpoint);
 +            if (prev == null || prev.topology != event.change)
 +            {
 +                LatestEvent ret = latestEvents.put(endpoint, 
LatestEvent.forTopologyChange(event.change, prev));
 +                if (ret == prev)
-                     server.connectionTracker.send(event);
++                    send(endpoint, event);
 +            }
 +        }
 +
 +        private void onStatusChange(InetAddress endpoint, Event.StatusChange 
event)
 +        {
 +            if (logger.isTraceEnabled())
 +                logger.trace("Status changed event : {}, {}", endpoint, 
event.status);
 +
 +            LatestEvent prev = latestEvents.get(endpoint);
 +            if (prev == null || prev.status != event.status)
 +            {
-                 LatestEvent ret = latestEvents.put(endpoint, 
LatestEvent.forStatusChange(event.status, prev));
++                LatestEvent ret = latestEvents.put(endpoint, 
LatestEvent.forStatusChange(event.status, null));
 +                if (ret == prev)
-                     server.connectionTracker.send(event);
++                    send(endpoint, event);
 +            }
          }
  
          public void onCreateKeyspace(String ksName)
@@@ -474,24 -442,12 +497,24 @@@
  
          public void onCreateUserType(String ksName, String typeName)
          {
-             server.connectionTracker.send(new 
Event.SchemaChange(Event.SchemaChange.Change.CREATED, 
Event.SchemaChange.Target.TYPE, ksName, typeName));
+             send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, 
Event.SchemaChange.Target.TYPE, ksName, typeName));
          }
  
 +        public void onCreateFunction(String ksName, String functionName, 
List<AbstractType<?>> argTypes)
 +        {
-             server.connectionTracker.send(new 
Event.SchemaChange(Event.SchemaChange.Change.CREATED, 
Event.SchemaChange.Target.FUNCTION,
-                                                                  ksName, 
functionName, AbstractType.asCQLTypeStringList(argTypes)));
++            send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, 
Event.SchemaChange.Target.FUNCTION,
++                                        ksName, functionName, 
AbstractType.asCQLTypeStringList(argTypes)));
 +        }
 +
 +        public void onCreateAggregate(String ksName, String aggregateName, 
List<AbstractType<?>> argTypes)
 +        {
-             server.connectionTracker.send(new 
Event.SchemaChange(Event.SchemaChange.Change.CREATED, 
Event.SchemaChange.Target.AGGREGATE,
-                                                                  ksName, 
aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
++            send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, 
Event.SchemaChange.Target.AGGREGATE,
++                                        ksName, aggregateName, 
AbstractType.asCQLTypeStringList(argTypes)));
 +        }
 +
          public void onUpdateKeyspace(String ksName)
          {
-             server.connectionTracker.send(new 
Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
+             send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, 
ksName));
          }
  
          public void onUpdateColumnFamily(String ksName, String cfName, 
boolean columnsDidChange)
@@@ -501,24 -457,12 +524,24 @@@
  
          public void onUpdateUserType(String ksName, String typeName)
          {
-             server.connectionTracker.send(new 
Event.SchemaChange(Event.SchemaChange.Change.UPDATED, 
Event.SchemaChange.Target.TYPE, ksName, typeName));
+             send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, 
Event.SchemaChange.Target.TYPE, ksName, typeName));
          }
  
 +        public void onUpdateFunction(String ksName, String functionName, 
List<AbstractType<?>> argTypes)
 +        {
-             server.connectionTracker.send(new 
Event.SchemaChange(Event.SchemaChange.Change.UPDATED, 
Event.SchemaChange.Target.FUNCTION,
-                                                                  ksName, 
functionName, AbstractType.asCQLTypeStringList(argTypes)));
++            send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, 
Event.SchemaChange.Target.FUNCTION,
++                                        ksName, functionName, 
AbstractType.asCQLTypeStringList(argTypes)));
 +        }
 +
 +        public void onUpdateAggregate(String ksName, String aggregateName, 
List<AbstractType<?>> argTypes)
 +        {
-             server.connectionTracker.send(new 
Event.SchemaChange(Event.SchemaChange.Change.UPDATED, 
Event.SchemaChange.Target.AGGREGATE,
-                                                                  ksName, 
aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
++            send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, 
Event.SchemaChange.Target.AGGREGATE,
++                                        ksName, aggregateName, 
AbstractType.asCQLTypeStringList(argTypes)));
 +        }
 +
          public void onDropKeyspace(String ksName)
          {
-             server.connectionTracker.send(new 
Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
+             send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, 
ksName));
          }
  
          public void onDropColumnFamily(String ksName, String cfName)
@@@ -528,19 -472,7 +551,19 @@@
  
          public void onDropUserType(String ksName, String typeName)
          {
-             server.connectionTracker.send(new 
Event.SchemaChange(Event.SchemaChange.Change.DROPPED, 
Event.SchemaChange.Target.TYPE, ksName, typeName));
+             send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, 
Event.SchemaChange.Target.TYPE, ksName, typeName));
          }
 +
 +        public void onDropFunction(String ksName, String functionName, 
List<AbstractType<?>> argTypes)
 +        {
-             server.connectionTracker.send(new 
Event.SchemaChange(Event.SchemaChange.Change.DROPPED, 
Event.SchemaChange.Target.FUNCTION,
-                                                                  ksName, 
functionName, AbstractType.asCQLTypeStringList(argTypes)));
++            send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, 
Event.SchemaChange.Target.FUNCTION,
++                                        ksName, functionName, 
AbstractType.asCQLTypeStringList(argTypes)));
 +        }
 +
 +        public void onDropAggregate(String ksName, String aggregateName, 
List<AbstractType<?>> argTypes)
 +        {
-             server.connectionTracker.send(new 
Event.SchemaChange(Event.SchemaChange.Change.DROPPED, 
Event.SchemaChange.Target.AGGREGATE,
-                                                                  ksName, 
aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
++            send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, 
Event.SchemaChange.Target.AGGREGATE,
++                                        ksName, aggregateName, 
AbstractType.asCQLTypeStringList(argTypes)));
 +        }
      }
  }

Reply via email to