Make deprecated repair methods backward-compatible with previous notification 
service

patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11430


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3557d2e0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3557d2e0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3557d2e0

Branch: refs/heads/cassandra-3.0
Commit: 3557d2e05c8d1059562de2a91c1b33b4fcfcc6eb
Parents: e22faeb
Author: Paulo Motta <pauloricard...@gmail.com>
Authored: Tue Apr 5 16:58:06 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Apr 11 11:28:44 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/repair/RepairRunnable.java |  10 ++
 .../cassandra/service/ActiveRepairService.java  |  11 ++
 .../cassandra/service/StorageService.java       |  28 ++++-
 .../progress/jmx/LegacyJMXProgressSupport.java  | 108 +++++++++++++++++
 .../jmx/LegacyJMXProgressSupportTest.java       | 118 +++++++++++++++++++
 6 files changed, 270 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b6438b8..e935e57 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.6
+ * Make deprecated repair methods backward-compatible with previous 
notification service (CASSANDRA-11430)
  * IncomingStreamingConnection version check message wrong (CASSANDRA-11462)
  * DatabaseDescriptor should log stacktrace in case of Eception during seed 
provider creation (CASSANDRA-11312)
  * Use canonical path for directory in SSTable descriptor (CASSANDRA-10587)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 91ac82a..d2b6ab6 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -227,6 +227,11 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
             {
                 public void onSuccess(RepairSessionResult result)
                 {
+                    /**
+                     * If the success message below is modified, it must also 
be updated on
+                     * {@link 
org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
+                     * for backward-compatibility support.
+                     */
                     String message = String.format("Repair session %s for 
range %s finished", session.getId(),
                                                    
session.getRange().toString());
                     logger.info(message);
@@ -238,6 +243,11 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
 
                 public void onFailure(Throwable t)
                 {
+                    /**
+                     * If the failure message below is modified, it must also 
be updated on
+                     * {@link 
org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
+                     * for backward-compatibility support.
+                     */
                     String message = String.format("Repair session %s for 
range %s failed with error %s",
                                                    session.getId(), 
session.getRange().toString(), t.getMessage());
                     logger.error(message, t);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 39be051..21cdeae 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -77,6 +77,17 @@ import org.apache.cassandra.utils.concurrent.Refs;
  */
 public class ActiveRepairService
 {
+    /**
+     * @deprecated this statuses are from the previous JMX notification 
service,
+     * which will be deprecated on 4.0. For statuses of the new notification
+     * service, see {@link 
org.apache.cassandra.streaming.StreamEvent.ProgressEvent}
+     */
+    @Deprecated
+    public static enum Status
+    {
+        STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
+    }
+
     public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new 
CassandraVersion("2.2.1");
 
     private static final Logger logger = 
LoggerFactory.getLogger(ActiveRepairService.class);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index bca5996..dedc823 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -171,6 +171,7 @@ import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventType;
 import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
+import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport;
 
 /**
  * This abstraction contains the token/identifier of this node
@@ -186,6 +187,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     private final JMXProgressSupport progressSupport = new 
JMXProgressSupport(this);
 
+    /**
+     * @deprecated backward support to previous notification interface
+     * Will be removed on 4.0
+     */
+    @Deprecated
+    private final LegacyJMXProgressSupport legacyProgressSupport;
+
     private static int getRingDelay()
     {
         String newdelay = System.getProperty("cassandra.ring_delay_ms");
@@ -309,6 +317,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             throw new RuntimeException(e);
         }
 
+        legacyProgressSupport = new LegacyJMXProgressSupport(this, 
jmxObjectName);
+
         /* register the verb handlers */
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION,
 new MutationVerbHandler());
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR,
 new ReadRepairVerbHandler());
@@ -2896,7 +2906,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 option.getRanges().addAll(getLocalRanges(keyspace));
             }
         }
-        return forceRepairAsync(keyspace, option);
+        return forceRepairAsync(keyspace, option, false);
     }
 
     @Deprecated
@@ -2962,9 +2972,10 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 options.getColumnFamilies().add(columnFamily);
             }
         }
-        return forceRepairAsync(keyspace, options);
+        return forceRepairAsync(keyspace, options, true);
     }
 
+    @Deprecated
     public int forceRepairAsync(String keyspace,
                                 boolean isSequential,
                                 boolean isLocal,
@@ -2980,6 +2991,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return forceRepairAsync(keyspace, isSequential, dataCenters, null, 
primaryRange, fullRepair, columnFamilies);
     }
 
+    @Deprecated
     public int forceRepairRangeAsync(String beginToken,
                                      String endToken,
                                      String keyspaceName,
@@ -2994,6 +3006,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                                      dataCenters, hosts, fullRepair, 
columnFamilies);
     }
 
+    @Deprecated
     public int forceRepairRangeAsync(String beginToken,
                                      String endToken,
                                      String keyspaceName,
@@ -3036,9 +3049,10 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         logger.info("starting user-requested repair of range {} for keyspace 
{} and column families {}",
                     repairingRange, keyspaceName, columnFamilies);
-        return forceRepairAsync(keyspaceName, options);
+        return forceRepairAsync(keyspaceName, options, true);
     }
 
+    @Deprecated
     public int forceRepairRangeAsync(String beginToken,
                                      String endToken,
                                      String keyspaceName,
@@ -3093,17 +3107,17 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return repairingRange;
     }
 
-    public int forceRepairAsync(String keyspace, RepairOption options)
+    public int forceRepairAsync(String keyspace, RepairOption options, boolean 
legacy)
     {
         if (options.getRanges().isEmpty() || 
Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
             return 0;
 
         int cmd = nextRepairCommand.incrementAndGet();
-        new Thread(createRepairTask(cmd, keyspace, options)).start();
+        new Thread(createRepairTask(cmd, keyspace, options, legacy)).start();
         return cmd;
     }
 
-    private FutureTask<Object> createRepairTask(final int cmd, final String 
keyspace, final RepairOption options)
+    private FutureTask<Object> createRepairTask(final int cmd, final String 
keyspace, final RepairOption options, boolean legacy)
     {
         if (!options.getDataCenters().isEmpty() && 
!options.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter()))
         {
@@ -3112,6 +3126,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         RepairRunnable task = new RepairRunnable(this, cmd, options, keyspace);
         task.addProgressListener(progressSupport);
+        if (legacy)
+            task.addProgressListener(legacyProgressSupport);
         return new FutureTask<>(task, null);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java
 
b/src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java
new file mode 100644
index 0000000..fae6f2a
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.progress.jmx;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.ObjectName;
+
+import com.google.common.base.Optional;
+
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressListener;
+
+import static org.apache.cassandra.service.ActiveRepairService.Status;
+
+/**
+ * ProgressListener that translates ProgressEvent to legacy JMX Notification 
message (backward compatibility support)
+ */
+public class LegacyJMXProgressSupport implements ProgressListener
+{
+    protected static final Pattern SESSION_FAILED_MATCHER = 
Pattern.compile("Repair session .* for range .* failed with error .*");
+    protected static final Pattern SESSION_SUCCESS_MATCHER = 
Pattern.compile("Repair session .* for range .* finished");
+
+    private final AtomicLong notificationSerialNumber = new AtomicLong();
+    private final ObjectName jmxObjectName;
+
+    private final NotificationBroadcasterSupport broadcaster;
+
+    public LegacyJMXProgressSupport(NotificationBroadcasterSupport broadcaster,
+                                    ObjectName jmxObjectName)
+    {
+        this.broadcaster = broadcaster;
+        this.jmxObjectName = jmxObjectName;
+    }
+
+    @Override
+    public void progress(String tag, ProgressEvent event)
+    {
+        if (tag.startsWith("repair:"))
+        {
+            Optional<int[]> legacyUserData = getLegacyUserdata(tag, event);
+            if (legacyUserData.isPresent())
+            {
+                Notification jmxNotification = new Notification("repair", 
jmxObjectName, notificationSerialNumber.incrementAndGet(), event.getMessage());
+                jmxNotification.setUserData(legacyUserData.get());
+                broadcaster.sendNotification(jmxNotification);
+            }
+        }
+    }
+
+    protected static Optional<int[]> getLegacyUserdata(String tag, 
ProgressEvent event)
+    {
+        Optional<Status> status = getStatus(event);
+        if (status.isPresent())
+        {
+            int[] result = new int[2];
+            result[0] = getCmd(tag);
+            result[1] = status.get().ordinal();
+            return Optional.of(result);
+        }
+        return Optional.absent();
+    }
+
+    protected static Optional<Status> getStatus(ProgressEvent event)
+    {
+        switch (event.getType())
+        {
+            case START:
+                return Optional.of(Status.STARTED);
+            case COMPLETE:
+                return Optional.of(Status.FINISHED);
+            case PROGRESS:
+                if 
(SESSION_FAILED_MATCHER.matcher(event.getMessage()).matches())
+                {
+                    return Optional.of(Status.SESSION_FAILED);
+                }
+                else if 
(SESSION_SUCCESS_MATCHER.matcher(event.getMessage()).matches())
+                {
+                    return Optional.of(Status.SESSION_SUCCESS);
+                }
+        }
+
+        return Optional.absent();
+    }
+
+    protected static int getCmd(String tag)
+    {
+        return Integer.valueOf(tag.split(":")[1]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3557d2e0/test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java
 
b/test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java
new file mode 100644
index 0000000..efa4a27
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.progress.jmx;
+
+import java.util.UUID;
+
+import com.google.common.base.Optional;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+
+import static org.junit.Assert.*;
+
+
+public class LegacyJMXProgressSupportTest
+{
+
+    @Test
+    public void testSessionSuccess()
+    {
+        int cmd = 321;
+        String message = String.format("Repair session %s for range %s 
finished", UUID.randomUUID(),
+                                       new Range<Token>(new 
Murmur3Partitioner.LongToken(3), new Murmur3Partitioner.LongToken(4)));
+        Optional<int[]> result = 
LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd),
+                                                            new 
ProgressEvent(ProgressEventType.PROGRESS, 2, 10, message));
+        assertTrue(result.isPresent());
+        assertArrayEquals(new int[]{ cmd, 
ActiveRepairService.Status.SESSION_SUCCESS.ordinal() }, result.get());
+    }
+
+    @Test
+    public void testSessionFailed()
+    {
+        int cmd = 321;
+        String message = String.format("Repair session %s for range %s failed 
with error %s", UUID.randomUUID(),
+                                       new Range<Token>(new 
Murmur3Partitioner.LongToken(3), new 
Murmur3Partitioner.LongToken(4)).toString(),
+                                       new RuntimeException("error"));
+        Optional<int[]> result = 
LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd),
+                                                                            
new ProgressEvent(ProgressEventType.PROGRESS, 2, 10, message));
+        assertTrue(result.isPresent());
+        assertArrayEquals(new int[]{ cmd, 
ActiveRepairService.Status.SESSION_FAILED.ordinal() }, result.get());
+    }
+
+    @Test
+    public void testStarted()
+    {
+        int cmd = 321;
+        Optional<int[]> result = 
LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd),
+                                                                            
new ProgressEvent(ProgressEventType.START,
+                                                                               
               0, 100, "bla"));
+        assertTrue(result.isPresent());
+        assertArrayEquals(new int[]{ cmd, 
ActiveRepairService.Status.STARTED.ordinal() }, result.get());
+    }
+
+    @Test
+    public void testFinished()
+    {
+        int cmd = 321;
+        Optional<int[]> result = 
LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd),
+                                                                         new 
ProgressEvent(ProgressEventType.COMPLETE,
+                                                                               
            2, 10, "bla"));
+        assertTrue(result.isPresent());
+        assertArrayEquals(new int[]{ cmd, 
ActiveRepairService.Status.FINISHED.ordinal() }, result.get());
+    }
+
+    /*
+    States not mapped to the legacy notification
+     */
+    @Test
+    public void testNone()
+    {
+        int cmd = 33;
+        Optional<int[]> result = 
LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd),
+                                                                         new 
ProgressEvent(ProgressEventType.ERROR, 2, 10, "bla"));
+        assertFalse(result.isPresent());
+
+        cmd = 33;
+        result = 
LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd),
+                                                                         new 
ProgressEvent(ProgressEventType.SUCCESS, 2, 10, "bla"));
+        assertFalse(result.isPresent());
+
+        cmd = 43;
+        result = 
LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd),
+                                                            new 
ProgressEvent(ProgressEventType.PROGRESS, 2, 10, "bla"));
+        assertFalse(result.isPresent());
+
+        cmd = 1;
+        result = 
LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd),
+                                                            new 
ProgressEvent(ProgressEventType.ABORT, 2, 10, "bla"));
+        assertFalse(result.isPresent());
+
+        cmd = 9;
+        result = 
LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd),
+                                                            new 
ProgressEvent(ProgressEventType.NOTIFICATION, 2, 10, "bla"));
+        assertFalse(result.isPresent());
+    }
+
+}

Reply via email to