Author: baedke Date: Thu Sep 18 16:25:34 2014 New Revision: 1626021 URL: http://svn.apache.org/r1626021 Log: OAK-1915: TarMK failover 2.0
Added runmode syncmaster for oak-run. Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java?rev=1626021&r1=1626020&r2=1626021&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java (original) +++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java Thu Sep 18 16:25:34 2014 @@ -79,6 +79,7 @@ import org.apache.jackrabbit.oak.jcr.Jcr import org.apache.jackrabbit.oak.kernel.JsopDiff; import org.apache.jackrabbit.oak.plugins.backup.FileStoreBackup; import org.apache.jackrabbit.oak.plugins.backup.FileStoreRestore; +import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.DocumentMK; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; import org.apache.jackrabbit.oak.plugins.document.LastRevRecoveryAgent; @@ -95,6 +96,7 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient; +import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer; import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.scalability.ScalabilityRunner; import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; @@ -170,7 +172,10 @@ public class Main { Explorer.main(args); break; case SYNCSLAVE: - syncslave(args); + syncSlave(args); + break; + case SYNCMASTER: + syncMaster(args); break; case CHECKPOINTS: checkpoints(args); @@ -269,7 +274,8 @@ public class Main { } } - private static void syncslave(String[] args) throws Exception { + + private static void syncSlave(String[] args) throws Exception { final String defaultHost = "127.0.0.1"; final int defaultPort = 8023; @@ -297,7 +303,6 @@ public class Main { FileStore store = null; FailoverClient failoverClient = null; - ScheduledSyncService syncService = null; try { store = new FileStore(new File(nonOptions.get(0)), 256); failoverClient = new FailoverClient( @@ -308,7 +313,7 @@ public class Main { if (!options.has(interval)) { failoverClient.run(); } else { - syncService = new ScheduledSyncService(failoverClient, options.valueOf(interval)); + ScheduledSyncService syncService = new ScheduledSyncService(failoverClient, options.valueOf(interval)); syncService.startAsync(); syncService.awaitTerminated(); } @@ -322,6 +327,53 @@ public class Main { } } + private static void syncMaster(String[] args) throws Exception { + + final int defaultPort = 8023; + + final OptionParser parser = new OptionParser(); + final OptionSpec<Integer> port = parser.accepts("port", "port to listen").withRequiredArg().ofType(Integer.class).defaultsTo(defaultPort); + final OptionSpec<Boolean> secure = parser.accepts("secure", "use secure connections").withRequiredArg().ofType(Boolean.class); + final OptionSpec<String> admissible = parser.accepts("admissible", "list of admissible slave host names or ip ranges").withRequiredArg().ofType(String.class); + final OptionSpec<?> help = parser.acceptsAll(asList("h", "?", "help"), "show help").forHelp(); + final OptionSpec<String> nonOption = parser.nonOptions(Mode.SYNCMASTER + " <path to repository>"); + + final OptionSet options = parser.parse(args); + final List<String> nonOptions = nonOption.values(options); + + if (options.has(help)) { + parser.printHelpOn(System.out); + System.exit(0); + } + + if (nonOptions.isEmpty()) { + parser.printHelpOn(System.err); + System.exit(1); + } + + + List<String> admissibleSlaves = options.has(admissible) ? options.valuesOf(admissible) : Collections.EMPTY_LIST; + + FileStore store = null; + FailoverServer failoverServer = null; + try { + store = new FileStore(new File(nonOptions.get(0)), 256); + failoverServer = new FailoverServer( + options.has(port)? options.valueOf(port) : defaultPort, + store, + admissibleSlaves.toArray(new String[0]), + options.has(secure) && options.valueOf(secure)); + failoverServer.startAndWait(); + } finally { + if (store != null) { + store.close(); + } + if (failoverServer != null) { + failoverServer.close(); + } + } + } + public static NodeStore bootstrapNodeStore(String[] args, Closer closer, String h) throws IOException { //TODO add support for other NodeStore flags @@ -1039,7 +1091,8 @@ public class Main { UPGRADE("upgrade"), SCALABILITY("scalability"), EXPLORE("explore"), - SYNCSLAVE("syncslave"), + SYNCSLAVE("syncSlave"), + SYNCMASTER("syncmaster"), HELP("help"), CHECKPOINTS("checkpoints"), RECOVERY("recovery"); Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java?rev=1626021&r1=1626020&r2=1626021&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java Thu Sep 18 16:25:34 2014 @@ -62,7 +62,6 @@ public final class FailoverClient implem private final String host; private final int port; - private final boolean checkChecksums; private int readTimeoutMs = 10000; private final FailoverStore store; @@ -77,18 +76,13 @@ public final class FailoverClient implem private final Object sync = new Object(); public FailoverClient(String host, int port, SegmentStore store) throws SSLException { - this(host, port, store, false, true); + this(host, port, store, false); } public FailoverClient(String host, int port, SegmentStore store, boolean secure) throws SSLException { - this(host, port, store, secure, true); - } - - public FailoverClient(String host, int port, SegmentStore store, boolean secure, boolean checksums) throws SSLException { this.state = STATUS_INITIALIZING; this.host = host; this.port = port; - this.checkChecksums = checksums; if (secure) { this.sslContext = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE); } @@ -162,9 +156,7 @@ public final class FailoverClient implem p.addLast("readTimeoutHandler", new ReadTimeoutHandler( readTimeoutMs, TimeUnit.MILLISECONDS)); p.addLast(new StringEncoder(CharsetUtil.UTF_8)); - if (FailoverClient.this.checkChecksums) { - p.addLast(new SnappyFramedDecoder(true)); - } + p.addLast(new SnappyFramedDecoder(true)); p.addLast(new RecordIdDecoder(store)); p.addLast(executor, handler); } Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java?rev=1626021&r1=1626020&r2=1626021&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java Thu Sep 18 16:25:34 2014 @@ -47,6 +47,7 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.reflect.misc.FieldUtil; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -71,31 +72,21 @@ public class FailoverServer implements F public FailoverServer(int port, final SegmentStore store) throws CertificateException, SSLException { - this(port, store, null, false, true); + this(port, store, null, false); } public FailoverServer(int port, final SegmentStore store, boolean secure) throws CertificateException, SSLException { - this(port, store, null, secure, true); - } - - public FailoverServer(int port, final SegmentStore store, boolean secure, boolean useChecksums) - throws CertificateException, SSLException { - this(port, store, null, secure, useChecksums); + this(port, store, null, secure); } public FailoverServer(int port, final SegmentStore store, String[] allowedClientIPRanges) throws CertificateException, SSLException { - this(port, store, allowedClientIPRanges, false, true); + this(port, store, allowedClientIPRanges, false); } public FailoverServer(int port, final SegmentStore store, String[] allowedClientIPRanges, boolean secure) throws CertificateException, SSLException { - this(port, store, allowedClientIPRanges, secure, true); - } - - public FailoverServer(int port, final SegmentStore store, String[] allowedClientIPRanges, boolean secure, final boolean checksums) - throws CertificateException, SSLException { this.port = port; if (secure) { @@ -135,9 +126,7 @@ public class FailoverServer implements F } p.addLast(new LineBasedFrameDecoder(8192)); p.addLast(new StringDecoder(CharsetUtil.UTF_8)); - if (checksums) { - p.addLast(new SnappyFramedEncoder()); - } + p.addLast(new SnappyFramedEncoder()); p.addLast(new RecordIdEncoder()); p.addLast(new SegmentEncoder()); p.addLast(handler); @@ -169,13 +158,23 @@ public class FailoverServer implements F handler.state = STATUS_CLOSED; } - @Override - public void start() { + private void start(boolean wait) { if (running) return; running = true; this.handler.state = STATUS_STARTING; + final Thread close = new Thread() { + @Override + public void run() { + try { + running = true; + channelFuture.sync().channel().closeFuture().sync(); + } catch (InterruptedException e) { + FailoverServer.this.stop(); + } + } + }; Future<?> startup = bossGroup.submit(new Runnable() { @Override public void run() { @@ -185,25 +184,28 @@ public class FailoverServer implements F //the channel registration synchronous. //Note that now this method will return immediately. channelFuture = b.bind(port); - new Thread() { - @Override - public void run() { - try { - running = true; - channelFuture.sync().channel().closeFuture().sync(); - } catch (InterruptedException e) { - FailoverServer.this.stop(); - } - } - }.start(); + close.start(); } }); if (!startup.awaitUninterruptibly(10000)) { log.error("FailoverServer failed to start within 10 seconds and will be canceled"); startup.cancel(true); + } else if (wait) { + try { + close.join(); + } catch (InterruptedException ignored) {} } } + public void startAndWait() { + start(true); + } + + @Override + public void start() { + start(false); + } + @Override public String getMode() { return "master"; Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java?rev=1626021&r1=1626020&r2=1626021&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java Thu Sep 18 16:25:34 2014 @@ -79,19 +79,10 @@ public class BulkTest extends TestBase { } @Test - public void test1MillionNodesNoChecksum() throws Exception { - test(1000000, 87, 87, 22700000, 22800000, false, false); - } - - @Test public void test1MillionNodesUsingSSL() throws Exception { - test(1000000, 87, 87, 22700000, 22800000, true, true); + test(1000000, 87, 87, 22700000, 22800000, true); } - @Test - public void test1MillionNodesUsingSSLNoChecksum() throws Exception { - test(1000000, 87, 87, 22700000, 22800000, true, false); - } /* @Test public void test10MillionNodes() throws Exception { @@ -102,11 +93,11 @@ public class BulkTest extends TestBase { // private helper private void test(int number, int minExpectedSegments, int maxExpectedSegments, long minExpectedBytes, long maxExpectedBytes) throws Exception { - test(number, minExpectedSegments, maxExpectedSegments, minExpectedBytes, maxExpectedBytes, false, true); + test(number, minExpectedSegments, maxExpectedSegments, minExpectedBytes, maxExpectedBytes, false); } private void test(int number, int minExpectedSegments, int maxExpectedSegments, long minExpectedBytes, long maxExpectedBytes, - boolean useSSL, boolean useChecksum) throws Exception { + boolean useSSL) throws Exception { NodeStore store = new SegmentNodeStore(storeS); NodeBuilder rootbuilder = store.getRoot().builder(); NodeBuilder b = rootbuilder.child("store"); @@ -119,11 +110,11 @@ public class BulkTest extends TestBase { store.merge(rootbuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); storeS.flush(); - final FailoverServer server = new FailoverServer(port, storeS, useSSL, useChecksum); + final FailoverServer server = new FailoverServer(port, storeS, useSSL); server.start(); System.setProperty(FailoverClient.CLIENT_ID_PROPERTY_NAME, "Bar"); - FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC, useSSL, useChecksum); + FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC, useSSL); final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); ObjectName status = new ObjectName(FailoverStatusMBean.JMX_NAME + ",id=*");