http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/gms/GossiperEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossiperEvent.java b/src/java/org/apache/cassandra/gms/GossiperEvent.java new file mode 100644 index 0000000..2de88bc --- /dev/null +++ b/src/java/org/apache/cassandra/gms/GossiperEvent.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.locator.InetAddressAndPort; + +/** + * DiagnosticEvent implementation for {@link Gossiper} activities. + */ +final class GossiperEvent extends DiagnosticEvent +{ + private final InetAddressAndPort endpoint; + @Nullable + private final Long quarantineExpiration; + @Nullable + private final EndpointState localState; + + private final Map<InetAddressAndPort, EndpointState> endpointStateMap; + private final boolean inShadowRound; + private final Map<InetAddressAndPort, Long> justRemovedEndpoints; + private final long lastProcessedMessageAt; + private final Set<InetAddressAndPort> liveEndpoints; + private final List<String> seeds; + private final Set<InetAddressAndPort> seedsInShadowRound; + private final Map<InetAddressAndPort, Long> unreachableEndpoints; + + + enum GossiperEventType + { + MARKED_AS_SHUTDOWN, + CONVICTED, + REPLACEMENT_QUARANTINE, + REPLACED_ENDPOINT, + EVICTED_FROM_MEMBERSHIP, + REMOVED_ENDPOINT, + QUARANTINED_ENDPOINT, + MARKED_ALIVE, + REAL_MARKED_ALIVE, + MARKED_DEAD, + MAJOR_STATE_CHANGE_HANDLED, + SEND_GOSSIP_DIGEST_SYN + } + + public GossiperEventType type; + + + GossiperEvent(GossiperEventType type, Gossiper gossiper, InetAddressAndPort endpoint, + @Nullable Long quarantineExpiration, @Nullable EndpointState localState) + { + this.type = type; + this.endpoint = endpoint; + this.quarantineExpiration = quarantineExpiration; + this.localState = localState; + + this.endpointStateMap = gossiper.getEndpointStateMap(); + this.inShadowRound = gossiper.isInShadowRound(); + this.justRemovedEndpoints = gossiper.getJustRemovedEndpoints(); + this.lastProcessedMessageAt = gossiper.getLastProcessedMessageAt(); + this.liveEndpoints = gossiper.getLiveMembers(); + this.seeds = gossiper.getSeeds(); + this.seedsInShadowRound = gossiper.getSeedsInShadowRound(); + this.unreachableEndpoints = gossiper.getUnreachableEndpoints(); + } + + public Enum<GossiperEventType> getType() + { + return type; + } + + public HashMap<String, Serializable> toMap() + { + // be extra defensive against nulls and bugs + HashMap<String, Serializable> ret = new HashMap<>(); + if (endpoint != null) ret.put("endpoint", endpoint.getHostAddress(true)); + ret.put("quarantineExpiration", quarantineExpiration); + ret.put("localState", String.valueOf(localState)); + ret.put("endpointStateMap", String.valueOf(endpointStateMap)); + ret.put("inShadowRound", inShadowRound); + ret.put("justRemovedEndpoints", String.valueOf(justRemovedEndpoints)); + ret.put("lastProcessedMessageAt", lastProcessedMessageAt); + ret.put("liveEndpoints", String.valueOf(liveEndpoints)); + ret.put("seeds", String.valueOf(seeds)); + ret.put("seedsInShadowRound", String.valueOf(seedsInShadowRound)); + ret.put("unreachableEndpoints", String.valueOf(unreachableEndpoints)); + return ret; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/Hint.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java index b0abd50..7e4618c 100644 --- a/src/java/org/apache/cassandra/hints/Hint.java +++ b/src/java/org/apache/cassandra/hints/Hint.java @@ -132,7 +132,7 @@ public final class Hint /** * @return calculates whether or not it is safe to apply the hint without risking to resurrect any deleted data */ - boolean isLive() + public boolean isLive() { return isLive(creationTime, System.currentTimeMillis(), ttl()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintDiagnostics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintDiagnostics.java b/src/java/org/apache/cassandra/hints/HintDiagnostics.java new file mode 100644 index 0000000..3ff0834 --- /dev/null +++ b/src/java/org/apache/cassandra/hints/HintDiagnostics.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.hints; + +import org.apache.cassandra.diag.DiagnosticEventService; +import org.apache.cassandra.hints.HintEvent.HintEventType; +import org.apache.cassandra.hints.HintEvent.HintResult; + +/** + * Utility methods for DiagnosticEvents around hinted handoff. + */ +final class HintDiagnostics +{ + private static final DiagnosticEventService service = DiagnosticEventService.instance(); + + private HintDiagnostics() + { + } + + static void dispatcherCreated(HintsDispatcher dispatcher) + { + if (isEnabled(HintEventType.DISPATCHER_CREATED)) + service.publish(new HintEvent(HintEventType.DISPATCHER_CREATED, dispatcher, + dispatcher.hostId, dispatcher.address, null, null, null, null)); + } + + static void dispatcherClosed(HintsDispatcher dispatcher) + { + if (isEnabled(HintEventType.DISPATCHER_CLOSED)) + service.publish(new HintEvent(HintEventType.DISPATCHER_CLOSED, dispatcher, + dispatcher.hostId, dispatcher.address, null, null, null, null)); + } + + static void dispatchPage(HintsDispatcher dispatcher) + { + if (isEnabled(HintEventType.DISPATCHER_PAGE)) + service.publish(new HintEvent(HintEventType.DISPATCHER_PAGE, dispatcher, + dispatcher.hostId, dispatcher.address, null, null, null, null)); + } + + static void abortRequested(HintsDispatcher dispatcher) + { + if (isEnabled(HintEventType.ABORT_REQUESTED)) + service.publish(new HintEvent(HintEventType.ABORT_REQUESTED, dispatcher, + dispatcher.hostId, dispatcher.address, null, null, null, null)); + } + + static void pageSuccessResult(HintsDispatcher dispatcher, long success, long failures, long timeouts) + { + if (isEnabled(HintEventType.DISPATCHER_HINT_RESULT)) + service.publish(new HintEvent(HintEventType.DISPATCHER_HINT_RESULT, dispatcher, + dispatcher.hostId, dispatcher.address, HintResult.PAGE_SUCCESS, + success, failures, timeouts)); + } + + static void pageFailureResult(HintsDispatcher dispatcher, long success, long failures, long timeouts) + { + if (isEnabled(HintEventType.DISPATCHER_HINT_RESULT)) + service.publish(new HintEvent(HintEventType.DISPATCHER_HINT_RESULT, dispatcher, + dispatcher.hostId, dispatcher.address, HintResult.PAGE_FAILURE, + success, failures, timeouts)); + } + + private static boolean isEnabled(HintEventType type) + { + return service.isEnabled(HintEvent.class, type); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintEvent.java b/src/java/org/apache/cassandra/hints/HintEvent.java new file mode 100644 index 0000000..011f248 --- /dev/null +++ b/src/java/org/apache/cassandra/hints/HintEvent.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.hints; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.UUID; +import javax.annotation.Nullable; + +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.locator.InetAddressAndPort; + +/** + * DiagnosticEvent implementation for hinted handoff. + */ +final class HintEvent extends DiagnosticEvent +{ + enum HintEventType + { + DISPATCHING_STARTED, + DISPATCHING_PAUSED, + DISPATCHING_RESUMED, + DISPATCHING_SHUTDOWN, + + DISPATCHER_CREATED, + DISPATCHER_CLOSED, + + DISPATCHER_PAGE, + DISPATCHER_HINT_RESULT, + + ABORT_REQUESTED + } + + enum HintResult + { + PAGE_SUCCESS, PAGE_FAILURE + } + + private final HintEventType type; + private final HintsDispatcher dispatcher; + private final UUID targetHostId; + private final InetAddressAndPort targetAddress; + @Nullable + private final HintResult dispatchResult; + @Nullable + private final Long pageHintsSuccessful; + @Nullable + private final Long pageHintsFailed; + @Nullable + private final Long pageHintsTimeout; + + HintEvent(HintEventType type, HintsDispatcher dispatcher, UUID targetHostId, InetAddressAndPort targetAddress, + @Nullable HintResult dispatchResult, @Nullable Long pageHintsSuccessful, + @Nullable Long pageHintsFailed, @Nullable Long pageHintsTimeout) + { + this.type = type; + this.dispatcher = dispatcher; + this.targetHostId = targetHostId; + this.targetAddress = targetAddress; + this.dispatchResult = dispatchResult; + this.pageHintsSuccessful = pageHintsSuccessful; + this.pageHintsFailed = pageHintsFailed; + this.pageHintsTimeout = pageHintsTimeout; + } + + public Enum<HintEventType> getType() + { + return type; + } + + public HashMap<String, Serializable> toMap() + { + // be extra defensive against nulls and bugs + HashMap<String, Serializable> ret = new HashMap<>(); + ret.put("targetHostId", targetHostId); + ret.put("targetAddress", targetAddress.getHostAddress(true)); + if (dispatchResult != null) ret.put("dispatchResult", dispatchResult.name()); + if (pageHintsSuccessful != null || pageHintsFailed != null || pageHintsTimeout != null) + { + ret.put("hint.page.hints_succeeded", pageHintsSuccessful); + ret.put("hint.page.hints_failed", pageHintsFailed); + ret.put("hint.page.hints_timed_out", pageHintsTimeout); + } + return ret; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java index 92dbc29..2d9fd9d 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java @@ -306,4 +306,14 @@ final class HintsDispatchExecutor } } } + + public boolean isPaused() + { + return isPaused.get(); + } + + public boolean hasScheduledDispatches() + { + return !scheduledDispatches.isEmpty(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintsDispatcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java index d0d9aac..2cff186 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java @@ -50,8 +50,8 @@ final class HintsDispatcher implements AutoCloseable private enum Action { CONTINUE, ABORT } private final HintsReader reader; - private final UUID hostId; - private final InetAddressAndPort address; + final UUID hostId; + final InetAddressAndPort address; private final int messagingVersion; private final BooleanSupplier abortRequested; @@ -71,11 +71,14 @@ final class HintsDispatcher implements AutoCloseable static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddressAndPort address, UUID hostId, BooleanSupplier abortRequested) { int messagingVersion = MessagingService.instance().getVersion(address); - return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, abortRequested); + HintsDispatcher dispatcher = new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, abortRequested); + HintDiagnostics.dispatcherCreated(dispatcher); + return dispatcher; } public void close() { + HintDiagnostics.dispatcherClosed(this); reader.close(); } @@ -111,6 +114,7 @@ final class HintsDispatcher implements AutoCloseable // retry in case of a timeout; stop in case of a failure, host going down, or delivery paused private Action dispatch(HintsReader.Page page) { + HintDiagnostics.dispatchPage(this); return sendHintsAndAwait(page); } @@ -132,35 +136,36 @@ final class HintsDispatcher implements AutoCloseable if (action == Action.ABORT) return action; - boolean hadFailures = false; + long success = 0, failures = 0, timeouts = 0; for (Callback cb : callbacks) { Callback.Outcome outcome = cb.await(); - updateMetrics(outcome); - - if (outcome != Callback.Outcome.SUCCESS) - hadFailures = true; + if (outcome == Callback.Outcome.SUCCESS) success++; + else if (outcome == Callback.Outcome.FAILURE) failures++; + else if (outcome == Callback.Outcome.TIMEOUT) timeouts++; } - return hadFailures ? Action.ABORT : Action.CONTINUE; - } + updateMetrics(success, failures, timeouts); - private void updateMetrics(Callback.Outcome outcome) - { - switch (outcome) + if (failures > 0 || timeouts > 0) + { + HintDiagnostics.pageFailureResult(this, success, failures, timeouts); + return Action.ABORT; + } + else { - case SUCCESS: - HintsServiceMetrics.hintsSucceeded.mark(); - break; - case FAILURE: - HintsServiceMetrics.hintsFailed.mark(); - break; - case TIMEOUT: - HintsServiceMetrics.hintsTimedOut.mark(); - break; + HintDiagnostics.pageSuccessResult(this, success, failures, timeouts); + return Action.CONTINUE; } } + private void updateMetrics(long success, long failures, long timeouts) + { + HintsServiceMetrics.hintsSucceeded.mark(success); + HintsServiceMetrics.hintsFailed.mark(failures); + HintsServiceMetrics.hintsTimedOut.mark(timeouts); + } + /* * Sending hints in compatibility mode. */ @@ -170,7 +175,10 @@ final class HintsDispatcher implements AutoCloseable while (hints.hasNext()) { if (abortRequested.getAsBoolean()) + { + HintDiagnostics.abortRequested(this); return Action.ABORT; + } callbacks.add(sendFunction.apply(hints.next())); } return Action.CONTINUE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintsService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java index a46eb52..0cd1278 100644 --- a/src/java/org/apache/cassandra/hints/HintsService.java +++ b/src/java/org/apache/cassandra/hints/HintsService.java @@ -73,8 +73,8 @@ public final class HintsService implements HintsServiceMBean private final HintsCatalog catalog; private final HintsWriteExecutor writeExecutor; private final HintsBufferPool bufferPool; - private final HintsDispatchExecutor dispatchExecutor; - private final AtomicBoolean isDispatchPaused; + final HintsDispatchExecutor dispatchExecutor; + final AtomicBoolean isDispatchPaused; private volatile boolean isShutDown = false; @@ -209,6 +209,8 @@ public final class HintsService implements HintsServiceMBean isDispatchPaused.set(false); + HintsServiceDiagnostics.dispatchingStarted(this); + HintsDispatchTrigger trigger = new HintsDispatchTrigger(catalog, writeExecutor, dispatchExecutor, isDispatchPaused); // triggering hint dispatch is now very cheap, so we can do it more often - every 10 seconds vs. every 10 minutes, // previously; this reduces mean time to delivery, and positively affects batchlog delivery latencies, too @@ -219,12 +221,16 @@ public final class HintsService implements HintsServiceMBean { logger.info("Paused hints dispatch"); isDispatchPaused.set(true); + + HintsServiceDiagnostics.dispatchingPaused(this); } public void resumeDispatch() { logger.info("Resumed hints dispatch"); isDispatchPaused.set(false); + + HintsServiceDiagnostics.dispatchingResumed(this); } /** @@ -250,6 +256,8 @@ public final class HintsService implements HintsServiceMBean dispatchExecutor.shutdownBlocking(); writeExecutor.shutdownBlocking(); + + HintsServiceDiagnostics.dispatchingShutdown(this); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintsServiceDiagnostics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsServiceDiagnostics.java b/src/java/org/apache/cassandra/hints/HintsServiceDiagnostics.java new file mode 100644 index 0000000..f4cf149 --- /dev/null +++ b/src/java/org/apache/cassandra/hints/HintsServiceDiagnostics.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.hints; + + +import org.apache.cassandra.diag.DiagnosticEventService; +import org.apache.cassandra.hints.HintsServiceEvent.HintsServiceEventType; + +/** + * Utility methods for DiagnosticEvents around the HintService. + */ +final class HintsServiceDiagnostics +{ + private static final DiagnosticEventService service = DiagnosticEventService.instance(); + + private HintsServiceDiagnostics() + { + } + + static void dispatchingStarted(HintsService hintsService) + { + if (isEnabled(HintsServiceEventType.DISPATCHING_STARTED)) + service.publish(new HintsServiceEvent(HintsServiceEventType.DISPATCHING_STARTED, hintsService)); + } + + static void dispatchingShutdown(HintsService hintsService) + { + if (isEnabled(HintsServiceEventType.DISPATCHING_SHUTDOWN)) + service.publish(new HintsServiceEvent(HintsServiceEventType.DISPATCHING_SHUTDOWN, hintsService)); + } + + static void dispatchingPaused(HintsService hintsService) + { + if (isEnabled(HintsServiceEventType.DISPATCHING_PAUSED)) + service.publish(new HintsServiceEvent(HintsServiceEventType.DISPATCHING_PAUSED, hintsService)); + } + + static void dispatchingResumed(HintsService hintsService) + { + if (isEnabled(HintsServiceEventType.DISPATCHING_RESUMED)) + service.publish(new HintsServiceEvent(HintsServiceEventType.DISPATCHING_RESUMED, hintsService)); + } + + private static boolean isEnabled(HintsServiceEventType type) + { + return service.isEnabled(HintsServiceEvent.class, type); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/hints/HintsServiceEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsServiceEvent.java b/src/java/org/apache/cassandra/hints/HintsServiceEvent.java new file mode 100644 index 0000000..72497a0 --- /dev/null +++ b/src/java/org/apache/cassandra/hints/HintsServiceEvent.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.hints; + +import java.io.Serializable; +import java.util.HashMap; + +import org.apache.cassandra.diag.DiagnosticEvent; + +/** + * DiagnosticEvent implementation for HintService. + */ +final class HintsServiceEvent extends DiagnosticEvent +{ + enum HintsServiceEventType + { + DISPATCHING_STARTED, + DISPATCHING_PAUSED, + DISPATCHING_RESUMED, + DISPATCHING_SHUTDOWN + } + + private final HintsServiceEventType type; + private final HintsService service; + private final boolean isDispatchPaused; + private final boolean isShutdown; + private final boolean dispatchExecutorIsPaused; + private final boolean dispatchExecutorHasScheduledDispatches; + + HintsServiceEvent(HintsServiceEventType type, HintsService service) + { + this.type = type; + this.service = service; + this.isDispatchPaused = service.isDispatchPaused.get(); + this.isShutdown = service.isShutDown(); + this.dispatchExecutorIsPaused = service.dispatchExecutor.isPaused(); + this.dispatchExecutorHasScheduledDispatches = service.dispatchExecutor.hasScheduledDispatches(); + } + + public Enum<HintsServiceEventType> getType() + { + return type; + } + + public HashMap<String, Serializable> toMap() + { + // be extra defensive against nulls and bugs + HashMap<String, Serializable> ret = new HashMap<>(); + ret.put("isDispatchPaused", isDispatchPaused); + ret.put("isShutdown", isShutdown); + ret.put("dispatchExecutorIsPaused", dispatchExecutorIsPaused); + ret.put("dispatchExecutorHasScheduledDispatches", dispatchExecutorHasScheduledDispatches); + return ret; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index e2c4628..46c191f 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -800,6 +800,8 @@ public class TokenMetadata // avoid race between both branches - do not use a lock here as this will block any other unrelated operations! synchronized (pendingRanges) { + TokenMetadataDiagnostics.pendingRangeCalculationStarted(this, keyspaceName); + if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) { if (logger.isTraceEnabled()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/locator/TokenMetadataDiagnostics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadataDiagnostics.java b/src/java/org/apache/cassandra/locator/TokenMetadataDiagnostics.java new file mode 100644 index 0000000..0221f1e --- /dev/null +++ b/src/java/org/apache/cassandra/locator/TokenMetadataDiagnostics.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import org.apache.cassandra.diag.DiagnosticEventService; +import org.apache.cassandra.locator.TokenMetadataEvent.TokenMetadataEventType; + +/** + * Utility methods for events related to {@link TokenMetadata} changes. + */ +final class TokenMetadataDiagnostics +{ + private static final DiagnosticEventService service = DiagnosticEventService.instance(); + + private TokenMetadataDiagnostics() + { + } + + static void pendingRangeCalculationStarted(TokenMetadata tokenMetadata, String keyspace) + { + if (isEnabled(TokenMetadataEventType.PENDING_RANGE_CALCULATION_STARTED)) + service.publish(new TokenMetadataEvent(TokenMetadataEventType.PENDING_RANGE_CALCULATION_STARTED, tokenMetadata, keyspace)); + } + + private static boolean isEnabled(TokenMetadataEventType type) + { + return service.isEnabled(TokenMetadataEvent.class, type); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/locator/TokenMetadataEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadataEvent.java b/src/java/org/apache/cassandra/locator/TokenMetadataEvent.java new file mode 100644 index 0000000..c3ed074 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/TokenMetadataEvent.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.io.Serializable; +import java.util.HashMap; + +import org.apache.cassandra.diag.DiagnosticEvent; + +/** + * Events related to {@link TokenMetadata} changes. + */ +public final class TokenMetadataEvent extends DiagnosticEvent +{ + + public enum TokenMetadataEventType + { + PENDING_RANGE_CALCULATION_STARTED, + PENDING_RANGE_CALCULATION_COMPLETED, + } + + private final TokenMetadataEventType type; + private final TokenMetadata tokenMetadata; + private final String keyspace; + + TokenMetadataEvent(TokenMetadataEventType type, TokenMetadata tokenMetadata, String keyspace) + { + this.type = type; + this.tokenMetadata = tokenMetadata; + this.keyspace = keyspace; + } + + public TokenMetadataEventType getType() + { + return type; + } + + public HashMap<String, Serializable> toMap() + { + // be extra defensive against nulls and bugs + HashMap<String, Serializable> ret = new HashMap<>(); + ret.put("keyspace", keyspace); + ret.put("tokenMetadata", tokenMetadata.toString()); + return ret; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/Diff.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Diff.java b/src/java/org/apache/cassandra/schema/Diff.java index 36c0687..7112c85 100644 --- a/src/java/org/apache/cassandra/schema/Diff.java +++ b/src/java/org/apache/cassandra/schema/Diff.java @@ -55,5 +55,10 @@ public class Diff<T extends Iterable, S> this.after = after; this.kind = kind; } + + public String toString() + { + return String.format("%s -> %s (%s)", before, after, kind); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java index ac95054..a439e2e 100644 --- a/src/java/org/apache/cassandra/schema/MigrationManager.java +++ b/src/java/org/apache/cassandra/schema/MigrationManager.java @@ -72,8 +72,9 @@ public class MigrationManager { if (Schema.instance.getVersion() == null) { - logger.debug("Not pulling schema from {}, because local schama version is not known yet", + logger.debug("Not pulling schema from {}, because local schema version is not known yet", endpoint); + SchemaMigrationDiagnostics.unknownLocalSchemaVersion(endpoint, theirVersion); return; } if (Schema.instance.isSameVersion(theirVersion)) @@ -81,12 +82,14 @@ public class MigrationManager logger.debug("Not pulling schema from {}, because schema versions match ({})", endpoint, Schema.schemaVersionToString(theirVersion)); + SchemaMigrationDiagnostics.versionMatch(endpoint, theirVersion); return; } if (!shouldPullSchemaFrom(endpoint)) { logger.debug("Not pulling schema from {}, because versions match ({}/{}), or shouldPullSchemaFrom returned false", endpoint, Schema.instance.getVersion(), theirVersion); + SchemaMigrationDiagnostics.skipPull(endpoint, theirVersion); return; } @@ -319,10 +322,22 @@ public class MigrationManager { Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(() -> Schema.instance.mergeAndAnnounceVersion(schema)); + Set<InetAddressAndPort> schemaDestinationEndpoints = new HashSet<>(); + Set<InetAddressAndPort> schemaEndpointsIgnored = new HashSet<>(); for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers()) + { if (shouldPushSchemaTo(endpoint)) + { pushSchemaMutation(endpoint, schema); + schemaDestinationEndpoints.add(endpoint); + } + else + { + schemaEndpointsIgnored.add(endpoint); + } + } + SchemaAnnouncementDiagnostics.schemaMutationsAnnounced(schemaDestinationEndpoints, schemaEndpointsIgnored); FBUtilities.waitOnFuture(f); } @@ -340,9 +355,23 @@ public class MigrationManager if (locally || result.diff.isEmpty()) return result.diff; + Set<InetAddressAndPort> schemaDestinationEndpoints = new HashSet<>(); + Set<InetAddressAndPort> schemaEndpointsIgnored = new HashSet<>(); for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers()) + { if (shouldPushSchemaTo(endpoint)) + { pushSchemaMutation(endpoint, result.mutations); + schemaDestinationEndpoints.add(endpoint); + } + else + { + schemaEndpointsIgnored.add(endpoint); + } + } + + SchemaAnnouncementDiagnostics.schemaTransformationAnnounced(schemaDestinationEndpoints, schemaEndpointsIgnored, + transformation); return result.diff; } @@ -357,6 +386,8 @@ public class MigrationManager logger.debug("Truncating schema tables..."); + SchemaMigrationDiagnostics.resetLocalSchema(); + SchemaKeyspace.truncate(); logger.debug("Clearing local schema keyspace definitions..."); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/MigrationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/MigrationTask.java b/src/java/org/apache/cassandra/schema/MigrationTask.java index 6ff206a..bf96fb2 100644 --- a/src/java/org/apache/cassandra/schema/MigrationTask.java +++ b/src/java/org/apache/cassandra/schema/MigrationTask.java @@ -51,6 +51,7 @@ final class MigrationTask extends WrappedRunnable MigrationTask(InetAddressAndPort endpoint) { this.endpoint = endpoint; + SchemaMigrationDiagnostics.taskCreated(endpoint); } static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks() @@ -63,6 +64,7 @@ final class MigrationTask extends WrappedRunnable if (!FailureDetector.instance.isAlive(endpoint)) { logger.warn("Can't send schema pull request: node {} is down.", endpoint); + SchemaMigrationDiagnostics.taskSendAborted(endpoint); return; } @@ -72,6 +74,7 @@ final class MigrationTask extends WrappedRunnable if (!MigrationManager.shouldPullSchemaFrom(endpoint)) { logger.info("Skipped sending a migration request: node {} has a higher major version now.", endpoint); + SchemaMigrationDiagnostics.taskSendAborted(endpoint); return; } @@ -109,5 +112,7 @@ final class MigrationTask extends WrappedRunnable inflightTasks.offer(completionLatch); MessagingService.instance().sendRR(message, endpoint, cb); + + SchemaMigrationDiagnostics.taskRequestSend(endpoint); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java index e1353cd..970d9ac 100644 --- a/src/java/org/apache/cassandra/schema/Schema.java +++ b/src/java/org/apache/cassandra/schema/Schema.java @@ -98,9 +98,11 @@ public final class Schema */ public void loadFromDisk(boolean updateVersion) { + SchemaDiagnostics.schemataLoading(this); SchemaKeyspace.fetchNonSystemKeyspaces().forEach(this::load); if (updateVersion) updateVersion(); + SchemaDiagnostics.schemataLoaded(this); } /** @@ -128,6 +130,8 @@ public final class Schema ksm.tables .indexTables() .forEach((name, metadata) -> indexMetadataRefs.put(Pair.create(ksm.name, name), new TableMetadataRef(metadata))); + + SchemaDiagnostics.metadataInitialized(this, ksm); } private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated) @@ -166,6 +170,8 @@ public final class Schema .stream() .map(MapDifference.ValueDifference::rightValue) .forEach(indexTable -> indexMetadataRefs.get(Pair.create(indexTable.keyspace, indexTable.indexName().get())).set(indexTable)); + + SchemaDiagnostics.metadataReloaded(this, previous, updated, tablesDiff, viewsDiff, indexesDiff); } public void registerListener(SchemaChangeListener listener) @@ -249,6 +255,8 @@ public final class Schema .indexTables() .keySet() .forEach(name -> indexMetadataRefs.remove(Pair.create(ksm.name, name))); + + SchemaDiagnostics.metadataRemoved(this, ksm); } public int getNumberOfTables() @@ -356,6 +364,11 @@ public final class Schema return indexMetadataRefs.get(Pair.create(keyspace, index)); } + Map<Pair<String, String>, TableMetadataRef> getIndexTableMetadataRefs() + { + return indexMetadataRefs; + } + /** * Get Table metadata by its identifier * @@ -373,6 +386,11 @@ public final class Schema return getTableMetadataRef(descriptor.ksname, descriptor.cfname); } + Map<TableId, TableMetadataRef> getTableMetadataRefs() + { + return metadataRefs; + } + /** * Given a keyspace name and table name, get the table * meta data. If the keyspace name or table name is not valid @@ -511,6 +529,7 @@ public final class Schema { version = SchemaKeyspace.calculateSchemaDigest(); SystemKeyspace.updateSchemaVersion(version); + SchemaDiagnostics.versionUpdated(this); } /* @@ -529,6 +548,7 @@ public final class Schema private void passiveAnnounceVersion() { Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version)); + SchemaDiagnostics.versionAnnounced(this); } /** @@ -538,6 +558,7 @@ public final class Schema { getNonSystemKeyspaces().forEach(k -> unload(getKeyspaceMetadata(k))); updateVersionAndAnnounce(); + SchemaDiagnostics.schemataCleared(this); } /* @@ -646,6 +667,8 @@ public final class Schema private void alterKeyspace(KeyspaceDiff delta) { + SchemaDiagnostics.keyspaceAltering(this, delta); + // drop tables and views delta.views.dropped.forEach(this::dropView); delta.tables.dropped.forEach(this::dropTable); @@ -685,10 +708,12 @@ public final class Schema delta.views.altered.forEach(diff -> notifyAlterView(diff.before, diff.after)); delta.udfs.altered.forEach(diff -> notifyAlterFunction(diff.before, diff.after)); delta.udas.altered.forEach(diff -> notifyAlterAggregate(diff.before, diff.after)); + SchemaDiagnostics.keyspaceAltered(this, delta); } private void createKeyspace(KeyspaceMetadata keyspace) { + SchemaDiagnostics.keyspaceCreating(this, keyspace); load(keyspace); Keyspace.open(keyspace.name); @@ -698,10 +723,12 @@ public final class Schema keyspace.views.forEach(this::notifyCreateView); keyspace.functions.udfs().forEach(this::notifyCreateFunction); keyspace.functions.udas().forEach(this::notifyCreateAggregate); + SchemaDiagnostics.keyspaceCreated(this, keyspace); } private void dropKeyspace(KeyspaceMetadata keyspace) { + SchemaDiagnostics.keyspaceDroping(this, keyspace); keyspace.views.forEach(this::dropView); keyspace.tables.forEach(this::dropTable); @@ -716,6 +743,7 @@ public final class Schema keyspace.tables.forEach(this::notifyDropTable); keyspace.types.forEach(this::notifyDropType); notifyDropKeyspace(keyspace); + SchemaDiagnostics.keyspaceDroped(this, keyspace); } private void dropView(ViewMetadata metadata) @@ -726,6 +754,7 @@ public final class Schema private void dropTable(TableMetadata metadata) { + SchemaDiagnostics.tableDropping(this, metadata); ColumnFamilyStore cfs = Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name); assert cfs != null; // make sure all the indexes are dropped, or else. @@ -735,11 +764,14 @@ public final class Schema cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, ColumnFamilyStore.SNAPSHOT_DROP_PREFIX)); CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id)); Keyspace.open(metadata.keyspace).dropCf(metadata.id); + SchemaDiagnostics.tableDropped(this, metadata); } private void createTable(TableMetadata table) { + SchemaDiagnostics.tableCreating(this, table); Keyspace.open(table.keyspace).initCf(metadataRefs.get(table.id), true); + SchemaDiagnostics.tableCreated(this, table); } private void createView(ViewMetadata view) @@ -749,7 +781,9 @@ public final class Schema private void alterTable(TableMetadata updated) { + SchemaDiagnostics.tableAltering(this, updated); Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload(); + SchemaDiagnostics.tableAltered(this, updated); } private void alterView(ViewMetadata updated) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaAnnouncementDiagnostics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaAnnouncementDiagnostics.java b/src/java/org/apache/cassandra/schema/SchemaAnnouncementDiagnostics.java new file mode 100644 index 0000000..be60b1b --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaAnnouncementDiagnostics.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.util.Set; + +import org.apache.cassandra.diag.DiagnosticEventService; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.SchemaAnnouncementEvent.SchemaAnnouncementEventType; + +final class SchemaAnnouncementDiagnostics +{ + private static final DiagnosticEventService service = DiagnosticEventService.instance(); + + private SchemaAnnouncementDiagnostics() + { + } + + static void schemaMutationsAnnounced(Set<InetAddressAndPort> schemaDestinationEndpoints, Set<InetAddressAndPort> schemaEndpointsIgnored) + { + if (isEnabled(SchemaAnnouncementEventType.SCHEMA_MUTATIONS_ANNOUNCED)) + service.publish(new SchemaAnnouncementEvent(SchemaAnnouncementEventType.SCHEMA_MUTATIONS_ANNOUNCED, + schemaDestinationEndpoints, schemaEndpointsIgnored, null, null)); + } + + public static void schemataMutationsReceived(InetAddressAndPort from) + { + if (isEnabled(SchemaAnnouncementEventType.SCHEMA_MUTATIONS_RECEIVED)) + service.publish(new SchemaAnnouncementEvent(SchemaAnnouncementEventType.SCHEMA_MUTATIONS_RECEIVED, + null, null, null, from)); + } + + static void schemaTransformationAnnounced(Set<InetAddressAndPort> schemaDestinationEndpoints, Set<InetAddressAndPort> schemaEndpointsIgnored, SchemaTransformation transformation) + { + if (isEnabled(SchemaAnnouncementEventType.SCHEMA_TRANSFORMATION_ANNOUNCED)) + service.publish(new SchemaAnnouncementEvent(SchemaAnnouncementEventType.SCHEMA_TRANSFORMATION_ANNOUNCED, + schemaDestinationEndpoints, schemaEndpointsIgnored, transformation, null)); + } + + private static boolean isEnabled(SchemaAnnouncementEventType type) + { + return service.isEnabled(SchemaAnnouncementEvent.class, type); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaAnnouncementEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaAnnouncementEvent.java b/src/java/org/apache/cassandra/schema/SchemaAnnouncementEvent.java new file mode 100644 index 0000000..4e0bd68 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaAnnouncementEvent.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.locator.InetAddressAndPort; + +/** + * Events emitted by {@link MigrationManager} around propagating schema changes to remote nodes. + */ +final class SchemaAnnouncementEvent extends DiagnosticEvent +{ + private final SchemaAnnouncementEventType type; + @Nullable + private final Set<InetAddressAndPort> schemaDestinationEndpoints; + @Nullable + private final Set<InetAddressAndPort> schemaEndpointsIgnored; + @Nullable + private final CQLStatement statement; + @Nullable + private final InetAddressAndPort sender; + + enum SchemaAnnouncementEventType + { + SCHEMA_MUTATIONS_ANNOUNCED, + SCHEMA_TRANSFORMATION_ANNOUNCED, + SCHEMA_MUTATIONS_RECEIVED + } + + SchemaAnnouncementEvent(SchemaAnnouncementEventType type, + @Nullable Set<InetAddressAndPort> schemaDestinationEndpoints, + @Nullable Set<InetAddressAndPort> schemaEndpointsIgnored, + @Nullable SchemaTransformation transformation, + @Nullable InetAddressAndPort sender) + { + this.type = type; + this.schemaDestinationEndpoints = schemaDestinationEndpoints; + this.schemaEndpointsIgnored = schemaEndpointsIgnored; + if (transformation instanceof CQLStatement) this.statement = (CQLStatement) transformation; + else this.statement = null; + this.sender = sender; + } + + public Enum<?> getType() + { + return type; + } + + public Map<String, Serializable> toMap() + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (schemaDestinationEndpoints != null) + { + Set<String> eps = schemaDestinationEndpoints.stream().map(InetAddressAndPort::toString).collect(Collectors.toSet()); + ret.put("endpointDestinations", new HashSet<>(eps)); + } + if (schemaEndpointsIgnored != null) + { + Set<String> eps = schemaEndpointsIgnored.stream().map(InetAddressAndPort::toString).collect(Collectors.toSet()); + ret.put("endpointIgnored", new HashSet<>(eps)); + } + if (statement != null) + { + AuditLogContext logContext = statement.getAuditLogContext(); + if (logContext != null) + { + HashMap<String, String> log = new HashMap<>(); + if (logContext.auditLogEntryType != null) log.put("type", logContext.auditLogEntryType.name()); + if (logContext.keyspace != null) log.put("keyspace", logContext.keyspace); + if (logContext.scope != null) log.put("table", logContext.scope); + ret.put("statement", log); + } + } + if (sender != null) ret.put("sender", sender.toString()); + return ret; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaDiagnostics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaDiagnostics.java b/src/java/org/apache/cassandra/schema/SchemaDiagnostics.java new file mode 100644 index 0000000..12b8409 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaDiagnostics.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import com.google.common.collect.MapDifference; + +import org.apache.cassandra.diag.DiagnosticEventService; +import org.apache.cassandra.schema.SchemaEvent.SchemaEventType; + +final class SchemaDiagnostics +{ + private static final DiagnosticEventService service = DiagnosticEventService.instance(); + + private SchemaDiagnostics() + { + } + + static void metadataInitialized(Schema schema, KeyspaceMetadata ksmUpdate) + { + if (isEnabled(SchemaEventType.KS_METADATA_LOADED)) + service.publish(new SchemaEvent(SchemaEventType.KS_METADATA_LOADED, schema, ksmUpdate, null, null, null, null, null, null)); + } + + static void metadataReloaded(Schema schema, KeyspaceMetadata previous, KeyspaceMetadata ksmUpdate, Tables.TablesDiff tablesDiff, Views.ViewsDiff viewsDiff, MapDifference<String,TableMetadata> indexesDiff) + { + if (isEnabled(SchemaEventType.KS_METADATA_RELOADED)) + service.publish(new SchemaEvent(SchemaEventType.KS_METADATA_RELOADED, schema, ksmUpdate, previous, + null, null, tablesDiff, viewsDiff, indexesDiff)); + } + + static void metadataRemoved(Schema schema, KeyspaceMetadata ksmUpdate) + { + if (isEnabled(SchemaEventType.KS_METADATA_REMOVED)) + service.publish(new SchemaEvent(SchemaEventType.KS_METADATA_REMOVED, schema, ksmUpdate, + null, null, null, null, null, null)); + } + + static void versionUpdated(Schema schema) + { + if (isEnabled(SchemaEventType.VERSION_UPDATED)) + service.publish(new SchemaEvent(SchemaEventType.VERSION_UPDATED, schema, + null, null, null, null, null, null, null)); + } + + static void keyspaceCreating(Schema schema, KeyspaceMetadata keyspace) + { + if (isEnabled(SchemaEventType.KS_CREATING)) + service.publish(new SchemaEvent(SchemaEventType.KS_CREATING, schema, keyspace, + null, null, null, null, null, null)); + } + + static void keyspaceCreated(Schema schema, KeyspaceMetadata keyspace) + { + if (isEnabled(SchemaEventType.KS_CREATED)) + service.publish(new SchemaEvent(SchemaEventType.KS_CREATED, schema, keyspace, + null, null, null, null, null, null)); + } + + static void keyspaceAltering(Schema schema, KeyspaceMetadata.KeyspaceDiff delta) + { + if (isEnabled(SchemaEventType.KS_ALTERING)) + service.publish(new SchemaEvent(SchemaEventType.KS_ALTERING, schema, delta.after, + delta.before, delta, null, null, null, null)); + } + + static void keyspaceAltered(Schema schema, KeyspaceMetadata.KeyspaceDiff delta) + { + if (isEnabled(SchemaEventType.KS_ALTERED)) + service.publish(new SchemaEvent(SchemaEventType.KS_ALTERED, schema, delta.after, + delta.before, delta, null, null, null, null)); + } + + static void keyspaceDroping(Schema schema, KeyspaceMetadata keyspace) + { + if (isEnabled(SchemaEventType.KS_DROPPING)) + service.publish(new SchemaEvent(SchemaEventType.KS_DROPPING, schema, keyspace, + null, null, null, null, null, null)); + } + + static void keyspaceDroped(Schema schema, KeyspaceMetadata keyspace) + { + if (isEnabled(SchemaEventType.KS_DROPPED)) + service.publish(new SchemaEvent(SchemaEventType.KS_DROPPED, schema, keyspace, + null, null, null, null, null, null)); + } + + static void schemataLoading(Schema schema) + { + if (isEnabled(SchemaEventType.SCHEMATA_LOADING)) + service.publish(new SchemaEvent(SchemaEventType.SCHEMATA_LOADING, schema, null, + null, null, null, null, null, null)); + } + + static void schemataLoaded(Schema schema) + { + if (isEnabled(SchemaEventType.SCHEMATA_LOADED)) + service.publish(new SchemaEvent(SchemaEventType.SCHEMATA_LOADED, schema, null, + null, null, null, null, null, null)); + } + + static void versionAnnounced(Schema schema) + { + if (isEnabled(SchemaEventType.VERSION_ANOUNCED)) + service.publish(new SchemaEvent(SchemaEventType.VERSION_ANOUNCED, schema, null, + null, null, null, null, null, null)); + } + + static void schemataCleared(Schema schema) + { + if (isEnabled(SchemaEventType.SCHEMATA_CLEARED)) + service.publish(new SchemaEvent(SchemaEventType.SCHEMATA_CLEARED, schema, null, + null, null, null, null, null, null)); + } + + static void tableCreating(Schema schema, TableMetadata table) + { + if (isEnabled(SchemaEventType.TABLE_CREATING)) + service.publish(new SchemaEvent(SchemaEventType.TABLE_CREATING, schema, null, + null, null, table, null, null, null)); + } + + static void tableCreated(Schema schema, TableMetadata table) + { + if (isEnabled(SchemaEventType.TABLE_CREATED)) + service.publish(new SchemaEvent(SchemaEventType.TABLE_CREATED, schema, null, + null, null, table, null, null, null)); + } + + static void tableAltering(Schema schema, TableMetadata table) + { + if (isEnabled(SchemaEventType.TABLE_ALTERING)) + service.publish(new SchemaEvent(SchemaEventType.TABLE_ALTERING, schema, null, + null, null, table, null, null, null)); + } + + static void tableAltered(Schema schema, TableMetadata table) + { + if (isEnabled(SchemaEventType.TABLE_ALTERED)) + service.publish(new SchemaEvent(SchemaEventType.TABLE_ALTERED, schema, null, + null, null, table, null, null, null)); + } + + static void tableDropping(Schema schema, TableMetadata table) + { + if (isEnabled(SchemaEventType.TABLE_DROPPING)) + service.publish(new SchemaEvent(SchemaEventType.TABLE_DROPPING, schema, null, + null, null, table, null, null, null)); + } + + static void tableDropped(Schema schema, TableMetadata table) + { + if (isEnabled(SchemaEventType.TABLE_DROPPED)) + service.publish(new SchemaEvent(SchemaEventType.TABLE_DROPPED, schema, null, + null, null, table, null, null, null)); + } + + private static boolean isEnabled(SchemaEventType type) + { + return service.isEnabled(SchemaEvent.class, type); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaEvent.java b/src/java/org/apache/cassandra/schema/SchemaEvent.java new file mode 100644 index 0000000..e26cee5 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaEvent.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +import com.google.common.collect.Lists; +import com.google.common.collect.MapDifference; + +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.utils.Pair; + +final class SchemaEvent extends DiagnosticEvent +{ + private final SchemaEventType type; + + private final HashSet<String> keyspaces; + private final HashMap<String, String> indexTables; + private final HashMap<String, String> tables; + private final ArrayList<String> nonSystemKeyspaces; + private final ArrayList<String> userKeyspaces; + private final int numberOfTables; + private final UUID version; + + @Nullable + private final KeyspaceMetadata ksUpdate; + @Nullable + private final KeyspaceMetadata previous; + @Nullable + private final KeyspaceMetadata.KeyspaceDiff ksDiff; + @Nullable + private final TableMetadata tableUpdate; + @Nullable + private final Tables.TablesDiff tablesDiff; + @Nullable + private final Views.ViewsDiff viewsDiff; + @Nullable + private final MapDifference<String,TableMetadata> indexesDiff; + + enum SchemaEventType + { + KS_METADATA_LOADED, + KS_METADATA_RELOADED, + KS_METADATA_REMOVED, + VERSION_UPDATED, + VERSION_ANOUNCED, + KS_CREATING, + KS_CREATED, + KS_ALTERING, + KS_ALTERED, + KS_DROPPING, + KS_DROPPED, + TABLE_CREATING, + TABLE_CREATED, + TABLE_ALTERING, + TABLE_ALTERED, + TABLE_DROPPING, + TABLE_DROPPED, + SCHEMATA_LOADING, + SCHEMATA_LOADED, + SCHEMATA_CLEARED + } + + SchemaEvent(SchemaEventType type, Schema schema, @Nullable KeyspaceMetadata ksUpdate, + @Nullable KeyspaceMetadata previous, @Nullable KeyspaceMetadata.KeyspaceDiff ksDiff, + @Nullable TableMetadata tableUpdate, @Nullable Tables.TablesDiff tablesDiff, + @Nullable Views.ViewsDiff viewsDiff, @Nullable MapDifference<String,TableMetadata> indexesDiff) + { + this.type = type; + this.ksUpdate = ksUpdate; + this.previous = previous; + this.ksDiff = ksDiff; + this.tableUpdate = tableUpdate; + this.tablesDiff = tablesDiff; + this.viewsDiff = viewsDiff; + this.indexesDiff = indexesDiff; + + this.keyspaces = new HashSet<>(schema.getKeyspaces()); + this.nonSystemKeyspaces = new ArrayList<>(schema.getNonSystemKeyspaces()); + this.userKeyspaces = new ArrayList<>(schema.getUserKeyspaces()); + this.numberOfTables = schema.getNumberOfTables(); + this.version = schema.getVersion(); + + Map<Pair<String, String>, TableMetadataRef> indexTableMetadataRefs = schema.getIndexTableMetadataRefs(); + Map<String, String> indexTables = indexTableMetadataRefs.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().left + ',' + + e.getKey().right, + e -> e.getValue().id.toHexString() + ',' + + e.getValue().keyspace + ',' + + e.getValue().name)); + this.indexTables = new HashMap<>(indexTables); + Map<TableId, TableMetadataRef> tableMetadataRefs = schema.getTableMetadataRefs(); + Map<String, String> tables = tableMetadataRefs.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().toHexString(), + e -> e.getValue().id.toHexString() + ',' + + e.getValue().keyspace + ',' + + e.getValue().name)); + this.tables = new HashMap<>(tables); + } + + public SchemaEventType getType() + { + return type; + } + + public Map<String, Serializable> toMap() + { + HashMap<String, Serializable> ret = new HashMap<>(); + ret.put("keyspaces", this.keyspaces); + ret.put("nonSystemKeyspaces", this.nonSystemKeyspaces); + ret.put("userKeyspaces", this.userKeyspaces); + ret.put("numberOfTables", this.numberOfTables); + ret.put("version", this.version); + ret.put("tables", this.tables); + ret.put("indexTables", this.indexTables); + if (ksUpdate != null) ret.put("ksMetadataUpdate", repr(ksUpdate)); + if (previous != null) ret.put("ksMetadataPrevious", repr(previous)); + if (ksDiff != null) + { + HashMap<String, Serializable> ks = new HashMap<>(); + ks.put("before", repr(ksDiff.before)); + ks.put("after", repr(ksDiff.after)); + ks.put("tables", repr(ksDiff.tables)); + ks.put("views", repr(ksDiff.views)); + ks.put("types", repr(ksDiff.types)); + ks.put("udas", repr(ksDiff.udas)); + ks.put("udfs", repr(ksDiff.udfs)); + ret.put("ksDiff", ks); + } + if (tableUpdate != null) ret.put("tableMetadataUpdate", repr(tableUpdate)); + if (tablesDiff != null) ret.put("tablesDiff", repr(tablesDiff)); + if (viewsDiff != null) ret.put("viewsDiff", repr(viewsDiff)); + if (indexesDiff != null) ret.put("indexesDiff", Lists.newArrayList(indexesDiff.entriesDiffering().keySet())); + return ret; + } + + private HashMap<String, Serializable> repr(Diff<?, ?> diff) + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (diff.created != null) ret.put("created", diff.created.toString()); + if (diff.dropped != null) ret.put("dropped", diff.dropped.toString()); + if (diff.altered != null) + ret.put("created", Lists.newArrayList(diff.altered.stream().map(Diff.Altered::toString).iterator())); + return ret; + } + + private HashMap<String, Serializable> repr(KeyspaceMetadata ksm) + { + HashMap<String, Serializable> ret = new HashMap<>(); + ret.put("name", ksm.name); + if (ksm.kind != null) ret.put("kind", ksm.kind.name()); + if (ksm.params != null) ret.put("params", ksm.params.toString()); + if (ksm.tables != null) ret.put("tables", ksm.tables.toString()); + if (ksm.views != null) ret.put("views", ksm.views.toString()); + if (ksm.functions != null) ret.put("functions", ksm.functions.toString()); + if (ksm.types != null) ret.put("types", ksm.types.toString()); + return ret; + } + + private HashMap<String, Serializable> repr(TableMetadata table) + { + HashMap<String, Serializable> ret = new HashMap<>(); + ret.put("id", table.id.toHexString()); + ret.put("name", table.name); + ret.put("keyspace", table.keyspace); + ret.put("partitioner", table.partitioner.toString()); + ret.put("kind", table.kind.name()); + ret.put("flags", Lists.newArrayList(table.flags.stream().map(Enum::name).iterator())); + ret.put("params", repr(table.params)); + ret.put("indexes", Lists.newArrayList(table.indexes.stream().map(this::repr).iterator())); + ret.put("triggers", Lists.newArrayList(repr(table.triggers))); + ret.put("columns", Lists.newArrayList(table.columns.values().stream().map(this::repr).iterator())); + ret.put("droppedColumns", Lists.newArrayList(table.droppedColumns.values().stream().map(this::repr).iterator())); + ret.put("isCompactTable", table.isCompactTable()); + ret.put("isCompound", table.isCompound()); + ret.put("isCounter", table.isCounter()); + ret.put("isCQLTable", table.isCQLTable()); + ret.put("isDense", table.isDense()); + ret.put("isIndex", table.isIndex()); + ret.put("isStaticCompactTable", table.isStaticCompactTable()); + ret.put("isSuper", table.isSuper()); + ret.put("isView", table.isView()); + ret.put("isVirtual", table.isVirtual()); + return ret; + } + + private HashMap<String, Serializable> repr(TableParams params) + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (params == null) return ret; + ret.put("minIndexInterval", params.minIndexInterval); + ret.put("maxIndexInterval", params.maxIndexInterval); + ret.put("defaultTimeToLive", params.defaultTimeToLive); + ret.put("gcGraceSeconds", params.gcGraceSeconds); + ret.put("bloomFilterFpChance", params.bloomFilterFpChance); + ret.put("cdc", params.cdc); + ret.put("crcCheckChance", params.crcCheckChance); + ret.put("memtableFlushPeriodInMs", params.memtableFlushPeriodInMs); + ret.put("comment", params.comment); + ret.put("caching", repr(params.caching)); + ret.put("compaction", repr(params.compaction)); + ret.put("compression", repr(params.compression)); + if (params.speculativeRetry != null) ret.put("speculativeRetry", params.speculativeRetry.kind().name()); + return ret; + } + + private HashMap<String, Serializable> repr(CachingParams caching) + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (caching == null) return ret; + ret.putAll(caching.asMap()); + return ret; + } + + private HashMap<String, Serializable> repr(CompactionParams comp) + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (comp == null) return ret; + ret.putAll(comp.asMap()); + return ret; + } + + private HashMap<String, Serializable> repr(CompressionParams compr) + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (compr == null) return ret; + ret.putAll(compr.asMap()); + return ret; + } + + private HashMap<String, Serializable> repr(IndexMetadata index) + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (index == null) return ret; + ret.put("name", index.name); + ret.put("kind", index.kind.name()); + ret.put("id", index.id); + ret.put("options", new HashMap<>(index.options)); + ret.put("isCustom", index.isCustom()); + ret.put("isKeys", index.isKeys()); + ret.put("isComposites", index.isComposites()); + return ret; + } + + private List<Map<String, Serializable>> repr(Triggers triggers) + { + List<Map<String, Serializable>> ret = new ArrayList<>(); + if (triggers == null) return ret; + Iterator<TriggerMetadata> iter = triggers.iterator(); + while (iter.hasNext()) ret.add(repr(iter.next())); + return ret; + } + + private HashMap<String, Serializable> repr(TriggerMetadata trigger) + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (trigger == null) return ret; + ret.put("name", trigger.name); + ret.put("classOption", trigger.classOption); + return ret; + } + + private HashMap<String, Serializable> repr(ColumnMetadata col) + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (col == null) return ret; + ret.put("name", col.name.toString()); + ret.put("kind", col.kind.name()); + ret.put("type", col.type.toString()); + ret.put("ksName", col.ksName); + ret.put("cfName", col.cfName); + ret.put("position", col.position()); + ret.put("clusteringOrder", col.clusteringOrder().name()); + ret.put("isComplex", col.isComplex()); + ret.put("isStatic", col.isStatic()); + ret.put("isPrimaryKeyColumn", col.isPrimaryKeyColumn()); + ret.put("isSimple", col.isSimple()); + ret.put("isPartitionKey", col.isPartitionKey()); + ret.put("isClusteringColumn", col.isClusteringColumn()); + ret.put("isCounterColumn", col.isCounterColumn()); + ret.put("isRegular", col.isRegular()); + return ret; + } + + private HashMap<String, Serializable> repr(DroppedColumn column) + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (column == null) return ret; + ret.put("droppedTime", column.droppedTime); + ret.put("column", repr(column.column)); + return ret; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaMigrationDiagnostics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaMigrationDiagnostics.java b/src/java/org/apache/cassandra/schema/SchemaMigrationDiagnostics.java new file mode 100644 index 0000000..62f1768 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaMigrationDiagnostics.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.util.UUID; + +import org.apache.cassandra.diag.DiagnosticEventService; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.SchemaMigrationEvent.MigrationManagerEventType; + +final class SchemaMigrationDiagnostics +{ + private static final DiagnosticEventService service = DiagnosticEventService.instance(); + + private SchemaMigrationDiagnostics() + { + } + + static void unknownLocalSchemaVersion(InetAddressAndPort endpoint, UUID theirVersion) + { + if (isEnabled(MigrationManagerEventType.UNKNOWN_LOCAL_SCHEMA_VERSION)) + service.publish(new SchemaMigrationEvent(MigrationManagerEventType.UNKNOWN_LOCAL_SCHEMA_VERSION, endpoint, + theirVersion)); + } + + static void versionMatch(InetAddressAndPort endpoint, UUID theirVersion) + { + if (isEnabled(MigrationManagerEventType.VERSION_MATCH)) + service.publish(new SchemaMigrationEvent(MigrationManagerEventType.VERSION_MATCH, endpoint, theirVersion)); + } + + static void skipPull(InetAddressAndPort endpoint, UUID theirVersion) + { + if (isEnabled(MigrationManagerEventType.SKIP_PULL)) + service.publish(new SchemaMigrationEvent(MigrationManagerEventType.SKIP_PULL, endpoint, theirVersion)); + } + + static void resetLocalSchema() + { + if (isEnabled(MigrationManagerEventType.RESET_LOCAL_SCHEMA)) + service.publish(new SchemaMigrationEvent(MigrationManagerEventType.RESET_LOCAL_SCHEMA, null, null)); + } + + static void taskCreated(InetAddressAndPort endpoint) + { + if (isEnabled(MigrationManagerEventType.TASK_CREATED)) + service.publish(new SchemaMigrationEvent(MigrationManagerEventType.TASK_CREATED, endpoint, null)); + } + + static void taskSendAborted(InetAddressAndPort endpoint) + { + if (isEnabled(MigrationManagerEventType.TASK_SEND_ABORTED)) + service.publish(new SchemaMigrationEvent(MigrationManagerEventType.TASK_SEND_ABORTED, endpoint, null)); + } + + static void taskRequestSend(InetAddressAndPort endpoint) + { + if (isEnabled(MigrationManagerEventType.TASK_REQUEST_SEND)) + service.publish(new SchemaMigrationEvent(MigrationManagerEventType.TASK_REQUEST_SEND, + endpoint, null)); + } + + private static boolean isEnabled(MigrationManagerEventType type) + { + return service.isEnabled(SchemaMigrationEvent.class, type); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java b/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java new file mode 100644 index 0000000..2c17235 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import javax.annotation.Nullable; + +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessagingService; + +/** + * Internal events emitted by {@link MigrationManager}. + */ +final class SchemaMigrationEvent extends DiagnosticEvent +{ + private final MigrationManagerEventType type; + @Nullable + private final InetAddressAndPort endpoint; + @Nullable + private final UUID endpointSchemaVersion; + private final UUID localSchemaVersion; + private final Integer localMessagingVersion; + private final SystemKeyspace.BootstrapState bootstrapState; + @Nullable + private Integer inflightTaskCount; + @Nullable + private Integer endpointMessagingVersion; + @Nullable + private Boolean endpointGossipOnlyMember; + @Nullable + private Boolean isAlive; + + enum MigrationManagerEventType + { + UNKNOWN_LOCAL_SCHEMA_VERSION, + VERSION_MATCH, + SKIP_PULL, + RESET_LOCAL_SCHEMA, + TASK_CREATED, + TASK_SEND_ABORTED, + TASK_REQUEST_SEND + } + + SchemaMigrationEvent(MigrationManagerEventType type, + @Nullable InetAddressAndPort endpoint, @Nullable UUID endpointSchemaVersion) + { + this.type = type; + this.endpoint = endpoint; + this.endpointSchemaVersion = endpointSchemaVersion; + + localSchemaVersion = Schema.instance.getVersion(); + localMessagingVersion = MessagingService.current_version; + + Queue<CountDownLatch> inflightTasks = MigrationTask.getInflightTasks(); + if (inflightTasks != null) + inflightTaskCount = inflightTasks.size(); + + this.bootstrapState = SystemKeyspace.getBootstrapState(); + + if (endpoint == null) return; + + if (MessagingService.instance().knowsVersion(endpoint)) + endpointMessagingVersion = MessagingService.instance().getRawVersion(endpoint); + + endpointGossipOnlyMember = Gossiper.instance.isGossipOnlyMember(endpoint); + this.isAlive = FailureDetector.instance.isAlive(endpoint); + } + + public Enum<?> getType() + { + return type; + } + + public Map<String, Serializable> toMap() + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (endpoint != null) ret.put("endpoint", endpoint.getHostAddress(true)); + ret.put("endpointSchemaVersion", Schema.schemaVersionToString(endpointSchemaVersion)); + ret.put("localSchemaVersion", Schema.schemaVersionToString(localSchemaVersion)); + if (endpointMessagingVersion != null) ret.put("endpointMessagingVersion", endpointMessagingVersion); + if (localMessagingVersion != null) ret.put("localMessagingVersion", localMessagingVersion); + if (endpointGossipOnlyMember != null) ret.put("endpointGossipOnlyMember", endpointGossipOnlyMember); + if (isAlive != null) ret.put("endpointIsAlive", isAlive); + if (bootstrapState != null) ret.put("bootstrapState", bootstrapState.name()); + if (inflightTaskCount != null) ret.put("inflightTaskCount", inflightTaskCount); + return ret; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java b/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java index f939cda..358739a 100644 --- a/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java +++ b/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java @@ -42,6 +42,7 @@ public final class SchemaPushVerbHandler implements IVerbHandler<Collection<Muta { logger.trace("Received schema push request from {}", message.from); + SchemaAnnouncementDiagnostics.schemataMutationsReceived(message.from); StageManager.getStage(Stage.MIGRATION).submit(() -> Schema.instance.mergeAndAnnounceVersion(message.payload)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/2846b22a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java index 297774a..e1c0f55 100644 --- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -45,28 +45,35 @@ public class PendingRangeCalculatorService public PendingRangeCalculatorService() { - executor.setRejectedExecutionHandler(new RejectedExecutionHandler() - { - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) + executor.setRejectedExecutionHandler((r, e) -> { + PendingRangeCalculatorServiceDiagnostics.taskRejected(instance, updateJobs); PendingRangeCalculatorService.instance.finishUpdate(); } - } ); } private static class PendingRangeTask implements Runnable { + private final AtomicInteger updateJobs; + + PendingRangeTask(AtomicInteger updateJobs) + { + this.updateJobs = updateJobs; + } + public void run() { try { + PendingRangeCalculatorServiceDiagnostics.taskStarted(instance, updateJobs); long start = System.currentTimeMillis(); List<String> keyspaces = Schema.instance.getNonLocalStrategyKeyspaces(); for (String keyspaceName : keyspaces) calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName); if (logger.isTraceEnabled()) logger.trace("Finished PendingRangeTask for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start); + PendingRangeCalculatorServiceDiagnostics.taskFinished(instance, updateJobs); } finally { @@ -77,13 +84,15 @@ public class PendingRangeCalculatorService private void finishUpdate() { - updateJobs.decrementAndGet(); + int jobs = updateJobs.decrementAndGet(); + PendingRangeCalculatorServiceDiagnostics.taskCountChanged(instance, jobs); } public void update() { - updateJobs.incrementAndGet(); - executor.submit(new PendingRangeTask()); + int jobs = updateJobs.incrementAndGet(); + PendingRangeCalculatorServiceDiagnostics.taskCountChanged(instance, jobs); + executor.submit(new PendingRangeTask(updateJobs)); } public void blockUntilFinished() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org