Author: baedke
Date: Thu Sep 18 16:25:34 2014
New Revision: 1626021

URL: http://svn.apache.org/r1626021
Log:
OAK-1915: TarMK failover 2.0

Added runmode syncmaster for oak-run.

Modified:
    
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java

Modified: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java?rev=1626021&r1=1626020&r2=1626021&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
 Thu Sep 18 16:25:34 2014
@@ -79,6 +79,7 @@ import org.apache.jackrabbit.oak.jcr.Jcr
 import org.apache.jackrabbit.oak.kernel.JsopDiff;
 import org.apache.jackrabbit.oak.plugins.backup.FileStoreBackup;
 import org.apache.jackrabbit.oak.plugins.backup.FileStoreRestore;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
 import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
 import org.apache.jackrabbit.oak.plugins.document.LastRevRecoveryAgent;
@@ -95,6 +96,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
 import 
org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import 
org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
 import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.scalability.ScalabilityRunner;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
@@ -170,7 +172,10 @@ public class Main {
                 Explorer.main(args);
                 break;
             case SYNCSLAVE:
-                syncslave(args);
+                syncSlave(args);
+                break;
+            case SYNCMASTER:
+                syncMaster(args);
                 break;
             case CHECKPOINTS:
                 checkpoints(args);
@@ -269,7 +274,8 @@ public class Main {
         }
     }
 
-    private static void syncslave(String[] args) throws Exception {
+
+    private static void syncSlave(String[] args) throws Exception {
 
         final String defaultHost = "127.0.0.1";
         final int defaultPort = 8023;
@@ -297,7 +303,6 @@ public class Main {
 
         FileStore store = null;
         FailoverClient failoverClient = null;
-        ScheduledSyncService syncService = null;
         try {
             store = new FileStore(new File(nonOptions.get(0)), 256);
             failoverClient = new FailoverClient(
@@ -308,7 +313,7 @@ public class Main {
             if (!options.has(interval)) {
                 failoverClient.run();
             } else {
-                syncService = new ScheduledSyncService(failoverClient, 
options.valueOf(interval));
+                ScheduledSyncService syncService = new 
ScheduledSyncService(failoverClient, options.valueOf(interval));
                 syncService.startAsync();
                 syncService.awaitTerminated();
             }
@@ -322,6 +327,53 @@ public class Main {
         }
     }
 
+    private static void syncMaster(String[] args) throws Exception {
+
+        final int defaultPort = 8023;
+
+        final OptionParser parser = new OptionParser();
+        final OptionSpec<Integer> port = parser.accepts("port", "port to 
listen").withRequiredArg().ofType(Integer.class).defaultsTo(defaultPort);
+        final OptionSpec<Boolean> secure = parser.accepts("secure", "use 
secure connections").withRequiredArg().ofType(Boolean.class);
+        final OptionSpec<String> admissible = parser.accepts("admissible", 
"list of admissible slave host names or ip 
ranges").withRequiredArg().ofType(String.class);
+        final OptionSpec<?> help = parser.acceptsAll(asList("h", "?", "help"), 
"show help").forHelp();
+        final OptionSpec<String> nonOption = parser.nonOptions(Mode.SYNCMASTER 
+ " <path to repository>");
+
+        final OptionSet options = parser.parse(args);
+        final List<String> nonOptions = nonOption.values(options);
+
+        if (options.has(help)) {
+            parser.printHelpOn(System.out);
+            System.exit(0);
+        }
+
+        if (nonOptions.isEmpty()) {
+            parser.printHelpOn(System.err);
+            System.exit(1);
+        }
+
+
+        List<String> admissibleSlaves = options.has(admissible) ? 
options.valuesOf(admissible) : Collections.EMPTY_LIST;
+
+        FileStore store = null;
+        FailoverServer failoverServer = null;
+        try {
+            store = new FileStore(new File(nonOptions.get(0)), 256);
+            failoverServer = new FailoverServer(
+                    options.has(port)? options.valueOf(port) : defaultPort,
+                    store,
+                    admissibleSlaves.toArray(new String[0]),
+                    options.has(secure) && options.valueOf(secure));
+            failoverServer.startAndWait();
+        } finally {
+            if (store != null) {
+                store.close();
+            }
+            if (failoverServer != null) {
+                failoverServer.close();
+            }
+        }
+    }
+
     public static NodeStore bootstrapNodeStore(String[] args, Closer closer,
             String h) throws IOException {
         //TODO add support for other NodeStore flags
@@ -1039,7 +1091,8 @@ public class Main {
         UPGRADE("upgrade"),
         SCALABILITY("scalability"),
         EXPLORE("explore"),
-        SYNCSLAVE("syncslave"),
+        SYNCSLAVE("syncSlave"),
+        SYNCMASTER("syncmaster"),
         HELP("help"),
         CHECKPOINTS("checkpoints"),
         RECOVERY("recovery");

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java?rev=1626021&r1=1626020&r2=1626021&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
 Thu Sep 18 16:25:34 2014
@@ -62,7 +62,6 @@ public final class FailoverClient implem
 
     private final String host;
     private final int port;
-    private final boolean checkChecksums;
     private int readTimeoutMs = 10000;
 
     private final FailoverStore store;
@@ -77,18 +76,13 @@ public final class FailoverClient implem
     private final Object sync = new Object();
 
     public FailoverClient(String host, int port, SegmentStore store) throws 
SSLException {
-        this(host, port, store, false, true);
+        this(host, port, store, false);
     }
 
     public FailoverClient(String host, int port, SegmentStore store, boolean 
secure) throws SSLException {
-        this(host, port, store, secure, true);
-    }
-
-    public FailoverClient(String host, int port, SegmentStore store, boolean 
secure, boolean checksums) throws SSLException {
         this.state = STATUS_INITIALIZING;
         this.host = host;
         this.port = port;
-        this.checkChecksums = checksums;
         if (secure) {
             this.sslContext = 
SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE);
         }
@@ -162,9 +156,7 @@ public final class FailoverClient implem
                     p.addLast("readTimeoutHandler", new ReadTimeoutHandler(
                             readTimeoutMs, TimeUnit.MILLISECONDS));
                     p.addLast(new StringEncoder(CharsetUtil.UTF_8));
-                    if (FailoverClient.this.checkChecksums) {
-                        p.addLast(new SnappyFramedDecoder(true));
-                    }
+                    p.addLast(new SnappyFramedDecoder(true));
                     p.addLast(new RecordIdDecoder(store));
                     p.addLast(executor, handler);
                 }

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java?rev=1626021&r1=1626020&r2=1626021&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
 Thu Sep 18 16:25:34 2014
@@ -47,6 +47,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import sun.reflect.misc.FieldUtil;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -71,31 +72,21 @@ public class FailoverServer implements F
 
     public FailoverServer(int port, final SegmentStore store)
             throws CertificateException, SSLException {
-        this(port, store, null, false, true);
+        this(port, store, null, false);
     }
 
     public FailoverServer(int port, final SegmentStore store, boolean secure)
             throws CertificateException, SSLException {
-        this(port, store, null, secure, true);
-    }
-
-    public FailoverServer(int port, final SegmentStore store, boolean secure, 
boolean useChecksums)
-            throws CertificateException, SSLException {
-        this(port, store, null, secure, useChecksums);
+        this(port, store, null, secure);
     }
 
     public FailoverServer(int port, final SegmentStore store, String[] 
allowedClientIPRanges)
             throws CertificateException, SSLException {
-        this(port, store, allowedClientIPRanges, false, true);
+        this(port, store, allowedClientIPRanges, false);
     }
 
     public FailoverServer(int port, final SegmentStore store, String[] 
allowedClientIPRanges, boolean secure)
             throws CertificateException, SSLException {
-        this(port, store, allowedClientIPRanges, secure, true);
-    }
-
-    public FailoverServer(int port, final SegmentStore store, String[] 
allowedClientIPRanges, boolean secure, final boolean checksums)
-            throws CertificateException, SSLException {
         this.port = port;
 
         if (secure) {
@@ -135,9 +126,7 @@ public class FailoverServer implements F
                 }
                 p.addLast(new LineBasedFrameDecoder(8192));
                 p.addLast(new StringDecoder(CharsetUtil.UTF_8));
-                if (checksums) {
-                    p.addLast(new SnappyFramedEncoder());
-                }
+                p.addLast(new SnappyFramedEncoder());
                 p.addLast(new RecordIdEncoder());
                 p.addLast(new SegmentEncoder());
                 p.addLast(handler);
@@ -169,13 +158,23 @@ public class FailoverServer implements F
         handler.state = STATUS_CLOSED;
     }
 
-    @Override
-    public void start() {
+    private void start(boolean wait) {
         if (running) return;
 
         running = true;
         this.handler.state = STATUS_STARTING;
 
+        final Thread close = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    running = true;
+                    channelFuture.sync().channel().closeFuture().sync();
+                } catch (InterruptedException e) {
+                    FailoverServer.this.stop();
+                }
+            }
+        };
         Future<?> startup = bossGroup.submit(new Runnable() {
             @Override
             public void run() {
@@ -185,25 +184,28 @@ public class FailoverServer implements F
                 //the channel registration synchronous.
                 //Note that now this method will return immediately.
                 channelFuture = b.bind(port);
-                new Thread() {
-                    @Override
-                    public void run() {
-                        try {
-                            running = true;
-                            
channelFuture.sync().channel().closeFuture().sync();
-                        } catch (InterruptedException e) {
-                            FailoverServer.this.stop();
-                        }
-                    }
-                }.start();
+                close.start();
             }
         });
         if (!startup.awaitUninterruptibly(10000)) {
             log.error("FailoverServer failed to start within 10 seconds and 
will be canceled");
             startup.cancel(true);
+        } else if (wait) {
+            try {
+                close.join();
+            } catch (InterruptedException ignored) {}
         }
     }
 
+    public void startAndWait() {
+        start(true);
+    }
+
+    @Override
+    public void start() {
+        start(false);
+    }
+
     @Override
     public String getMode() {
         return "master";

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java?rev=1626021&r1=1626020&r2=1626021&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
 Thu Sep 18 16:25:34 2014
@@ -79,19 +79,10 @@ public class BulkTest extends TestBase {
     }
 
     @Test
-    public void test1MillionNodesNoChecksum() throws Exception {
-        test(1000000, 87, 87, 22700000, 22800000, false, false);
-    }
-
-    @Test
     public void test1MillionNodesUsingSSL() throws Exception {
-        test(1000000, 87, 87, 22700000, 22800000, true, true);
+        test(1000000, 87, 87, 22700000, 22800000, true);
     }
 
-    @Test
-    public void test1MillionNodesUsingSSLNoChecksum() throws Exception {
-        test(1000000, 87, 87, 22700000, 22800000, true, false);
-    }
 /*
     @Test
     public void test10MillionNodes() throws Exception {
@@ -102,11 +93,11 @@ public class BulkTest extends TestBase {
     // private helper
 
     private void test(int number, int minExpectedSegments, int 
maxExpectedSegments, long minExpectedBytes, long maxExpectedBytes) throws 
Exception {
-        test(number, minExpectedSegments, maxExpectedSegments, 
minExpectedBytes, maxExpectedBytes, false, true);
+        test(number, minExpectedSegments, maxExpectedSegments, 
minExpectedBytes, maxExpectedBytes, false);
     }
 
     private void test(int number, int minExpectedSegments, int 
maxExpectedSegments, long minExpectedBytes, long maxExpectedBytes,
-                      boolean useSSL, boolean useChecksum) throws Exception {
+                      boolean useSSL) throws Exception {
         NodeStore store = new SegmentNodeStore(storeS);
         NodeBuilder rootbuilder = store.getRoot().builder();
         NodeBuilder b = rootbuilder.child("store");
@@ -119,11 +110,11 @@ public class BulkTest extends TestBase {
         store.merge(rootbuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
         storeS.flush();
 
-        final FailoverServer server = new FailoverServer(port, storeS, useSSL, 
useChecksum);
+        final FailoverServer server = new FailoverServer(port, storeS, useSSL);
         server.start();
 
         System.setProperty(FailoverClient.CLIENT_ID_PROPERTY_NAME, "Bar");
-        FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC, 
useSSL, useChecksum);
+        FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC, 
useSSL);
 
         final MBeanServer jmxServer = 
ManagementFactory.getPlatformMBeanServer();
         ObjectName status = new ObjectName(FailoverStatusMBean.JMX_NAME + 
",id=*");


Reply via email to