Author: baedke Date: Thu Sep 4 14:00:27 2014 New Revision: 1622479 URL: http://svn.apache.org/r1622479 Log: OAK-1915: TarMK failover 2.0
Added test cases, improved stability and error handling, added ssl support, prepared Sling runmodes slave and master, improved JMX MBeans. Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg jackrabbit/oak/trunk/oak-tarmk-failover/pom.xml jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Thu Sep 4 14:00:27 2014 @@ -63,7 +63,7 @@ import org.slf4j.LoggerFactory; */ @Component(policy = ConfigurationPolicy.REQUIRE) public class SegmentNodeStoreService extends ProxyNodeStore - implements Observable { + implements Observable, SegmentStoreProvider { @Property(description="The unique name of this instance") public static final String NAME = "name"; @@ -83,6 +83,8 @@ public class SegmentNodeStoreService ext @Property(description = "TarMK compaction paused flag", boolValue = true) public static final String PAUSE_COMPACTION = "pauseCompaction"; + @Property(description = "Flag indicating that this component will not register as a NodeStore but just as a NodeStoreProvider", boolValue = false) + public static final String STANDBY = "standby"; /** * Boolean value indicating a blobStore is to be used */ @@ -104,7 +106,8 @@ public class SegmentNodeStoreService ext policy = ReferencePolicy.DYNAMIC) private volatile BlobStore blobStore; - private ServiceRegistration registration; + private ServiceRegistration storeRegistration; + private ServiceRegistration providerRegistration; private Registration revisionGCRegistration; private Registration blobGCRegistration; private WhiteboardExecutor executor; @@ -174,7 +177,12 @@ public class SegmentNodeStoreService ext Dictionary<String, String> props = new Hashtable<String, String>(); props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName()); - registration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props); + + boolean standby = toBoolean(lookup(context, STANDBY), false); + providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, props); + if (!standby) { + storeRegistration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props); + } OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext()); executor = new WhiteboardExecutor(); @@ -240,9 +248,13 @@ public class SegmentNodeStoreService ext } private void unregisterNodeStore() { - if(registration != null){ - registration.unregister(); - registration = null; + if(providerRegistration != null){ + providerRegistration.unregister(); + providerRegistration = null; + } + if(storeRegistration != null){ + storeRegistration.unregister(); + storeRegistration = null; } if (revisionGCRegistration != null) { revisionGCRegistration.unregister(); Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java?rev=1622479&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java Thu Sep 4 14:00:27 2014 @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment; + +public interface SegmentStoreProvider { + + SegmentStore getSegmentStore(); +} Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java (original) +++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java Thu Sep 4 14:00:27 2014 @@ -276,6 +276,7 @@ public class Main { final OptionSpec<String> host = parser.accepts("host", "master host").withRequiredArg().ofType(String.class).defaultsTo(defaultHost); final OptionSpec<Integer> port = parser.accepts("port", "master port").withRequiredArg().ofType(Integer.class).defaultsTo(defaultPort); final OptionSpec<Integer> interval = parser.accepts("interval", "interval between successive executions").withRequiredArg().ofType(Integer.class); + final OptionSpec<Boolean> secure = parser.accepts("secure", "use secure connections").withRequiredArg().ofType(Boolean.class); final OptionSpec<?> help = parser.acceptsAll(asList("h", "?", "help"), "show help").forHelp(); final OptionSpec<String> nonOption = parser.nonOptions(Mode.SYNCSLAVE + " <path to repository>"); @@ -300,7 +301,8 @@ public class Main { failoverClient = new FailoverClient( options.has(host)? options.valueOf(host) : defaultHost, options.has(port)? options.valueOf(port) : defaultPort, - store); + store, + options.has(secure) && options.valueOf(secure)); if (!options.has(interval)) { failoverClient.run(); } else { Modified: jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg Thu Sep 4 14:00:27 2014 @@ -1 +1,2 @@ -mode=master +mode=master +master.allowed-client-ip-ranges=localhost,127.0.0.1,::1,192.168.0.0-192.168.255.255 \ No newline at end of file Modified: jackrabbit/oak/trunk/oak-tarmk-failover/pom.xml URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/pom.xml?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/pom.xml (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/pom.xml Thu Sep 4 14:00:27 2014 @@ -28,6 +28,7 @@ <description>Oak TarMK failover module</description> <properties> + <netty-version>4.0.23.Final</netty-version> </properties> <build> @@ -137,31 +138,31 @@ <dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> - <version>4.0.20.Final</version> + <version>${netty-version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-buffer</artifactId> - <version>4.0.20.Final</version> + <version>${netty-version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport</artifactId> - <version>4.0.20.Final</version> + <version>${netty-version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-codec</artifactId> - <version>4.0.20.Final</version> + <version>${netty-version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-handler</artifactId> - <version>4.0.20.Final</version> + <version>${netty-version}</version> <scope>provided</scope> </dependency> @@ -184,6 +185,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.3.2</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <scope>test</scope> Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java Thu Sep 4 14:00:27 2014 @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.Map; public class CommunicationObserver { + private static final int MAX_CLIENT_STATISTICS = 10; private class CommunicationPartnerMBean implements ObservablePartnerMBean { private final ObjectName mbeanName; @@ -45,6 +46,8 @@ public class CommunicationObserver { public Date lastSeen; public String remoteAddress; public int remotePort; + public long segmentsSent; + public long segmentBytesSent; public CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException { this.clientName = clientName; @@ -79,6 +82,16 @@ public class CommunicationObserver { public String getLastSeenTimestamp() { return this.lastSeen == null ? null : this.lastSeen.toString(); } + + @Override + public long getTransferredSegments() { + return this.segmentsSent; + } + + @Override + public long getTransferredSegmentBytes() { + return this.segmentBytesSent; + } } private static final Logger log = LoggerFactory @@ -92,22 +105,28 @@ public class CommunicationObserver { this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>(); } - public void unregister() { + private void unregister(CommunicationPartnerMBean m) { final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); + try { + jmxServer.unregisterMBean(m.getMBeanName()); + } + catch (Exception e) { + log.error("error unregistering mbean for client '" + m.getName() + "'", e); + } + } + + public void unregister() { for (CommunicationPartnerMBean m : this.partnerDetails.values()) { - try { - jmxServer.unregisterMBean(m.getMBeanName()); - } - catch (Exception e) { - log.error("error unregistering mbean for client '" + m.getName() + "'", e); - } + unregister(m); } } public void gotMessageFrom(String client, String request, InetSocketAddress remote) throws MalformedObjectNameException { + log.debug("got message '" + request + "' from client " + client); CommunicationPartnerMBean m = this.partnerDetails.get(client); boolean register = false; if (m == null) { + cleanUp(); m = new CommunicationPartnerMBean(client); m.remoteAddress = remote.getAddress().getHostAddress(); m.remotePort = remote.getPort(); @@ -127,7 +146,35 @@ public class CommunicationObserver { } } + public void didSendSegmentBytes(String client, int size) { + log.debug("did send segment with " + size + " bytes to client " + client); + CommunicationPartnerMBean m = this.partnerDetails.get(client); + m.segmentsSent++; + m.segmentBytesSent += size; + this.partnerDetails.put(client, m); + } + public String getID() { return this.identifier; } + + // helper + + private void cleanUp() { + while (this.partnerDetails.size() >= MAX_CLIENT_STATISTICS) { + CommunicationPartnerMBean oldestEntry = oldestEntry(); + if (oldestEntry == null) return; + log.info("housekeeping: removing statistics for " + oldestEntry.getName()); + unregister(oldestEntry); + this.partnerDetails.remove(oldestEntry.getName()); + } + } + + private CommunicationPartnerMBean oldestEntry() { + CommunicationPartnerMBean ret = null; + for (CommunicationPartnerMBean m : this.partnerDetails.values()) { + if (ret == null || ret.lastSeen.after(m.lastSeen)) ret = m; + } + return ret; + } } Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java Thu Sep 4 14:00:27 2014 @@ -29,6 +29,8 @@ import io.netty.channel.socket.SocketCha import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.compression.SnappyFramedDecoder; import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.DefaultEventExecutorGroup; @@ -50,6 +52,7 @@ import org.slf4j.LoggerFactory; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.StandardMBean; +import javax.net.ssl.SSLException; public final class FailoverClient implements FailoverStatusMBean, Runnable, Closeable { public static final String CLIENT_ID_PROPERTY_NAME = "failOverID"; @@ -59,6 +62,7 @@ public final class FailoverClient implem private final String host; private final int port; + private final boolean checkChecksums; private int readTimeoutMs = 10000; private final FailoverStore store; @@ -66,13 +70,26 @@ public final class FailoverClient implem private FailoverClientHandler handler; private EventLoopGroup group; private EventExecutorGroup executor; + private SslContext sslContext; private boolean running; private String state; - public FailoverClient(String host, int port, SegmentStore store) { + public FailoverClient(String host, int port, SegmentStore store) throws SSLException { + this(host, port, store, false, true); + } + + public FailoverClient(String host, int port, SegmentStore store, boolean secure) throws SSLException { + this(host, port, store, secure, true); + } + + public FailoverClient(String host, int port, SegmentStore store, boolean secure, boolean checksums) throws SSLException { this.state = STATUS_INITIALIZING; this.host = host; this.port = port; + this.checkChecksums = checksums; + if (secure) { + this.sslContext = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE); + } this.store = new FailoverStore(store); String s = System.getProperty(CLIENT_ID_PROPERTY_NAME); this.observer = new CommunicationObserver((s == null || s.length() == 0) ? UUID.randomUUID().toString() : s); @@ -130,11 +147,16 @@ public final class FailoverClient implem @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); + if (sslContext != null) { + p.addLast(sslContext.newHandler(ch.alloc())); + } // WriteTimeoutHandler & ReadTimeoutHandler p.addLast("readTimeoutHandler", new ReadTimeoutHandler( readTimeoutMs, TimeUnit.MILLISECONDS)); p.addLast(new StringEncoder(CharsetUtil.UTF_8)); - p.addLast(new SnappyFramedDecoder(true)); + if (FailoverClient.this.checkChecksums) { + p.addLast(new SnappyFramedDecoder(true)); + } p.addLast(new RecordIdDecoder(store)); p.addLast(executor, handler); } Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java Thu Sep 4 14:00:27 2014 @@ -19,16 +19,13 @@ package org.apache.jackrabbit.oak.plugins.segment.failover.client; import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetHeadReq; + import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; import java.io.Closeable; -import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; import org.apache.jackrabbit.oak.plugins.segment.RecordId; @@ -53,8 +50,6 @@ public class FailoverClientHandler exten private ChannelHandlerContext ctx; - private Promise<RecordId> headPromise; - public FailoverClientHandler(final FailoverStore store, EventExecutorGroup executor, CommunicationObserver observer) { this.store = store; @@ -63,15 +58,17 @@ public class FailoverClientHandler exten } @Override - public void channelActive(ChannelHandlerContext ctx) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; - sendHeadRequest(); + log.debug("sending head request"); + ctx.writeAndFlush(newGetHeadReq(this.observer.getID())); + log.debug("did send head request"); } @Override protected void channelRead0(ChannelHandlerContext ctx, RecordId msg) throws Exception { - headPromise.setSuccess(msg); + setHead(msg); }; @Override @@ -79,28 +76,7 @@ public class FailoverClientHandler exten ctx.flush(); } - private synchronized void sendHeadRequest() { - headPromise = ctx.executor().newPromise(); - headPromise.addListener(new GenericFutureListener<Future<RecordId>>() { - @Override - public void operationComplete(Future<RecordId> future) { - if (future.isSuccess()) { - try { - setHead(future.get()); - } catch (Exception e) { - exceptionCaught(ctx, e); - } - } else { - exceptionCaught(ctx, future.cause()); - } - } - }); - ctx.writeAndFlush(newGetHeadReq(this.observer.getID())).addListener( - new FailedRequestListener(headPromise)); - } - synchronized void setHead(RecordId head) { - headPromise = null; if (store.getHead().getRecordId().equals(head)) { // all sync'ed up @@ -108,6 +84,8 @@ public class FailoverClientHandler exten ctx.close(); return; } + + log.debug("updating current head to " + head); ctx.pipeline().remove(RecordIdDecoder.class); ctx.pipeline().remove(this); ctx.pipeline().addLast(new SegmentDecoder(store)); @@ -123,12 +101,7 @@ public class FailoverClientHandler exten h1.channelActive(ctx); h2.channelActive(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - log.error("Failed synchronizing state.", cause); - close(); + log.debug("updating current head finished"); } @Override Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java Thu Sep 4 14:00:27 2014 @@ -80,12 +80,13 @@ public class SegmentLoaderHandler extend public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof SegmentReply) { + //log.debug("offering segment " + ((SegmentReply) evt).getSegment()); segment.offer(((SegmentReply) evt).getSegment()); } } private void initSync() { - log.debug("new head id " + head); + log.info("new head id " + head); long t = System.currentTimeMillis(); try { @@ -102,30 +103,33 @@ public class SegmentLoaderHandler extend } finally { close(); } + log.info("returning initSync"); } @Override public Segment readSegment(final SegmentId id) { - ctx.writeAndFlush(newGetSegmentReq(this.clientID, id)).addListener(reqListener); + ctx.writeAndFlush(newGetSegmentReq(this.clientID, id)); return getSegment(); } - private final ChannelFutureListener reqListener = new ChannelFutureListener() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + log.warn("Closing channel. Got exception: " + cause); + ctx.close(); + } - @Override - public void operationComplete(ChannelFuture future) { - if (!future.isSuccess()) { - exceptionCaught(ctx, future.cause()); - } - } - }; + // implementation of RemoteSegmentLoader public Segment getSegment() { boolean interrupted = false; try { for (;;) { try { - return segment.poll(timeoutMs, TimeUnit.MILLISECONDS); + log.debug("polling segment"); + Segment s = segment.poll(timeoutMs, TimeUnit.MILLISECONDS); + log.debug("returning segment " + s); + return s; } catch (InterruptedException ignore) { interrupted = true; } @@ -135,15 +139,9 @@ public class SegmentLoaderHandler extend Thread.currentThread().interrupt(); } } - } - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - log.error("Failed synchronizing state.", cause); - close(); } - @Override public void close() { ctx.close(); if (preloaderExecutor != null && !preloaderExecutor.isShuttingDown()) { @@ -156,7 +154,6 @@ public class SegmentLoaderHandler extend } } - @Override public boolean isClosed() { return (loaderExecutor != null && (loaderExecutor.isShuttingDown() || loaderExecutor .isShutdown())); Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java Thu Sep 4 14:00:27 2014 @@ -23,13 +23,18 @@ import io.netty.channel.SimpleChannelInb import org.apache.jackrabbit.oak.plugins.segment.Segment; import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SegmentPreLoaderHandler extends SimpleChannelInboundHandler<Segment> { + private static final Logger log = LoggerFactory + .getLogger(SegmentPreLoaderHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, Segment msg) throws Exception { + log.info("fire new segment reply for " + msg.getSegmentId()); ctx.fireUserEventTriggered(new SegmentReply(msg)); } Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java Thu Sep 4 14:00:27 2014 @@ -19,6 +19,8 @@ package org.apache.jackrabbit.oak.plugins.segment.failover.codec; +import java.io.IOException; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -46,7 +48,7 @@ public class RecordIdDecoder extends Len throws Exception { ByteBuf frame = (ByteBuf) super.decode(ctx, in); if (frame == null) { - return null; + throw new IOException("Received unexpected empty frame. Maybe you have enabled secure transmission on only one endpoint of the connection."); } byte type = frame.readByte(); frame.discardReadBytes(); Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java Thu Sep 4 14:00:27 2014 @@ -39,4 +39,10 @@ public interface ObservablePartnerMBean @CheckForNull @Description("Time the remote instance was last contacted") String getLastSeenTimestamp(); + + @Description("Number of transferred segments") + long getTransferredSegments(); + + @Description("Number of bytes stored in transferred segments") + long getTransferredSegmentBytes(); } Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java Thu Sep 4 14:00:27 2014 @@ -30,10 +30,13 @@ import io.netty.channel.socket.nio.NioSe import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.compression.SnappyFramedEncoder; import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.util.CharsetUtil; import java.io.Closeable; import java.lang.management.ManagementFactory; +import java.security.cert.CertificateException; import java.util.concurrent.TimeUnit; import io.netty.util.concurrent.Future; @@ -48,6 +51,7 @@ import org.slf4j.LoggerFactory; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.StandardMBean; +import javax.net.ssl.SSLException; public class FailoverServer implements FailoverStatusMBean, Closeable { @@ -61,16 +65,44 @@ public class FailoverServer implements F private final ServerBootstrap b; private final CommunicationObserver observer; private final FailoverServerHandler handler; + private SslContext sslContext; private ChannelFuture channelFuture; private boolean running; - public FailoverServer(int port, final SegmentStore store) { - this(port, store, null); + public FailoverServer(int port, final SegmentStore store) + throws CertificateException, SSLException { + this(port, store, null, false, true); } - public FailoverServer(int port, final SegmentStore store, String[] allowedClientIPRanges) { + public FailoverServer(int port, final SegmentStore store, boolean secure) + throws CertificateException, SSLException { + this(port, store, null, secure, true); + } + + public FailoverServer(int port, final SegmentStore store, boolean secure, boolean useChecksums) + throws CertificateException, SSLException { + this(port, store, null, secure, useChecksums); + } + + public FailoverServer(int port, final SegmentStore store, String[] allowedClientIPRanges) + throws CertificateException, SSLException { + this(port, store, allowedClientIPRanges, false, true); + } + + public FailoverServer(int port, final SegmentStore store, String[] allowedClientIPRanges, boolean secure) + throws CertificateException, SSLException { + this(port, store, allowedClientIPRanges, secure, true); + } + + public FailoverServer(int port, final SegmentStore store, String[] allowedClientIPRanges, boolean secure, final boolean checksums) + throws CertificateException, SSLException { this.port = port; + if (secure) { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sslContext = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); + } + observer = new CommunicationObserver("master"); handler = new FailoverServerHandler(store, observer, allowedClientIPRanges); bossGroup = new NioEventLoopGroup(1); @@ -81,7 +113,7 @@ public class FailoverServer implements F jmxServer.registerMBean(new StandardMBean(this, FailoverStatusMBean.class), new ObjectName(this.getMBeanName())); } catch (Exception e) { - log.error("can register failover status mbean", e); + log.error("can't register failover status mbean", e); } b = new ServerBootstrap(); @@ -98,9 +130,14 @@ public class FailoverServer implements F @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); + if (sslContext != null) { + p.addLast(sslContext.newHandler(ch.alloc())); + } p.addLast(new LineBasedFrameDecoder(8192)); p.addLast(new StringDecoder(CharsetUtil.UTF_8)); - p.addLast(new SnappyFramedEncoder()); + if (checksums) { + p.addLast(new SnappyFramedEncoder()); + } p.addLast(new RecordIdEncoder()); p.addLast(new SegmentEncoder()); p.addLast(handler); Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java Thu Sep 4 14:00:27 2014 @@ -101,25 +101,25 @@ public class FailoverServerHandler exten @Override public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception { - state = "Channel registered"; + state = "channel registered"; super.channelRegistered(ctx); } @Override public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception { - state = "Channel active"; + state = "channel active"; super.channelActive(ctx); } @Override public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception { - state = "Channel inactive"; + state = "channel inactive"; super.channelInactive(ctx); } @Override public void channelUnregistered(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception { - state = "Channel unregistered"; + state = "channel unregistered"; super.channelUnregistered(ctx); } @@ -130,15 +130,18 @@ public class FailoverServerHandler exten String request = Messages.extractMessageFrom(payload); InetSocketAddress client = (InetSocketAddress)ctx.channel().remoteAddress(); + if (!clientAllowed(client)) { log.warn("Got request from client " + client + " which is not in the allowed ip ranges! Request will be ignored."); } else { - observer.gotMessageFrom(Messages.extractClientFrom(payload), request, client); + String clientID = Messages.extractClientFrom(payload); + observer.gotMessageFrom(clientID, request, client); if (Messages.GET_HEAD.equalsIgnoreCase(request)) { RecordId r = headId(); if (r != null) { ctx.writeAndFlush(r); + log.debug("returning from head request"); return; } } else if (request.startsWith(Messages.GET_SEGMENT)) { @@ -148,24 +151,24 @@ public class FailoverServerHandler exten Segment s = null; - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 10; i++) { try { s = store.readSegment(new SegmentId(store.getTracker(), uuid.getMostSignificantBits(), uuid .getLeastSignificantBits())); } catch (IllegalStateException e) { // segment not found - log.warn(e.getMessage()); - } - if (s != null) { - break; - } else { - TimeUnit.MILLISECONDS.sleep(500); + log.info("waiting for segment. Got exception: " + e.getMessage()); + TimeUnit.MILLISECONDS.sleep(1000); } + if (s != null) break; } if (s != null) { + log.info("sending segment" + sid + " to " + client); ctx.writeAndFlush(s); + observer.didSendSegmentBytes(clientID, s.size()); + log.debug("master returns from segment request"); return; } } else { @@ -184,6 +187,6 @@ public class FailoverServerHandler exten public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { state = "exception occurred: " + cause.getMessage(); log.error(cause.getMessage(), cause); - ctx.close(); + ctx.fireExceptionCaught(cause); } } Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java Thu Sep 4 14:00:27 2014 @@ -29,9 +29,13 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker; +import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer; import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FailoverStore implements SegmentStore { + private static final Logger log = LoggerFactory.getLogger(FailoverStore.class); private final SegmentTracker tracker = new SegmentTracker(this); @@ -65,7 +69,7 @@ public class FailoverStore implements Se @Override public Segment readSegment(SegmentId sid) { - + log.info("shall read segment " + sid); Deque<SegmentId> ids = new ArrayDeque<SegmentId>(); ids.offer(sid); int err = 0; @@ -74,8 +78,10 @@ public class FailoverStore implements Se while (!ids.isEmpty()) { SegmentId id = ids.remove(); if (!seen.contains(id) && !delegate.containsSegment(id)) { + log.debug("trying to read segment " + id); Segment s = loader.readSegment(id); if (s != null) { + log.info("got segment " + id + " with size " + s.size()); ByteArrayOutputStream bout = new ByteArrayOutputStream( s.size()); if (id.isDataSegmentId()) { @@ -92,6 +98,7 @@ public class FailoverStore implements Se ids.removeAll(seen); err = 0; } else { + log.error("could NOT read segment " + id); if (loader.isClosed() || err == 4) { loader.close(); throw new IllegalStateException( @@ -105,6 +112,7 @@ public class FailoverStore implements Se } } + log.info("calling delegate to return segment " + sid); return delegate.readSegment(sid); } Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java Thu Sep 4 14:00:27 2014 @@ -21,9 +21,12 @@ import static org.apache.felix.scr.annot import static org.apache.felix.scr.annotations.ReferencePolicyOption.GREEDY; import java.io.IOException; +import java.security.cert.CertificateException; import java.util.Dictionary; import java.util.Hashtable; +import javax.net.ssl.SSLException; + import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.ConfigurationPolicy; @@ -32,7 +35,7 @@ import org.apache.felix.scr.annotations. import org.apache.felix.scr.annotations.PropertyOption; import org.apache.felix.scr.annotations.Reference; import org.apache.jackrabbit.oak.commons.PropertiesUtil; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStoreService; +import org.apache.jackrabbit.oak.plugins.segment.SegmentStoreProvider; import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient; import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer; @@ -73,8 +76,13 @@ public class FailoverStoreService { @Property(label = "Client allowed IP-Ranges", description = "accept incoming requests for these IP-ranges only") public static final String ALLOWED_CLIENT_IP_RANGES = "master.allowed-client-ip-ranges"; + public static final boolean SECURE_DEFAULT = false; + @Property(label = "Secure", description = "Use secure connections", boolValue = SECURE_DEFAULT) + public static final String SECURE = "secure"; + @Reference(policy = STATIC, policyOption = GREEDY) - private NodeStore store = null; + private SegmentStoreProvider storeProvider = null; + private SegmentStore segmentStore; private FailoverServer master = null; @@ -83,13 +91,12 @@ public class FailoverStoreService { private ServiceRegistration syncReg = null; @Activate - private void activate(ComponentContext context) throws IOException { - if (store instanceof SegmentNodeStoreService) { - segmentStore = ((SegmentNodeStoreService) store).getSegmentStore(); + private void activate(ComponentContext context) throws IOException, CertificateException { + if (storeProvider != null) { + segmentStore = storeProvider.getSegmentStore(); } else { throw new IllegalArgumentException( - "Unexpected NodeStore impl, expecting SegmentNodeStoreService, got " - + store.getClass()); + "Missing SegmentStoreProvider service"); } String mode = valueOf(context.getProperties().get(MODE)); if (MODE_MASTER.equals(mode)) { @@ -116,23 +123,25 @@ public class FailoverStoreService { } } - private void bootstrapMaster(ComponentContext context) { + private void bootstrapMaster(ComponentContext context) throws CertificateException, SSLException { Dictionary<?, ?> props = context.getProperties(); int port = PropertiesUtil.toInteger(props.get(PORT), PORT_DEFAULT); String ipRanges = PropertiesUtil.toString(props.get(ALLOWED_CLIENT_IP_RANGES), ALLOWED_CLIENT_IP_RANGES_DEFAULT); String[] ranges = ipRanges == null ? null : ipRanges.split(","); - master = new FailoverServer(port, segmentStore, ranges); + boolean secure = PropertiesUtil.toBoolean(props.get(SECURE), SECURE_DEFAULT); + master = new FailoverServer(port, segmentStore, ranges, secure); master.start(); log.info("started failover master on port {} with allowed ip ranges {}.", port, ipRanges); } - private void bootstrapSlave(ComponentContext context) { + private void bootstrapSlave(ComponentContext context) throws SSLException { Dictionary<?, ?> props = context.getProperties(); int port = PropertiesUtil.toInteger(props.get(PORT), PORT_DEFAULT); long interval = PropertiesUtil.toInteger(props.get(INTERVAL), INTERVAL_DEFAULT); String host = PropertiesUtil.toString(props.get(MASTER_HOST), MASTER_HOST_DEFAULT); + boolean secure = PropertiesUtil.toBoolean(props.get(SECURE), SECURE_DEFAULT); - sync = new FailoverClient(host, port, segmentStore); + sync = new FailoverClient(host, port, segmentStore, secure); Dictionary<Object, Object> dictionary = new Hashtable<Object, Object>(); dictionary.put("scheduler.period", interval); dictionary.put("scheduler.concurrent", false); Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java?rev=1622479&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java (added) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java Thu Sep 4 14:00:27 2014 @@ -0,0 +1,231 @@ +package org.apache.jackrabbit.oak.plugins.segment; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class NetworkErrorProxy { + private static final Logger log = LoggerFactory + .getLogger(NetworkErrorProxy.class); + + private final int inboundPort; + private final int outboundPort; + private final String host; + private ChannelFuture f; + + private ForwardHandler fh; + + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + public NetworkErrorProxy(int inboundPort, String outboundHost, int outboundPort) { + this.inboundPort = inboundPort; + this.outboundPort = outboundPort; + this.host = outboundHost; + this.fh = new ForwardHandler(NetworkErrorProxy.this.host, NetworkErrorProxy.this.outboundPort); + } + + public void skipBytes(int pos, int n) { + this.fh.skipPosition = pos; + this.fh.skipBytes = n; + } + + public void run() throws Exception { + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(NetworkErrorProxy.this.fh); + } + }); + + f = b.bind(this.inboundPort).sync(); + } catch (Exception e) { + log.warn("exception occurred", e); + } + } + + public void reset() throws Exception { + f.channel().disconnect(); + this.fh = new ForwardHandler(NetworkErrorProxy.this.host, NetworkErrorProxy.this.outboundPort); + run(); + } + + public void close() { + f.channel().close(); + if (bossGroup != null && !bossGroup.isShuttingDown()) { + bossGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly(); + } + if (workerGroup != null && !workerGroup.isShuttingDown()) { + workerGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly(); + } + } +} + +class ForwardHandler extends ChannelInboundHandlerAdapter { + private final String targetHost; + private final int targetPort; + public long transferredBytes; + public int skipPosition; + public int skipBytes; + private ChannelFuture remote; + + public ForwardHandler(String host, int port) { + this.targetHost = host; + this.targetPort = port; + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + final ChannelHandlerContext c = ctx; + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap cb = new Bootstrap(); + cb.group(group); + cb.channel(NioSocketChannel.class); + + cb.handler(new ChannelInitializer<SocketChannel>() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addFirst(new SwallowingHandler(c, ForwardHandler.this.skipPosition, ForwardHandler.this.skipBytes)); + } + }); + remote = cb.connect(this.targetHost, this.targetPort).sync(); + + ctx.fireChannelRegistered(); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + remote.channel().close(); + remote = null; + ctx.fireChannelUnregistered(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf bb = (ByteBuf)msg; + this.transferredBytes += (bb.writerIndex() - bb.readerIndex()); + } + remote.channel().write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + remote.channel().flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } +} + +class SendBackHandler implements ChannelInboundHandler { + private final ChannelHandlerContext target; + public long transferredBytes; + + public SendBackHandler(ChannelHandlerContext ctx) { + this.target = ctx; + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + } + + public int messageSize(Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf bb = (ByteBuf)msg; + return (bb.writerIndex() - bb.readerIndex()); + } + // unknown + return 0; + } + + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + this.transferredBytes += messageSize(msg); + this.target.write(msg); + } + + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + this.target.flush(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + this.target.close(); + } + +} + +class SwallowingHandler extends SendBackHandler { + private int skipStartingPos; + private int nrOfBytes; + + public SwallowingHandler(ChannelHandlerContext ctx, int skipStartingPos, int numberOfBytes) { + super(ctx); + this.skipStartingPos = skipStartingPos; + this.nrOfBytes = numberOfBytes; + } + + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf bb = (ByteBuf)msg; + if (this.nrOfBytes > 0) { + if (this.transferredBytes >= this.skipStartingPos) { + bb.skipBytes(this.nrOfBytes); + this.nrOfBytes = 0; + } + else { + this.skipStartingPos -= messageSize(msg); + } + } + } + super.channelRead(ctx, msg); + } + +} + Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java?rev=1622479&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java (added) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java Thu Sep 4 14:00:27 2014 @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover; + +import org.apache.jackrabbit.oak.plugins.segment.NetworkErrorProxy; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient; +import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class BrokenNetworkTest extends TestBase { + final static int PROXY_PORT = 4711; + + @Before + public void setUp() throws Exception { + setUpServerAndTwoClients(); + } + + @After + public void after() { + closeServerAndTwoClients(); + } + + @Test + public void testProxy() throws Exception { + useProxy(false); + } + + @Test + public void testProxySSL() throws Exception { + useProxy(true); + } + + @Test + public void testProxySkippedBytes() throws Exception { + useProxy(false, 100, 1, false); + } + + @Test + public void testProxySSLSkippedBytes() throws Exception { + useProxy(true, 400, 10, false); + } + + @Test + public void testProxySkippedBytesIntermediateChange() throws Exception { + useProxy(false, 100, 1, true); + } + + @Test + public void testProxySSLSkippedBytesIntermediateChange() throws Exception { + useProxy(true, 400, 10, true); + } + + // private helper + + private void useProxy(boolean ssl) throws Exception { + useProxy(ssl, 0, 0, false); + } + + private void useProxy(boolean ssl, int skipPosition, int skipBytes, boolean intermediateChange) throws Exception { + NetworkErrorProxy p = new NetworkErrorProxy(PROXY_PORT, LOCALHOST, port); + p.skipBytes(skipPosition, skipBytes); + p.run(); + + NodeStore store = new SegmentNodeStore(storeS); + final FailoverServer server = new FailoverServer(port, storeS, ssl); + server.start(); + addTestContent(store, "server"); + storeS.flush(); // this speeds up the test a little bit... + + FailoverClient cl = new FailoverClient(LOCALHOST, PROXY_PORT, storeC, ssl); + cl.run(); + + try { + if (skipBytes > 0) { + assertFalse("stores are not expected to be equal", storeS.getHead().equals(storeC.getHead())); + assertEquals(storeC2.getHead(), storeC.getHead()); + + p.reset(); + if (intermediateChange) { + addTestContent(store, "server2"); + storeS.flush(); + } + cl.run(); + } + assertEquals(storeS.getHead(), storeC.getHead()); + } finally { + server.close(); + cl.close(); + p.close(); + } + } +} Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java?rev=1622479&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java (added) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java Thu Sep 4 14:00:27 2014 @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover; + +import junit.framework.Assert; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient; +import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean; +import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.Set; + +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +public class BulkTest extends TestBase { + + @Before + public void setUp() throws Exception { + setUpServerAndClient(); + } + + @After + public void after() { + closeServerAndClient(); + } + + @Test + public void test100Nodes() throws Exception { + test(100, 1, 1, 3000, 3100); + } + + @Test + public void test1000Nodes() throws Exception { + test(1000, 1, 1, 53000, 55000); + } + + @Test + public void test10000Nodes() throws Exception { + test(10000, 1, 1, 245000, 246000); + } + + @Test + public void test100000Nodes() throws Exception { + test(100000, 9, 9, 2210000, 2220000); + } + + @Test + public void test1MillionNodes() throws Exception { + test(1000000, 87, 87, 22700000, 22800000); + } + + @Test + public void test1MillionNodesNoChecksum() throws Exception { + test(1000000, 87, 87, 22700000, 22800000, false, false); + } + + @Test + public void test1MillionNodesUsingSSL() throws Exception { + test(1000000, 87, 87, 22700000, 22800000, true, true); + } + + @Test + public void test1MillionNodesUsingSSLNoChecksum() throws Exception { + test(1000000, 87, 87, 22700000, 22800000, true, false); + } +/* + @Test + public void test10MillionNodes() throws Exception { + test(10000000, 856, 856, 223000000, 224000000); + } +*/ + + // private helper + + private void test(int number, int minExpectedSegments, int maxExpectedSegments, long minExpectedBytes, long maxExpectedBytes) throws Exception { + test(number, minExpectedSegments, maxExpectedSegments, minExpectedBytes, maxExpectedBytes, false, true); + } + + private void test(int number, int minExpectedSegments, int maxExpectedSegments, long minExpectedBytes, long maxExpectedBytes, + boolean useSSL, boolean useChecksum) throws Exception { + NodeStore store = new SegmentNodeStore(storeS); + NodeBuilder rootbuilder = store.getRoot().builder(); + NodeBuilder b = rootbuilder.child("store"); + for (int j=0; j<=number / 1000; j++) { + NodeBuilder builder = b.child("Folder#" + j); + for (int i = 0; i <(number < 1000 ? number : 1000); i++) { + builder.child("Test#" + i).setProperty("ts", System.currentTimeMillis()); + } + } + store.merge(rootbuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + storeS.flush(); + + final FailoverServer server = new FailoverServer(port, storeS, useSSL, useChecksum); + server.start(); + + System.setProperty(FailoverClient.CLIENT_ID_PROPERTY_NAME, "Bar"); + FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC, useSSL, useChecksum); + + final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); + ObjectName status = new ObjectName(FailoverStatusMBean.JMX_NAME + ",id=*"); + ObjectName clientStatus = new ObjectName(cl.getMBeanName()); + ObjectName serverStatus = new ObjectName(server.getMBeanName()); + + long start = System.currentTimeMillis(); + cl.run(); + + try { + Set<ObjectName> instances = jmxServer.queryNames(status, null); + assertEquals(3, instances.size()); + + ObjectName connectionStatus = null; + for (ObjectName s : instances) { + if (!s.equals(clientStatus) && !s.equals(serverStatus)) connectionStatus = s; + } + assertNotNull(connectionStatus); + + long segments = ((Long)jmxServer.getAttribute(connectionStatus, "TransferredSegments")).longValue(); + long bytes = ((Long)jmxServer.getAttribute(connectionStatus, "TransferredSegmentBytes")).longValue(); + + System.out.println("did transfer " + segments + " segments with " + bytes + " bytes in " + (System.currentTimeMillis() - start) / 1000 + " seconds."); + + assertEquals(storeS.getHead(), storeC.getHead()); + + //compare(segments, "segment", minExpectedSegments, maxExpectedSegments); + //compare(bytes, "byte", minExpectedBytes, maxExpectedBytes); + + } finally { + server.close(); + cl.close(); + } + } + + private void compare(long current, String unit, long expectedMin, long expectedMax) { + assertTrue("current number of " + unit + "s (" + current + ") is less than minimum expected: " + expectedMin, current >= expectedMin); + assertTrue("current number of " + unit + "s (" + current + ") is bigger than maximum expected: " + expectedMax, current <= expectedMax); + } +} Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java Thu Sep 4 14:00:27 2014 @@ -48,18 +48,32 @@ public class FailoverIPRangeTest extends public void testFailoverAllClients() throws Exception { createTestWithConfig(null, true); } - @Test + @Test public void testFailoverLocalClient() throws Exception { createTestWithConfig(new String[]{"127.0.0.1"}, true); } @Test + public void testFailoverLocalClientUseIPv6() throws Exception { + if (!noDualStackSupport) { + createTestWithConfig("::1", new String[]{"::1"}, true); + } + } + + @Test public void testFailoverWrongClient() throws Exception { createTestWithConfig(new String[]{"127.0.0.2"}, false); } @Test + public void testFailoverWrongClientIPv6() throws Exception { + if (!noDualStackSupport) { + createTestWithConfig(new String[]{"::2"}, false); + } + } + + @Test public void testFailoverLocalhost() throws Exception { createTestWithConfig(new String[]{"localhost"}, true); } @@ -95,17 +109,50 @@ public class FailoverIPRangeTest extends } @Test + public void testFailoverCorrectListIPv6() throws Exception { + if (!noDualStackSupport) { + createTestWithConfig(new String[]{"foobar", "122-126", "::1", "126.0.0.1", "127.0.0.0-127.255.255.255"}, true); + } + } + + @Test public void testFailoverWrongList() throws Exception { - createTestWithConfig(new String[]{"foobar","126.0.0.1", "128.0.0.1-255.255.255.255", "128.0.0.0-127.255.255.255"}, false); + createTestWithConfig(new String[]{"foobar", "126.0.0.1", "::2", "128.0.0.1-255.255.255.255", "128.0.0.0-127.255.255.255"}, false); + } + + @Test + public void testFailoverCorrectListUseIPv6() throws Exception { + if (!noDualStackSupport) { + createTestWithConfig("::1", new String[]{"foobar","127-128", "0:0:0:0:0:0:0:1", "126.0.0.1", "127.0.0.0-127.255.255.255"}, true); + } + } + + @Test + public void testFailoverCorrectListIPv6UseIPv6() throws Exception { + if (!noDualStackSupport) { + createTestWithConfig("::1", new String[]{"foobar", "122-126", "::1", "126.0.0.1", "127.0.0.0-127.255.255.255"}, true); + } + } + + @Test + public void testFailoverWrongListUseIPv6() throws Exception { + if (!noDualStackSupport) { + createTestWithConfig("::1", new String[]{"foobar", "126.0.0.1", "::2", "128.0.0.1-255.255.255.255", "128.0.0.0-127.255.255.255"}, false); + } } private void createTestWithConfig(String[] ipRanges, boolean expectedToWork) throws Exception { + createTestWithConfig("127.0.0.1", ipRanges, expectedToWork); + } + + private void createTestWithConfig(String host, String[] ipRanges, boolean expectedToWork) throws Exception { NodeStore store = new SegmentNodeStore(storeS); final FailoverServer server = new FailoverServer(port, storeS, ipRanges); server.start(); addTestContent(store, "server"); + storeS.flush(); // this speeds up the test a little bit... - FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC); + FailoverClient cl = new FailoverClient(host, port, storeC); cl.run(); try { Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java Thu Sep 4 14:00:27 2014 @@ -25,7 +25,6 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -50,6 +49,7 @@ public class FailoverMultipleClientsTest final FailoverServer server = new FailoverServer(port, storeS); server.start(); SegmentTestUtils.addTestContent(store, "server"); + storeS.flush(); // this speeds up the test a little bit... FailoverClient cl1 = new FailoverClient("127.0.0.1", port, storeC); FailoverClient cl2 = new FailoverClient("127.0.0.1", port, storeC2); @@ -70,7 +70,7 @@ public class FailoverMultipleClientsTest cl2.run(); assertEquals(storeS.getHead(), storeC2.getHead()); - Assert.assertFalse("first client updated in stopped state!", storeS.getHead().equals(storeC.getHead())); + assertFalse("first client updated in stopped state!", storeS.getHead().equals(storeC.getHead())); cl1.start(); assertEquals(storeS.getHead(), storeC.getHead()); Added: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java?rev=1622479&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java (added) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java Thu Sep 4 14:00:27 2014 @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover; + +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient; +import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class FailoverSslTest extends TestBase { + + @Before + public void setUp() throws Exception { + setUpServerAndClient(); + } + + @After + public void after() { + closeServerAndClient(); + } + + @Test + public void testFailoverSecure() throws Exception { + + NodeStore store = new SegmentNodeStore(storeS); + final FailoverServer server = new FailoverServer(port, storeS, true); + server.start(); + addTestContent(store, "server"); + storeS.flush(); // this speeds up the test a little bit... + + FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC, true); + cl.run(); + + try { + assertEquals(storeS.getHead(), storeC.getHead()); + } finally { + server.close(); + cl.close(); + } + } + + @Test + public void testFailoverSecureServerPlainClient() throws Exception { + + NodeStore store = new SegmentNodeStore(storeS); + final FailoverServer server = new FailoverServer(port, storeS, true); + server.start(); + addTestContent(store, "server"); + storeS.flush(); // this speeds up the test a little bit... + + FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC); + cl.run(); + + try { + assertFalse("stores are equal but shouldn't!", storeS.getHead().equals(storeC.getHead())); + } finally { + server.close(); + cl.close(); + } + } + + @Test + public void testFailoverPlainServerSecureClient() throws Exception { + + NodeStore store = new SegmentNodeStore(storeS); + final FailoverServer server = new FailoverServer(port, storeS); + server.start(); + addTestContent(store, "server"); + storeS.flush(); // this speeds up the test a little bit... + + FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC, true); + cl.run(); + + try { + assertFalse("stores are equal but shouldn't!", storeS.getHead().equals(storeC.getHead())); + } finally { + server.close(); + cl.close(); + } + } +} Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java Thu Sep 4 14:00:27 2014 @@ -31,9 +31,7 @@ import javax.management.ObjectName; import java.lang.management.ManagementFactory; import java.util.Set; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; +import static junit.framework.Assert.*; public class MBeanTest extends TestBase { @@ -63,7 +61,7 @@ public class MBeanTest extends TestBase assertEquals("master", jmxServer.getAttribute(status, "Mode")); String m = jmxServer.getAttribute(status, "Status").toString(); - if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && !m.equals("Channel unregistered")) + if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && !m.equals("channel unregistered")) fail("unexpected Status" + m); assertEquals(FailoverStatusMBean.STATUS_STARTING, jmxServer.getAttribute(status, "Status")); @@ -150,8 +148,15 @@ public class MBeanTest extends TestBase Set<ObjectName> instances = jmxServer.queryNames(status, null); assertEquals(3, instances.size()); + ObjectName connectionStatus = null; + for (ObjectName s : instances) { + if (!s.equals(clientStatus) && !s.equals(serverStatus)) connectionStatus = s; + } + assertNotNull(connectionStatus); + assertTrue(jmxServer.isRegistered(clientStatus)); assertTrue(jmxServer.isRegistered(serverStatus)); + assertTrue(jmxServer.isRegistered(connectionStatus)); String m = jmxServer.getAttribute(clientStatus, "Mode").toString(); if (!m.startsWith("client: ")) fail("unexpected mode " + m); @@ -161,11 +166,14 @@ public class MBeanTest extends TestBase assertEquals(true, jmxServer.getAttribute(serverStatus, "Running")); assertEquals(true, jmxServer.getAttribute(clientStatus, "Running")); + assertEquals(new Long(2), jmxServer.getAttribute(connectionStatus, "TransferredSegments")); + assertEquals(new Long(128), jmxServer.getAttribute(connectionStatus, "TransferredSegmentBytes")); + // stop the master jmxServer.invoke(serverStatus, "stop", null, null); assertEquals(false, jmxServer.getAttribute(serverStatus, "Running")); m = jmxServer.getAttribute(serverStatus, "Status").toString(); - if (!m.equals(FailoverStatusMBean.STATUS_STOPPED) && !m.equals("Channel unregistered")) + if (!m.equals(FailoverStatusMBean.STATUS_STOPPED) && !m.equals("channel unregistered")) fail("unexpected Status" + m); // restart the master @@ -173,7 +181,7 @@ public class MBeanTest extends TestBase assertEquals(true, jmxServer.getAttribute(serverStatus, "Running")); assertEquals(true, jmxServer.getAttribute(clientStatus, "Running")); m = jmxServer.getAttribute(serverStatus, "Status").toString(); - if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && !m.equals("Channel unregistered")) + if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && !m.equals("channel unregistered")) fail("unexpected Status" + m); // stop the slave Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java Thu Sep 4 14:00:27 2014 @@ -70,4 +70,28 @@ public class RecoverTest extends TestBas } } + + @Test + public void testLocalChanges() throws Exception { + + NodeStore store = new SegmentNodeStore(storeC); + addTestContent(store, "client"); + + final FailoverServer server = new FailoverServer(port, storeS); + server.start(); + store = new SegmentNodeStore(storeS); + addTestContent(store, "server"); + storeS.flush(); + + FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC); + try { + assertFalse("stores are not expected to be equal", storeS.getHead().equals(storeC.getHead())); + cl.run(); + assertEquals(storeS.getHead(), storeC.getHead()); + } finally { + server.close(); + cl.close(); + } + + } } Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java?rev=1622479&r1=1622478&r2=1622479&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java Thu Sep 4 14:00:27 2014 @@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.plugins.segment.failover; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.SystemUtils; import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import java.io.File; @@ -28,6 +29,7 @@ import static org.apache.jackrabbit.oak. public class TestBase { int port = Integer.valueOf(System.getProperty("failover.server.port", "52808")); + final static String LOCALHOST = "127.0.0.1"; File directoryS; FileStore storeS; @@ -38,6 +40,12 @@ public class TestBase { File directoryC2; FileStore storeC2; + /* + Java 6 on Windows doesn't support dual IP stacks, so we will skip our IPv6 + tests. + */ + protected final boolean noDualStackSupport = SystemUtils.IS_OS_WINDOWS && SystemUtils.IS_JAVA_1_6; + public void setUpServerAndClient() throws IOException { // server directoryS = createTmpTargetDir("FailoverServerTest");