Allow storage port to be configurable per node

Patch by Ariel Weisberg; Reviewed by Jason Brown for CASSANDRA-7544


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59b5b6be
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59b5b6be
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59b5b6be

Branch: refs/heads/trunk
Commit: 59b5b6bef0fa76bf5740b688fcd4d9cf525760d0
Parents: 4de7a65
Author: Ariel Weisberg <aweisb...@apple.com>
Authored: Thu Nov 9 11:33:48 2017 -0500
Committer: Ariel Weisberg <aweisb...@apple.com>
Committed: Thu Jan 25 14:32:24 2018 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  18 +-
 bin/cqlsh.py                                    |  12 +-
 build.xml                                       |  10 +-
 conf/cassandra.yaml                             |   9 +-
 ...iver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar | Bin 2618371 -> 0 bytes
 ...sandra-driver-core-4.0.0-SNAPSHOT-shaded.jar | Bin 0 -> 2621460 bytes
 lib/cassandra-driver-internal-only-3.11.zip     | Bin 264882 -> 0 bytes
 ...iver-internal-only-3.12.0.post0-9ee88ded.zip | Bin 0 -> 265110 bytes
 .../cassandra/batchlog/BatchlogManager.java     |  52 +-
 .../cassandra/config/DatabaseDescriptor.java    |  27 +-
 .../apache/cassandra/db/ConsistencyLevel.java   |  40 +-
 .../db/CounterMutationVerbHandler.java          |   2 +-
 .../cassandra/db/DiskBoundaryManager.java       |   4 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |   1 +
 src/java/org/apache/cassandra/db/Mutation.java  |   3 -
 .../cassandra/db/MutationVerbHandler.java       |  43 +-
 .../cassandra/db/SizeEstimatesRecorder.java     |   4 +-
 .../org/apache/cassandra/db/SystemKeyspace.java | 522 +++++++-----
 .../cassandra/db/SystemKeyspaceMigrator40.java  | 184 +++++
 .../org/apache/cassandra/db/view/ViewUtils.java |  18 +-
 .../org/apache/cassandra/dht/BootStrapper.java  |  12 +-
 .../cassandra/dht/RangeFetchMapCalculator.java  |  37 +-
 .../org/apache/cassandra/dht/RangeStreamer.java |  96 +--
 .../dht/tokenallocator/TokenAllocation.java     |  50 +-
 .../tokenallocator/TokenAllocatorFactory.java   |   8 +-
 .../exceptions/ReadFailureException.java        |   4 +-
 .../exceptions/RequestFailureException.java     |   6 +-
 .../exceptions/WriteFailureException.java       |   4 +-
 .../apache/cassandra/gms/ApplicationState.java  |  11 +-
 .../org/apache/cassandra/gms/EndpointState.java |   8 +-
 .../apache/cassandra/gms/FailureDetector.java   |  74 +-
 .../cassandra/gms/FailureDetectorMBean.java     |   9 +-
 .../org/apache/cassandra/gms/GossipDigest.java  |  14 +-
 .../apache/cassandra/gms/GossipDigestAck.java   |  22 +-
 .../apache/cassandra/gms/GossipDigestAck2.java  |  22 +-
 .../gms/GossipDigestAck2VerbHandler.java        |   6 +-
 .../gms/GossipDigestAckVerbHandler.java         |  10 +-
 .../gms/GossipDigestSynVerbHandler.java         |  12 +-
 src/java/org/apache/cassandra/gms/Gossiper.java | 274 ++++---
 .../gms/IEndpointStateChangeSubscriber.java     |  16 +-
 .../gms/IFailureDetectionEventListener.java     |   4 +-
 .../apache/cassandra/gms/IFailureDetector.java  |  12 +-
 .../apache/cassandra/gms/VersionedValue.java    |  17 +
 .../apache/cassandra/hadoop/ConfigHelper.java   |  12 +
 .../hadoop/cql3/CqlBulkRecordWriter.java        |  20 +-
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |  12 +
 .../apache/cassandra/hints/HintVerbHandler.java |   6 +-
 .../cassandra/hints/HintsDispatchExecutor.java  |  12 +-
 .../cassandra/hints/HintsDispatchTrigger.java   |   2 +-
 .../apache/cassandra/hints/HintsDispatcher.java |  10 +-
 .../apache/cassandra/hints/HintsService.java    |   8 +-
 .../org/apache/cassandra/hints/HintsStore.java  |   6 +-
 .../io/DummyByteVersionedSerializer.java        |  55 ++
 .../cassandra/io/ShortVersionedSerializer.java  |  47 ++
 .../cassandra/io/sstable/SSTableLoader.java     |  31 +-
 .../locator/AbstractEndpointSnitch.java         |  19 +-
 .../locator/AbstractNetworkTopologySnitch.java  |   8 +-
 .../locator/AbstractReplicationStrategy.java    |  41 +-
 .../cassandra/locator/CloudstackSnitch.java     |  11 +-
 .../locator/DynamicEndpointSnitch.java          |  64 +-
 .../locator/DynamicEndpointSnitchMBean.java     |   2 +
 .../cassandra/locator/Ec2MultiRegionSnitch.java |  11 +
 .../org/apache/cassandra/locator/Ec2Snitch.java |  11 +-
 .../cassandra/locator/EndpointSnitchInfo.java   |   9 +-
 .../cassandra/locator/GoogleCloudSnitch.java    |  11 +-
 .../locator/GossipingPropertyFileSnitch.java    |  15 +-
 .../cassandra/locator/IEndpointSnitch.java      |  13 +-
 .../cassandra/locator/ILatencySubscriber.java   |   4 +-
 .../cassandra/locator/InetAddressAndPort.java   | 203 +++++
 .../apache/cassandra/locator/LocalStrategy.java |  11 +-
 .../locator/NetworkTopologyStrategy.java        |  23 +-
 .../locator/OldNetworkTopologyStrategy.java     |   5 +-
 .../cassandra/locator/PendingRangeMaps.java     |  57 +-
 .../cassandra/locator/PropertyFileSnitch.java   |  29 +-
 .../cassandra/locator/RackInferringSnitch.java  |  10 +-
 .../locator/ReconnectableSnitchHelper.java      |  60 +-
 .../apache/cassandra/locator/SeedProvider.java  |   3 +-
 .../cassandra/locator/SimpleSeedProvider.java   |   8 +-
 .../apache/cassandra/locator/SimpleSnitch.java  |   9 +-
 .../cassandra/locator/SimpleStrategy.java       |   7 +-
 .../apache/cassandra/locator/TokenMetadata.java | 221 +++--
 .../cassandra/metrics/ConnectionMetrics.java    |   8 +-
 .../cassandra/metrics/HintedHandoffMetrics.java |  25 +-
 .../cassandra/metrics/HintsServiceMetrics.java  |  11 +-
 .../cassandra/metrics/MessagingMetrics.java     |   4 +-
 .../cassandra/metrics/StreamingMetrics.java     |  11 +-
 .../apache/cassandra/net/BackPressureState.java |   4 +-
 .../cassandra/net/BackPressureStrategy.java     |   5 +-
 .../org/apache/cassandra/net/CallbackInfo.java  |   7 +-
 .../net/CompactEndpointSerializationHelper.java | 108 ++-
 .../cassandra/net/ForwardToContainer.java       |  43 +
 .../cassandra/net/ForwardToSerializer.java      |  86 ++
 .../apache/cassandra/net/IAsyncCallback.java    |   7 +-
 .../net/IAsyncCallbackWithFailure.java          |   5 +-
 .../org/apache/cassandra/net/IMessageSink.java  |   4 +-
 .../cassandra/net/MessageDeliveryTask.java      |  14 +-
 .../org/apache/cassandra/net/MessageIn.java     |  74 +-
 .../org/apache/cassandra/net/MessageOut.java    |  81 +-
 .../apache/cassandra/net/MessagingService.java  | 271 ++++--
 .../cassandra/net/MessagingServiceMBean.java    |  26 +
 .../org/apache/cassandra/net/ParameterType.java |  69 ++
 .../cassandra/net/RateBasedBackPressure.java    |   4 +-
 .../net/RateBasedBackPressureState.java         |   8 +-
 .../apache/cassandra/net/WriteCallbackInfo.java |   7 +-
 .../cassandra/net/async/HandshakeProtocol.java  |  18 +-
 .../net/async/InboundHandshakeHandler.java      |   7 +-
 .../cassandra/net/async/MessageInHandler.java   |  21 +-
 .../cassandra/net/async/MessageOutHandler.java  |  12 +-
 .../cassandra/net/async/NettyFactory.java       |  15 +-
 .../net/async/OutboundConnectionIdentifier.java |  48 +-
 .../net/async/OutboundMessagingConnection.java  |  13 +-
 .../net/async/OutboundMessagingPool.java        |  15 +-
 .../repair/AsymmetricLocalSyncTask.java         |   8 +-
 .../repair/AsymmetricRemoteSyncTask.java        |   6 +-
 .../cassandra/repair/AsymmetricSyncTask.java    |   8 +-
 .../apache/cassandra/repair/LocalSyncTask.java  |  13 +-
 .../org/apache/cassandra/repair/NodePair.java   |  25 +-
 .../apache/cassandra/repair/RemoteSyncTask.java |   4 +-
 .../org/apache/cassandra/repair/RepairJob.java  |  56 +-
 .../repair/RepairMessageVerbHandler.java        |   7 +-
 .../apache/cassandra/repair/RepairRunnable.java |  31 +-
 .../apache/cassandra/repair/RepairSession.java  |  38 +-
 .../apache/cassandra/repair/SnapshotTask.java   |  10 +-
 .../cassandra/repair/StreamingRepairTask.java   |  17 +-
 .../repair/SystemDistributedKeyspace.java       |  98 ++-
 .../apache/cassandra/repair/TreeResponse.java   |   7 +-
 .../apache/cassandra/repair/ValidationTask.java |   7 +-
 .../org/apache/cassandra/repair/Validator.java  |  12 +-
 .../repair/asymmetric/DifferenceHolder.java     |  16 +-
 .../repair/asymmetric/HostDifferences.java      |  14 +-
 .../asymmetric/IncomingRepairStreamTracker.java |   4 +-
 .../repair/asymmetric/PreferedNodeFilter.java   |   5 +-
 .../repair/asymmetric/ReduceHelper.java         |  42 +-
 .../repair/asymmetric/StreamFromOptions.java    |  16 +-
 .../repair/consistent/ConsistentSession.java    |  21 +-
 .../repair/consistent/CoordinatorSession.java   |  24 +-
 .../repair/consistent/CoordinatorSessions.java  |   4 +-
 .../repair/consistent/LocalSessionInfo.java     |   6 +-
 .../repair/consistent/LocalSessions.java        |  82 +-
 .../repair/consistent/SyncStatSummary.java      |  14 +-
 .../repair/messages/AsymmetricSyncRequest.java  |  25 +-
 .../repair/messages/FinalizePromise.java        |  18 +-
 .../messages/PrepareConsistentRequest.java      |  31 +-
 .../messages/PrepareConsistentResponse.java     |  17 +-
 .../cassandra/repair/messages/SyncComplete.java |   4 +-
 .../cassandra/repair/messages/SyncRequest.java  |  26 +-
 .../cassandra/schema/MigrationManager.java      |  24 +-
 .../apache/cassandra/schema/MigrationTask.java  |   6 +-
 .../cassandra/service/AbstractReadExecutor.java |  42 +-
 .../service/AbstractWriteResponseHandler.java   |  16 +-
 .../cassandra/service/ActiveRepairService.java  |  64 +-
 .../service/BatchlogResponseHandler.java        |   6 +-
 .../cassandra/service/CassandraDaemon.java      |   7 +-
 .../apache/cassandra/service/ClientState.java   |   2 +-
 .../apache/cassandra/service/DataResolver.java  |  18 +-
 .../DatacenterSyncWriteResponseHandler.java     |   8 +-
 .../service/DatacenterWriteResponseHandler.java |   8 +-
 .../service/IEndpointLifecycleSubscriber.java   |  12 +-
 .../cassandra/service/LoadBroadcaster.java      |  20 +-
 .../apache/cassandra/service/ReadCallback.java  |  20 +-
 .../apache/cassandra/service/StartupChecks.java |   4 +-
 .../apache/cassandra/service/StorageProxy.java  | 320 ++++----
 .../cassandra/service/StorageProxyMBean.java    |   3 +-
 .../cassandra/service/StorageService.java       | 818 ++++++++++++-------
 .../cassandra/service/StorageServiceMBean.java  |  57 +-
 .../apache/cassandra/service/TokenRange.java    |  50 +-
 .../cassandra/service/WriteResponseHandler.java |  12 +-
 .../service/paxos/PrepareCallback.java          |  10 +-
 .../cassandra/streaming/ProgressInfo.java       |  14 +-
 .../apache/cassandra/streaming/SessionInfo.java |  12 +-
 .../cassandra/streaming/SessionSummary.java     |  23 +-
 .../cassandra/streaming/StreamCoordinator.java  |  24 +-
 .../apache/cassandra/streaming/StreamEvent.java |   4 +-
 .../cassandra/streaming/StreamManager.java      |  10 +-
 .../apache/cassandra/streaming/StreamPlan.java  |  16 +-
 .../cassandra/streaming/StreamResultFuture.java |  15 +-
 .../cassandra/streaming/StreamSession.java      |  43 +-
 .../async/NettyStreamingMessageSender.java      |   4 +-
 .../async/StreamingInboundHandler.java          |  10 +-
 .../management/ProgressInfoCompositeData.java   |  30 +-
 .../SessionCompleteEventCompositeData.java      |   8 +-
 .../management/SessionInfoCompositeData.java    |  44 +-
 .../streaming/messages/FileMessageHeader.java   |  14 +-
 .../streaming/messages/OutgoingFileMessage.java |   2 +-
 .../streaming/messages/StreamInitMessage.java   |  12 +-
 .../tools/BulkLoadConnectionFactory.java        |  10 +-
 .../org/apache/cassandra/tools/BulkLoader.java  |  27 +-
 .../apache/cassandra/tools/LoaderOptions.java   |  77 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |  63 +-
 .../org/apache/cassandra/tools/NodeTool.java    |  26 +
 .../tools/nodetool/DescribeCluster.java         |   4 +-
 .../cassandra/tools/nodetool/DescribeRing.java  |   4 +-
 .../tools/nodetool/FailureDetectorInfo.java     |   2 +-
 .../cassandra/tools/nodetool/GetEndpoints.java  |  18 +-
 .../cassandra/tools/nodetool/GossipInfo.java    |   4 +-
 .../tools/nodetool/HostStatWithPort.java        |  44 +
 .../cassandra/tools/nodetool/NetStats.java      |  10 +-
 .../cassandra/tools/nodetool/RemoveNode.java    |   6 +-
 .../cassandra/tools/nodetool/RepairAdmin.java   |   6 +-
 .../apache/cassandra/tools/nodetool/Ring.java   | 211 +++--
 .../tools/nodetool/SetHostStatWithPort.java     |  56 ++
 .../apache/cassandra/tools/nodetool/Status.java | 200 +++--
 .../cassandra/tracing/ExpiredTraceState.java    |   2 +-
 .../apache/cassandra/tracing/TraceKeyspace.java |  48 +-
 .../apache/cassandra/tracing/TraceState.java    |   6 +-
 .../cassandra/tracing/TraceStateImpl.java       |   4 +-
 .../org/apache/cassandra/tracing/Tracing.java   |  52 +-
 .../apache/cassandra/tracing/TracingImpl.java   |   3 +-
 .../org/apache/cassandra/transport/Event.java   |  21 +-
 .../cassandra/transport/ProtocolVersion.java    |   9 +-
 .../org/apache/cassandra/transport/Server.java  |  60 +-
 .../transport/messages/ErrorMessage.java        |  13 +-
 .../org/apache/cassandra/utils/FBUtilities.java |  53 +-
 .../apache/cassandra/utils/JMXServerUtils.java  |   2 +-
 .../org/apache/cassandra/utils/Mx4jTool.java    |   2 +-
 .../utils/NativeSSTableLoaderClient.java        |  46 +-
 .../org/apache/cassandra/utils/UUIDGen.java     |   2 +-
 test/conf/cassandra.yaml                        |   3 +-
 .../serialization/4.0/gms.EndpointState.bin     | Bin 73 -> 73 bytes
 test/data/serialization/4.0/gms.Gossip.bin      | Bin 158 -> 166 bytes
 .../serialization/4.0/service.SyncComplete.bin  | Bin 538 -> 554 bytes
 .../serialization/4.0/service.SyncRequest.bin   | Bin 227 -> 241 bytes
 .../4.0/service.ValidationComplete.bin          | Bin 1251 -> 1257 bytes
 .../4.0/service.ValidationRequest.bin           | Bin 167 -> 169 bytes
 .../locator/DynamicEndpointSnitchLongTest.java  |  15 +-
 .../cassandra/streaming/LongStreamingTest.java  |   4 +-
 .../test/microbench/PendingRangesBench.java     |  10 +-
 .../OffsetAwareConfigurationLoader.java         |  32 +
 test/unit/org/apache/cassandra/Util.java        |   9 +-
 .../batchlog/BatchlogEndpointFilterTest.java    |  84 +-
 .../cassandra/batchlog/BatchlogManagerTest.java |   6 +-
 .../config/DatabaseDescriptorRefTest.java       |   2 +
 .../org/apache/cassandra/cql3/CQLTester.java    |  12 +-
 .../cassandra/cql3/PreparedStatementsTest.java  |   1 -
 .../apache/cassandra/cql3/ViewComplexTest.java  |   8 +-
 .../cql3/validation/operations/CreateTest.java  |   8 +-
 .../org/apache/cassandra/db/CleanupTest.java    |  22 +-
 .../cassandra/db/DiskBoundaryManagerTest.java   |   6 +-
 .../apache/cassandra/db/ReadCommandTest.java    |  12 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |   6 +-
 .../db/SystemKeyspaceMigrator40Test.java        | 192 +++++
 .../apache/cassandra/db/SystemKeyspaceTest.java |   6 +-
 .../compaction/AbstractPendingRepairTest.java   |   4 +-
 .../db/compaction/AntiCompactionTest.java       |   4 +-
 ...tionManagerGetSSTablesForValidationTest.java |   6 +-
 .../LeveledCompactionStrategyTest.java          |   4 +-
 .../apache/cassandra/db/view/ViewUtilsTest.java |  48 +-
 .../apache/cassandra/dht/BootStrapperTest.java  |  34 +-
 .../dht/RangeFetchMapCalculatorTest.java        | 116 +--
 .../cassandra/dht/StreamStateStoreTest.java     |   4 +-
 .../apache/cassandra/gms/ArrivalWindowTest.java |  15 +-
 .../apache/cassandra/gms/EndpointStateTest.java |   8 +-
 .../cassandra/gms/FailureDetectorTest.java      |   8 +-
 .../apache/cassandra/gms/GossipDigestTest.java  |   5 +-
 .../org/apache/cassandra/gms/GossiperTest.java  |   6 +-
 .../gms/PendingRangeCalculatorServiceTest.java  |   8 +-
 .../cassandra/gms/SerializationsTest.java       |  12 +-
 .../org/apache/cassandra/hints/HintTest.java    |  12 +-
 .../cassandra/hints/HintsServiceTest.java       |  14 +-
 .../io/sstable/CQLSSTableWriterTest.java        |   2 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java |   2 +-
 .../cassandra/locator/CloudstackSnitchTest.java |   7 +-
 .../locator/DynamicEndpointSnitchTest.java      |  17 +-
 .../apache/cassandra/locator/EC2SnitchTest.java |   8 +-
 .../locator/GoogleCloudSnitchTest.java          |   7 +-
 .../GossipingPropertyFileSnitchTest.java        |  15 +-
 .../locator/InetAddressAndPortTest.java         | 143 ++++
 .../locator/NetworkTopologyStrategyTest.java    |  73 +-
 .../locator/OldNetworkTopologyStrategyTest.java |  21 +-
 .../cassandra/locator/PendingRangeMapsTest.java |  37 +-
 .../locator/PropertyFileSnitchTest.java         | 108 +--
 .../locator/ReconnectableSnitchHelperTest.java  |   2 +-
 .../ReplicationStrategyEndpointCacheTest.java   |  35 +-
 .../cassandra/locator/SimpleStrategyTest.java   |  19 +-
 .../cassandra/locator/TokenMetadataTest.java    |  49 +-
 .../metrics/HintedHandOffMetricsTest.java       |  10 +-
 .../CompactEndpointSerializationHelperTest.java |  72 ++
 .../cassandra/net/ForwardToContainerTest.java   | 100 +++
 test/unit/org/apache/cassandra/net/Matcher.java |   4 +-
 .../apache/cassandra/net/MatcherResponse.java   |  11 +-
 .../cassandra/net/MessagingServiceTest.java     |  82 +-
 .../cassandra/net/MockMessagingService.java     |  15 +-
 .../cassandra/net/MockMessagingServiceTest.java |   6 +-
 .../net/RateBasedBackPressureTest.java          |  42 +-
 .../cassandra/net/WriteCallbackInfoTest.java    |   4 +-
 .../cassandra/net/async/ChannelWriterTest.java  |   6 +-
 .../net/async/HandshakeHandlersTest.java        |   8 +-
 .../net/async/HandshakeProtocolTest.java        |   2 +-
 .../net/async/InboundHandshakeHandlerTest.java  |  18 +-
 .../net/async/MessageInHandlerTest.java         |  39 +-
 .../net/async/MessageOutHandlerTest.java        |  16 +-
 .../cassandra/net/async/NettyFactoryTest.java   |  15 +-
 .../net/async/OutboundHandshakeHandlerTest.java |   6 +-
 .../async/OutboundMessagingConnectionTest.java  |  48 +-
 .../net/async/OutboundMessagingPoolTest.java    |  11 +-
 .../cassandra/repair/AbstractRepairTest.java    |  20 +-
 .../cassandra/repair/LocalSyncTaskTest.java     |  12 +-
 .../cassandra/repair/RepairRunnableTest.java    |   3 +-
 .../cassandra/repair/RepairSessionTest.java     |   7 +-
 .../apache/cassandra/repair/ValidatorTest.java  |  12 +-
 .../repair/asymmetric/DifferenceHolderTest.java |  10 +-
 .../repair/asymmetric/ReduceHelperTest.java     |  74 +-
 .../asymmetric/StreamFromOptionsTest.java       |  36 +-
 .../AbstractConsistentSessionTest.java          |  20 +-
 .../consistent/CoordinatorSessionTest.java      |  30 +-
 .../consistent/CoordinatorSessionsTest.java     |  12 +-
 .../repair/consistent/LocalSessionAccessor.java |   4 +-
 .../repair/consistent/LocalSessionTest.java     |  17 +-
 .../consistent/PendingAntiCompactionTest.java   |   6 +-
 .../RepairMessageSerializationsTest.java        |  12 +-
 .../messages/RepairMessageSerializerTest.java   |  14 +-
 .../service/ActiveRepairServiceTest.java        |  58 +-
 .../cassandra/service/DataResolverTest.java     |  84 +-
 .../service/LeaveAndBootstrapTest.java          |  63 +-
 .../org/apache/cassandra/service/MoveTest.java  | 146 ++--
 .../service/ProtocolBetaVersionTest.java        |   2 +-
 .../cassandra/service/ReadExecutorTest.java     |   6 +-
 .../apache/cassandra/service/RemoveTest.java    |  14 +-
 .../cassandra/service/SerializationsTest.java   |  30 +-
 .../cassandra/service/StorageProxyTest.java     |   6 +-
 .../service/StorageServiceServerTest.java       | 170 ++--
 .../service/WriteResponseHandlerTest.java       |  25 +-
 .../cassandra/streaming/SessionInfoTest.java    |   4 +-
 .../streaming/StreamTransferTaskTest.java       |   6 +-
 .../streaming/StreamingTransferTest.java        |   4 +-
 .../async/NettyStreamingMessageSenderTest.java  |   8 +-
 .../async/StreamingInboundHandlerTest.java      |  12 +-
 .../apache/cassandra/tracing/TracingTest.java   |   3 +-
 .../cassandra/transport/ErrorMessageTest.java   |  20 +-
 .../transport/ProtocolVersionTest.java          |   9 +-
 .../cassandra/transport/SerDeserTest.java       |  12 +-
 .../apache/cassandra/utils/FBUtilitiesTest.java |   8 +-
 .../cassandra/stress/CompactionStress.java      |   4 +-
 .../cassandra/stress/settings/SettingsNode.java |  16 +-
 .../cassandra/stress/util/JavaDriverClient.java |   7 +-
 337 files changed, 6456 insertions(+), 3772 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b29ae78..d8ae88f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Allow storage port to be configurable per node (CASSANDRA-7544)
  * Make sub-range selection for non-frozen collections return null instead of 
empty (CASSANDRA-14182)
  * BloomFilter serialization format should not change byte ordering 
(CASSANDRA-9067)
  * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 06d73ea..c314030 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -39,7 +39,7 @@ New features
      This property can be modified at runtime through both JMX and the new 
`setconcurrentviewbuilders`
      and `getconcurrentviewbuilders` nodetool commands. See CASSANDRA-12245 
for more details.
    - There is now a binary full query log based on Chronicle Queue that can be 
controlled using
-     nodetool enablefullquerylog, disablefullquerylog, and resetfullquerylog. 
The log 
+     nodetool enablefullquerylog, disablefullquerylog, and resetfullquerylog. 
The log
      contains all queries invoked, approximate time they were invoked, any 
parameters necessary
      to bind wildcard values, and all query options. A human readable version 
of the log can be
      dumped or tailed using the new bin/fqltool utility. The full query log is 
designed to be safe
@@ -103,6 +103,22 @@ Upgrading
          but blocks for up to a configurable number of milliseconds between 
disk flushes.
        - nodetool clearsnapshot now required the --all flag to remove all 
snapshots.
          Previous behavior would delete all snapshots by default.
+    - Nodes are now identified by a combination of IP, and storage port.
+      Existing JMX APIs, nodetool, and system tables continue to work
+      and accept/return just an IP, but there is a new
+      version of each that works with the full unambiguous identifier.
+      You should prefer these over the deprecated ambiguous versions that only
+      work with an IP. This was done to support multiple instances per IP.
+      Additionally we are moving to only using a single port for encrypted and
+      unencrypted traffic and if you want multiple instances per IP you must
+      first switch encrypted traffic to the storage port and not a separate
+      encrypted port. If you want to use multiple instances per IP
+      with SSL you will need to use StartTLS on storage_port and set
+      outgoing_encrypted_port_source to gossip outbound connections
+      know what port to connect to for each instance. Before changing
+      storage port or native port at nodes you must first upgrade the entire 
cluster
+      and clients to 4.0 so they can handle the port not being consistent 
across
+      the cluster.
 
 Materialized Views
 -------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/bin/cqlsh.py
----------------------------------------------------------------------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index 61ea160..3055110 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -455,7 +455,8 @@ class Shell(cmd.Cmd):
                  single_statement=None,
                  request_timeout=DEFAULT_REQUEST_TIMEOUT_SECONDS,
                  protocol_version=None,
-                 connect_timeout=DEFAULT_CONNECT_TIMEOUT_SECONDS):
+                 connect_timeout=DEFAULT_CONNECT_TIMEOUT_SECONDS,
+                 allow_server_port_discovery=False):
         cmd.Cmd.__init__(self, completekey=completekey)
         self.hostname = hostname
         self.port = port
@@ -470,6 +471,7 @@ class Shell(cmd.Cmd):
         self.tracing_enabled = tracing_enabled
         self.page_size = self.default_page_size
         self.expand_enabled = expand_enabled
+        self.allow_server_port_discovery = allow_server_port_discovery
         if use_conn:
             self.conn = use_conn
         else:
@@ -482,6 +484,7 @@ class Shell(cmd.Cmd):
                                 
load_balancing_policy=WhiteListRoundRobinPolicy([self.hostname]),
                                 control_connection_timeout=connect_timeout,
                                 connect_timeout=connect_timeout,
+                                
allow_server_port_discovery=allow_server_port_discovery,
                                 **kwargs)
         self.owns_connection = not use_conn
 
@@ -1768,7 +1771,8 @@ class Shell(cmd.Cmd):
                          display_timezone=self.display_timezone,
                          max_trace_wait=self.max_trace_wait, ssl=self.ssl,
                          request_timeout=self.session.default_timeout,
-                         connect_timeout=self.conn.connect_timeout)
+                         connect_timeout=self.conn.connect_timeout,
+                         
allow_server_port_discovery=self.allow_server_port_discovery)
         subshell.cmdloop()
         f.close()
 
@@ -2252,6 +2256,7 @@ def read_options(cmdlineargs, environment):
     optvalues.connect_timeout = option_with_default(configs.getint, 
'connection', 'timeout', DEFAULT_CONNECT_TIMEOUT_SECONDS)
     optvalues.request_timeout = option_with_default(configs.getint, 
'connection', 'request_timeout', DEFAULT_REQUEST_TIMEOUT_SECONDS)
     optvalues.execute = None
+    optvalues.allow_server_port_discovery = 
option_with_default(configs.getboolean, 'connection', 
'allow_server_port_discovery', 'False')
 
     (options, arguments) = parser.parse_args(cmdlineargs, values=optvalues)
 
@@ -2415,7 +2420,8 @@ def main(options, hostname, port):
                       single_statement=options.execute,
                       request_timeout=options.request_timeout,
                       connect_timeout=options.connect_timeout,
-                      encoding=options.encoding)
+                      encoding=options.encoding,
+                      
allow_server_port_discovery=options.allow_server_port_discovery)
     except KeyboardInterrupt:
         sys.exit('Connection aborted.')
     except CQL_ERRORS, e:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 8a033e0..5796868 100644
--- a/build.xml
+++ b/build.xml
@@ -306,7 +306,7 @@
       <!-- define the remote repositories we use -->
       <artifact:remoteRepository id="central"   
url="${artifact.remoteRepository.central}"/>
       <artifact:remoteRepository id="apache"    
url="${artifact.remoteRepository.apache}"/>
-      
+
       <macrodef name="install">
         <attribute name="pomFile"/>
         <attribute name="file"/>
@@ -437,7 +437,7 @@
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" 
version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" 
version="2.5.2" />
          <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
-          <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" version="3.0.1" classifier="shaded">
+          <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" version="4.0.0-SNAPSHOT" classifier="shaded">
             <exclusion groupId="io.netty" artifactId="netty-buffer"/>
             <exclusion groupId="io.netty" artifactId="netty-codec"/>
             <exclusion groupId="io.netty" artifactId="netty-handler"/>
@@ -522,7 +522,7 @@
        <dependency groupId="org.apache.hadoop" 
artifactId="hadoop-minicluster"/>
        <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
         <dependency groupId="org.antlr" artifactId="antlr"/>
-       <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE     
+       <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
         <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded"/>
        -->
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
@@ -543,7 +543,7 @@
         <dependency groupId="junit" artifactId="junit"/>
        <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
         <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded"/>
-       -->     
+       -->
         <dependency groupId="io.netty" artifactId="netty-all"/>
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
@@ -624,7 +624,7 @@
           <exclusion groupId="io.netty" artifactId="netty-transport"/>
         </dependency>
        -->
-       
+
         <!-- don't need jna to run, but nice to have -->
         <dependency groupId="net.java.dev.jna" artifactId="jna"/>
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 80e4515..3bed3a6 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -416,7 +416,7 @@ seed_provider:
       parameters:
           # seeds is actually a comma-delimited list of addresses.
           # Ex: "<ip1>,<ip2>,<ip3>"
-          - seeds: "127.0.0.1"
+          - seeds: "127.0.0.1:7000"
 
 # For workloads with more data than can fit in memory, Cassandra's
 # bottleneck will be reads that need to fetch data from
@@ -945,6 +945,13 @@ dynamic_snitch_badness_threshold: 0.1
 # the keystore and truststore.  For instructions on generating these files, 
see:
 # 
http://download.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
 #
+# If you are taking advantage of StartTLS outbound connections will have the 
issue that they can't know
+# what encrypted port to connect to in a foolproof way. 
outgoing_encrypted_port_source deals with this confusion
+# by allowing you to specify how you want a node to pick an outgoing port for 
intra-cluster connections.
+# Valid values are "gossip" and "yaml". Gossip will always connect to the 
storage port for a node that is
+# published via a gossip which is always going to be the plain storage port. 
"yaml" will always select
+# the port configured as ssl_storage_port on THIS node. If you want to use SSL 
and have different storage
+# ports across the cluster you must select "gossip" and use StartTLS on 
storage_port.
 server_encryption_options:
     # set to true for allowing secure incoming connections
     enabled: false

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar 
b/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar
deleted file mode 100644
index a7be9cb..0000000
Binary files a/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar and 
/dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/lib/cassandra-driver-core-4.0.0-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-4.0.0-SNAPSHOT-shaded.jar 
b/lib/cassandra-driver-core-4.0.0-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..609c393
Binary files /dev/null and 
b/lib/cassandra-driver-core-4.0.0-SNAPSHOT-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/lib/cassandra-driver-internal-only-3.11.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.11.zip 
b/lib/cassandra-driver-internal-only-3.11.zip
deleted file mode 100644
index f7760af..0000000
Binary files a/lib/cassandra-driver-internal-only-3.11.zip and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/lib/cassandra-driver-internal-only-3.12.0.post0-9ee88ded.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.12.0.post0-9ee88ded.zip 
b/lib/cassandra-driver-internal-only-3.12.0.post0-9ee88ded.zip
new file mode 100644
index 0000000..4aa91b7
Binary files /dev/null and 
b/lib/cassandra-driver-internal-only-3.12.0.post0-9ee88ded.zip differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java 
b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 807e970..f232bdc 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.batchlog;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
@@ -34,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.*;
@@ -250,7 +250,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         int positionInPage = 0;
         ArrayList<ReplayingBatch> unfinishedBatches = new 
ArrayList<>(pageSize);
 
-        Set<InetAddress> hintedNodes = new HashSet<>();
+        Set<InetAddressAndPort> hintedNodes = new HashSet<>();
         Set<UUID> replayedBatches = new HashSet<>();
 
         // Sending out batches for replay without waiting for them, so that 
one stuck batch doesn't affect others
@@ -295,7 +295,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         replayedBatches.forEach(BatchlogManager::remove);
     }
 
-    private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, 
Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
+    private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, 
Set<InetAddressAndPort> hintedNodes, Set<UUID> replayedBatches)
     {
         // schedule hints for timed out deliveries
         for (ReplayingBatch batch : batches)
@@ -330,7 +330,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             this.replayedBytes = addMutations(version, serializedMutations);
         }
 
-        public int replay(RateLimiter rateLimiter, Set<InetAddress> 
hintedNodes) throws IOException
+        public int replay(RateLimiter rateLimiter, Set<InetAddressAndPort> 
hintedNodes) throws IOException
         {
             logger.trace("Replaying batch {}", id);
 
@@ -348,7 +348,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             return replayHandlers.size();
         }
 
-        public void finish(Set<InetAddress> hintedNodes)
+        public void finish(Set<InetAddressAndPort> hintedNodes)
         {
             for (int i = 0; i < replayHandlers.size(); i++)
             {
@@ -396,7 +396,7 @@ public class BatchlogManager implements BatchlogManagerMBean
                 mutations.add(mutation);
         }
 
-        private void writeHintsForUndeliveredEndpoints(int startFrom, 
Set<InetAddress> hintedNodes)
+        private void writeHintsForUndeliveredEndpoints(int startFrom, 
Set<InetAddressAndPort> hintedNodes)
         {
             int gcgs = gcgs(mutations);
 
@@ -420,7 +420,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         private static List<ReplayWriteResponseHandler<Mutation>> 
sendReplays(List<Mutation> mutations,
                                                                               
long writtenAt,
-                                                                              
Set<InetAddress> hintedNodes)
+                                                                              
Set<InetAddressAndPort> hintedNodes)
         {
             List<ReplayWriteResponseHandler<Mutation>> handlers = new 
ArrayList<>(mutations.size());
             for (Mutation mutation : mutations)
@@ -440,15 +440,15 @@ public class BatchlogManager implements 
BatchlogManagerMBean
          */
         private static ReplayWriteResponseHandler<Mutation> 
sendSingleReplayMutation(final Mutation mutation,
                                                                                
      long writtenAt,
-                                                                               
      Set<InetAddress> hintedNodes)
+                                                                               
      Set<InetAddressAndPort> hintedNodes)
         {
-            Set<InetAddress> liveEndpoints = new HashSet<>();
+            Set<InetAddressAndPort> liveEndpoints = new HashSet<>();
             String ks = mutation.getKeyspaceName();
             Token tk = mutation.key().getToken();
 
-            for (InetAddress endpoint : 
StorageService.instance.getNaturalAndPendingEndpoints(ks, tk))
+            for (InetAddressAndPort endpoint : 
StorageService.instance.getNaturalAndPendingEndpoints(ks, tk))
             {
-                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+                if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
                 {
                     mutation.apply();
                 }
@@ -469,7 +469,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
             ReplayWriteResponseHandler<Mutation> handler = new 
ReplayWriteResponseHandler<>(liveEndpoints, System.nanoTime());
             MessageOut<Mutation> message = mutation.createMessage();
-            for (InetAddress endpoint : liveEndpoints)
+            for (InetAddressAndPort endpoint : liveEndpoints)
                 MessagingService.instance().sendRR(message, endpoint, handler, 
false);
             return handler;
         }
@@ -488,11 +488,11 @@ public class BatchlogManager implements 
BatchlogManagerMBean
          */
         private static class ReplayWriteResponseHandler<T> extends 
WriteResponseHandler<T>
         {
-            private final Set<InetAddress> undelivered = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+            private final Set<InetAddressAndPort> undelivered = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints, 
long queryStartNanoTime)
+            ReplayWriteResponseHandler(Collection<InetAddressAndPort> 
writeEndpoints, long queryStartNanoTime)
             {
-                super(writeEndpoints, Collections.<InetAddress>emptySet(), 
null, null, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
+                super(writeEndpoints, 
Collections.<InetAddressAndPort>emptySet(), null, null, null, 
WriteType.UNLOGGED_BATCH, queryStartNanoTime);
                 undelivered.addAll(writeEndpoints);
             }
 
@@ -505,7 +505,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             @Override
             public void response(MessageIn<T> m)
             {
-                boolean removed = undelivered.remove(m == null ? 
FBUtilities.getBroadcastAddress() : m.from);
+                boolean removed = undelivered.remove(m == null ? 
FBUtilities.getBroadcastAddressAndPort() : m.from);
                 assert removed;
                 super.response(m);
             }
@@ -515,9 +515,9 @@ public class BatchlogManager implements BatchlogManagerMBean
     public static class EndpointFilter
     {
         private final String localRack;
-        private final Multimap<String, InetAddress> endpoints;
+        private final Multimap<String, InetAddressAndPort> endpoints;
 
-        public EndpointFilter(String localRack, Multimap<String, InetAddress> 
endpoints)
+        public EndpointFilter(String localRack, Multimap<String, 
InetAddressAndPort> endpoints)
         {
             this.localRack = localRack;
             this.endpoints = endpoints;
@@ -526,15 +526,15 @@ public class BatchlogManager implements 
BatchlogManagerMBean
         /**
          * @return list of candidates for batchlog hosting. If possible these 
will be two nodes from different racks.
          */
-        public Collection<InetAddress> filter()
+        public Collection<InetAddressAndPort> filter()
         {
             // special case for single-node data centers
             if (endpoints.values().size() == 1)
                 return endpoints.values();
 
             // strip out dead endpoints and localhost
-            ListMultimap<String, InetAddress> validated = 
ArrayListMultimap.create();
-            for (Map.Entry<String, InetAddress> entry : endpoints.entries())
+            ListMultimap<String, InetAddressAndPort> validated = 
ArrayListMultimap.create();
+            for (Map.Entry<String, InetAddressAndPort> entry : 
endpoints.entries())
                 if (isValid(entry.getValue()))
                     validated.put(entry.getKey(), entry.getValue());
 
@@ -554,7 +554,7 @@ public class BatchlogManager implements BatchlogManagerMBean
                  * pick two random nodes from there; we are guaranteed to have 
at least two nodes in the single remaining rack
                  * because of the preceding if block.
                  */
-                List<InetAddress> otherRack = 
Lists.newArrayList(validated.values());
+                List<InetAddressAndPort> otherRack = 
Lists.newArrayList(validated.values());
                 shuffle(otherRack);
                 return otherRack.subList(0, 2);
             }
@@ -572,10 +572,10 @@ public class BatchlogManager implements 
BatchlogManagerMBean
             }
 
             // grab a random member of up to two racks
-            List<InetAddress> result = new ArrayList<>(2);
+            List<InetAddressAndPort> result = new ArrayList<>(2);
             for (String rack : Iterables.limit(racks, 2))
             {
-                List<InetAddress> rackMembers = validated.get(rack);
+                List<InetAddressAndPort> rackMembers = validated.get(rack);
                 result.add(rackMembers.get(getRandomInt(rackMembers.size())));
             }
 
@@ -583,9 +583,9 @@ public class BatchlogManager implements BatchlogManagerMBean
         }
 
         @VisibleForTesting
-        protected boolean isValid(InetAddress input)
+        protected boolean isValid(InetAddressAndPort input)
         {
-            return !input.equals(FBUtilities.getBroadcastAddress()) && 
FailureDetector.instance.isAlive(input);
+            return !input.equals(FBUtilities.getBroadcastAddressAndPort()) && 
FailureDetector.instance.isAlive(input);
         }
 
         @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a656d1f..9012e3a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -55,6 +55,7 @@ import 
org.apache.cassandra.io.util.SsdDiskOptimizationStrategy;
 import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.locator.EndpointSnitchInfo;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.SeedProvider;
 import org.apache.cassandra.net.BackPressureStrategy;
 import org.apache.cassandra.net.RateBasedBackPressure;
@@ -110,7 +111,7 @@ public class DatabaseDescriptor
     private static long indexSummaryCapacityInMB;
 
     private static String localDC;
-    private static Comparator<InetAddress> localComparator;
+    private static Comparator<InetAddressAndPort> localComparator;
     private static EncryptionContext encryptionContext;
     private static boolean hasLoggedConfig;
 
@@ -307,6 +308,7 @@ public class DatabaseDescriptor
 
     private static void applyAll() throws ConfigurationException
     {
+        //InetAddressAndPort cares that applySimpleConfig runs first
         applySimpleConfig();
 
         applyPartitioner();
@@ -324,6 +326,9 @@ public class DatabaseDescriptor
 
     private static void applySimpleConfig()
     {
+        //Doing this first before all other things in case other pieces of 
config want to construct
+        //InetAddressAndPort and get the right defaults
+        InetAddressAndPort.initializeDefaultPort(getStoragePort());
 
         if (conf.commitlog_sync == null)
         {
@@ -827,7 +832,7 @@ public class DatabaseDescriptor
         }
         else
         {
-            rpcAddress = FBUtilities.getLocalAddress();
+            rpcAddress = FBUtilities.getJustLocalAddress();
         }
 
         /* RPC address to broadcast */
@@ -956,10 +961,10 @@ public class DatabaseDescriptor
         snitch = createEndpointSnitch(conf.dynamic_snitch, 
conf.endpoint_snitch);
         EndpointSnitchInfo.create();
 
-        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
-        localComparator = new Comparator<InetAddress>()
+        localDC = 
snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+        localComparator = new Comparator<InetAddressAndPort>()
         {
-            public int compare(InetAddress endpoint1, InetAddress endpoint2)
+            public int compare(InetAddressAndPort endpoint1, 
InetAddressAndPort endpoint2)
             {
                 boolean local1 = 
localDC.equals(snitch.getDatacenter(endpoint1));
                 boolean local2 = 
localDC.equals(snitch.getDatacenter(endpoint2));
@@ -1319,14 +1324,14 @@ public class DatabaseDescriptor
         return conf.num_tokens;
     }
 
-    public static InetAddress getReplaceAddress()
+    public static InetAddressAndPort getReplaceAddress()
     {
         try
         {
             if (System.getProperty(Config.PROPERTY_PREFIX + "replace_address", 
null) != null)
-                return 
InetAddress.getByName(System.getProperty(Config.PROPERTY_PREFIX + 
"replace_address", null));
+                return 
InetAddressAndPort.getByName(System.getProperty(Config.PROPERTY_PREFIX + 
"replace_address", null));
             else if (System.getProperty(Config.PROPERTY_PREFIX + 
"replace_address_first_boot", null) != null)
-                return 
InetAddress.getByName(System.getProperty(Config.PROPERTY_PREFIX + 
"replace_address_first_boot", null));
+                return 
InetAddressAndPort.getByName(System.getProperty(Config.PROPERTY_PREFIX + 
"replace_address_first_boot", null));
             return null;
         }
         catch (UnknownHostException e)
@@ -1651,9 +1656,9 @@ public class DatabaseDescriptor
         return conf.saved_caches_directory;
     }
 
-    public static Set<InetAddress> getSeeds()
+    public static Set<InetAddressAndPort> getSeeds()
     {
-        return 
ImmutableSet.<InetAddress>builder().addAll(seedProvider.getSeeds()).build();
+        return 
ImmutableSet.<InetAddressAndPort>builder().addAll(seedProvider.getSeeds()).build();
     }
 
     public static InetAddress getListenAddress()
@@ -2206,7 +2211,7 @@ public class DatabaseDescriptor
         return localDC;
     }
 
-    public static Comparator<InetAddress> getLocalComparator()
+    public static Comparator<InetAddressAndPort> getLocalComparator()
     {
         return localComparator;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 2214d8d..f93e737 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,6 +27,7 @@ import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.ReadRepairDecision;
@@ -145,21 +145,21 @@ public enum ConsistencyLevel
         return isDCLocal;
     }
 
-    public boolean isLocal(InetAddress endpoint)
+    public boolean isLocal(InetAddressAndPort endpoint)
     {
         return 
DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
     }
 
-    public int countLocalEndpoints(Iterable<InetAddress> liveEndpoints)
+    public int countLocalEndpoints(Iterable<InetAddressAndPort> liveEndpoints)
     {
         int count = 0;
-        for (InetAddress endpoint : liveEndpoints)
+        for (InetAddressAndPort endpoint : liveEndpoints)
             if (isLocal(endpoint))
                 count++;
         return count;
     }
 
-    private Map<String, Integer> countPerDCEndpoints(Keyspace keyspace, 
Iterable<InetAddress> liveEndpoints)
+    private Map<String, Integer> countPerDCEndpoints(Keyspace keyspace, 
Iterable<InetAddressAndPort> liveEndpoints)
     {
         NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
keyspace.getReplicationStrategy();
 
@@ -167,7 +167,7 @@ public enum ConsistencyLevel
         for (String dc: strategy.getDatacenters())
             dcEndpoints.put(dc, 0);
 
-        for (InetAddress endpoint : liveEndpoints)
+        for (InetAddressAndPort endpoint : liveEndpoints)
         {
             String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint);
             dcEndpoints.put(dc, dcEndpoints.get(dc) + 1);
@@ -175,12 +175,12 @@ public enum ConsistencyLevel
         return dcEndpoints;
     }
 
-    public List<InetAddress> filterForQuery(Keyspace keyspace, 
List<InetAddress> liveEndpoints)
+    public List<InetAddressAndPort> filterForQuery(Keyspace keyspace, 
List<InetAddressAndPort> liveEndpoints)
     {
         return filterForQuery(keyspace, liveEndpoints, 
ReadRepairDecision.NONE);
     }
 
-    public List<InetAddress> filterForQuery(Keyspace keyspace, 
List<InetAddress> liveEndpoints, ReadRepairDecision readRepair)
+    public List<InetAddressAndPort> filterForQuery(Keyspace keyspace, 
List<InetAddressAndPort> liveEndpoints, ReadRepairDecision readRepair)
     {
         /*
          * If we are doing an each quorum query, we have to make sure that the 
endpoints we select
@@ -206,9 +206,9 @@ public enum ConsistencyLevel
             case GLOBAL:
                 return liveEndpoints;
             case DC_LOCAL:
-                List<InetAddress> local = new ArrayList<InetAddress>();
-                List<InetAddress> other = new ArrayList<InetAddress>();
-                for (InetAddress add : liveEndpoints)
+                List<InetAddressAndPort> local = new ArrayList<>();
+                List<InetAddressAndPort> other = new ArrayList<>();
+                for (InetAddressAndPort add : liveEndpoints)
                 {
                     if (isLocal(add))
                         local.add(add);
@@ -225,7 +225,7 @@ public enum ConsistencyLevel
         }
     }
 
-    private List<InetAddress> filterForEachQuorum(Keyspace keyspace, 
List<InetAddress> liveEndpoints, ReadRepairDecision readRepair)
+    private List<InetAddressAndPort> filterForEachQuorum(Keyspace keyspace, 
List<InetAddressAndPort> liveEndpoints, ReadRepairDecision readRepair)
     {
         NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
keyspace.getReplicationStrategy();
 
@@ -233,20 +233,20 @@ public enum ConsistencyLevel
         if (readRepair == ReadRepairDecision.GLOBAL)
             return liveEndpoints;
 
-        Map<String, List<InetAddress>> dcsEndpoints = new HashMap<>();
+        Map<String, List<InetAddressAndPort>> dcsEndpoints = new HashMap<>();
         for (String dc: strategy.getDatacenters())
             dcsEndpoints.put(dc, new ArrayList<>());
 
-        for (InetAddress add : liveEndpoints)
+        for (InetAddressAndPort add : liveEndpoints)
         {
             String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(add);
             dcsEndpoints.get(dc).add(add);
         }
 
-        List<InetAddress> waitSet = new ArrayList<>();
-        for (Map.Entry<String, List<InetAddress>> dcEndpoints : 
dcsEndpoints.entrySet())
+        List<InetAddressAndPort> waitSet = new ArrayList<>();
+        for (Map.Entry<String, List<InetAddressAndPort>> dcEndpoints : 
dcsEndpoints.entrySet())
         {
-            List<InetAddress> dcEndpoint = dcEndpoints.getValue();
+            List<InetAddressAndPort> dcEndpoint = dcEndpoints.getValue();
             if (readRepair == ReadRepairDecision.DC_LOCAL && 
dcEndpoints.getKey().equals(DatabaseDescriptor.getLocalDataCenter()))
                 waitSet.addAll(dcEndpoint);
             else
@@ -256,7 +256,7 @@ public enum ConsistencyLevel
         return waitSet;
     }
 
-    public boolean isSufficientLiveNodes(Keyspace keyspace, 
Iterable<InetAddress> liveEndpoints)
+    public boolean isSufficientLiveNodes(Keyspace keyspace, 
Iterable<InetAddressAndPort> liveEndpoints)
     {
         switch (this)
         {
@@ -283,7 +283,7 @@ public enum ConsistencyLevel
         }
     }
 
-    public void assureSufficientLiveNodes(Keyspace keyspace, 
Iterable<InetAddress> liveEndpoints) throws UnavailableException
+    public void assureSufficientLiveNodes(Keyspace keyspace, 
Iterable<InetAddressAndPort> liveEndpoints) throws UnavailableException
     {
         int blockFor = blockFor(keyspace);
         switch (this)
@@ -302,7 +302,7 @@ public enum ConsistencyLevel
                     if (logger.isTraceEnabled())
                     {
                         StringBuilder builder = new StringBuilder("Local 
replicas [");
-                        for (InetAddress endpoint : liveEndpoints)
+                        for (InetAddressAndPort endpoint : liveEndpoints)
                         {
                             if (isLocal(endpoint))
                                 builder.append(endpoint).append(",");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java 
b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index bd273e4..95d7916 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -37,7 +37,7 @@ public class CounterMutationVerbHandler implements 
IVerbHandler<CounterMutation>
         final CounterMutation cm = message.payload;
         logger.trace("Applying forwarded {}", cm);
 
-        String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+        String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
         // We should not wait for the result of the write in this thread,
         // otherwise we could have a distributed deadlock between replicas
         // running this VerbHandler (see #4578).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java 
b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 03cbf7b..72b5e2a 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -80,14 +80,14 @@ public class DiskBoundaryManager
                 && !StorageService.isReplacingSameAddress()) // When replacing 
same address, the node marks itself as UN locally
             {
                 PendingRangeCalculatorService.instance.blockUntilFinished();
-                localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), 
FBUtilities.getBroadcastAddress());
+                localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), 
FBUtilities.getBroadcastAddressAndPort());
             }
             else
             {
                 // Reason we use use the future settled TMD is that if we 
decommission a node, we want to stream
                 // from that node to the correct location on disk, if we 
didn't, we would put new files in the wrong places.
                 // We do this to minimize the amount of data we need to move 
in rebalancedisks once everything settled
-                localRanges = 
cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddress());
+                localRanges = 
cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddressAndPort());
             }
             logger.debug("Got local ranges {} (ringVersion = {})", 
localRanges, ringVersion);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index cebf6eb..ae778f1 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -352,6 +352,7 @@ public class Keyspace
 
     private void createReplicationStrategy(KeyspaceMetadata ksm)
     {
+        logger.info("Creating replication strategy " + ksm .name + " params " 
+ ksm.params);
         replicationStrategy = 
AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
                                                                                
     ksm.params.replication.klass,
                                                                                
     StorageService.instance.getTokenMetadata(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java 
b/src/java/org/apache/cassandra/db/Mutation.java
index a6a920c..6195fe4 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -43,9 +43,6 @@ public class Mutation implements IMutation
 {
     public static final MutationSerializer serializer = new 
MutationSerializer();
 
-    public static final String FORWARD_TO = "FWD_TO";
-    public static final String FORWARD_FROM = "FWD_FRM";
-
     // todo this is redundant
     // when we remove it, also restore SerializationsTest.testMutationRead to 
not regenerate new Mutations each test
     private final String keyspaceName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java 
b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 59247a2..8386048 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -17,18 +17,17 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInputStream;
 import java.io.IOException;
-import java.net.InetAddress;
+import java.util.Iterator;
 
 import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
 
 public class MutationVerbHandler implements IVerbHandler<Mutation>
 {
-    private void reply(int id, InetAddress replyTo)
+    private void reply(int id, InetAddressAndPort replyTo)
     {
         Tracing.trace("Enqueuing response to {}", replyTo);
         MessagingService.instance().sendReply(WriteResponse.createMessage(), 
id, replyTo);
@@ -42,18 +41,19 @@ public class MutationVerbHandler implements 
IVerbHandler<Mutation>
     public void doVerb(MessageIn<Mutation> message, int id)  throws IOException
     {
         // Check if there were any forwarding headers in this message
-        byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
-        InetAddress replyTo;
+        InetAddressAndPort from = 
(InetAddressAndPort)message.parameters.get(ParameterType.FORWARD_FROM);
+        InetAddressAndPort replyTo;
         if (from == null)
         {
             replyTo = message.from;
-            byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
-            if (forwardBytes != null)
-                forwardToLocalNodes(message.payload, message.verb, 
forwardBytes, message.from);
+            ForwardToContainer forwardTo = 
(ForwardToContainer)message.parameters.get(ParameterType.FORWARD_TO);
+            if (forwardTo != null)
+                forwardToLocalNodes(message.payload, message.verb, forwardTo, 
message.from);
         }
         else
         {
-            replyTo = InetAddress.getByAddress(from);
+
+            replyTo = from;
         }
 
         try
@@ -69,22 +69,17 @@ public class MutationVerbHandler implements 
IVerbHandler<Mutation>
         }
     }
 
-    private static void forwardToLocalNodes(Mutation mutation, 
MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws 
IOException
+    private static void forwardToLocalNodes(Mutation mutation, 
MessagingService.Verb verb, ForwardToContainer forwardTo, InetAddressAndPort 
from) throws IOException
     {
-        try (DataInputStream in = new DataInputStream(new 
FastByteArrayInputStream(forwardBytes)))
+        // tell the recipients who to send their ack to
+        MessageOut<Mutation> message = new MessageOut<>(verb, mutation, 
Mutation.serializer).withParameter(ParameterType.FORWARD_FROM, from);
+        Iterator<InetAddressAndPort> iterator = forwardTo.targets.iterator();
+        // Send a message to each of the addresses on our Forward List
+        for (int i = 0; i < forwardTo.targets.size(); i++)
         {
-            int size = in.readInt();
-
-            // tell the recipients who to send their ack to
-            MessageOut<Mutation> message = new MessageOut<>(verb, mutation, 
Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
-            // Send a message to each of the addresses on our Forward List
-            for (int i = 0; i < size; i++)
-            {
-                InetAddress address = 
CompactEndpointSerializationHelper.deserialize(in);
-                int id = in.readInt();
-                Tracing.trace("Enqueuing forwarded write to {}", address);
-                MessagingService.instance().sendOneWay(message, id, address);
-            }
+            InetAddressAndPort address = iterator.next();
+            Tracing.trace("Enqueuing forwarded write to {}", address);
+            MessagingService.instance().sendOneWay(message, 
forwardTo.messageIds[i], address);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java 
b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
index 066b2fe..151e7d3 100644
--- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -60,7 +60,7 @@ public class SizeEstimatesRecorder extends 
SchemaChangeListener implements Runna
     public void run()
     {
         TokenMetadata metadata = 
StorageService.instance.getTokenMetadata().cloneOnlyTokenMap();
-        if (!metadata.isMember(FBUtilities.getBroadcastAddress()))
+        if (!metadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
         {
             logger.debug("Node is not part of the ring; not recording size 
estimates");
             return;
@@ -71,7 +71,7 @@ public class SizeEstimatesRecorder extends 
SchemaChangeListener implements Runna
         for (Keyspace keyspace : Keyspace.nonLocalStrategy())
         {
             Collection<Range<Token>> localRanges = 
StorageService.instance.getPrimaryRangesForEndpoint(keyspace.getName(),
-                    FBUtilities.getBroadcastAddress());
+                    FBUtilities.getBroadcastAddressAndPort());
             for (ColumnFamilyStore table : keyspace.getColumnFamilyStores())
             {
                 long start = System.nanoTime();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to