http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/gms/GossiperEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossiperEvent.java 
b/src/java/org/apache/cassandra/gms/GossiperEvent.java
new file mode 100644
index 0000000..2de88bc
--- /dev/null
+++ b/src/java/org/apache/cassandra/gms/GossiperEvent.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.diag.DiagnosticEvent;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+/**
+ * DiagnosticEvent implementation for {@link Gossiper} activities.
+ */
+final class GossiperEvent extends DiagnosticEvent
+{
+    private final InetAddressAndPort endpoint;
+    @Nullable
+    private final Long quarantineExpiration;
+    @Nullable
+    private final EndpointState localState;
+
+    private final Map<InetAddressAndPort, EndpointState> endpointStateMap;
+    private final boolean inShadowRound;
+    private final Map<InetAddressAndPort, Long> justRemovedEndpoints;
+    private final long lastProcessedMessageAt;
+    private final Set<InetAddressAndPort> liveEndpoints;
+    private final List<String> seeds;
+    private final Set<InetAddressAndPort> seedsInShadowRound;
+    private final Map<InetAddressAndPort, Long> unreachableEndpoints;
+
+
+    enum GossiperEventType
+    {
+        MARKED_AS_SHUTDOWN,
+        CONVICTED,
+        REPLACEMENT_QUARANTINE,
+        REPLACED_ENDPOINT,
+        EVICTED_FROM_MEMBERSHIP,
+        REMOVED_ENDPOINT,
+        QUARANTINED_ENDPOINT,
+        MARKED_ALIVE,
+        REAL_MARKED_ALIVE,
+        MARKED_DEAD,
+        MAJOR_STATE_CHANGE_HANDLED,
+        SEND_GOSSIP_DIGEST_SYN
+    }
+
+    public GossiperEventType type;
+
+
+    GossiperEvent(GossiperEventType type, Gossiper gossiper, 
InetAddressAndPort endpoint,
+                  @Nullable Long quarantineExpiration, @Nullable EndpointState 
localState)
+    {
+        this.type = type;
+        this.endpoint = endpoint;
+        this.quarantineExpiration = quarantineExpiration;
+        this.localState = localState;
+
+        this.endpointStateMap = gossiper.getEndpointStateMap();
+        this.inShadowRound = gossiper.isInShadowRound();
+        this.justRemovedEndpoints = gossiper.getJustRemovedEndpoints();
+        this.lastProcessedMessageAt = gossiper.getLastProcessedMessageAt();
+        this.liveEndpoints = gossiper.getLiveMembers();
+        this.seeds = gossiper.getSeeds();
+        this.seedsInShadowRound = gossiper.getSeedsInShadowRound();
+        this.unreachableEndpoints = gossiper.getUnreachableEndpoints();
+    }
+
+    public Enum<GossiperEventType> getType()
+    {
+        return type;
+    }
+
+    public HashMap<String, Serializable> toMap()
+    {
+        // be extra defensive against nulls and bugs
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (endpoint != null) ret.put("endpoint", 
endpoint.getHostAddress(true));
+        ret.put("quarantineExpiration", quarantineExpiration);
+        ret.put("localState", String.valueOf(localState));
+        ret.put("endpointStateMap", String.valueOf(endpointStateMap));
+        ret.put("inShadowRound", inShadowRound);
+        ret.put("justRemovedEndpoints", String.valueOf(justRemovedEndpoints));
+        ret.put("lastProcessedMessageAt", lastProcessedMessageAt);
+        ret.put("liveEndpoints", String.valueOf(liveEndpoints));
+        ret.put("seeds", String.valueOf(seeds));
+        ret.put("seedsInShadowRound", String.valueOf(seedsInShadowRound));
+        ret.put("unreachableEndpoints", String.valueOf(unreachableEndpoints));
+        return ret;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/Hint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/Hint.java 
b/src/java/org/apache/cassandra/hints/Hint.java
index b0abd50..7e4618c 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -132,7 +132,7 @@ public final class Hint
     /**
      * @return calculates whether or not it is safe to apply the hint without 
risking to resurrect any deleted data
      */
-    boolean isLive()
+    public boolean isLive()
     {
         return isLive(creationTime, System.currentTimeMillis(), ttl());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintDiagnostics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintDiagnostics.java 
b/src/java/org/apache/cassandra/hints/HintDiagnostics.java
new file mode 100644
index 0000000..3ff0834
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintDiagnostics.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.hints.HintEvent.HintEventType;
+import org.apache.cassandra.hints.HintEvent.HintResult;
+
+/**
+ * Utility methods for DiagnosticEvents around hinted handoff.
+ */
+final class HintDiagnostics
+{
+    private static final DiagnosticEventService service = 
DiagnosticEventService.instance();
+
+    private HintDiagnostics()
+    {
+    }
+
+    static void dispatcherCreated(HintsDispatcher dispatcher)
+    {
+        if (isEnabled(HintEventType.DISPATCHER_CREATED))
+            service.publish(new HintEvent(HintEventType.DISPATCHER_CREATED, 
dispatcher,
+                                          dispatcher.hostId, 
dispatcher.address, null, null, null, null));
+    }
+
+    static void dispatcherClosed(HintsDispatcher dispatcher)
+    {
+        if (isEnabled(HintEventType.DISPATCHER_CLOSED))
+            service.publish(new HintEvent(HintEventType.DISPATCHER_CLOSED, 
dispatcher,
+                                          dispatcher.hostId, 
dispatcher.address, null, null, null, null));
+    }
+
+    static void dispatchPage(HintsDispatcher dispatcher)
+    {
+        if (isEnabled(HintEventType.DISPATCHER_PAGE))
+            service.publish(new HintEvent(HintEventType.DISPATCHER_PAGE, 
dispatcher,
+                                          dispatcher.hostId, 
dispatcher.address, null, null, null, null));
+    }
+
+    static void abortRequested(HintsDispatcher dispatcher)
+    {
+        if (isEnabled(HintEventType.ABORT_REQUESTED))
+            service.publish(new HintEvent(HintEventType.ABORT_REQUESTED, 
dispatcher,
+                                          dispatcher.hostId, 
dispatcher.address, null, null, null, null));
+    }
+
+    static void pageSuccessResult(HintsDispatcher dispatcher, long success, 
long failures, long timeouts)
+    {
+        if (isEnabled(HintEventType.DISPATCHER_HINT_RESULT))
+            service.publish(new 
HintEvent(HintEventType.DISPATCHER_HINT_RESULT, dispatcher,
+                                          dispatcher.hostId, 
dispatcher.address, HintResult.PAGE_SUCCESS,
+                                          success, failures, timeouts));
+    }
+
+    static void pageFailureResult(HintsDispatcher dispatcher, long success, 
long failures, long timeouts)
+    {
+        if (isEnabled(HintEventType.DISPATCHER_HINT_RESULT))
+            service.publish(new 
HintEvent(HintEventType.DISPATCHER_HINT_RESULT, dispatcher,
+                                          dispatcher.hostId, 
dispatcher.address, HintResult.PAGE_FAILURE,
+                                          success, failures, timeouts));
+    }
+
+    private static boolean isEnabled(HintEventType type)
+    {
+        return service.isEnabled(HintEvent.class, type);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintEvent.java 
b/src/java/org/apache/cassandra/hints/HintEvent.java
new file mode 100644
index 0000000..011f248
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintEvent.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.diag.DiagnosticEvent;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+/**
+ * DiagnosticEvent implementation for hinted handoff.
+ */
+final class HintEvent extends DiagnosticEvent
+{
+    enum HintEventType
+    {
+        DISPATCHING_STARTED,
+        DISPATCHING_PAUSED,
+        DISPATCHING_RESUMED,
+        DISPATCHING_SHUTDOWN,
+
+        DISPATCHER_CREATED,
+        DISPATCHER_CLOSED,
+
+        DISPATCHER_PAGE,
+        DISPATCHER_HINT_RESULT,
+
+        ABORT_REQUESTED
+    }
+
+    enum HintResult
+    {
+        PAGE_SUCCESS, PAGE_FAILURE
+    }
+
+    private final HintEventType type;
+    private final HintsDispatcher dispatcher;
+    private final UUID targetHostId;
+    private final InetAddressAndPort targetAddress;
+    @Nullable
+    private final HintResult dispatchResult;
+    @Nullable
+    private final Long pageHintsSuccessful;
+    @Nullable
+    private final Long pageHintsFailed;
+    @Nullable
+    private final Long pageHintsTimeout;
+
+    HintEvent(HintEventType type, HintsDispatcher dispatcher, UUID 
targetHostId, InetAddressAndPort targetAddress,
+              @Nullable HintResult dispatchResult, @Nullable Long 
pageHintsSuccessful,
+              @Nullable Long pageHintsFailed, @Nullable Long pageHintsTimeout)
+    {
+        this.type = type;
+        this.dispatcher = dispatcher;
+        this.targetHostId = targetHostId;
+        this.targetAddress = targetAddress;
+        this.dispatchResult = dispatchResult;
+        this.pageHintsSuccessful = pageHintsSuccessful;
+        this.pageHintsFailed = pageHintsFailed;
+        this.pageHintsTimeout = pageHintsTimeout;
+    }
+
+    public Enum<HintEventType> getType()
+    {
+        return type;
+    }
+
+    public HashMap<String, Serializable> toMap()
+    {
+        // be extra defensive against nulls and bugs
+        HashMap<String, Serializable> ret = new HashMap<>();
+        ret.put("targetHostId", targetHostId);
+        ret.put("targetAddress", targetAddress.getHostAddress(true));
+        if (dispatchResult != null) ret.put("dispatchResult", 
dispatchResult.name());
+        if (pageHintsSuccessful != null || pageHintsFailed != null || 
pageHintsTimeout != null)
+        {
+            ret.put("hint.page.hints_succeeded", pageHintsSuccessful);
+            ret.put("hint.page.hints_failed", pageHintsFailed);
+            ret.put("hint.page.hints_timed_out", pageHintsTimeout);
+        }
+        return ret;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java 
b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 92dbc29..2d9fd9d 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -306,4 +306,14 @@ final class HintsDispatchExecutor
             }
         }
     }
+
+    public boolean isPaused()
+    {
+        return isPaused.get();
+    }
+
+    public boolean hasScheduledDispatches()
+    {
+        return !scheduledDispatches.isEmpty();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java 
b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index d0d9aac..2cff186 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -50,8 +50,8 @@ final class HintsDispatcher implements AutoCloseable
     private enum Action { CONTINUE, ABORT }
 
     private final HintsReader reader;
-    private final UUID hostId;
-    private final InetAddressAndPort address;
+    final UUID hostId;
+    final InetAddressAndPort address;
     private final int messagingVersion;
     private final BooleanSupplier abortRequested;
 
@@ -71,11 +71,14 @@ final class HintsDispatcher implements AutoCloseable
     static HintsDispatcher create(File file, RateLimiter rateLimiter, 
InetAddressAndPort address, UUID hostId, BooleanSupplier abortRequested)
     {
         int messagingVersion = MessagingService.instance().getVersion(address);
-        return new HintsDispatcher(HintsReader.open(file, rateLimiter), 
hostId, address, messagingVersion, abortRequested);
+        HintsDispatcher dispatcher = new 
HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, 
messagingVersion, abortRequested);
+        HintDiagnostics.dispatcherCreated(dispatcher);
+        return dispatcher;
     }
 
     public void close()
     {
+        HintDiagnostics.dispatcherClosed(this);
         reader.close();
     }
 
@@ -111,6 +114,7 @@ final class HintsDispatcher implements AutoCloseable
     // retry in case of a timeout; stop in case of a failure, host going down, 
or delivery paused
     private Action dispatch(HintsReader.Page page)
     {
+        HintDiagnostics.dispatchPage(this);
         return sendHintsAndAwait(page);
     }
 
@@ -132,35 +136,36 @@ final class HintsDispatcher implements AutoCloseable
         if (action == Action.ABORT)
             return action;
 
-        boolean hadFailures = false;
+        long success = 0, failures = 0, timeouts = 0;
         for (Callback cb : callbacks)
         {
             Callback.Outcome outcome = cb.await();
-            updateMetrics(outcome);
-
-            if (outcome != Callback.Outcome.SUCCESS)
-                hadFailures = true;
+            if (outcome == Callback.Outcome.SUCCESS) success++;
+            else if (outcome == Callback.Outcome.FAILURE) failures++;
+            else if (outcome == Callback.Outcome.TIMEOUT) timeouts++;
         }
 
-        return hadFailures ? Action.ABORT : Action.CONTINUE;
-    }
+        updateMetrics(success, failures, timeouts);
 
-    private void updateMetrics(Callback.Outcome outcome)
-    {
-        switch (outcome)
+        if (failures > 0 || timeouts > 0)
+        {
+            HintDiagnostics.pageFailureResult(this, success, failures, 
timeouts);
+            return Action.ABORT;
+        }
+        else
         {
-            case SUCCESS:
-                HintsServiceMetrics.hintsSucceeded.mark();
-                break;
-            case FAILURE:
-                HintsServiceMetrics.hintsFailed.mark();
-                break;
-            case TIMEOUT:
-                HintsServiceMetrics.hintsTimedOut.mark();
-                break;
+            HintDiagnostics.pageSuccessResult(this, success, failures, 
timeouts);
+            return Action.CONTINUE;
         }
     }
 
+    private void updateMetrics(long success, long failures, long timeouts)
+    {
+        HintsServiceMetrics.hintsSucceeded.mark(success);
+        HintsServiceMetrics.hintsFailed.mark(failures);
+        HintsServiceMetrics.hintsTimedOut.mark(timeouts);
+    }
+
     /*
      * Sending hints in compatibility mode.
      */
@@ -170,7 +175,10 @@ final class HintsDispatcher implements AutoCloseable
         while (hints.hasNext())
         {
             if (abortRequested.getAsBoolean())
+            {
+                HintDiagnostics.abortRequested(this);
                 return Action.ABORT;
+            }
             callbacks.add(sendFunction.apply(hints.next()));
         }
         return Action.CONTINUE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java 
b/src/java/org/apache/cassandra/hints/HintsService.java
index a46eb52..0cd1278 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -73,8 +73,8 @@ public final class HintsService implements HintsServiceMBean
     private final HintsCatalog catalog;
     private final HintsWriteExecutor writeExecutor;
     private final HintsBufferPool bufferPool;
-    private final HintsDispatchExecutor dispatchExecutor;
-    private final AtomicBoolean isDispatchPaused;
+    final HintsDispatchExecutor dispatchExecutor;
+    final AtomicBoolean isDispatchPaused;
 
     private volatile boolean isShutDown = false;
 
@@ -209,6 +209,8 @@ public final class HintsService implements HintsServiceMBean
 
         isDispatchPaused.set(false);
 
+        HintsServiceDiagnostics.dispatchingStarted(this);
+
         HintsDispatchTrigger trigger = new HintsDispatchTrigger(catalog, 
writeExecutor, dispatchExecutor, isDispatchPaused);
         // triggering hint dispatch is now very cheap, so we can do it more 
often - every 10 seconds vs. every 10 minutes,
         // previously; this reduces mean time to delivery, and positively 
affects batchlog delivery latencies, too
@@ -219,12 +221,16 @@ public final class HintsService implements 
HintsServiceMBean
     {
         logger.info("Paused hints dispatch");
         isDispatchPaused.set(true);
+
+        HintsServiceDiagnostics.dispatchingPaused(this);
     }
 
     public void resumeDispatch()
     {
         logger.info("Resumed hints dispatch");
         isDispatchPaused.set(false);
+
+        HintsServiceDiagnostics.dispatchingResumed(this);
     }
 
     /**
@@ -250,6 +256,8 @@ public final class HintsService implements HintsServiceMBean
 
         dispatchExecutor.shutdownBlocking();
         writeExecutor.shutdownBlocking();
+
+        HintsServiceDiagnostics.dispatchingShutdown(this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintsServiceDiagnostics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsServiceDiagnostics.java 
b/src/java/org/apache/cassandra/hints/HintsServiceDiagnostics.java
new file mode 100644
index 0000000..f4cf149
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsServiceDiagnostics.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.hints.HintsServiceEvent.HintsServiceEventType;
+
+/**
+ * Utility methods for DiagnosticEvents around the HintService.
+ */
+final class HintsServiceDiagnostics
+{
+    private static final DiagnosticEventService service = 
DiagnosticEventService.instance();
+
+    private HintsServiceDiagnostics()
+    {
+    }
+    
+    static void dispatchingStarted(HintsService hintsService)
+    {
+        if (isEnabled(HintsServiceEventType.DISPATCHING_STARTED))
+            service.publish(new 
HintsServiceEvent(HintsServiceEventType.DISPATCHING_STARTED, hintsService));
+    }
+
+    static void dispatchingShutdown(HintsService hintsService)
+    {
+        if (isEnabled(HintsServiceEventType.DISPATCHING_SHUTDOWN))
+            service.publish(new 
HintsServiceEvent(HintsServiceEventType.DISPATCHING_SHUTDOWN, hintsService));
+    }
+
+    static void dispatchingPaused(HintsService hintsService)
+    {
+        if (isEnabled(HintsServiceEventType.DISPATCHING_PAUSED))
+            service.publish(new 
HintsServiceEvent(HintsServiceEventType.DISPATCHING_PAUSED, hintsService));
+    }
+
+    static void dispatchingResumed(HintsService hintsService)
+    {
+        if (isEnabled(HintsServiceEventType.DISPATCHING_RESUMED))
+            service.publish(new 
HintsServiceEvent(HintsServiceEventType.DISPATCHING_RESUMED, hintsService));
+    }
+
+    private static boolean isEnabled(HintsServiceEventType type)
+    {
+        return service.isEnabled(HintsServiceEvent.class, type);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintsServiceEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsServiceEvent.java 
b/src/java/org/apache/cassandra/hints/HintsServiceEvent.java
new file mode 100644
index 0000000..72497a0
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsServiceEvent.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+import org.apache.cassandra.diag.DiagnosticEvent;
+
+/**
+ * DiagnosticEvent implementation for HintService.
+ */
+final class HintsServiceEvent extends DiagnosticEvent
+{
+    enum HintsServiceEventType
+    {
+        DISPATCHING_STARTED,
+        DISPATCHING_PAUSED,
+        DISPATCHING_RESUMED,
+        DISPATCHING_SHUTDOWN
+    }
+
+    private final HintsServiceEventType type;
+    private final HintsService service;
+    private final boolean isDispatchPaused;
+    private final boolean isShutdown;
+    private final boolean dispatchExecutorIsPaused;
+    private final boolean dispatchExecutorHasScheduledDispatches;
+
+    HintsServiceEvent(HintsServiceEventType type, HintsService service)
+    {
+        this.type = type;
+        this.service = service;
+        this.isDispatchPaused = service.isDispatchPaused.get();
+        this.isShutdown = service.isShutDown();
+        this.dispatchExecutorIsPaused = service.dispatchExecutor.isPaused();
+        this.dispatchExecutorHasScheduledDispatches = 
service.dispatchExecutor.hasScheduledDispatches();
+    }
+
+    public Enum<HintsServiceEventType> getType()
+    {
+        return type;
+    }
+
+    public HashMap<String, Serializable> toMap()
+    {
+        // be extra defensive against nulls and bugs
+        HashMap<String, Serializable> ret = new HashMap<>();
+        ret.put("isDispatchPaused", isDispatchPaused);
+        ret.put("isShutdown", isShutdown);
+        ret.put("dispatchExecutorIsPaused", dispatchExecutorIsPaused);
+        ret.put("dispatchExecutorHasScheduledDispatches", 
dispatchExecutorHasScheduledDispatches);
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java 
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index e2c4628..46c191f 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -800,6 +800,8 @@ public class TokenMetadata
         // avoid race between both branches - do not use a lock here as this 
will block any other unrelated operations!
         synchronized (pendingRanges)
         {
+            TokenMetadataDiagnostics.pendingRangeCalculationStarted(this, 
keyspaceName);
+
             if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && 
movingEndpoints.isEmpty())
             {
                 if (logger.isTraceEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/locator/TokenMetadataDiagnostics.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/TokenMetadataDiagnostics.java 
b/src/java/org/apache/cassandra/locator/TokenMetadataDiagnostics.java
new file mode 100644
index 0000000..0221f1e
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/TokenMetadataDiagnostics.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.locator.TokenMetadataEvent.TokenMetadataEventType;
+
+/**
+ * Utility methods for events related to {@link TokenMetadata} changes.
+ */
+final class TokenMetadataDiagnostics
+{
+    private static final DiagnosticEventService service = 
DiagnosticEventService.instance();
+
+    private TokenMetadataDiagnostics()
+    {
+    }
+
+    static void pendingRangeCalculationStarted(TokenMetadata tokenMetadata, 
String keyspace)
+    {
+        if 
(isEnabled(TokenMetadataEventType.PENDING_RANGE_CALCULATION_STARTED))
+            service.publish(new 
TokenMetadataEvent(TokenMetadataEventType.PENDING_RANGE_CALCULATION_STARTED, 
tokenMetadata, keyspace));
+    }
+
+    private static boolean isEnabled(TokenMetadataEventType type)
+    {
+        return service.isEnabled(TokenMetadataEvent.class, type);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/locator/TokenMetadataEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadataEvent.java 
b/src/java/org/apache/cassandra/locator/TokenMetadataEvent.java
new file mode 100644
index 0000000..c3ed074
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/TokenMetadataEvent.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+import org.apache.cassandra.diag.DiagnosticEvent;
+
+/**
+ * Events related to {@link TokenMetadata} changes.
+ */
+public final class TokenMetadataEvent extends DiagnosticEvent
+{
+
+    public enum TokenMetadataEventType
+    {
+        PENDING_RANGE_CALCULATION_STARTED,
+        PENDING_RANGE_CALCULATION_COMPLETED,
+    }
+
+    private final TokenMetadataEventType type;
+    private final TokenMetadata tokenMetadata;
+    private final String keyspace;
+
+    TokenMetadataEvent(TokenMetadataEventType type, TokenMetadata 
tokenMetadata, String keyspace)
+    {
+        this.type = type;
+        this.tokenMetadata = tokenMetadata;
+        this.keyspace = keyspace;
+    }
+
+    public TokenMetadataEventType getType()
+    {
+        return type;
+    }
+
+    public HashMap<String, Serializable> toMap()
+    {
+        // be extra defensive against nulls and bugs
+        HashMap<String, Serializable> ret = new HashMap<>();
+        ret.put("keyspace", keyspace);
+        ret.put("tokenMetadata", tokenMetadata.toString());
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/Diff.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Diff.java 
b/src/java/org/apache/cassandra/schema/Diff.java
index 36c0687..7112c85 100644
--- a/src/java/org/apache/cassandra/schema/Diff.java
+++ b/src/java/org/apache/cassandra/schema/Diff.java
@@ -55,5 +55,10 @@ public class Diff<T extends Iterable, S>
             this.after = after;
             this.kind = kind;
         }
+
+        public String toString()
+        {
+            return String.format("%s -> %s (%s)", before, after, kind);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java 
b/src/java/org/apache/cassandra/schema/MigrationManager.java
index ac95054..a439e2e 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -72,8 +72,9 @@ public class MigrationManager
     {
         if (Schema.instance.getVersion() == null)
         {
-            logger.debug("Not pulling schema from {}, because local schama 
version is not known yet",
+            logger.debug("Not pulling schema from {}, because local schema 
version is not known yet",
                          endpoint);
+            SchemaMigrationDiagnostics.unknownLocalSchemaVersion(endpoint, 
theirVersion);
             return;
         }
         if (Schema.instance.isSameVersion(theirVersion))
@@ -81,12 +82,14 @@ public class MigrationManager
             logger.debug("Not pulling schema from {}, because schema versions 
match ({})",
                          endpoint,
                          Schema.schemaVersionToString(theirVersion));
+            SchemaMigrationDiagnostics.versionMatch(endpoint, theirVersion);
             return;
         }
         if (!shouldPullSchemaFrom(endpoint))
         {
             logger.debug("Not pulling schema from {}, because versions match 
({}/{}), or shouldPullSchemaFrom returned false",
                          endpoint, Schema.instance.getVersion(), theirVersion);
+            SchemaMigrationDiagnostics.skipPull(endpoint, theirVersion);
             return;
         }
 
@@ -319,10 +322,22 @@ public class MigrationManager
     {
         Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(() -> 
Schema.instance.mergeAndAnnounceVersion(schema));
 
+        Set<InetAddressAndPort> schemaDestinationEndpoints = new HashSet<>();
+        Set<InetAddressAndPort> schemaEndpointsIgnored = new HashSet<>();
         for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers())
+        {
             if (shouldPushSchemaTo(endpoint))
+            {
                 pushSchemaMutation(endpoint, schema);
+                schemaDestinationEndpoints.add(endpoint);
+            }
+            else
+            {
+                schemaEndpointsIgnored.add(endpoint);
+            }
+        }
 
+        
SchemaAnnouncementDiagnostics.schemaMutationsAnnounced(schemaDestinationEndpoints,
 schemaEndpointsIgnored);
         FBUtilities.waitOnFuture(f);
     }
 
@@ -340,9 +355,23 @@ public class MigrationManager
         if (locally || result.diff.isEmpty())
             return result.diff;
 
+        Set<InetAddressAndPort> schemaDestinationEndpoints = new HashSet<>();
+        Set<InetAddressAndPort> schemaEndpointsIgnored = new HashSet<>();
         for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers())
+        {
             if (shouldPushSchemaTo(endpoint))
+            {
                 pushSchemaMutation(endpoint, result.mutations);
+                schemaDestinationEndpoints.add(endpoint);
+            }
+            else
+            {
+                schemaEndpointsIgnored.add(endpoint);
+            }
+        }
+
+        
SchemaAnnouncementDiagnostics.schemaTransformationAnnounced(schemaDestinationEndpoints,
 schemaEndpointsIgnored,
+                                                                    
transformation);
 
         return result.diff;
     }
@@ -357,6 +386,8 @@ public class MigrationManager
 
         logger.debug("Truncating schema tables...");
 
+        SchemaMigrationDiagnostics.resetLocalSchema();
+
         SchemaKeyspace.truncate();
 
         logger.debug("Clearing local schema keyspace definitions...");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationTask.java 
b/src/java/org/apache/cassandra/schema/MigrationTask.java
index 6ff206a..bf96fb2 100644
--- a/src/java/org/apache/cassandra/schema/MigrationTask.java
+++ b/src/java/org/apache/cassandra/schema/MigrationTask.java
@@ -51,6 +51,7 @@ final class MigrationTask extends WrappedRunnable
     MigrationTask(InetAddressAndPort endpoint)
     {
         this.endpoint = endpoint;
+        SchemaMigrationDiagnostics.taskCreated(endpoint);
     }
 
     static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
@@ -63,6 +64,7 @@ final class MigrationTask extends WrappedRunnable
         if (!FailureDetector.instance.isAlive(endpoint))
         {
             logger.warn("Can't send schema pull request: node {} is down.", 
endpoint);
+            SchemaMigrationDiagnostics.taskSendAborted(endpoint);
             return;
         }
 
@@ -72,6 +74,7 @@ final class MigrationTask extends WrappedRunnable
         if (!MigrationManager.shouldPullSchemaFrom(endpoint))
         {
             logger.info("Skipped sending a migration request: node {} has a 
higher major version now.", endpoint);
+            SchemaMigrationDiagnostics.taskSendAborted(endpoint);
             return;
         }
 
@@ -109,5 +112,7 @@ final class MigrationTask extends WrappedRunnable
             inflightTasks.offer(completionLatch);
 
         MessagingService.instance().sendRR(message, endpoint, cb);
+
+        SchemaMigrationDiagnostics.taskRequestSend(endpoint);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java 
b/src/java/org/apache/cassandra/schema/Schema.java
index e1353cd..970d9ac 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -98,9 +98,11 @@ public final class Schema
      */
     public void loadFromDisk(boolean updateVersion)
     {
+        SchemaDiagnostics.schemataLoading(this);
         SchemaKeyspace.fetchNonSystemKeyspaces().forEach(this::load);
         if (updateVersion)
             updateVersion();
+        SchemaDiagnostics.schemataLoaded(this);
     }
 
     /**
@@ -128,6 +130,8 @@ public final class Schema
         ksm.tables
            .indexTables()
            .forEach((name, metadata) -> 
indexMetadataRefs.put(Pair.create(ksm.name, name), new 
TableMetadataRef(metadata)));
+
+        SchemaDiagnostics.metadataInitialized(this, ksm);
     }
 
     private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated)
@@ -166,6 +170,8 @@ public final class Schema
                    .stream()
                    .map(MapDifference.ValueDifference::rightValue)
                    .forEach(indexTable -> 
indexMetadataRefs.get(Pair.create(indexTable.keyspace, 
indexTable.indexName().get())).set(indexTable));
+
+        SchemaDiagnostics.metadataReloaded(this, previous, updated, 
tablesDiff, viewsDiff, indexesDiff);
     }
 
     public void registerListener(SchemaChangeListener listener)
@@ -249,6 +255,8 @@ public final class Schema
            .indexTables()
            .keySet()
            .forEach(name -> indexMetadataRefs.remove(Pair.create(ksm.name, 
name)));
+
+        SchemaDiagnostics.metadataRemoved(this, ksm);
     }
 
     public int getNumberOfTables()
@@ -356,6 +364,11 @@ public final class Schema
         return indexMetadataRefs.get(Pair.create(keyspace, index));
     }
 
+    Map<Pair<String, String>, TableMetadataRef> getIndexTableMetadataRefs()
+    {
+        return indexMetadataRefs;
+    }
+
     /**
      * Get Table metadata by its identifier
      *
@@ -373,6 +386,11 @@ public final class Schema
         return getTableMetadataRef(descriptor.ksname, descriptor.cfname);
     }
 
+    Map<TableId, TableMetadataRef> getTableMetadataRefs()
+    {
+        return metadataRefs;
+    }
+
     /**
      * Given a keyspace name and table name, get the table
      * meta data. If the keyspace name or table name is not valid
@@ -511,6 +529,7 @@ public final class Schema
     {
         version = SchemaKeyspace.calculateSchemaDigest();
         SystemKeyspace.updateSchemaVersion(version);
+        SchemaDiagnostics.versionUpdated(this);
     }
 
     /*
@@ -529,6 +548,7 @@ public final class Schema
     private void passiveAnnounceVersion()
     {
         Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.schema(version));
+        SchemaDiagnostics.versionAnnounced(this);
     }
 
     /**
@@ -538,6 +558,7 @@ public final class Schema
     {
         getNonSystemKeyspaces().forEach(k -> unload(getKeyspaceMetadata(k)));
         updateVersionAndAnnounce();
+        SchemaDiagnostics.schemataCleared(this);
     }
 
     /*
@@ -646,6 +667,8 @@ public final class Schema
 
     private void alterKeyspace(KeyspaceDiff delta)
     {
+        SchemaDiagnostics.keyspaceAltering(this, delta);
+
         // drop tables and views
         delta.views.dropped.forEach(this::dropView);
         delta.tables.dropped.forEach(this::dropTable);
@@ -685,10 +708,12 @@ public final class Schema
         delta.views.altered.forEach(diff -> notifyAlterView(diff.before, 
diff.after));
         delta.udfs.altered.forEach(diff -> notifyAlterFunction(diff.before, 
diff.after));
         delta.udas.altered.forEach(diff -> notifyAlterAggregate(diff.before, 
diff.after));
+        SchemaDiagnostics.keyspaceAltered(this, delta);
     }
 
     private void createKeyspace(KeyspaceMetadata keyspace)
     {
+        SchemaDiagnostics.keyspaceCreating(this, keyspace);
         load(keyspace);
         Keyspace.open(keyspace.name);
 
@@ -698,10 +723,12 @@ public final class Schema
         keyspace.views.forEach(this::notifyCreateView);
         keyspace.functions.udfs().forEach(this::notifyCreateFunction);
         keyspace.functions.udas().forEach(this::notifyCreateAggregate);
+        SchemaDiagnostics.keyspaceCreated(this, keyspace);
     }
 
     private void dropKeyspace(KeyspaceMetadata keyspace)
     {
+        SchemaDiagnostics.keyspaceDroping(this, keyspace);
         keyspace.views.forEach(this::dropView);
         keyspace.tables.forEach(this::dropTable);
 
@@ -716,6 +743,7 @@ public final class Schema
         keyspace.tables.forEach(this::notifyDropTable);
         keyspace.types.forEach(this::notifyDropType);
         notifyDropKeyspace(keyspace);
+        SchemaDiagnostics.keyspaceDroped(this, keyspace);
     }
 
     private void dropView(ViewMetadata metadata)
@@ -726,6 +754,7 @@ public final class Schema
 
     private void dropTable(TableMetadata metadata)
     {
+        SchemaDiagnostics.tableDropping(this, metadata);
         ColumnFamilyStore cfs = 
Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name);
         assert cfs != null;
         // make sure all the indexes are dropped, or else.
@@ -735,11 +764,14 @@ public final class Schema
             
cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, 
ColumnFamilyStore.SNAPSHOT_DROP_PREFIX));
         
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
         Keyspace.open(metadata.keyspace).dropCf(metadata.id);
+        SchemaDiagnostics.tableDropped(this, metadata);
     }
 
     private void createTable(TableMetadata table)
     {
+        SchemaDiagnostics.tableCreating(this, table);
         Keyspace.open(table.keyspace).initCf(metadataRefs.get(table.id), true);
+        SchemaDiagnostics.tableCreated(this, table);
     }
 
     private void createView(ViewMetadata view)
@@ -749,7 +781,9 @@ public final class Schema
 
     private void alterTable(TableMetadata updated)
     {
+        SchemaDiagnostics.tableAltering(this, updated);
         
Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload();
+        SchemaDiagnostics.tableAltered(this, updated);
     }
 
     private void alterView(ViewMetadata updated)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaAnnouncementDiagnostics.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/schema/SchemaAnnouncementDiagnostics.java 
b/src/java/org/apache/cassandra/schema/SchemaAnnouncementDiagnostics.java
new file mode 100644
index 0000000..be60b1b
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaAnnouncementDiagnostics.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.util.Set;
+
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import 
org.apache.cassandra.schema.SchemaAnnouncementEvent.SchemaAnnouncementEventType;
+
+final class SchemaAnnouncementDiagnostics
+{
+    private static final DiagnosticEventService service = 
DiagnosticEventService.instance();
+
+    private SchemaAnnouncementDiagnostics()
+    {
+    }
+
+    static void schemaMutationsAnnounced(Set<InetAddressAndPort> 
schemaDestinationEndpoints, Set<InetAddressAndPort> schemaEndpointsIgnored)
+    {
+        if (isEnabled(SchemaAnnouncementEventType.SCHEMA_MUTATIONS_ANNOUNCED))
+            service.publish(new 
SchemaAnnouncementEvent(SchemaAnnouncementEventType.SCHEMA_MUTATIONS_ANNOUNCED,
+                                                        
schemaDestinationEndpoints, schemaEndpointsIgnored, null, null));
+    }
+
+    public static void schemataMutationsReceived(InetAddressAndPort from)
+    {
+        if (isEnabled(SchemaAnnouncementEventType.SCHEMA_MUTATIONS_RECEIVED))
+            service.publish(new 
SchemaAnnouncementEvent(SchemaAnnouncementEventType.SCHEMA_MUTATIONS_RECEIVED,
+                                                        null, null, null, 
from));
+    }
+
+    static void schemaTransformationAnnounced(Set<InetAddressAndPort> 
schemaDestinationEndpoints, Set<InetAddressAndPort> schemaEndpointsIgnored, 
SchemaTransformation transformation)
+    {
+        if 
(isEnabled(SchemaAnnouncementEventType.SCHEMA_TRANSFORMATION_ANNOUNCED))
+            service.publish(new 
SchemaAnnouncementEvent(SchemaAnnouncementEventType.SCHEMA_TRANSFORMATION_ANNOUNCED,
+                                                        
schemaDestinationEndpoints, schemaEndpointsIgnored, transformation, null));
+    }
+
+    private static boolean isEnabled(SchemaAnnouncementEventType type)
+    {
+        return service.isEnabled(SchemaAnnouncementEvent.class, type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaAnnouncementEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaAnnouncementEvent.java 
b/src/java/org/apache/cassandra/schema/SchemaAnnouncementEvent.java
new file mode 100644
index 0000000..4e0bd68
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaAnnouncementEvent.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.diag.DiagnosticEvent;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+/**
+ * Events emitted by {@link MigrationManager} around propagating schema 
changes to remote nodes.
+ */
+final class SchemaAnnouncementEvent extends DiagnosticEvent
+{
+    private final SchemaAnnouncementEventType type;
+    @Nullable
+    private final Set<InetAddressAndPort> schemaDestinationEndpoints;
+    @Nullable
+    private final Set<InetAddressAndPort> schemaEndpointsIgnored;
+    @Nullable
+    private final CQLStatement statement;
+    @Nullable
+    private final InetAddressAndPort sender;
+
+    enum SchemaAnnouncementEventType
+    {
+        SCHEMA_MUTATIONS_ANNOUNCED,
+        SCHEMA_TRANSFORMATION_ANNOUNCED,
+        SCHEMA_MUTATIONS_RECEIVED
+    }
+
+    SchemaAnnouncementEvent(SchemaAnnouncementEventType type,
+                            @Nullable Set<InetAddressAndPort> 
schemaDestinationEndpoints,
+                            @Nullable Set<InetAddressAndPort> 
schemaEndpointsIgnored,
+                            @Nullable SchemaTransformation transformation,
+                            @Nullable InetAddressAndPort sender)
+    {
+        this.type = type;
+        this.schemaDestinationEndpoints = schemaDestinationEndpoints;
+        this.schemaEndpointsIgnored = schemaEndpointsIgnored;
+        if (transformation instanceof CQLStatement) this.statement = 
(CQLStatement) transformation;
+        else this.statement = null;
+        this.sender = sender;
+    }
+
+    public Enum<?> getType()
+    {
+        return type;
+    }
+
+    public Map<String, Serializable> toMap()
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (schemaDestinationEndpoints != null)
+        {
+            Set<String> eps = 
schemaDestinationEndpoints.stream().map(InetAddressAndPort::toString).collect(Collectors.toSet());
+            ret.put("endpointDestinations", new HashSet<>(eps));
+        }
+        if (schemaEndpointsIgnored != null)
+        {
+            Set<String> eps = 
schemaEndpointsIgnored.stream().map(InetAddressAndPort::toString).collect(Collectors.toSet());
+            ret.put("endpointIgnored", new HashSet<>(eps));
+        }
+        if (statement != null)
+        {
+            AuditLogContext logContext = statement.getAuditLogContext();
+            if (logContext != null)
+            {
+                HashMap<String, String> log = new HashMap<>();
+                if (logContext.auditLogEntryType != null) log.put("type", 
logContext.auditLogEntryType.name());
+                if (logContext.keyspace != null) log.put("keyspace", 
logContext.keyspace);
+                if (logContext.scope != null) log.put("table", 
logContext.scope);
+                ret.put("statement", log);
+            }
+        }
+        if (sender != null) ret.put("sender", sender.toString());
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaDiagnostics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaDiagnostics.java 
b/src/java/org/apache/cassandra/schema/SchemaDiagnostics.java
new file mode 100644
index 0000000..12b8409
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaDiagnostics.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import com.google.common.collect.MapDifference;
+
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.schema.SchemaEvent.SchemaEventType;
+
+final class SchemaDiagnostics
+{
+    private static final DiagnosticEventService service = 
DiagnosticEventService.instance();
+
+    private SchemaDiagnostics()
+    {
+    }
+
+    static void metadataInitialized(Schema schema, KeyspaceMetadata ksmUpdate)
+    {
+        if (isEnabled(SchemaEventType.KS_METADATA_LOADED))
+            service.publish(new 
SchemaEvent(SchemaEventType.KS_METADATA_LOADED, schema, ksmUpdate, null, null, 
null, null, null, null));
+    }
+
+    static void metadataReloaded(Schema schema, KeyspaceMetadata previous, 
KeyspaceMetadata ksmUpdate, Tables.TablesDiff tablesDiff, Views.ViewsDiff 
viewsDiff, MapDifference<String,TableMetadata> indexesDiff)
+    {
+        if (isEnabled(SchemaEventType.KS_METADATA_RELOADED))
+            service.publish(new 
SchemaEvent(SchemaEventType.KS_METADATA_RELOADED, schema, ksmUpdate, previous,
+                                            null, null, tablesDiff, viewsDiff, 
indexesDiff));
+    }
+
+    static void metadataRemoved(Schema schema, KeyspaceMetadata ksmUpdate)
+    {
+        if (isEnabled(SchemaEventType.KS_METADATA_REMOVED))
+            service.publish(new 
SchemaEvent(SchemaEventType.KS_METADATA_REMOVED, schema, ksmUpdate,
+                                            null, null, null, null, null, 
null));
+    }
+
+    static void versionUpdated(Schema schema)
+    {
+        if (isEnabled(SchemaEventType.VERSION_UPDATED))
+            service.publish(new SchemaEvent(SchemaEventType.VERSION_UPDATED, 
schema,
+                                            null, null, null, null, null, 
null, null));
+    }
+
+    static void keyspaceCreating(Schema schema, KeyspaceMetadata keyspace)
+    {
+        if (isEnabled(SchemaEventType.KS_CREATING))
+            service.publish(new SchemaEvent(SchemaEventType.KS_CREATING, 
schema, keyspace,
+                                            null, null, null, null, null, 
null));
+    }
+
+    static void keyspaceCreated(Schema schema, KeyspaceMetadata keyspace)
+    {
+        if (isEnabled(SchemaEventType.KS_CREATED))
+            service.publish(new SchemaEvent(SchemaEventType.KS_CREATED, 
schema, keyspace,
+                                            null, null, null, null, null, 
null));
+    }
+
+    static void keyspaceAltering(Schema schema, KeyspaceMetadata.KeyspaceDiff 
delta)
+    {
+        if (isEnabled(SchemaEventType.KS_ALTERING))
+            service.publish(new SchemaEvent(SchemaEventType.KS_ALTERING, 
schema, delta.after,
+                                            delta.before, delta, null, null, 
null, null));
+    }
+
+    static void keyspaceAltered(Schema schema, KeyspaceMetadata.KeyspaceDiff 
delta)
+    {
+        if (isEnabled(SchemaEventType.KS_ALTERED))
+            service.publish(new SchemaEvent(SchemaEventType.KS_ALTERED, 
schema, delta.after,
+                                            delta.before, delta, null, null, 
null, null));
+    }
+
+    static void keyspaceDroping(Schema schema, KeyspaceMetadata keyspace)
+    {
+        if (isEnabled(SchemaEventType.KS_DROPPING))
+            service.publish(new SchemaEvent(SchemaEventType.KS_DROPPING, 
schema, keyspace,
+                                            null, null, null, null, null, 
null));
+    }
+
+    static void keyspaceDroped(Schema schema, KeyspaceMetadata keyspace)
+    {
+        if (isEnabled(SchemaEventType.KS_DROPPED))
+            service.publish(new SchemaEvent(SchemaEventType.KS_DROPPED, 
schema, keyspace,
+                                            null, null, null, null, null, 
null));
+    }
+
+    static void schemataLoading(Schema schema)
+    {
+        if (isEnabled(SchemaEventType.SCHEMATA_LOADING))
+            service.publish(new SchemaEvent(SchemaEventType.SCHEMATA_LOADING, 
schema, null,
+                                            null, null, null, null, null, 
null));
+    }
+
+    static void schemataLoaded(Schema schema)
+    {
+        if (isEnabled(SchemaEventType.SCHEMATA_LOADED))
+            service.publish(new SchemaEvent(SchemaEventType.SCHEMATA_LOADED, 
schema, null,
+                                            null, null, null, null, null, 
null));
+    }
+
+    static void versionAnnounced(Schema schema)
+    {
+        if (isEnabled(SchemaEventType.VERSION_ANOUNCED))
+            service.publish(new SchemaEvent(SchemaEventType.VERSION_ANOUNCED, 
schema, null,
+                                            null, null, null, null, null, 
null));
+    }
+
+    static void schemataCleared(Schema schema)
+    {
+        if (isEnabled(SchemaEventType.SCHEMATA_CLEARED))
+            service.publish(new SchemaEvent(SchemaEventType.SCHEMATA_CLEARED, 
schema, null,
+                                            null, null, null, null, null, 
null));
+    }
+
+    static void tableCreating(Schema schema, TableMetadata table)
+    {
+        if (isEnabled(SchemaEventType.TABLE_CREATING))
+            service.publish(new SchemaEvent(SchemaEventType.TABLE_CREATING, 
schema, null,
+                                            null, null, table, null, null, 
null));
+    }
+
+    static void tableCreated(Schema schema, TableMetadata table)
+    {
+        if (isEnabled(SchemaEventType.TABLE_CREATED))
+            service.publish(new SchemaEvent(SchemaEventType.TABLE_CREATED, 
schema, null,
+                                            null, null, table, null, null, 
null));
+    }
+
+    static void tableAltering(Schema schema, TableMetadata table)
+    {
+        if (isEnabled(SchemaEventType.TABLE_ALTERING))
+            service.publish(new SchemaEvent(SchemaEventType.TABLE_ALTERING, 
schema, null,
+                                            null, null, table, null, null, 
null));
+    }
+
+    static void tableAltered(Schema schema, TableMetadata table)
+    {
+        if (isEnabled(SchemaEventType.TABLE_ALTERED))
+            service.publish(new SchemaEvent(SchemaEventType.TABLE_ALTERED, 
schema, null,
+                                            null, null, table, null, null, 
null));
+    }
+
+    static void tableDropping(Schema schema, TableMetadata table)
+    {
+        if (isEnabled(SchemaEventType.TABLE_DROPPING))
+            service.publish(new SchemaEvent(SchemaEventType.TABLE_DROPPING, 
schema, null,
+                                            null, null, table, null, null, 
null));
+    }
+
+    static void tableDropped(Schema schema, TableMetadata table)
+    {
+        if (isEnabled(SchemaEventType.TABLE_DROPPED))
+            service.publish(new SchemaEvent(SchemaEventType.TABLE_DROPPED, 
schema, null,
+                                            null, null, table, null, null, 
null));
+    }
+
+    private static boolean isEnabled(SchemaEventType type)
+    {
+        return service.isEnabled(SchemaEvent.class, type);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaEvent.java 
b/src/java/org/apache/cassandra/schema/SchemaEvent.java
new file mode 100644
index 0000000..e26cee5
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaEvent.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapDifference;
+
+import org.apache.cassandra.diag.DiagnosticEvent;
+import org.apache.cassandra.utils.Pair;
+
+final class SchemaEvent extends DiagnosticEvent
+{
+    private final SchemaEventType type;
+
+    private final HashSet<String> keyspaces;
+    private final HashMap<String, String> indexTables;
+    private final HashMap<String, String> tables;
+    private final ArrayList<String> nonSystemKeyspaces;
+    private final ArrayList<String> userKeyspaces;
+    private final int numberOfTables;
+    private final UUID version;
+
+    @Nullable
+    private final KeyspaceMetadata ksUpdate;
+    @Nullable
+    private final KeyspaceMetadata previous;
+    @Nullable
+    private final KeyspaceMetadata.KeyspaceDiff ksDiff;
+    @Nullable
+    private final TableMetadata tableUpdate;
+    @Nullable
+    private final Tables.TablesDiff tablesDiff;
+    @Nullable
+    private final Views.ViewsDiff viewsDiff;
+    @Nullable
+    private final MapDifference<String,TableMetadata> indexesDiff;
+
+    enum SchemaEventType
+    {
+        KS_METADATA_LOADED,
+        KS_METADATA_RELOADED,
+        KS_METADATA_REMOVED,
+        VERSION_UPDATED,
+        VERSION_ANOUNCED,
+        KS_CREATING,
+        KS_CREATED,
+        KS_ALTERING,
+        KS_ALTERED,
+        KS_DROPPING,
+        KS_DROPPED,
+        TABLE_CREATING,
+        TABLE_CREATED,
+        TABLE_ALTERING,
+        TABLE_ALTERED,
+        TABLE_DROPPING,
+        TABLE_DROPPED,
+        SCHEMATA_LOADING,
+        SCHEMATA_LOADED,
+        SCHEMATA_CLEARED
+    }
+
+    SchemaEvent(SchemaEventType type, Schema schema, @Nullable 
KeyspaceMetadata ksUpdate,
+                @Nullable KeyspaceMetadata previous, @Nullable 
KeyspaceMetadata.KeyspaceDiff ksDiff,
+                @Nullable TableMetadata tableUpdate, @Nullable 
Tables.TablesDiff tablesDiff,
+                @Nullable Views.ViewsDiff viewsDiff, @Nullable 
MapDifference<String,TableMetadata> indexesDiff)
+    {
+        this.type = type;
+        this.ksUpdate = ksUpdate;
+        this.previous = previous;
+        this.ksDiff = ksDiff;
+        this.tableUpdate = tableUpdate;
+        this.tablesDiff = tablesDiff;
+        this.viewsDiff = viewsDiff;
+        this.indexesDiff = indexesDiff;
+
+        this.keyspaces = new HashSet<>(schema.getKeyspaces());
+        this.nonSystemKeyspaces = new 
ArrayList<>(schema.getNonSystemKeyspaces());
+        this.userKeyspaces = new ArrayList<>(schema.getUserKeyspaces());
+        this.numberOfTables = schema.getNumberOfTables();
+        this.version = schema.getVersion();
+
+        Map<Pair<String, String>, TableMetadataRef> indexTableMetadataRefs = 
schema.getIndexTableMetadataRefs();
+        Map<String, String> indexTables = 
indexTableMetadataRefs.entrySet().stream()
+                                                                
.collect(Collectors.toMap(e -> e.getKey().left + ',' +
+                                                                               
                e.getKey().right,
+                                                                               
           e -> e.getValue().id.toHexString() + ',' +
+                                                                               
                e.getValue().keyspace + ',' +
+                                                                               
                e.getValue().name));
+        this.indexTables = new HashMap<>(indexTables);
+        Map<TableId, TableMetadataRef> tableMetadataRefs = 
schema.getTableMetadataRefs();
+        Map<String, String> tables = tableMetadataRefs.entrySet().stream()
+                                                      
.collect(Collectors.toMap(e -> e.getKey().toHexString(),
+                                                                               
 e -> e.getValue().id.toHexString() + ',' +
+                                                                               
      e.getValue().keyspace + ',' +
+                                                                               
      e.getValue().name));
+        this.tables = new HashMap<>(tables);
+    }
+
+    public SchemaEventType getType()
+    {
+        return type;
+    }
+
+    public Map<String, Serializable> toMap()
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        ret.put("keyspaces", this.keyspaces);
+        ret.put("nonSystemKeyspaces", this.nonSystemKeyspaces);
+        ret.put("userKeyspaces", this.userKeyspaces);
+        ret.put("numberOfTables", this.numberOfTables);
+        ret.put("version", this.version);
+        ret.put("tables", this.tables);
+        ret.put("indexTables", this.indexTables);
+        if (ksUpdate != null) ret.put("ksMetadataUpdate", repr(ksUpdate));
+        if (previous != null) ret.put("ksMetadataPrevious", repr(previous));
+        if (ksDiff != null)
+        {
+            HashMap<String, Serializable> ks = new HashMap<>();
+            ks.put("before", repr(ksDiff.before));
+            ks.put("after", repr(ksDiff.after));
+            ks.put("tables", repr(ksDiff.tables));
+            ks.put("views", repr(ksDiff.views));
+            ks.put("types", repr(ksDiff.types));
+            ks.put("udas", repr(ksDiff.udas));
+            ks.put("udfs", repr(ksDiff.udfs));
+            ret.put("ksDiff", ks);
+        }
+        if (tableUpdate != null) ret.put("tableMetadataUpdate", 
repr(tableUpdate));
+        if (tablesDiff != null) ret.put("tablesDiff", repr(tablesDiff));
+        if (viewsDiff != null) ret.put("viewsDiff", repr(viewsDiff));
+        if (indexesDiff != null) ret.put("indexesDiff", 
Lists.newArrayList(indexesDiff.entriesDiffering().keySet()));
+        return ret;
+    }
+
+    private HashMap<String, Serializable> repr(Diff<?, ?> diff)
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (diff.created != null) ret.put("created", diff.created.toString());
+        if (diff.dropped != null) ret.put("dropped", diff.dropped.toString());
+        if (diff.altered != null)
+            ret.put("created", 
Lists.newArrayList(diff.altered.stream().map(Diff.Altered::toString).iterator()));
+        return ret;
+    }
+
+    private HashMap<String, Serializable> repr(KeyspaceMetadata ksm)
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        ret.put("name", ksm.name);
+        if (ksm.kind != null) ret.put("kind", ksm.kind.name());
+        if (ksm.params != null) ret.put("params", ksm.params.toString());
+        if (ksm.tables != null) ret.put("tables", ksm.tables.toString());
+        if (ksm.views != null) ret.put("views", ksm.views.toString());
+        if (ksm.functions != null) ret.put("functions", 
ksm.functions.toString());
+        if (ksm.types != null) ret.put("types", ksm.types.toString());
+        return ret;
+    }
+
+    private HashMap<String, Serializable> repr(TableMetadata table)
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        ret.put("id", table.id.toHexString());
+        ret.put("name", table.name);
+        ret.put("keyspace", table.keyspace);
+        ret.put("partitioner", table.partitioner.toString());
+        ret.put("kind", table.kind.name());
+        ret.put("flags", 
Lists.newArrayList(table.flags.stream().map(Enum::name).iterator()));
+        ret.put("params", repr(table.params));
+        ret.put("indexes", 
Lists.newArrayList(table.indexes.stream().map(this::repr).iterator()));
+        ret.put("triggers", Lists.newArrayList(repr(table.triggers)));
+        ret.put("columns", 
Lists.newArrayList(table.columns.values().stream().map(this::repr).iterator()));
+        ret.put("droppedColumns", 
Lists.newArrayList(table.droppedColumns.values().stream().map(this::repr).iterator()));
+        ret.put("isCompactTable", table.isCompactTable());
+        ret.put("isCompound", table.isCompound());
+        ret.put("isCounter", table.isCounter());
+        ret.put("isCQLTable", table.isCQLTable());
+        ret.put("isDense", table.isDense());
+        ret.put("isIndex", table.isIndex());
+        ret.put("isStaticCompactTable", table.isStaticCompactTable());
+        ret.put("isSuper", table.isSuper());
+        ret.put("isView", table.isView());
+        ret.put("isVirtual", table.isVirtual());
+        return ret;
+    }
+
+    private HashMap<String, Serializable> repr(TableParams params)
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (params == null) return ret;
+        ret.put("minIndexInterval", params.minIndexInterval);
+        ret.put("maxIndexInterval", params.maxIndexInterval);
+        ret.put("defaultTimeToLive", params.defaultTimeToLive);
+        ret.put("gcGraceSeconds", params.gcGraceSeconds);
+        ret.put("bloomFilterFpChance", params.bloomFilterFpChance);
+        ret.put("cdc", params.cdc);
+        ret.put("crcCheckChance", params.crcCheckChance);
+        ret.put("memtableFlushPeriodInMs", params.memtableFlushPeriodInMs);
+        ret.put("comment", params.comment);
+        ret.put("caching", repr(params.caching));
+        ret.put("compaction", repr(params.compaction));
+        ret.put("compression", repr(params.compression));
+        if (params.speculativeRetry != null) ret.put("speculativeRetry", 
params.speculativeRetry.kind().name());
+        return ret;
+    }
+
+    private HashMap<String, Serializable> repr(CachingParams caching)
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (caching == null) return ret;
+        ret.putAll(caching.asMap());
+        return ret;
+    }
+
+    private HashMap<String, Serializable> repr(CompactionParams comp)
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (comp == null) return ret;
+        ret.putAll(comp.asMap());
+        return ret;
+    }
+
+    private HashMap<String, Serializable> repr(CompressionParams compr)
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (compr == null) return ret;
+        ret.putAll(compr.asMap());
+        return ret;
+    }
+
+    private HashMap<String, Serializable> repr(IndexMetadata index)
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (index == null) return ret;
+        ret.put("name", index.name);
+        ret.put("kind", index.kind.name());
+        ret.put("id", index.id);
+        ret.put("options", new HashMap<>(index.options));
+        ret.put("isCustom", index.isCustom());
+        ret.put("isKeys", index.isKeys());
+        ret.put("isComposites", index.isComposites());
+        return ret;
+    }
+
+    private List<Map<String, Serializable>> repr(Triggers triggers)
+    {
+        List<Map<String, Serializable>> ret = new ArrayList<>();
+        if (triggers == null) return ret;
+        Iterator<TriggerMetadata> iter = triggers.iterator();
+        while (iter.hasNext()) ret.add(repr(iter.next()));
+        return ret;
+    }
+
+    private HashMap<String, Serializable> repr(TriggerMetadata trigger)
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (trigger == null) return ret;
+        ret.put("name", trigger.name);
+        ret.put("classOption", trigger.classOption);
+        return ret;
+    }
+
+    private HashMap<String, Serializable> repr(ColumnMetadata col)
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (col == null) return ret;
+        ret.put("name", col.name.toString());
+        ret.put("kind", col.kind.name());
+        ret.put("type", col.type.toString());
+        ret.put("ksName", col.ksName);
+        ret.put("cfName", col.cfName);
+        ret.put("position", col.position());
+        ret.put("clusteringOrder", col.clusteringOrder().name());
+        ret.put("isComplex", col.isComplex());
+        ret.put("isStatic", col.isStatic());
+        ret.put("isPrimaryKeyColumn", col.isPrimaryKeyColumn());
+        ret.put("isSimple", col.isSimple());
+        ret.put("isPartitionKey", col.isPartitionKey());
+        ret.put("isClusteringColumn", col.isClusteringColumn());
+        ret.put("isCounterColumn", col.isCounterColumn());
+        ret.put("isRegular", col.isRegular());
+        return ret;
+    }
+
+    private HashMap<String, Serializable> repr(DroppedColumn column)
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (column == null) return ret;
+        ret.put("droppedTime", column.droppedTime);
+        ret.put("column", repr(column.column));
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaMigrationDiagnostics.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/schema/SchemaMigrationDiagnostics.java 
b/src/java/org/apache/cassandra/schema/SchemaMigrationDiagnostics.java
new file mode 100644
index 0000000..62f1768
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaMigrationDiagnostics.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.util.UUID;
+
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import 
org.apache.cassandra.schema.SchemaMigrationEvent.MigrationManagerEventType;
+
+final class SchemaMigrationDiagnostics
+{
+    private static final DiagnosticEventService service = 
DiagnosticEventService.instance();
+
+    private SchemaMigrationDiagnostics()
+    {
+    }
+
+    static void unknownLocalSchemaVersion(InetAddressAndPort endpoint, UUID 
theirVersion)
+    {
+        if (isEnabled(MigrationManagerEventType.UNKNOWN_LOCAL_SCHEMA_VERSION))
+            service.publish(new 
SchemaMigrationEvent(MigrationManagerEventType.UNKNOWN_LOCAL_SCHEMA_VERSION, 
endpoint,
+                                                     theirVersion));
+    }
+
+    static void versionMatch(InetAddressAndPort endpoint, UUID theirVersion)
+    {
+        if (isEnabled(MigrationManagerEventType.VERSION_MATCH))
+            service.publish(new 
SchemaMigrationEvent(MigrationManagerEventType.VERSION_MATCH, endpoint, 
theirVersion));
+    }
+
+    static void skipPull(InetAddressAndPort endpoint, UUID theirVersion)
+    {
+        if (isEnabled(MigrationManagerEventType.SKIP_PULL))
+            service.publish(new 
SchemaMigrationEvent(MigrationManagerEventType.SKIP_PULL, endpoint, 
theirVersion));
+    }
+
+    static void resetLocalSchema()
+    {
+        if (isEnabled(MigrationManagerEventType.RESET_LOCAL_SCHEMA))
+            service.publish(new 
SchemaMigrationEvent(MigrationManagerEventType.RESET_LOCAL_SCHEMA, null, null));
+    }
+
+    static void taskCreated(InetAddressAndPort endpoint)
+    {
+        if (isEnabled(MigrationManagerEventType.TASK_CREATED))
+            service.publish(new 
SchemaMigrationEvent(MigrationManagerEventType.TASK_CREATED, endpoint, null));
+    }
+
+    static void taskSendAborted(InetAddressAndPort endpoint)
+    {
+        if (isEnabled(MigrationManagerEventType.TASK_SEND_ABORTED))
+            service.publish(new 
SchemaMigrationEvent(MigrationManagerEventType.TASK_SEND_ABORTED, endpoint, 
null));
+    }
+
+    static void taskRequestSend(InetAddressAndPort endpoint)
+    {
+        if (isEnabled(MigrationManagerEventType.TASK_REQUEST_SEND))
+            service.publish(new 
SchemaMigrationEvent(MigrationManagerEventType.TASK_REQUEST_SEND,
+                                                     endpoint, null));
+    }
+
+    private static boolean isEnabled(MigrationManagerEventType type)
+    {
+        return service.isEnabled(SchemaMigrationEvent.class, type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java 
b/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java
new file mode 100644
index 0000000..2c17235
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.diag.DiagnosticEvent;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Internal events emitted by {@link MigrationManager}.
+ */
+final class SchemaMigrationEvent extends DiagnosticEvent
+{
+    private final MigrationManagerEventType type;
+    @Nullable
+    private final InetAddressAndPort endpoint;
+    @Nullable
+    private final UUID endpointSchemaVersion;
+    private final UUID localSchemaVersion;
+    private final Integer localMessagingVersion;
+    private final SystemKeyspace.BootstrapState bootstrapState;
+    @Nullable
+    private Integer inflightTaskCount;
+    @Nullable
+    private Integer endpointMessagingVersion;
+    @Nullable
+    private Boolean endpointGossipOnlyMember;
+    @Nullable
+    private Boolean isAlive;
+
+    enum MigrationManagerEventType
+    {
+        UNKNOWN_LOCAL_SCHEMA_VERSION,
+        VERSION_MATCH,
+        SKIP_PULL,
+        RESET_LOCAL_SCHEMA,
+        TASK_CREATED,
+        TASK_SEND_ABORTED,
+        TASK_REQUEST_SEND
+    }
+
+    SchemaMigrationEvent(MigrationManagerEventType type,
+                         @Nullable InetAddressAndPort endpoint, @Nullable UUID 
endpointSchemaVersion)
+    {
+        this.type = type;
+        this.endpoint = endpoint;
+        this.endpointSchemaVersion = endpointSchemaVersion;
+
+        localSchemaVersion = Schema.instance.getVersion();
+        localMessagingVersion = MessagingService.current_version;
+
+        Queue<CountDownLatch> inflightTasks = MigrationTask.getInflightTasks();
+        if (inflightTasks != null)
+            inflightTaskCount = inflightTasks.size();
+
+        this.bootstrapState = SystemKeyspace.getBootstrapState();
+
+        if (endpoint == null) return;
+
+        if (MessagingService.instance().knowsVersion(endpoint))
+            endpointMessagingVersion = 
MessagingService.instance().getRawVersion(endpoint);
+
+        endpointGossipOnlyMember = 
Gossiper.instance.isGossipOnlyMember(endpoint);
+        this.isAlive = FailureDetector.instance.isAlive(endpoint);
+    }
+
+    public Enum<?> getType()
+    {
+        return type;
+    }
+
+    public Map<String, Serializable> toMap()
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (endpoint != null) ret.put("endpoint", 
endpoint.getHostAddress(true));
+        ret.put("endpointSchemaVersion", 
Schema.schemaVersionToString(endpointSchemaVersion));
+        ret.put("localSchemaVersion", 
Schema.schemaVersionToString(localSchemaVersion));
+        if (endpointMessagingVersion != null) 
ret.put("endpointMessagingVersion", endpointMessagingVersion);
+        if (localMessagingVersion != null) ret.put("localMessagingVersion", 
localMessagingVersion);
+        if (endpointGossipOnlyMember != null) 
ret.put("endpointGossipOnlyMember", endpointGossipOnlyMember);
+        if (isAlive != null) ret.put("endpointIsAlive", isAlive);
+        if (bootstrapState != null) ret.put("bootstrapState", 
bootstrapState.name());
+        if (inflightTaskCount != null) ret.put("inflightTaskCount", 
inflightTaskCount);
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java 
b/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java
index f939cda..358739a 100644
--- a/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java
+++ b/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java
@@ -42,6 +42,7 @@ public final class SchemaPushVerbHandler implements 
IVerbHandler<Collection<Muta
     {
         logger.trace("Received schema push request from {}", message.from);
 
+        SchemaAnnouncementDiagnostics.schemataMutationsReceived(message.from);
         StageManager.getStage(Stage.MIGRATION).submit(() -> 
Schema.instance.mergeAndAnnounceVersion(message.payload));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java 
b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 297774a..e1c0f55 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -45,28 +45,35 @@ public class PendingRangeCalculatorService
 
     public PendingRangeCalculatorService()
     {
-        executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
-        {
-            public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
+        executor.setRejectedExecutionHandler((r, e) ->
             {
+                
PendingRangeCalculatorServiceDiagnostics.taskRejected(instance, updateJobs);
                 PendingRangeCalculatorService.instance.finishUpdate();
             }
-        }
         );
     }
 
     private static class PendingRangeTask implements Runnable
     {
+        private final AtomicInteger updateJobs;
+
+        PendingRangeTask(AtomicInteger updateJobs)
+        {
+            this.updateJobs = updateJobs;
+        }
+
         public void run()
         {
             try
             {
+                PendingRangeCalculatorServiceDiagnostics.taskStarted(instance, 
updateJobs);
                 long start = System.currentTimeMillis();
                 List<String> keyspaces = 
Schema.instance.getNonLocalStrategyKeyspaces();
                 for (String keyspaceName : keyspaces)
                     
calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), 
keyspaceName);
                 if (logger.isTraceEnabled())
                     logger.trace("Finished PendingRangeTask for {} keyspaces 
in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
+                
PendingRangeCalculatorServiceDiagnostics.taskFinished(instance, updateJobs);
             }
             finally
             {
@@ -77,13 +84,15 @@ public class PendingRangeCalculatorService
 
     private void finishUpdate()
     {
-        updateJobs.decrementAndGet();
+        int jobs = updateJobs.decrementAndGet();
+        PendingRangeCalculatorServiceDiagnostics.taskCountChanged(instance, 
jobs);
     }
 
     public void update()
     {
-        updateJobs.incrementAndGet();
-        executor.submit(new PendingRangeTask());
+        int jobs = updateJobs.incrementAndGet();
+        PendingRangeCalculatorServiceDiagnostics.taskCountChanged(instance, 
jobs);
+        executor.submit(new PendingRangeTask(updateJobs));
     }
 
     public void blockUntilFinished()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to