Make deprecated repair methods backward-compatible with previous notification service
patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11430 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3557d2e0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3557d2e0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3557d2e0 Branch: refs/heads/trunk Commit: 3557d2e05c8d1059562de2a91c1b33b4fcfcc6eb Parents: e22faeb Author: Paulo Motta <pauloricard...@gmail.com> Authored: Tue Apr 5 16:58:06 2016 -0300 Committer: Yuki Morishita <yu...@apache.org> Committed: Mon Apr 11 11:28:44 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/repair/RepairRunnable.java | 10 ++ .../cassandra/service/ActiveRepairService.java | 11 ++ .../cassandra/service/StorageService.java | 28 ++++- .../progress/jmx/LegacyJMXProgressSupport.java | 108 +++++++++++++++++ .../jmx/LegacyJMXProgressSupportTest.java | 118 +++++++++++++++++++ 6 files changed, 270 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b6438b8..e935e57 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.6 + * Make deprecated repair methods backward-compatible with previous notification service (CASSANDRA-11430) * IncomingStreamingConnection version check message wrong (CASSANDRA-11462) * DatabaseDescriptor should log stacktrace in case of Eception during seed provider creation (CASSANDRA-11312) * Use canonical path for directory in SSTable descriptor (CASSANDRA-10587) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 91ac82a..d2b6ab6 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -227,6 +227,11 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti { public void onSuccess(RepairSessionResult result) { + /** + * If the success message below is modified, it must also be updated on + * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} + * for backward-compatibility support. + */ String message = String.format("Repair session %s for range %s finished", session.getId(), session.getRange().toString()); logger.info(message); @@ -238,6 +243,11 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti public void onFailure(Throwable t) { + /** + * If the failure message below is modified, it must also be updated on + * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} + * for backward-compatibility support. + */ String message = String.format("Repair session %s for range %s failed with error %s", session.getId(), session.getRange().toString(), t.getMessage()); logger.error(message, t); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 39be051..21cdeae 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -77,6 +77,17 @@ import org.apache.cassandra.utils.concurrent.Refs; */ public class ActiveRepairService { + /** + * @deprecated this statuses are from the previous JMX notification service, + * which will be deprecated on 4.0. For statuses of the new notification + * service, see {@link org.apache.cassandra.streaming.StreamEvent.ProgressEvent} + */ + @Deprecated + public static enum Status + { + STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED + } + public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1"); private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index bca5996..dedc823 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -171,6 +171,7 @@ import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventType; import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport; +import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport; /** * This abstraction contains the token/identifier of this node @@ -186,6 +187,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private final JMXProgressSupport progressSupport = new JMXProgressSupport(this); + /** + * @deprecated backward support to previous notification interface + * Will be removed on 4.0 + */ + @Deprecated + private final LegacyJMXProgressSupport legacyProgressSupport; + private static int getRingDelay() { String newdelay = System.getProperty("cassandra.ring_delay_ms"); @@ -309,6 +317,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new RuntimeException(e); } + legacyProgressSupport = new LegacyJMXProgressSupport(this, jmxObjectName); + /* register the verb handlers */ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler()); @@ -2896,7 +2906,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE option.getRanges().addAll(getLocalRanges(keyspace)); } } - return forceRepairAsync(keyspace, option); + return forceRepairAsync(keyspace, option, false); } @Deprecated @@ -2962,9 +2972,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE options.getColumnFamilies().add(columnFamily); } } - return forceRepairAsync(keyspace, options); + return forceRepairAsync(keyspace, options, true); } + @Deprecated public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, @@ -2980,6 +2991,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return forceRepairAsync(keyspace, isSequential, dataCenters, null, primaryRange, fullRepair, columnFamilies); } + @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, @@ -2994,6 +3006,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE dataCenters, hosts, fullRepair, columnFamilies); } + @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, @@ -3036,9 +3049,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.info("starting user-requested repair of range {} for keyspace {} and column families {}", repairingRange, keyspaceName, columnFamilies); - return forceRepairAsync(keyspaceName, options); + return forceRepairAsync(keyspaceName, options, true); } + @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, @@ -3093,17 +3107,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return repairingRange; } - public int forceRepairAsync(String keyspace, RepairOption options) + public int forceRepairAsync(String keyspace, RepairOption options, boolean legacy) { if (options.getRanges().isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2) return 0; int cmd = nextRepairCommand.incrementAndGet(); - new Thread(createRepairTask(cmd, keyspace, options)).start(); + new Thread(createRepairTask(cmd, keyspace, options, legacy)).start(); return cmd; } - private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final RepairOption options) + private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final RepairOption options, boolean legacy) { if (!options.getDataCenters().isEmpty() && !options.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter())) { @@ -3112,6 +3126,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE RepairRunnable task = new RepairRunnable(this, cmd, options, keyspace); task.addProgressListener(progressSupport); + if (legacy) + task.addProgressListener(legacyProgressSupport); return new FutureTask<>(task, null); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java b/src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java new file mode 100644 index 0000000..fae6f2a --- /dev/null +++ b/src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.progress.jmx; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; +import javax.management.Notification; +import javax.management.NotificationBroadcasterSupport; +import javax.management.ObjectName; + +import com.google.common.base.Optional; + +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressListener; + +import static org.apache.cassandra.service.ActiveRepairService.Status; + +/** + * ProgressListener that translates ProgressEvent to legacy JMX Notification message (backward compatibility support) + */ +public class LegacyJMXProgressSupport implements ProgressListener +{ + protected static final Pattern SESSION_FAILED_MATCHER = Pattern.compile("Repair session .* for range .* failed with error .*"); + protected static final Pattern SESSION_SUCCESS_MATCHER = Pattern.compile("Repair session .* for range .* finished"); + + private final AtomicLong notificationSerialNumber = new AtomicLong(); + private final ObjectName jmxObjectName; + + private final NotificationBroadcasterSupport broadcaster; + + public LegacyJMXProgressSupport(NotificationBroadcasterSupport broadcaster, + ObjectName jmxObjectName) + { + this.broadcaster = broadcaster; + this.jmxObjectName = jmxObjectName; + } + + @Override + public void progress(String tag, ProgressEvent event) + { + if (tag.startsWith("repair:")) + { + Optional<int[]> legacyUserData = getLegacyUserdata(tag, event); + if (legacyUserData.isPresent()) + { + Notification jmxNotification = new Notification("repair", jmxObjectName, notificationSerialNumber.incrementAndGet(), event.getMessage()); + jmxNotification.setUserData(legacyUserData.get()); + broadcaster.sendNotification(jmxNotification); + } + } + } + + protected static Optional<int[]> getLegacyUserdata(String tag, ProgressEvent event) + { + Optional<Status> status = getStatus(event); + if (status.isPresent()) + { + int[] result = new int[2]; + result[0] = getCmd(tag); + result[1] = status.get().ordinal(); + return Optional.of(result); + } + return Optional.absent(); + } + + protected static Optional<Status> getStatus(ProgressEvent event) + { + switch (event.getType()) + { + case START: + return Optional.of(Status.STARTED); + case COMPLETE: + return Optional.of(Status.FINISHED); + case PROGRESS: + if (SESSION_FAILED_MATCHER.matcher(event.getMessage()).matches()) + { + return Optional.of(Status.SESSION_FAILED); + } + else if (SESSION_SUCCESS_MATCHER.matcher(event.getMessage()).matches()) + { + return Optional.of(Status.SESSION_SUCCESS); + } + } + + return Optional.absent(); + } + + protected static int getCmd(String tag) + { + return Integer.valueOf(tag.split(":")[1]); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java b/test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java new file mode 100644 index 0000000..efa4a27 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.progress.jmx; + +import java.util.UUID; + +import com.google.common.base.Optional; +import org.junit.Test; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressEventType; + +import static org.junit.Assert.*; + + +public class LegacyJMXProgressSupportTest +{ + + @Test + public void testSessionSuccess() + { + int cmd = 321; + String message = String.format("Repair session %s for range %s finished", UUID.randomUUID(), + new Range<Token>(new Murmur3Partitioner.LongToken(3), new Murmur3Partitioner.LongToken(4))); + Optional<int[]> result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.PROGRESS, 2, 10, message)); + assertTrue(result.isPresent()); + assertArrayEquals(new int[]{ cmd, ActiveRepairService.Status.SESSION_SUCCESS.ordinal() }, result.get()); + } + + @Test + public void testSessionFailed() + { + int cmd = 321; + String message = String.format("Repair session %s for range %s failed with error %s", UUID.randomUUID(), + new Range<Token>(new Murmur3Partitioner.LongToken(3), new Murmur3Partitioner.LongToken(4)).toString(), + new RuntimeException("error")); + Optional<int[]> result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.PROGRESS, 2, 10, message)); + assertTrue(result.isPresent()); + assertArrayEquals(new int[]{ cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal() }, result.get()); + } + + @Test + public void testStarted() + { + int cmd = 321; + Optional<int[]> result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.START, + 0, 100, "bla")); + assertTrue(result.isPresent()); + assertArrayEquals(new int[]{ cmd, ActiveRepairService.Status.STARTED.ordinal() }, result.get()); + } + + @Test + public void testFinished() + { + int cmd = 321; + Optional<int[]> result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.COMPLETE, + 2, 10, "bla")); + assertTrue(result.isPresent()); + assertArrayEquals(new int[]{ cmd, ActiveRepairService.Status.FINISHED.ordinal() }, result.get()); + } + + /* + States not mapped to the legacy notification + */ + @Test + public void testNone() + { + int cmd = 33; + Optional<int[]> result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.ERROR, 2, 10, "bla")); + assertFalse(result.isPresent()); + + cmd = 33; + result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.SUCCESS, 2, 10, "bla")); + assertFalse(result.isPresent()); + + cmd = 43; + result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.PROGRESS, 2, 10, "bla")); + assertFalse(result.isPresent()); + + cmd = 1; + result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.ABORT, 2, 10, "bla")); + assertFalse(result.isPresent()); + + cmd = 9; + result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.NOTIFICATION, 2, 10, "bla")); + assertFalse(result.isPresent()); + } + +}