Allow storage port to be configurable per node Patch by Ariel Weisberg; Reviewed by Jason Brown for CASSANDRA-7544
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59b5b6be Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59b5b6be Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59b5b6be Branch: refs/heads/trunk Commit: 59b5b6bef0fa76bf5740b688fcd4d9cf525760d0 Parents: 4de7a65 Author: Ariel Weisberg <aweisb...@apple.com> Authored: Thu Nov 9 11:33:48 2017 -0500 Committer: Ariel Weisberg <aweisb...@apple.com> Committed: Thu Jan 25 14:32:24 2018 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 18 +- bin/cqlsh.py | 12 +- build.xml | 10 +- conf/cassandra.yaml | 9 +- ...iver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar | Bin 2618371 -> 0 bytes ...sandra-driver-core-4.0.0-SNAPSHOT-shaded.jar | Bin 0 -> 2621460 bytes lib/cassandra-driver-internal-only-3.11.zip | Bin 264882 -> 0 bytes ...iver-internal-only-3.12.0.post0-9ee88ded.zip | Bin 0 -> 265110 bytes .../cassandra/batchlog/BatchlogManager.java | 52 +- .../cassandra/config/DatabaseDescriptor.java | 27 +- .../apache/cassandra/db/ConsistencyLevel.java | 40 +- .../db/CounterMutationVerbHandler.java | 2 +- .../cassandra/db/DiskBoundaryManager.java | 4 +- src/java/org/apache/cassandra/db/Keyspace.java | 1 + src/java/org/apache/cassandra/db/Mutation.java | 3 - .../cassandra/db/MutationVerbHandler.java | 43 +- .../cassandra/db/SizeEstimatesRecorder.java | 4 +- .../org/apache/cassandra/db/SystemKeyspace.java | 522 +++++++----- .../cassandra/db/SystemKeyspaceMigrator40.java | 184 +++++ .../org/apache/cassandra/db/view/ViewUtils.java | 18 +- .../org/apache/cassandra/dht/BootStrapper.java | 12 +- .../cassandra/dht/RangeFetchMapCalculator.java | 37 +- .../org/apache/cassandra/dht/RangeStreamer.java | 96 +-- .../dht/tokenallocator/TokenAllocation.java | 50 +- .../tokenallocator/TokenAllocatorFactory.java | 8 +- .../exceptions/ReadFailureException.java | 4 +- .../exceptions/RequestFailureException.java | 6 +- .../exceptions/WriteFailureException.java | 4 +- .../apache/cassandra/gms/ApplicationState.java | 11 +- .../org/apache/cassandra/gms/EndpointState.java | 8 +- .../apache/cassandra/gms/FailureDetector.java | 74 +- .../cassandra/gms/FailureDetectorMBean.java | 9 +- .../org/apache/cassandra/gms/GossipDigest.java | 14 +- .../apache/cassandra/gms/GossipDigestAck.java | 22 +- .../apache/cassandra/gms/GossipDigestAck2.java | 22 +- .../gms/GossipDigestAck2VerbHandler.java | 6 +- .../gms/GossipDigestAckVerbHandler.java | 10 +- .../gms/GossipDigestSynVerbHandler.java | 12 +- src/java/org/apache/cassandra/gms/Gossiper.java | 274 ++++--- .../gms/IEndpointStateChangeSubscriber.java | 16 +- .../gms/IFailureDetectionEventListener.java | 4 +- .../apache/cassandra/gms/IFailureDetector.java | 12 +- .../apache/cassandra/gms/VersionedValue.java | 17 + .../apache/cassandra/hadoop/ConfigHelper.java | 12 + .../hadoop/cql3/CqlBulkRecordWriter.java | 20 +- .../cassandra/hadoop/cql3/CqlConfigHelper.java | 12 + .../apache/cassandra/hints/HintVerbHandler.java | 6 +- .../cassandra/hints/HintsDispatchExecutor.java | 12 +- .../cassandra/hints/HintsDispatchTrigger.java | 2 +- .../apache/cassandra/hints/HintsDispatcher.java | 10 +- .../apache/cassandra/hints/HintsService.java | 8 +- .../org/apache/cassandra/hints/HintsStore.java | 6 +- .../io/DummyByteVersionedSerializer.java | 55 ++ .../cassandra/io/ShortVersionedSerializer.java | 47 ++ .../cassandra/io/sstable/SSTableLoader.java | 31 +- .../locator/AbstractEndpointSnitch.java | 19 +- .../locator/AbstractNetworkTopologySnitch.java | 8 +- .../locator/AbstractReplicationStrategy.java | 41 +- .../cassandra/locator/CloudstackSnitch.java | 11 +- .../locator/DynamicEndpointSnitch.java | 64 +- .../locator/DynamicEndpointSnitchMBean.java | 2 + .../cassandra/locator/Ec2MultiRegionSnitch.java | 11 + .../org/apache/cassandra/locator/Ec2Snitch.java | 11 +- .../cassandra/locator/EndpointSnitchInfo.java | 9 +- .../cassandra/locator/GoogleCloudSnitch.java | 11 +- .../locator/GossipingPropertyFileSnitch.java | 15 +- .../cassandra/locator/IEndpointSnitch.java | 13 +- .../cassandra/locator/ILatencySubscriber.java | 4 +- .../cassandra/locator/InetAddressAndPort.java | 203 +++++ .../apache/cassandra/locator/LocalStrategy.java | 11 +- .../locator/NetworkTopologyStrategy.java | 23 +- .../locator/OldNetworkTopologyStrategy.java | 5 +- .../cassandra/locator/PendingRangeMaps.java | 57 +- .../cassandra/locator/PropertyFileSnitch.java | 29 +- .../cassandra/locator/RackInferringSnitch.java | 10 +- .../locator/ReconnectableSnitchHelper.java | 60 +- .../apache/cassandra/locator/SeedProvider.java | 3 +- .../cassandra/locator/SimpleSeedProvider.java | 8 +- .../apache/cassandra/locator/SimpleSnitch.java | 9 +- .../cassandra/locator/SimpleStrategy.java | 7 +- .../apache/cassandra/locator/TokenMetadata.java | 221 +++-- .../cassandra/metrics/ConnectionMetrics.java | 8 +- .../cassandra/metrics/HintedHandoffMetrics.java | 25 +- .../cassandra/metrics/HintsServiceMetrics.java | 11 +- .../cassandra/metrics/MessagingMetrics.java | 4 +- .../cassandra/metrics/StreamingMetrics.java | 11 +- .../apache/cassandra/net/BackPressureState.java | 4 +- .../cassandra/net/BackPressureStrategy.java | 5 +- .../org/apache/cassandra/net/CallbackInfo.java | 7 +- .../net/CompactEndpointSerializationHelper.java | 108 ++- .../cassandra/net/ForwardToContainer.java | 43 + .../cassandra/net/ForwardToSerializer.java | 86 ++ .../apache/cassandra/net/IAsyncCallback.java | 7 +- .../net/IAsyncCallbackWithFailure.java | 5 +- .../org/apache/cassandra/net/IMessageSink.java | 4 +- .../cassandra/net/MessageDeliveryTask.java | 14 +- .../org/apache/cassandra/net/MessageIn.java | 74 +- .../org/apache/cassandra/net/MessageOut.java | 81 +- .../apache/cassandra/net/MessagingService.java | 271 ++++-- .../cassandra/net/MessagingServiceMBean.java | 26 + .../org/apache/cassandra/net/ParameterType.java | 69 ++ .../cassandra/net/RateBasedBackPressure.java | 4 +- .../net/RateBasedBackPressureState.java | 8 +- .../apache/cassandra/net/WriteCallbackInfo.java | 7 +- .../cassandra/net/async/HandshakeProtocol.java | 18 +- .../net/async/InboundHandshakeHandler.java | 7 +- .../cassandra/net/async/MessageInHandler.java | 21 +- .../cassandra/net/async/MessageOutHandler.java | 12 +- .../cassandra/net/async/NettyFactory.java | 15 +- .../net/async/OutboundConnectionIdentifier.java | 48 +- .../net/async/OutboundMessagingConnection.java | 13 +- .../net/async/OutboundMessagingPool.java | 15 +- .../repair/AsymmetricLocalSyncTask.java | 8 +- .../repair/AsymmetricRemoteSyncTask.java | 6 +- .../cassandra/repair/AsymmetricSyncTask.java | 8 +- .../apache/cassandra/repair/LocalSyncTask.java | 13 +- .../org/apache/cassandra/repair/NodePair.java | 25 +- .../apache/cassandra/repair/RemoteSyncTask.java | 4 +- .../org/apache/cassandra/repair/RepairJob.java | 56 +- .../repair/RepairMessageVerbHandler.java | 7 +- .../apache/cassandra/repair/RepairRunnable.java | 31 +- .../apache/cassandra/repair/RepairSession.java | 38 +- .../apache/cassandra/repair/SnapshotTask.java | 10 +- .../cassandra/repair/StreamingRepairTask.java | 17 +- .../repair/SystemDistributedKeyspace.java | 98 ++- .../apache/cassandra/repair/TreeResponse.java | 7 +- .../apache/cassandra/repair/ValidationTask.java | 7 +- .../org/apache/cassandra/repair/Validator.java | 12 +- .../repair/asymmetric/DifferenceHolder.java | 16 +- .../repair/asymmetric/HostDifferences.java | 14 +- .../asymmetric/IncomingRepairStreamTracker.java | 4 +- .../repair/asymmetric/PreferedNodeFilter.java | 5 +- .../repair/asymmetric/ReduceHelper.java | 42 +- .../repair/asymmetric/StreamFromOptions.java | 16 +- .../repair/consistent/ConsistentSession.java | 21 +- .../repair/consistent/CoordinatorSession.java | 24 +- .../repair/consistent/CoordinatorSessions.java | 4 +- .../repair/consistent/LocalSessionInfo.java | 6 +- .../repair/consistent/LocalSessions.java | 82 +- .../repair/consistent/SyncStatSummary.java | 14 +- .../repair/messages/AsymmetricSyncRequest.java | 25 +- .../repair/messages/FinalizePromise.java | 18 +- .../messages/PrepareConsistentRequest.java | 31 +- .../messages/PrepareConsistentResponse.java | 17 +- .../cassandra/repair/messages/SyncComplete.java | 4 +- .../cassandra/repair/messages/SyncRequest.java | 26 +- .../cassandra/schema/MigrationManager.java | 24 +- .../apache/cassandra/schema/MigrationTask.java | 6 +- .../cassandra/service/AbstractReadExecutor.java | 42 +- .../service/AbstractWriteResponseHandler.java | 16 +- .../cassandra/service/ActiveRepairService.java | 64 +- .../service/BatchlogResponseHandler.java | 6 +- .../cassandra/service/CassandraDaemon.java | 7 +- .../apache/cassandra/service/ClientState.java | 2 +- .../apache/cassandra/service/DataResolver.java | 18 +- .../DatacenterSyncWriteResponseHandler.java | 8 +- .../service/DatacenterWriteResponseHandler.java | 8 +- .../service/IEndpointLifecycleSubscriber.java | 12 +- .../cassandra/service/LoadBroadcaster.java | 20 +- .../apache/cassandra/service/ReadCallback.java | 20 +- .../apache/cassandra/service/StartupChecks.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 320 ++++---- .../cassandra/service/StorageProxyMBean.java | 3 +- .../cassandra/service/StorageService.java | 818 ++++++++++++------- .../cassandra/service/StorageServiceMBean.java | 57 +- .../apache/cassandra/service/TokenRange.java | 50 +- .../cassandra/service/WriteResponseHandler.java | 12 +- .../service/paxos/PrepareCallback.java | 10 +- .../cassandra/streaming/ProgressInfo.java | 14 +- .../apache/cassandra/streaming/SessionInfo.java | 12 +- .../cassandra/streaming/SessionSummary.java | 23 +- .../cassandra/streaming/StreamCoordinator.java | 24 +- .../apache/cassandra/streaming/StreamEvent.java | 4 +- .../cassandra/streaming/StreamManager.java | 10 +- .../apache/cassandra/streaming/StreamPlan.java | 16 +- .../cassandra/streaming/StreamResultFuture.java | 15 +- .../cassandra/streaming/StreamSession.java | 43 +- .../async/NettyStreamingMessageSender.java | 4 +- .../async/StreamingInboundHandler.java | 10 +- .../management/ProgressInfoCompositeData.java | 30 +- .../SessionCompleteEventCompositeData.java | 8 +- .../management/SessionInfoCompositeData.java | 44 +- .../streaming/messages/FileMessageHeader.java | 14 +- .../streaming/messages/OutgoingFileMessage.java | 2 +- .../streaming/messages/StreamInitMessage.java | 12 +- .../tools/BulkLoadConnectionFactory.java | 10 +- .../org/apache/cassandra/tools/BulkLoader.java | 27 +- .../apache/cassandra/tools/LoaderOptions.java | 77 +- .../org/apache/cassandra/tools/NodeProbe.java | 63 +- .../org/apache/cassandra/tools/NodeTool.java | 26 + .../tools/nodetool/DescribeCluster.java | 4 +- .../cassandra/tools/nodetool/DescribeRing.java | 4 +- .../tools/nodetool/FailureDetectorInfo.java | 2 +- .../cassandra/tools/nodetool/GetEndpoints.java | 18 +- .../cassandra/tools/nodetool/GossipInfo.java | 4 +- .../tools/nodetool/HostStatWithPort.java | 44 + .../cassandra/tools/nodetool/NetStats.java | 10 +- .../cassandra/tools/nodetool/RemoveNode.java | 6 +- .../cassandra/tools/nodetool/RepairAdmin.java | 6 +- .../apache/cassandra/tools/nodetool/Ring.java | 211 +++-- .../tools/nodetool/SetHostStatWithPort.java | 56 ++ .../apache/cassandra/tools/nodetool/Status.java | 200 +++-- .../cassandra/tracing/ExpiredTraceState.java | 2 +- .../apache/cassandra/tracing/TraceKeyspace.java | 48 +- .../apache/cassandra/tracing/TraceState.java | 6 +- .../cassandra/tracing/TraceStateImpl.java | 4 +- .../org/apache/cassandra/tracing/Tracing.java | 52 +- .../apache/cassandra/tracing/TracingImpl.java | 3 +- .../org/apache/cassandra/transport/Event.java | 21 +- .../cassandra/transport/ProtocolVersion.java | 9 +- .../org/apache/cassandra/transport/Server.java | 60 +- .../transport/messages/ErrorMessage.java | 13 +- .../org/apache/cassandra/utils/FBUtilities.java | 53 +- .../apache/cassandra/utils/JMXServerUtils.java | 2 +- .../org/apache/cassandra/utils/Mx4jTool.java | 2 +- .../utils/NativeSSTableLoaderClient.java | 46 +- .../org/apache/cassandra/utils/UUIDGen.java | 2 +- test/conf/cassandra.yaml | 3 +- .../serialization/4.0/gms.EndpointState.bin | Bin 73 -> 73 bytes test/data/serialization/4.0/gms.Gossip.bin | Bin 158 -> 166 bytes .../serialization/4.0/service.SyncComplete.bin | Bin 538 -> 554 bytes .../serialization/4.0/service.SyncRequest.bin | Bin 227 -> 241 bytes .../4.0/service.ValidationComplete.bin | Bin 1251 -> 1257 bytes .../4.0/service.ValidationRequest.bin | Bin 167 -> 169 bytes .../locator/DynamicEndpointSnitchLongTest.java | 15 +- .../cassandra/streaming/LongStreamingTest.java | 4 +- .../test/microbench/PendingRangesBench.java | 10 +- .../OffsetAwareConfigurationLoader.java | 32 + test/unit/org/apache/cassandra/Util.java | 9 +- .../batchlog/BatchlogEndpointFilterTest.java | 84 +- .../cassandra/batchlog/BatchlogManagerTest.java | 6 +- .../config/DatabaseDescriptorRefTest.java | 2 + .../org/apache/cassandra/cql3/CQLTester.java | 12 +- .../cassandra/cql3/PreparedStatementsTest.java | 1 - .../apache/cassandra/cql3/ViewComplexTest.java | 8 +- .../cql3/validation/operations/CreateTest.java | 8 +- .../org/apache/cassandra/db/CleanupTest.java | 22 +- .../cassandra/db/DiskBoundaryManagerTest.java | 6 +- .../apache/cassandra/db/ReadCommandTest.java | 12 +- .../org/apache/cassandra/db/RowCacheTest.java | 6 +- .../db/SystemKeyspaceMigrator40Test.java | 192 +++++ .../apache/cassandra/db/SystemKeyspaceTest.java | 6 +- .../compaction/AbstractPendingRepairTest.java | 4 +- .../db/compaction/AntiCompactionTest.java | 4 +- ...tionManagerGetSSTablesForValidationTest.java | 6 +- .../LeveledCompactionStrategyTest.java | 4 +- .../apache/cassandra/db/view/ViewUtilsTest.java | 48 +- .../apache/cassandra/dht/BootStrapperTest.java | 34 +- .../dht/RangeFetchMapCalculatorTest.java | 116 +-- .../cassandra/dht/StreamStateStoreTest.java | 4 +- .../apache/cassandra/gms/ArrivalWindowTest.java | 15 +- .../apache/cassandra/gms/EndpointStateTest.java | 8 +- .../cassandra/gms/FailureDetectorTest.java | 8 +- .../apache/cassandra/gms/GossipDigestTest.java | 5 +- .../org/apache/cassandra/gms/GossiperTest.java | 6 +- .../gms/PendingRangeCalculatorServiceTest.java | 8 +- .../cassandra/gms/SerializationsTest.java | 12 +- .../org/apache/cassandra/hints/HintTest.java | 12 +- .../cassandra/hints/HintsServiceTest.java | 14 +- .../io/sstable/CQLSSTableWriterTest.java | 2 +- .../cassandra/io/sstable/LegacySSTableTest.java | 2 +- .../cassandra/io/sstable/SSTableLoaderTest.java | 2 +- .../cassandra/locator/CloudstackSnitchTest.java | 7 +- .../locator/DynamicEndpointSnitchTest.java | 17 +- .../apache/cassandra/locator/EC2SnitchTest.java | 8 +- .../locator/GoogleCloudSnitchTest.java | 7 +- .../GossipingPropertyFileSnitchTest.java | 15 +- .../locator/InetAddressAndPortTest.java | 143 ++++ .../locator/NetworkTopologyStrategyTest.java | 73 +- .../locator/OldNetworkTopologyStrategyTest.java | 21 +- .../cassandra/locator/PendingRangeMapsTest.java | 37 +- .../locator/PropertyFileSnitchTest.java | 108 +-- .../locator/ReconnectableSnitchHelperTest.java | 2 +- .../ReplicationStrategyEndpointCacheTest.java | 35 +- .../cassandra/locator/SimpleStrategyTest.java | 19 +- .../cassandra/locator/TokenMetadataTest.java | 49 +- .../metrics/HintedHandOffMetricsTest.java | 10 +- .../CompactEndpointSerializationHelperTest.java | 72 ++ .../cassandra/net/ForwardToContainerTest.java | 100 +++ test/unit/org/apache/cassandra/net/Matcher.java | 4 +- .../apache/cassandra/net/MatcherResponse.java | 11 +- .../cassandra/net/MessagingServiceTest.java | 82 +- .../cassandra/net/MockMessagingService.java | 15 +- .../cassandra/net/MockMessagingServiceTest.java | 6 +- .../net/RateBasedBackPressureTest.java | 42 +- .../cassandra/net/WriteCallbackInfoTest.java | 4 +- .../cassandra/net/async/ChannelWriterTest.java | 6 +- .../net/async/HandshakeHandlersTest.java | 8 +- .../net/async/HandshakeProtocolTest.java | 2 +- .../net/async/InboundHandshakeHandlerTest.java | 18 +- .../net/async/MessageInHandlerTest.java | 39 +- .../net/async/MessageOutHandlerTest.java | 16 +- .../cassandra/net/async/NettyFactoryTest.java | 15 +- .../net/async/OutboundHandshakeHandlerTest.java | 6 +- .../async/OutboundMessagingConnectionTest.java | 48 +- .../net/async/OutboundMessagingPoolTest.java | 11 +- .../cassandra/repair/AbstractRepairTest.java | 20 +- .../cassandra/repair/LocalSyncTaskTest.java | 12 +- .../cassandra/repair/RepairRunnableTest.java | 3 +- .../cassandra/repair/RepairSessionTest.java | 7 +- .../apache/cassandra/repair/ValidatorTest.java | 12 +- .../repair/asymmetric/DifferenceHolderTest.java | 10 +- .../repair/asymmetric/ReduceHelperTest.java | 74 +- .../asymmetric/StreamFromOptionsTest.java | 36 +- .../AbstractConsistentSessionTest.java | 20 +- .../consistent/CoordinatorSessionTest.java | 30 +- .../consistent/CoordinatorSessionsTest.java | 12 +- .../repair/consistent/LocalSessionAccessor.java | 4 +- .../repair/consistent/LocalSessionTest.java | 17 +- .../consistent/PendingAntiCompactionTest.java | 6 +- .../RepairMessageSerializationsTest.java | 12 +- .../messages/RepairMessageSerializerTest.java | 14 +- .../service/ActiveRepairServiceTest.java | 58 +- .../cassandra/service/DataResolverTest.java | 84 +- .../service/LeaveAndBootstrapTest.java | 63 +- .../org/apache/cassandra/service/MoveTest.java | 146 ++-- .../service/ProtocolBetaVersionTest.java | 2 +- .../cassandra/service/ReadExecutorTest.java | 6 +- .../apache/cassandra/service/RemoveTest.java | 14 +- .../cassandra/service/SerializationsTest.java | 30 +- .../cassandra/service/StorageProxyTest.java | 6 +- .../service/StorageServiceServerTest.java | 170 ++-- .../service/WriteResponseHandlerTest.java | 25 +- .../cassandra/streaming/SessionInfoTest.java | 4 +- .../streaming/StreamTransferTaskTest.java | 6 +- .../streaming/StreamingTransferTest.java | 4 +- .../async/NettyStreamingMessageSenderTest.java | 8 +- .../async/StreamingInboundHandlerTest.java | 12 +- .../apache/cassandra/tracing/TracingTest.java | 3 +- .../cassandra/transport/ErrorMessageTest.java | 20 +- .../transport/ProtocolVersionTest.java | 9 +- .../cassandra/transport/SerDeserTest.java | 12 +- .../apache/cassandra/utils/FBUtilitiesTest.java | 8 +- .../cassandra/stress/CompactionStress.java | 4 +- .../cassandra/stress/settings/SettingsNode.java | 16 +- .../cassandra/stress/util/JavaDriverClient.java | 7 +- 337 files changed, 6456 insertions(+), 3772 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b29ae78..d8ae88f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Allow storage port to be configurable per node (CASSANDRA-7544) * Make sub-range selection for non-frozen collections return null instead of empty (CASSANDRA-14182) * BloomFilter serialization format should not change byte ordering (CASSANDRA-9067) * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 06d73ea..c314030 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -39,7 +39,7 @@ New features This property can be modified at runtime through both JMX and the new `setconcurrentviewbuilders` and `getconcurrentviewbuilders` nodetool commands. See CASSANDRA-12245 for more details. - There is now a binary full query log based on Chronicle Queue that can be controlled using - nodetool enablefullquerylog, disablefullquerylog, and resetfullquerylog. The log + nodetool enablefullquerylog, disablefullquerylog, and resetfullquerylog. The log contains all queries invoked, approximate time they were invoked, any parameters necessary to bind wildcard values, and all query options. A human readable version of the log can be dumped or tailed using the new bin/fqltool utility. The full query log is designed to be safe @@ -103,6 +103,22 @@ Upgrading but blocks for up to a configurable number of milliseconds between disk flushes. - nodetool clearsnapshot now required the --all flag to remove all snapshots. Previous behavior would delete all snapshots by default. + - Nodes are now identified by a combination of IP, and storage port. + Existing JMX APIs, nodetool, and system tables continue to work + and accept/return just an IP, but there is a new + version of each that works with the full unambiguous identifier. + You should prefer these over the deprecated ambiguous versions that only + work with an IP. This was done to support multiple instances per IP. + Additionally we are moving to only using a single port for encrypted and + unencrypted traffic and if you want multiple instances per IP you must + first switch encrypted traffic to the storage port and not a separate + encrypted port. If you want to use multiple instances per IP + with SSL you will need to use StartTLS on storage_port and set + outgoing_encrypted_port_source to gossip outbound connections + know what port to connect to for each instance. Before changing + storage port or native port at nodes you must first upgrade the entire cluster + and clients to 4.0 so they can handle the port not being consistent across + the cluster. Materialized Views ------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/bin/cqlsh.py ---------------------------------------------------------------------- diff --git a/bin/cqlsh.py b/bin/cqlsh.py index 61ea160..3055110 100644 --- a/bin/cqlsh.py +++ b/bin/cqlsh.py @@ -455,7 +455,8 @@ class Shell(cmd.Cmd): single_statement=None, request_timeout=DEFAULT_REQUEST_TIMEOUT_SECONDS, protocol_version=None, - connect_timeout=DEFAULT_CONNECT_TIMEOUT_SECONDS): + connect_timeout=DEFAULT_CONNECT_TIMEOUT_SECONDS, + allow_server_port_discovery=False): cmd.Cmd.__init__(self, completekey=completekey) self.hostname = hostname self.port = port @@ -470,6 +471,7 @@ class Shell(cmd.Cmd): self.tracing_enabled = tracing_enabled self.page_size = self.default_page_size self.expand_enabled = expand_enabled + self.allow_server_port_discovery = allow_server_port_discovery if use_conn: self.conn = use_conn else: @@ -482,6 +484,7 @@ class Shell(cmd.Cmd): load_balancing_policy=WhiteListRoundRobinPolicy([self.hostname]), control_connection_timeout=connect_timeout, connect_timeout=connect_timeout, + allow_server_port_discovery=allow_server_port_discovery, **kwargs) self.owns_connection = not use_conn @@ -1768,7 +1771,8 @@ class Shell(cmd.Cmd): display_timezone=self.display_timezone, max_trace_wait=self.max_trace_wait, ssl=self.ssl, request_timeout=self.session.default_timeout, - connect_timeout=self.conn.connect_timeout) + connect_timeout=self.conn.connect_timeout, + allow_server_port_discovery=self.allow_server_port_discovery) subshell.cmdloop() f.close() @@ -2252,6 +2256,7 @@ def read_options(cmdlineargs, environment): optvalues.connect_timeout = option_with_default(configs.getint, 'connection', 'timeout', DEFAULT_CONNECT_TIMEOUT_SECONDS) optvalues.request_timeout = option_with_default(configs.getint, 'connection', 'request_timeout', DEFAULT_REQUEST_TIMEOUT_SECONDS) optvalues.execute = None + optvalues.allow_server_port_discovery = option_with_default(configs.getboolean, 'connection', 'allow_server_port_discovery', 'False') (options, arguments) = parser.parse_args(cmdlineargs, values=optvalues) @@ -2415,7 +2420,8 @@ def main(options, hostname, port): single_statement=options.execute, request_timeout=options.request_timeout, connect_timeout=options.connect_timeout, - encoding=options.encoding) + encoding=options.encoding, + allow_server_port_discovery=options.allow_server_port_discovery) except KeyboardInterrupt: sys.exit('Connection aborted.') except CQL_ERRORS, e: http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 8a033e0..5796868 100644 --- a/build.xml +++ b/build.xml @@ -306,7 +306,7 @@ <!-- define the remote repositories we use --> <artifact:remoteRepository id="central" url="${artifact.remoteRepository.central}"/> <artifact:remoteRepository id="apache" url="${artifact.remoteRepository.apache}"/> - + <macrodef name="install"> <attribute name="pomFile"/> <attribute name="file"/> @@ -437,7 +437,7 @@ <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" /> <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" /> <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE - <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="3.0.1" classifier="shaded"> + <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="4.0.0-SNAPSHOT" classifier="shaded"> <exclusion groupId="io.netty" artifactId="netty-buffer"/> <exclusion groupId="io.netty" artifactId="netty-codec"/> <exclusion groupId="io.netty" artifactId="netty-handler"/> @@ -522,7 +522,7 @@ <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/> <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/> <dependency groupId="org.antlr" artifactId="antlr"/> - <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE + <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/> --> <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/> @@ -543,7 +543,7 @@ <dependency groupId="junit" artifactId="junit"/> <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/> - --> + --> <dependency groupId="io.netty" artifactId="netty-all"/> <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/> <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/> @@ -624,7 +624,7 @@ <exclusion groupId="io.netty" artifactId="netty-transport"/> </dependency> --> - + <!-- don't need jna to run, but nice to have --> <dependency groupId="net.java.dev.jna" artifactId="jna"/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 80e4515..3bed3a6 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -416,7 +416,7 @@ seed_provider: parameters: # seeds is actually a comma-delimited list of addresses. # Ex: "<ip1>,<ip2>,<ip3>" - - seeds: "127.0.0.1" + - seeds: "127.0.0.1:7000" # For workloads with more data than can fit in memory, Cassandra's # bottleneck will be reads that need to fetch data from @@ -945,6 +945,13 @@ dynamic_snitch_badness_threshold: 0.1 # the keystore and truststore. For instructions on generating these files, see: # http://download.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore # +# If you are taking advantage of StartTLS outbound connections will have the issue that they can't know +# what encrypted port to connect to in a foolproof way. outgoing_encrypted_port_source deals with this confusion +# by allowing you to specify how you want a node to pick an outgoing port for intra-cluster connections. +# Valid values are "gossip" and "yaml". Gossip will always connect to the storage port for a node that is +# published via a gossip which is always going to be the plain storage port. "yaml" will always select +# the port configured as ssl_storage_port on THIS node. If you want to use SSL and have different storage +# ports across the cluster you must select "gossip" and use StartTLS on storage_port. server_encryption_options: # set to true for allowing secure incoming connections enabled: false http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar deleted file mode 100644 index a7be9cb..0000000 Binary files a/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/lib/cassandra-driver-core-4.0.0-SNAPSHOT-shaded.jar ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-core-4.0.0-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-4.0.0-SNAPSHOT-shaded.jar new file mode 100644 index 0000000..609c393 Binary files /dev/null and b/lib/cassandra-driver-core-4.0.0-SNAPSHOT-shaded.jar differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/lib/cassandra-driver-internal-only-3.11.zip ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-internal-only-3.11.zip b/lib/cassandra-driver-internal-only-3.11.zip deleted file mode 100644 index f7760af..0000000 Binary files a/lib/cassandra-driver-internal-only-3.11.zip and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/lib/cassandra-driver-internal-only-3.12.0.post0-9ee88ded.zip ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-internal-only-3.12.0.post0-9ee88ded.zip b/lib/cassandra-driver-internal-only-3.12.0.post0-9ee88ded.zip new file mode 100644 index 0000000..4aa91b7 Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.12.0.post0-9ee88ded.zip differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index 807e970..f232bdc 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -19,7 +19,6 @@ package org.apache.cassandra.batchlog; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; @@ -34,6 +33,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.*; @@ -250,7 +250,7 @@ public class BatchlogManager implements BatchlogManagerMBean int positionInPage = 0; ArrayList<ReplayingBatch> unfinishedBatches = new ArrayList<>(pageSize); - Set<InetAddress> hintedNodes = new HashSet<>(); + Set<InetAddressAndPort> hintedNodes = new HashSet<>(); Set<UUID> replayedBatches = new HashSet<>(); // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others @@ -295,7 +295,7 @@ public class BatchlogManager implements BatchlogManagerMBean replayedBatches.forEach(BatchlogManager::remove); } - private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches) + private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, Set<InetAddressAndPort> hintedNodes, Set<UUID> replayedBatches) { // schedule hints for timed out deliveries for (ReplayingBatch batch : batches) @@ -330,7 +330,7 @@ public class BatchlogManager implements BatchlogManagerMBean this.replayedBytes = addMutations(version, serializedMutations); } - public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException + public int replay(RateLimiter rateLimiter, Set<InetAddressAndPort> hintedNodes) throws IOException { logger.trace("Replaying batch {}", id); @@ -348,7 +348,7 @@ public class BatchlogManager implements BatchlogManagerMBean return replayHandlers.size(); } - public void finish(Set<InetAddress> hintedNodes) + public void finish(Set<InetAddressAndPort> hintedNodes) { for (int i = 0; i < replayHandlers.size(); i++) { @@ -396,7 +396,7 @@ public class BatchlogManager implements BatchlogManagerMBean mutations.add(mutation); } - private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes) + private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddressAndPort> hintedNodes) { int gcgs = gcgs(mutations); @@ -420,7 +420,7 @@ public class BatchlogManager implements BatchlogManagerMBean private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, - Set<InetAddress> hintedNodes) + Set<InetAddressAndPort> hintedNodes) { List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size()); for (Mutation mutation : mutations) @@ -440,15 +440,15 @@ public class BatchlogManager implements BatchlogManagerMBean */ private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, - Set<InetAddress> hintedNodes) + Set<InetAddressAndPort> hintedNodes) { - Set<InetAddress> liveEndpoints = new HashSet<>(); + Set<InetAddressAndPort> liveEndpoints = new HashSet<>(); String ks = mutation.getKeyspaceName(); Token tk = mutation.key().getToken(); - for (InetAddress endpoint : StorageService.instance.getNaturalAndPendingEndpoints(ks, tk)) + for (InetAddressAndPort endpoint : StorageService.instance.getNaturalAndPendingEndpoints(ks, tk)) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) { mutation.apply(); } @@ -469,7 +469,7 @@ public class BatchlogManager implements BatchlogManagerMBean ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints, System.nanoTime()); MessageOut<Mutation> message = mutation.createMessage(); - for (InetAddress endpoint : liveEndpoints) + for (InetAddressAndPort endpoint : liveEndpoints) MessagingService.instance().sendRR(message, endpoint, handler, false); return handler; } @@ -488,11 +488,11 @@ public class BatchlogManager implements BatchlogManagerMBean */ private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T> { - private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set<InetAddressAndPort> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>()); - ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints, long queryStartNanoTime) + ReplayWriteResponseHandler(Collection<InetAddressAndPort> writeEndpoints, long queryStartNanoTime) { - super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime); + super(writeEndpoints, Collections.<InetAddressAndPort>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime); undelivered.addAll(writeEndpoints); } @@ -505,7 +505,7 @@ public class BatchlogManager implements BatchlogManagerMBean @Override public void response(MessageIn<T> m) { - boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from); + boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddressAndPort() : m.from); assert removed; super.response(m); } @@ -515,9 +515,9 @@ public class BatchlogManager implements BatchlogManagerMBean public static class EndpointFilter { private final String localRack; - private final Multimap<String, InetAddress> endpoints; + private final Multimap<String, InetAddressAndPort> endpoints; - public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints) + public EndpointFilter(String localRack, Multimap<String, InetAddressAndPort> endpoints) { this.localRack = localRack; this.endpoints = endpoints; @@ -526,15 +526,15 @@ public class BatchlogManager implements BatchlogManagerMBean /** * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. */ - public Collection<InetAddress> filter() + public Collection<InetAddressAndPort> filter() { // special case for single-node data centers if (endpoints.values().size() == 1) return endpoints.values(); // strip out dead endpoints and localhost - ListMultimap<String, InetAddress> validated = ArrayListMultimap.create(); - for (Map.Entry<String, InetAddress> entry : endpoints.entries()) + ListMultimap<String, InetAddressAndPort> validated = ArrayListMultimap.create(); + for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries()) if (isValid(entry.getValue())) validated.put(entry.getKey(), entry.getValue()); @@ -554,7 +554,7 @@ public class BatchlogManager implements BatchlogManagerMBean * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack * because of the preceding if block. */ - List<InetAddress> otherRack = Lists.newArrayList(validated.values()); + List<InetAddressAndPort> otherRack = Lists.newArrayList(validated.values()); shuffle(otherRack); return otherRack.subList(0, 2); } @@ -572,10 +572,10 @@ public class BatchlogManager implements BatchlogManagerMBean } // grab a random member of up to two racks - List<InetAddress> result = new ArrayList<>(2); + List<InetAddressAndPort> result = new ArrayList<>(2); for (String rack : Iterables.limit(racks, 2)) { - List<InetAddress> rackMembers = validated.get(rack); + List<InetAddressAndPort> rackMembers = validated.get(rack); result.add(rackMembers.get(getRandomInt(rackMembers.size()))); } @@ -583,9 +583,9 @@ public class BatchlogManager implements BatchlogManagerMBean } @VisibleForTesting - protected boolean isValid(InetAddress input) + protected boolean isValid(InetAddressAndPort input) { - return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input); + return !input.equals(FBUtilities.getBroadcastAddressAndPort()) && FailureDetector.instance.isAlive(input); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index a656d1f..9012e3a 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -55,6 +55,7 @@ import org.apache.cassandra.io.util.SsdDiskOptimizationStrategy; import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.apache.cassandra.locator.EndpointSnitchInfo; import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.SeedProvider; import org.apache.cassandra.net.BackPressureStrategy; import org.apache.cassandra.net.RateBasedBackPressure; @@ -110,7 +111,7 @@ public class DatabaseDescriptor private static long indexSummaryCapacityInMB; private static String localDC; - private static Comparator<InetAddress> localComparator; + private static Comparator<InetAddressAndPort> localComparator; private static EncryptionContext encryptionContext; private static boolean hasLoggedConfig; @@ -307,6 +308,7 @@ public class DatabaseDescriptor private static void applyAll() throws ConfigurationException { + //InetAddressAndPort cares that applySimpleConfig runs first applySimpleConfig(); applyPartitioner(); @@ -324,6 +326,9 @@ public class DatabaseDescriptor private static void applySimpleConfig() { + //Doing this first before all other things in case other pieces of config want to construct + //InetAddressAndPort and get the right defaults + InetAddressAndPort.initializeDefaultPort(getStoragePort()); if (conf.commitlog_sync == null) { @@ -827,7 +832,7 @@ public class DatabaseDescriptor } else { - rpcAddress = FBUtilities.getLocalAddress(); + rpcAddress = FBUtilities.getJustLocalAddress(); } /* RPC address to broadcast */ @@ -956,10 +961,10 @@ public class DatabaseDescriptor snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch); EndpointSnitchInfo.create(); - localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); - localComparator = new Comparator<InetAddress>() + localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + localComparator = new Comparator<InetAddressAndPort>() { - public int compare(InetAddress endpoint1, InetAddress endpoint2) + public int compare(InetAddressAndPort endpoint1, InetAddressAndPort endpoint2) { boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); @@ -1319,14 +1324,14 @@ public class DatabaseDescriptor return conf.num_tokens; } - public static InetAddress getReplaceAddress() + public static InetAddressAndPort getReplaceAddress() { try { if (System.getProperty(Config.PROPERTY_PREFIX + "replace_address", null) != null) - return InetAddress.getByName(System.getProperty(Config.PROPERTY_PREFIX + "replace_address", null)); + return InetAddressAndPort.getByName(System.getProperty(Config.PROPERTY_PREFIX + "replace_address", null)); else if (System.getProperty(Config.PROPERTY_PREFIX + "replace_address_first_boot", null) != null) - return InetAddress.getByName(System.getProperty(Config.PROPERTY_PREFIX + "replace_address_first_boot", null)); + return InetAddressAndPort.getByName(System.getProperty(Config.PROPERTY_PREFIX + "replace_address_first_boot", null)); return null; } catch (UnknownHostException e) @@ -1651,9 +1656,9 @@ public class DatabaseDescriptor return conf.saved_caches_directory; } - public static Set<InetAddress> getSeeds() + public static Set<InetAddressAndPort> getSeeds() { - return ImmutableSet.<InetAddress>builder().addAll(seedProvider.getSeeds()).build(); + return ImmutableSet.<InetAddressAndPort>builder().addAll(seedProvider.getSeeds()).build(); } public static InetAddress getListenAddress() @@ -2206,7 +2211,7 @@ public class DatabaseDescriptor return localDC; } - public static Comparator<InetAddress> getLocalComparator() + public static Comparator<InetAddressAndPort> getLocalComparator() { return localComparator; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 2214d8d..f93e737 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -28,6 +27,7 @@ import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.service.ReadRepairDecision; @@ -145,21 +145,21 @@ public enum ConsistencyLevel return isDCLocal; } - public boolean isLocal(InetAddress endpoint) + public boolean isLocal(InetAddressAndPort endpoint) { return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint)); } - public int countLocalEndpoints(Iterable<InetAddress> liveEndpoints) + public int countLocalEndpoints(Iterable<InetAddressAndPort> liveEndpoints) { int count = 0; - for (InetAddress endpoint : liveEndpoints) + for (InetAddressAndPort endpoint : liveEndpoints) if (isLocal(endpoint)) count++; return count; } - private Map<String, Integer> countPerDCEndpoints(Keyspace keyspace, Iterable<InetAddress> liveEndpoints) + private Map<String, Integer> countPerDCEndpoints(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints) { NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); @@ -167,7 +167,7 @@ public enum ConsistencyLevel for (String dc: strategy.getDatacenters()) dcEndpoints.put(dc, 0); - for (InetAddress endpoint : liveEndpoints) + for (InetAddressAndPort endpoint : liveEndpoints) { String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint); dcEndpoints.put(dc, dcEndpoints.get(dc) + 1); @@ -175,12 +175,12 @@ public enum ConsistencyLevel return dcEndpoints; } - public List<InetAddress> filterForQuery(Keyspace keyspace, List<InetAddress> liveEndpoints) + public List<InetAddressAndPort> filterForQuery(Keyspace keyspace, List<InetAddressAndPort> liveEndpoints) { return filterForQuery(keyspace, liveEndpoints, ReadRepairDecision.NONE); } - public List<InetAddress> filterForQuery(Keyspace keyspace, List<InetAddress> liveEndpoints, ReadRepairDecision readRepair) + public List<InetAddressAndPort> filterForQuery(Keyspace keyspace, List<InetAddressAndPort> liveEndpoints, ReadRepairDecision readRepair) { /* * If we are doing an each quorum query, we have to make sure that the endpoints we select @@ -206,9 +206,9 @@ public enum ConsistencyLevel case GLOBAL: return liveEndpoints; case DC_LOCAL: - List<InetAddress> local = new ArrayList<InetAddress>(); - List<InetAddress> other = new ArrayList<InetAddress>(); - for (InetAddress add : liveEndpoints) + List<InetAddressAndPort> local = new ArrayList<>(); + List<InetAddressAndPort> other = new ArrayList<>(); + for (InetAddressAndPort add : liveEndpoints) { if (isLocal(add)) local.add(add); @@ -225,7 +225,7 @@ public enum ConsistencyLevel } } - private List<InetAddress> filterForEachQuorum(Keyspace keyspace, List<InetAddress> liveEndpoints, ReadRepairDecision readRepair) + private List<InetAddressAndPort> filterForEachQuorum(Keyspace keyspace, List<InetAddressAndPort> liveEndpoints, ReadRepairDecision readRepair) { NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); @@ -233,20 +233,20 @@ public enum ConsistencyLevel if (readRepair == ReadRepairDecision.GLOBAL) return liveEndpoints; - Map<String, List<InetAddress>> dcsEndpoints = new HashMap<>(); + Map<String, List<InetAddressAndPort>> dcsEndpoints = new HashMap<>(); for (String dc: strategy.getDatacenters()) dcsEndpoints.put(dc, new ArrayList<>()); - for (InetAddress add : liveEndpoints) + for (InetAddressAndPort add : liveEndpoints) { String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(add); dcsEndpoints.get(dc).add(add); } - List<InetAddress> waitSet = new ArrayList<>(); - for (Map.Entry<String, List<InetAddress>> dcEndpoints : dcsEndpoints.entrySet()) + List<InetAddressAndPort> waitSet = new ArrayList<>(); + for (Map.Entry<String, List<InetAddressAndPort>> dcEndpoints : dcsEndpoints.entrySet()) { - List<InetAddress> dcEndpoint = dcEndpoints.getValue(); + List<InetAddressAndPort> dcEndpoint = dcEndpoints.getValue(); if (readRepair == ReadRepairDecision.DC_LOCAL && dcEndpoints.getKey().equals(DatabaseDescriptor.getLocalDataCenter())) waitSet.addAll(dcEndpoint); else @@ -256,7 +256,7 @@ public enum ConsistencyLevel return waitSet; } - public boolean isSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddress> liveEndpoints) + public boolean isSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints) { switch (this) { @@ -283,7 +283,7 @@ public enum ConsistencyLevel } } - public void assureSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddress> liveEndpoints) throws UnavailableException + public void assureSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints) throws UnavailableException { int blockFor = blockFor(keyspace); switch (this) @@ -302,7 +302,7 @@ public enum ConsistencyLevel if (logger.isTraceEnabled()) { StringBuilder builder = new StringBuilder("Local replicas ["); - for (InetAddress endpoint : liveEndpoints) + for (InetAddressAndPort endpoint : liveEndpoints) { if (isLocal(endpoint)) builder.append(endpoint).append(","); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index bd273e4..95d7916 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -37,7 +37,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation> final CounterMutation cm = message.payload; logger.trace("Applying forwarded {}", cm); - String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); // We should not wait for the result of the write in this thread, // otherwise we could have a distributed deadlock between replicas // running this VerbHandler (see #4578). http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/DiskBoundaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java index 03cbf7b..72b5e2a 100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java @@ -80,14 +80,14 @@ public class DiskBoundaryManager && !StorageService.isReplacingSameAddress()) // When replacing same address, the node marks itself as UN locally { PendingRangeCalculatorService.instance.blockUntilFinished(); - localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress()); + localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddressAndPort()); } else { // Reason we use use the future settled TMD is that if we decommission a node, we want to stream // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places. // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled - localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddress()); + localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddressAndPort()); } logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index cebf6eb..ae778f1 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -352,6 +352,7 @@ public class Keyspace private void createReplicationStrategy(KeyspaceMetadata ksm) { + logger.info("Creating replication strategy " + ksm .name + " params " + ksm.params); replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name, ksm.params.replication.klass, StorageService.instance.getTokenMetadata(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index a6a920c..6195fe4 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -43,9 +43,6 @@ public class Mutation implements IMutation { public static final MutationSerializer serializer = new MutationSerializer(); - public static final String FORWARD_TO = "FWD_TO"; - public static final String FORWARD_FROM = "FWD_FRM"; - // todo this is redundant // when we remove it, also restore SerializationsTest.testMutationRead to not regenerate new Mutations each test private final String keyspaceName; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/MutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java index 59247a2..8386048 100644 --- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java @@ -17,18 +17,17 @@ */ package org.apache.cassandra.db; -import java.io.DataInputStream; import java.io.IOException; -import java.net.InetAddress; +import java.util.Iterator; import org.apache.cassandra.exceptions.WriteTimeoutException; -import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.*; import org.apache.cassandra.tracing.Tracing; public class MutationVerbHandler implements IVerbHandler<Mutation> { - private void reply(int id, InetAddress replyTo) + private void reply(int id, InetAddressAndPort replyTo) { Tracing.trace("Enqueuing response to {}", replyTo); MessagingService.instance().sendReply(WriteResponse.createMessage(), id, replyTo); @@ -42,18 +41,19 @@ public class MutationVerbHandler implements IVerbHandler<Mutation> public void doVerb(MessageIn<Mutation> message, int id) throws IOException { // Check if there were any forwarding headers in this message - byte[] from = message.parameters.get(Mutation.FORWARD_FROM); - InetAddress replyTo; + InetAddressAndPort from = (InetAddressAndPort)message.parameters.get(ParameterType.FORWARD_FROM); + InetAddressAndPort replyTo; if (from == null) { replyTo = message.from; - byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO); - if (forwardBytes != null) - forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from); + ForwardToContainer forwardTo = (ForwardToContainer)message.parameters.get(ParameterType.FORWARD_TO); + if (forwardTo != null) + forwardToLocalNodes(message.payload, message.verb, forwardTo, message.from); } else { - replyTo = InetAddress.getByAddress(from); + + replyTo = from; } try @@ -69,22 +69,17 @@ public class MutationVerbHandler implements IVerbHandler<Mutation> } } - private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException + private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, ForwardToContainer forwardTo, InetAddressAndPort from) throws IOException { - try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes))) + // tell the recipients who to send their ack to + MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(ParameterType.FORWARD_FROM, from); + Iterator<InetAddressAndPort> iterator = forwardTo.targets.iterator(); + // Send a message to each of the addresses on our Forward List + for (int i = 0; i < forwardTo.targets.size(); i++) { - int size = in.readInt(); - - // tell the recipients who to send their ack to - MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress()); - // Send a message to each of the addresses on our Forward List - for (int i = 0; i < size; i++) - { - InetAddress address = CompactEndpointSerializationHelper.deserialize(in); - int id = in.readInt(); - Tracing.trace("Enqueuing forwarded write to {}", address); - MessagingService.instance().sendOneWay(message, id, address); - } + InetAddressAndPort address = iterator.next(); + Tracing.trace("Enqueuing forwarded write to {}", address); + MessagingService.instance().sendOneWay(message, forwardTo.messageIds[i], address); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java index 066b2fe..151e7d3 100644 --- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java +++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java @@ -60,7 +60,7 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna public void run() { TokenMetadata metadata = StorageService.instance.getTokenMetadata().cloneOnlyTokenMap(); - if (!metadata.isMember(FBUtilities.getBroadcastAddress())) + if (!metadata.isMember(FBUtilities.getBroadcastAddressAndPort())) { logger.debug("Node is not part of the ring; not recording size estimates"); return; @@ -71,7 +71,7 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna for (Keyspace keyspace : Keyspace.nonLocalStrategy()) { Collection<Range<Token>> localRanges = StorageService.instance.getPrimaryRangesForEndpoint(keyspace.getName(), - FBUtilities.getBroadcastAddress()); + FBUtilities.getBroadcastAddressAndPort()); for (ColumnFamilyStore table : keyspace.getColumnFamilyStores()) { long start = System.nanoTime(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org