This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit fa85ff978bae303fe1b06dce64b758b635278a4d
Merge: 860de83 0d5ccb9
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Mon Nov 11 15:55:33 2019 +0100

    Merge branch 'cassandra-3.11' into trunk

 .../org/apache/cassandra/gms/GossiperEvent.java    |   4 +-
 .../apache/cassandra/service/CassandraDaemon.java  |  47 ++++--
 .../apache/cassandra/distributed/api/Feature.java  |   2 +-
 .../cassandra/distributed/api/IInstance.java       |   2 +
 .../apache/cassandra/distributed/api/IListen.java  |   2 +
 .../distributed/impl/AbstractCluster.java          | 158 +++++++++++++++++----
 .../cassandra/distributed/impl/Instance.java       |  25 +++-
 .../distributed/impl/InstanceClassLoader.java      |  16 ++-
 .../cassandra/distributed/impl/InstanceConfig.java |  34 +++--
 .../apache/cassandra/distributed/impl/Listen.java  |   9 ++
 .../apache/cassandra/distributed/impl/RowUtil.java |  17 +++
 .../distributed/test/DistributedTestBase.java      |   7 +
 .../distributed/test/NativeProtocolTest.java       |  59 ++++++++
 .../distributed/test/ResourceLeakTest.java         |  16 +++
 14 files changed, 347 insertions(+), 51 deletions(-)

diff --cc src/java/org/apache/cassandra/gms/GossiperEvent.java
index 2de88bc,0000000..ef7bd8d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/gms/GossiperEvent.java
+++ b/src/java/org/apache/cassandra/gms/GossiperEvent.java
@@@ -1,111 -1,0 +1,111 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.gms;
 +
 +import java.io.Serializable;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import javax.annotation.Nullable;
 +
 +import org.apache.cassandra.diag.DiagnosticEvent;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +
 +/**
 + * DiagnosticEvent implementation for {@link Gossiper} activities.
 + */
- final class GossiperEvent extends DiagnosticEvent
++public final class GossiperEvent extends DiagnosticEvent
 +{
 +    private final InetAddressAndPort endpoint;
 +    @Nullable
 +    private final Long quarantineExpiration;
 +    @Nullable
 +    private final EndpointState localState;
 +
 +    private final Map<InetAddressAndPort, EndpointState> endpointStateMap;
 +    private final boolean inShadowRound;
 +    private final Map<InetAddressAndPort, Long> justRemovedEndpoints;
 +    private final long lastProcessedMessageAt;
 +    private final Set<InetAddressAndPort> liveEndpoints;
 +    private final List<String> seeds;
 +    private final Set<InetAddressAndPort> seedsInShadowRound;
 +    private final Map<InetAddressAndPort, Long> unreachableEndpoints;
 +
 +
-     enum GossiperEventType
++    public enum GossiperEventType
 +    {
 +        MARKED_AS_SHUTDOWN,
 +        CONVICTED,
 +        REPLACEMENT_QUARANTINE,
 +        REPLACED_ENDPOINT,
 +        EVICTED_FROM_MEMBERSHIP,
 +        REMOVED_ENDPOINT,
 +        QUARANTINED_ENDPOINT,
 +        MARKED_ALIVE,
 +        REAL_MARKED_ALIVE,
 +        MARKED_DEAD,
 +        MAJOR_STATE_CHANGE_HANDLED,
 +        SEND_GOSSIP_DIGEST_SYN
 +    }
 +
 +    public GossiperEventType type;
 +
 +
 +    GossiperEvent(GossiperEventType type, Gossiper gossiper, 
InetAddressAndPort endpoint,
 +                  @Nullable Long quarantineExpiration, @Nullable 
EndpointState localState)
 +    {
 +        this.type = type;
 +        this.endpoint = endpoint;
 +        this.quarantineExpiration = quarantineExpiration;
 +        this.localState = localState;
 +
 +        this.endpointStateMap = gossiper.getEndpointStateMap();
 +        this.inShadowRound = gossiper.isInShadowRound();
 +        this.justRemovedEndpoints = gossiper.getJustRemovedEndpoints();
 +        this.lastProcessedMessageAt = gossiper.getLastProcessedMessageAt();
 +        this.liveEndpoints = gossiper.getLiveMembers();
 +        this.seeds = gossiper.getSeeds();
 +        this.seedsInShadowRound = gossiper.getSeedsInShadowRound();
 +        this.unreachableEndpoints = gossiper.getUnreachableEndpoints();
 +    }
 +
 +    public Enum<GossiperEventType> getType()
 +    {
 +        return type;
 +    }
 +
 +    public HashMap<String, Serializable> toMap()
 +    {
 +        // be extra defensive against nulls and bugs
 +        HashMap<String, Serializable> ret = new HashMap<>();
 +        if (endpoint != null) ret.put("endpoint", 
endpoint.getHostAddress(true));
 +        ret.put("quarantineExpiration", quarantineExpiration);
 +        ret.put("localState", String.valueOf(localState));
 +        ret.put("endpointStateMap", String.valueOf(endpointStateMap));
 +        ret.put("inShadowRound", inShadowRound);
 +        ret.put("justRemovedEndpoints", String.valueOf(justRemovedEndpoints));
 +        ret.put("lastProcessedMessageAt", lastProcessedMessageAt);
 +        ret.put("liveEndpoints", String.valueOf(liveEndpoints));
 +        ret.put("seeds", String.valueOf(seeds));
 +        ret.put("seedsInShadowRound", String.valueOf(seedsInShadowRound));
 +        ret.put("unreachableEndpoints", String.valueOf(unreachableEndpoints));
 +        return ret;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 9d05371,16a6145..d705bd7
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -252,19 -264,8 +258,18 @@@ public class CassandraDaemo
          // Populate token metadata before flushing, for token-aware sstable 
partitioning (#6696)
          StorageService.instance.populateTokenMetadata();
  
 -        // load schema from disk
 -        Schema.instance.loadFromDisk();
 +        try
 +        {
 +            // load schema from disk
 +            Schema.instance.loadFromDisk();
 +        }
 +        catch (Exception e)
 +        {
 +            logger.error("Error while loading schema: ", e);
 +            throw e;
 +        }
 +
-         
VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
-         
VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
++        setupVirtualKeyspaces();
  
          // clean up debris in the rest of the keyspaces
          for (String keyspaceName : Schema.instance.getKeyspaces())
@@@ -430,20 -434,22 +435,31 @@@
          // due to scheduling errors or race conditions
          
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(),
 5, 1, TimeUnit.MINUTES);
  
 -        // Thrift
 -        InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
 -        int rpcPort = DatabaseDescriptor.getRpcPort();
 -        int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
 -        thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
 +        // schedule periodic recomputation of speculative retry thresholds
 +        ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(
 +            () -> Keyspace.all().forEach(k -> 
k.getColumnFamilyStores().forEach(ColumnFamilyStore::updateSpeculationThreshold)),
 +            DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS),
 +            DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS),
 +            NANOSECONDS
 +        );
 +
-         // Native transport
-         nativeTransportService = new NativeTransportService();
+         initializeNativeTransport();
  
          completeSetup();
      }
  
++    public void setupVirtualKeyspaces()
++    {
++        
VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
++        
VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
++    }
++
+     public void initializeNativeTransport()
+     {
+         // Native transport
+         nativeTransportService = new NativeTransportService();
+     }
+ 
      /*
       * Asynchronously load the row and key cache in one off threads and 
return a compound future of the result.
       * Error handling is pushed into the cache load since cache loads are 
allowed to fail and are handled by logging.
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index d866fae,6d24ad9..28a6a74
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@@ -129,9 -131,9 +131,9 @@@ public abstract class AbstractCluster<
  
          private IInvokableInstance newInstance(int generation)
          {
-             ClassLoader classLoader = new InstanceClassLoader(generation, 
version.classpath, sharedClassLoader);
+             ClassLoader classLoader = new InstanceClassLoader(generation, 
config.num, version.classpath, sharedClassLoader);
              return 
Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, 
Instance>)Instance::new, classLoader)
 -                           .apply(config, classLoader);
 +                                        
.apply(config.forVersion(version.major), classLoader);
          }
  
          public IInstanceConfig config()
@@@ -256,12 -273,11 +273,11 @@@
          FBUtilities.waitOnFutures(instances.stream()
                                             .map(i -> 
i.async(consumer).apply(i))
                                             .collect(Collectors.toList()),
-                                   timeout, units);
+                                   timeout, unit);
      }
  
- 
      public IMessageFilters filters() { return filters; }
 -    public MessageFilters.Builder verbs(MessagingService.Verb ... verbs) { 
return filters.verbs(verbs); }
 +    public MessageFilters.Builder verbs(Verb... verbs) { return 
filters.verbs(verbs); }
  
      public void disableAutoCompaction(String keyspace)
      {
@@@ -358,9 -438,27 +438,27 @@@
          get(instance).schemaChangeInternal(statement);
      }
  
 -    public void startup()
 +    void startup()
      {
-         parallelForEach(I::startup, 0, null);
+         try (AllMembersAliveMonitor monitor = new AllMembersAliveMonitor())
+         {
+             // Start any instances with auto_bootstrap enabled first, and in 
series to avoid issues
+             // with multiple nodes bootstrapping with consistent range 
movement enabled,
+             // and then start any instances with it disabled in parallel.
+             List<I> startSequentially = new ArrayList<>();
+             List<I> startParallel = new ArrayList<>();
+             for (I instance : instances)
+             {
+                 if ((boolean) instance.config().get("auto_bootstrap"))
+                     startSequentially.add(instance);
+                 else
+                     startParallel.add(instance);
+             }
+ 
+             forEach(startSequentially, I::startup);
+             parallelForEach(startParallel, I::startup, 0, null);
+             monitor.waitForCompletion();
+         }
      }
  
      protected interface Factory<I extends IInstance, C extends 
AbstractCluster<I>>
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index faf59d5,6c3a70d..cdc3cc8
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -68,11 -72,12 +68,12 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.locator.InetAddressAndPort;
 -import org.apache.cassandra.net.IMessageSink;
 -import org.apache.cassandra.net.MessageIn;
 -import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.schema.LegacySchemaMigrator;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaConstants;
 +import org.apache.cassandra.service.ActiveRepairService;
+ import org.apache.cassandra.service.CassandraDaemon;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.PendingRangeCalculatorService;
  import org.apache.cassandra.service.QueryState;
@@@ -367,7 -432,15 +370,16 @@@ public class Instance extends IsolatedE
  
                  SystemKeyspace.finishStartup();
  
+                 if (config.has(NATIVE_PROTOCOL))
+                 {
++                    // Start up virtual table support
++                    
CassandraDaemon.getInstanceForTesting().setupVirtualKeyspaces();
++
+                     
CassandraDaemon.getInstanceForTesting().initializeNativeTransport();
+                     
CassandraDaemon.getInstanceForTesting().startNativeTransport();
+                 }
+ 
 -                if 
(!FBUtilities.getBroadcastAddress().equals(broadcastAddressAndPort().address))
 -                    throw new IllegalStateException();
 -                if (DatabaseDescriptor.getStoragePort() != 
broadcastAddressAndPort().port)
 +                if 
(!FBUtilities.getBroadcastAddressAndPort().equals(broadcastAddressAndPort()))
                      throw new IllegalStateException();
              }
              catch (Throwable t)
@@@ -507,11 -578,15 +521,20 @@@
                                  .thenRun(super::shutdown);
      }
  
+     public int liveMemberCount()
+     {
+         return sync(() -> {
+             if (!DatabaseDescriptor.isDaemonInitialized() || 
!Gossiper.instance.isEnabled())
+                 return 0;
+             return Gossiper.instance.getLiveMembers().size();
+         }).call();
+     }
+ 
 +    private static void shutdownAndWait(List<ExecutorService> executors) 
throws TimeoutException, InterruptedException
 +    {
 +        ExecutorUtils.shutdownNow(executors);
 +        ExecutorUtils.awaitTermination(1L, MINUTES, executors);
 +    }
  
      private static Throwable parallelRun(Throwable accumulate, 
ExecutorService runOn, ThrowingRunnable ... runnables)
      {
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 3b54578,d36487e..fcedf21
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@@ -108,9 -114,12 +114,14 @@@ public class InstanceConfig implements 
                  .set("storage_port", 7012)
                  .set("endpoint_snitch", DistributedTestSnitch.class.getName())
                  .set("seed_provider", new 
ParameterizedClass(SimpleSeedProvider.class.getName(),
 -                        Collections.singletonMap("seeds", "127.0.0.1")))
 +                        Collections.singletonMap("seeds", "127.0.0.1:7012")))
 +                // required settings for dtest functionality
 +                .set("diagnostic_events_enabled", true)
+                 .set("auto_bootstrap", false)
+                 // capacities that are based on `totalMemory` that should be 
fixed size
+                 .set("index_summary_capacity_in_mb", 50l)
+                 .set("counter_cache_size_in_mb", 50l)
+                 .set("key_cache_size_in_mb", 50l)
                  // legacy parameters
                  .forceSet("commitlog_sync_batch_window_in_ms", 1.0);
          this.featureFlags = EnumSet.noneOf(Feature.class);
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Listen.java
index ec34518,27ae156..e37c4f7
--- a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
@@@ -18,11 -18,13 +18,12 @@@
  
  package org.apache.cassandra.distributed.impl;
  
 -import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.atomic.AtomicBoolean;
 -import java.util.concurrent.locks.LockSupport;
 -import java.util.function.Supplier;
 +import java.util.function.Consumer;
  
 +import org.apache.cassandra.diag.DiagnosticEventService;
  import org.apache.cassandra.distributed.api.IListen;
 -import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.gms.GossiperEvent;
 +import org.apache.cassandra.schema.SchemaEvent;
  
  public class Listen implements IListen
  {
@@@ -34,8 -37,31 +36,15 @@@
  
      public Cancel schema(Runnable onChange)
      {
 -        return start(onChange, instance::schemaVersion);
 +        Consumer<SchemaEvent> consumer = event -> onChange.run();
 +        DiagnosticEventService.instance().subscribe(SchemaEvent.class, 
SchemaEvent.SchemaEventType.VERSION_UPDATED, consumer);
 +        return () -> 
DiagnosticEventService.instance().unsubscribe(SchemaEvent.class, consumer);
      }
+ 
+     public Cancel liveMembers(Runnable onChange)
+     {
 -        return start(onChange, instance::liveMemberCount);
 -    }
 -
 -    protected <T> Cancel start(Runnable onChange, Supplier<T> valueSupplier) {
 -        AtomicBoolean cancel = new AtomicBoolean(false);
 -        instance.isolatedExecutor.execute(() -> {
 -            T prev = valueSupplier.get();
 -            while (true)
 -            {
 -                if (cancel.get())
 -                    return;
 -
 -                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 -
 -                T cur = valueSupplier.get();
 -                if (!prev.equals(cur))
 -                    onChange.run();
 -                prev = cur;
 -            }
 -        });
 -        return () -> cancel.set(true);
++        Consumer<GossiperEvent> consumer = event -> onChange.run();
++        DiagnosticEventService.instance().subscribe(GossiperEvent.class, 
GossiperEvent.GossiperEventType.REAL_MARKED_ALIVE, consumer);
++        return () -> 
DiagnosticEventService.instance().unsubscribe(GossiperEvent.class, consumer);
+     }
  }
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 35d3691,03e4047..d048079
--- 
a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@@ -78,9 -80,14 +80,14 @@@ public class DistributedTestBas
          return cluster;
      }
  
+     public static void assertRows(ResultSet actual,Object[]... expected)
+     {
+         assertRows(RowUtil.toObjects(actual), expected);
+     }
+ 
      public static void assertRows(Object[][] actual, Object[]... expected)
      {
 -        Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected),
 +        Assert.assertEquals(rowsNotEqualErrorMessage(expected, actual),
                              expected.length, actual.length);
  
          for (int i = 0; i < expected.length; i++)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to