Author: baedke
Date: Thu Sep  4 14:00:27 2014
New Revision: 1622479

URL: http://svn.apache.org/r1622479
Log:
OAK-1915: TarMK failover 2.0

Added test cases, improved stability and error handling, added ssl support, 
prepared Sling runmodes slave and master, improved JMX MBeans. 


Added:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
    
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
    jackrabbit/oak/trunk/oak-tarmk-failover/pom.xml
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
 Thu Sep  4 14:00:27 2014
@@ -63,7 +63,7 @@ import org.slf4j.LoggerFactory;
  */
 @Component(policy = ConfigurationPolicy.REQUIRE)
 public class SegmentNodeStoreService extends ProxyNodeStore
-        implements Observable {
+        implements Observable, SegmentStoreProvider {
 
     @Property(description="The unique name of this instance")
     public static final String NAME = "name";
@@ -83,6 +83,8 @@ public class SegmentNodeStoreService ext
     @Property(description = "TarMK compaction paused flag", boolValue = true)
     public static final String PAUSE_COMPACTION = "pauseCompaction";
 
+    @Property(description = "Flag indicating that this component will not 
register as a NodeStore but just as a NodeStoreProvider", boolValue = false)
+    public static final String STANDBY = "standby";
     /**
      * Boolean value indicating a blobStore is to be used
      */
@@ -104,7 +106,8 @@ public class SegmentNodeStoreService ext
             policy = ReferencePolicy.DYNAMIC)
     private volatile BlobStore blobStore;
 
-    private ServiceRegistration registration;
+    private ServiceRegistration storeRegistration;
+    private ServiceRegistration providerRegistration;
     private Registration revisionGCRegistration;
     private Registration blobGCRegistration;
     private WhiteboardExecutor executor;
@@ -174,7 +177,12 @@ public class SegmentNodeStoreService ext
 
         Dictionary<String, String> props = new Hashtable<String, String>();
         props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
-        registration = 
context.getBundleContext().registerService(NodeStore.class.getName(), this, 
props);
+
+        boolean standby = toBoolean(lookup(context, STANDBY), false);
+        providerRegistration = 
context.getBundleContext().registerService(SegmentStoreProvider.class.getName(),
 this, props);
+        if (!standby) {
+            storeRegistration = 
context.getBundleContext().registerService(NodeStore.class.getName(), this, 
props);
+        }
 
         OsgiWhiteboard whiteboard = new 
OsgiWhiteboard(context.getBundleContext());
         executor = new WhiteboardExecutor();
@@ -240,9 +248,13 @@ public class SegmentNodeStoreService ext
     }
 
     private void unregisterNodeStore() {
-        if(registration != null){
-            registration.unregister();
-            registration = null;
+        if(providerRegistration != null){
+            providerRegistration.unregister();
+            providerRegistration = null;
+        }
+        if(storeRegistration != null){
+            storeRegistration.unregister();
+            storeRegistration = null;
         }
         if (revisionGCRegistration != null) {
             revisionGCRegistration.unregister();

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java?rev=1622479&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStoreProvider.java
 Thu Sep  4 14:00:27 2014
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+public interface SegmentStoreProvider {
+
+    SegmentStore getSegmentStore();
+}

Modified: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
 Thu Sep  4 14:00:27 2014
@@ -276,6 +276,7 @@ public class Main {
         final OptionSpec<String> host = parser.accepts("host", "master 
host").withRequiredArg().ofType(String.class).defaultsTo(defaultHost);
         final OptionSpec<Integer> port = parser.accepts("port", "master 
port").withRequiredArg().ofType(Integer.class).defaultsTo(defaultPort);
         final OptionSpec<Integer> interval = parser.accepts("interval", 
"interval between successive 
executions").withRequiredArg().ofType(Integer.class);
+        final OptionSpec<Boolean> secure = parser.accepts("secure", "use 
secure connections").withRequiredArg().ofType(Boolean.class);
         final OptionSpec<?> help = parser.acceptsAll(asList("h", "?", "help"), 
"show help").forHelp();
         final OptionSpec<String> nonOption = parser.nonOptions(Mode.SYNCSLAVE 
+ " <path to repository>");
 
@@ -300,7 +301,8 @@ public class Main {
             failoverClient = new FailoverClient(
                     options.has(host)? options.valueOf(host) : defaultHost,
                     options.has(port)? options.valueOf(port) : defaultPort,
-                    store);
+                    store,
+                    options.has(secure) && options.valueOf(secure));
             if (!options.has(interval)) {
                 failoverClient.run();
             } else {

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
 Thu Sep  4 14:00:27 2014
@@ -1 +1,2 @@
-mode=master
+mode=master
+master.allowed-client-ip-ranges=localhost,127.0.0.1,::1,192.168.0.0-192.168.255.255
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/pom.xml
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/pom.xml?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/pom.xml Thu Sep  4 14:00:27 2014
@@ -28,6 +28,7 @@
   <description>Oak TarMK failover module</description>
 
   <properties>
+      <netty-version>4.0.23.Final</netty-version>
   </properties>
 
   <build>
@@ -137,31 +138,31 @@
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-common</artifactId>
-      <version>4.0.20.Final</version>
+      <version>${netty-version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-buffer</artifactId>
-      <version>4.0.20.Final</version>
+      <version>${netty-version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-transport</artifactId>
-      <version>4.0.20.Final</version>
+      <version>${netty-version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-codec</artifactId>
-      <version>4.0.20.Final</version>
+      <version>${netty-version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-handler</artifactId>
-      <version>4.0.20.Final</version>
+      <version>${netty-version}</version>
       <scope>provided</scope>
     </dependency>
 
@@ -184,6 +185,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>ch.qos.logback</groupId>
       <artifactId>logback-classic</artifactId>
       <scope>test</scope>

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/CommunicationObserver.java
 Thu Sep  4 14:00:27 2014
@@ -37,6 +37,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class CommunicationObserver {
+    private static final int MAX_CLIENT_STATISTICS = 10;
 
     private class CommunicationPartnerMBean implements ObservablePartnerMBean {
         private final ObjectName mbeanName;
@@ -45,6 +46,8 @@ public class CommunicationObserver {
         public Date lastSeen;
         public String remoteAddress;
         public int remotePort;
+        public long segmentsSent;
+        public long segmentBytesSent;
 
         public CommunicationPartnerMBean(String clientName) throws 
MalformedObjectNameException {
             this.clientName = clientName;
@@ -79,6 +82,16 @@ public class CommunicationObserver {
         public String getLastSeenTimestamp() {
             return this.lastSeen == null ? null : this.lastSeen.toString();
         }
+
+        @Override
+        public long getTransferredSegments() {
+            return this.segmentsSent;
+        }
+
+        @Override
+        public long getTransferredSegmentBytes() {
+            return this.segmentBytesSent;
+        }
     }
 
     private static final Logger log = LoggerFactory
@@ -92,22 +105,28 @@ public class CommunicationObserver {
         this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
     }
 
-    public void unregister() {
+    private void unregister(CommunicationPartnerMBean m) {
         final MBeanServer jmxServer = 
ManagementFactory.getPlatformMBeanServer();
+        try {
+            jmxServer.unregisterMBean(m.getMBeanName());
+        }
+        catch (Exception e) {
+            log.error("error unregistering mbean for client '" + m.getName() + 
"'", e);
+        }
+    }
+
+    public void unregister() {
         for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
-            try {
-                jmxServer.unregisterMBean(m.getMBeanName());
-            }
-            catch (Exception e) {
-                log.error("error unregistering mbean for client '" + 
m.getName() + "'", e);
-            }
+            unregister(m);
         }
     }
 
     public void gotMessageFrom(String client, String request, 
InetSocketAddress remote) throws MalformedObjectNameException {
+        log.debug("got message '" + request + "' from client " + client);
         CommunicationPartnerMBean m = this.partnerDetails.get(client);
         boolean register = false;
         if (m == null) {
+            cleanUp();
             m = new CommunicationPartnerMBean(client);
             m.remoteAddress = remote.getAddress().getHostAddress();
             m.remotePort = remote.getPort();
@@ -127,7 +146,35 @@ public class CommunicationObserver {
         }
     }
 
+    public void didSendSegmentBytes(String client, int size) {
+        log.debug("did send segment with " + size + " bytes to client " + 
client);
+        CommunicationPartnerMBean m = this.partnerDetails.get(client);
+        m.segmentsSent++;
+        m.segmentBytesSent += size;
+        this.partnerDetails.put(client, m);
+    }
+
     public String getID() {
         return this.identifier;
     }
+
+    // helper
+
+    private void cleanUp() {
+        while (this.partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
+            CommunicationPartnerMBean oldestEntry = oldestEntry();
+            if (oldestEntry == null) return;
+            log.info("housekeeping: removing statistics for " + 
oldestEntry.getName());
+            unregister(oldestEntry);
+            this.partnerDetails.remove(oldestEntry.getName());
+        }
+    }
+
+    private CommunicationPartnerMBean oldestEntry() {
+        CommunicationPartnerMBean ret = null;
+        for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
+            if (ret == null || ret.lastSeen.after(m.lastSeen)) ret = m;
+        }
+        return ret;
+    }
 }

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
 Thu Sep  4 14:00:27 2014
@@ -29,6 +29,8 @@ import io.netty.channel.socket.SocketCha
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.compression.SnappyFramedDecoder;
 import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.CharsetUtil;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
@@ -50,6 +52,7 @@ import org.slf4j.LoggerFactory;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
+import javax.net.ssl.SSLException;
 
 public final class FailoverClient implements FailoverStatusMBean, Runnable, 
Closeable {
     public static final String CLIENT_ID_PROPERTY_NAME = "failOverID";
@@ -59,6 +62,7 @@ public final class FailoverClient implem
 
     private final String host;
     private final int port;
+    private final boolean checkChecksums;
     private int readTimeoutMs = 10000;
 
     private final FailoverStore store;
@@ -66,13 +70,26 @@ public final class FailoverClient implem
     private FailoverClientHandler handler;
     private EventLoopGroup group;
     private EventExecutorGroup executor;
+    private SslContext sslContext;
     private boolean running;
     private String state;
 
-    public FailoverClient(String host, int port, SegmentStore store) {
+    public FailoverClient(String host, int port, SegmentStore store) throws 
SSLException {
+        this(host, port, store, false, true);
+    }
+
+    public FailoverClient(String host, int port, SegmentStore store, boolean 
secure) throws SSLException {
+        this(host, port, store, secure, true);
+    }
+
+    public FailoverClient(String host, int port, SegmentStore store, boolean 
secure, boolean checksums) throws SSLException {
         this.state = STATUS_INITIALIZING;
         this.host = host;
         this.port = port;
+        this.checkChecksums = checksums;
+        if (secure) {
+            this.sslContext = 
SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE);
+        }
         this.store = new FailoverStore(store);
         String s = System.getProperty(CLIENT_ID_PROPERTY_NAME);
         this.observer = new CommunicationObserver((s == null || s.length() == 
0) ? UUID.randomUUID().toString() : s);
@@ -130,11 +147,16 @@ public final class FailoverClient implem
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
+                if (sslContext != null) {
+                    p.addLast(sslContext.newHandler(ch.alloc()));
+                }
                 // WriteTimeoutHandler & ReadTimeoutHandler
                 p.addLast("readTimeoutHandler", new ReadTimeoutHandler(
                         readTimeoutMs, TimeUnit.MILLISECONDS));
                 p.addLast(new StringEncoder(CharsetUtil.UTF_8));
-                p.addLast(new SnappyFramedDecoder(true));
+                if (FailoverClient.this.checkChecksums) {
+                    p.addLast(new SnappyFramedDecoder(true));
+                }
                 p.addLast(new RecordIdDecoder(store));
                 p.addLast(executor, handler);
             }

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
 Thu Sep  4 14:00:27 2014
@@ -19,16 +19,13 @@
 package org.apache.jackrabbit.oak.plugins.segment.failover.client;
 
 import static 
org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetHeadReq;
+
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
 
 import java.io.Closeable;
-import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
@@ -53,8 +50,6 @@ public class FailoverClientHandler exten
 
     private ChannelHandlerContext ctx;
 
-    private Promise<RecordId> headPromise;
-
     public FailoverClientHandler(final FailoverStore store,
             EventExecutorGroup executor, CommunicationObserver observer) {
         this.store = store;
@@ -63,15 +58,17 @@ public class FailoverClientHandler exten
     }
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) {
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
         this.ctx = ctx;
-        sendHeadRequest();
+        log.debug("sending head request");
+        ctx.writeAndFlush(newGetHeadReq(this.observer.getID()));
+        log.debug("did send head request");
     }
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, RecordId msg)
             throws Exception {
-        headPromise.setSuccess(msg);
+        setHead(msg);
     };
 
     @Override
@@ -79,28 +76,7 @@ public class FailoverClientHandler exten
         ctx.flush();
     }
 
-    private synchronized void sendHeadRequest() {
-        headPromise = ctx.executor().newPromise();
-        headPromise.addListener(new GenericFutureListener<Future<RecordId>>() {
-            @Override
-            public void operationComplete(Future<RecordId> future) {
-                if (future.isSuccess()) {
-                    try {
-                        setHead(future.get());
-                    } catch (Exception e) {
-                        exceptionCaught(ctx, e);
-                    }
-                } else {
-                    exceptionCaught(ctx, future.cause());
-                }
-            }
-        });
-        ctx.writeAndFlush(newGetHeadReq(this.observer.getID())).addListener(
-                new FailedRequestListener(headPromise));
-    }
-
     synchronized void setHead(RecordId head) {
-        headPromise = null;
 
         if (store.getHead().getRecordId().equals(head)) {
             // all sync'ed up
@@ -108,6 +84,8 @@ public class FailoverClientHandler exten
             ctx.close();
             return;
         }
+
+        log.debug("updating current head to " + head);
         ctx.pipeline().remove(RecordIdDecoder.class);
         ctx.pipeline().remove(this);
         ctx.pipeline().addLast(new SegmentDecoder(store));
@@ -123,12 +101,7 @@ public class FailoverClientHandler exten
 
         h1.channelActive(ctx);
         h2.channelActive(ctx);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        log.error("Failed synchronizing state.", cause);
-        close();
+        log.debug("updating current head finished");
     }
 
     @Override

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
 Thu Sep  4 14:00:27 2014
@@ -80,12 +80,13 @@ public class SegmentLoaderHandler extend
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
             throws Exception {
         if (evt instanceof SegmentReply) {
+            //log.debug("offering segment " + ((SegmentReply) 
evt).getSegment());
             segment.offer(((SegmentReply) evt).getSegment());
         }
     }
 
     private void initSync() {
-        log.debug("new head id " + head);
+        log.info("new head id " + head);
         long t = System.currentTimeMillis();
 
         try {
@@ -102,30 +103,33 @@ public class SegmentLoaderHandler extend
         } finally {
             close();
         }
+        log.info("returning initSync");
     }
 
     @Override
     public Segment readSegment(final SegmentId id) {
-        ctx.writeAndFlush(newGetSegmentReq(this.clientID, 
id)).addListener(reqListener);
+        ctx.writeAndFlush(newGetSegmentReq(this.clientID, id));
         return getSegment();
     }
 
-    private final ChannelFutureListener reqListener = new 
ChannelFutureListener() {
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+            throws Exception {
+        log.warn("Closing channel. Got exception: " + cause);
+        ctx.close();
+    }
 
-        @Override
-        public void operationComplete(ChannelFuture future) {
-            if (!future.isSuccess()) {
-                exceptionCaught(ctx, future.cause());
-            }
-        }
-    };
+    // implementation of RemoteSegmentLoader
 
     public Segment getSegment() {
         boolean interrupted = false;
         try {
             for (;;) {
                 try {
-                    return segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
+                    log.debug("polling segment");
+                    Segment s = segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
+                    log.debug("returning segment " + s);
+                    return s;
                 } catch (InterruptedException ignore) {
                     interrupted = true;
                 }
@@ -135,15 +139,9 @@ public class SegmentLoaderHandler extend
                 Thread.currentThread().interrupt();
             }
         }
-    }
 
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        log.error("Failed synchronizing state.", cause);
-        close();
     }
 
-    @Override
     public void close() {
         ctx.close();
         if (preloaderExecutor != null && !preloaderExecutor.isShuttingDown()) {
@@ -156,7 +154,6 @@ public class SegmentLoaderHandler extend
         }
     }
 
-    @Override
     public boolean isClosed() {
         return (loaderExecutor != null && (loaderExecutor.isShuttingDown() || 
loaderExecutor
                 .isShutdown()));

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java
 Thu Sep  4 14:00:27 2014
@@ -23,13 +23,18 @@ import io.netty.channel.SimpleChannelInb
 
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SegmentPreLoaderHandler extends
         SimpleChannelInboundHandler<Segment> {
+    private static final Logger log = LoggerFactory
+            .getLogger(SegmentPreLoaderHandler.class);
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, Segment msg)
             throws Exception {
+        log.info("fire new segment reply for " + msg.getSegmentId());
         ctx.fireUserEventTriggered(new SegmentReply(msg));
     }
 

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java
 Thu Sep  4 14:00:27 2014
@@ -19,6 +19,8 @@
 
 package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
 
+import java.io.IOException;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -46,7 +48,7 @@ public class RecordIdDecoder extends Len
             throws Exception {
         ByteBuf frame = (ByteBuf) super.decode(ctx, in);
         if (frame == null) {
-            return null;
+            throw new IOException("Received unexpected empty frame. Maybe you 
have enabled secure transmission on only one endpoint of the connection.");
         }
         byte type = frame.readByte();
         frame.discardReadBytes();

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/jmx/ObservablePartnerMBean.java
 Thu Sep  4 14:00:27 2014
@@ -39,4 +39,10 @@ public interface ObservablePartnerMBean 
     @CheckForNull
     @Description("Time the remote instance was last contacted")
     String getLastSeenTimestamp();
+
+    @Description("Number of transferred segments")
+    long getTransferredSegments();
+
+    @Description("Number of bytes stored in transferred segments")
+    long getTransferredSegmentBytes();
 }

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
 Thu Sep  4 14:00:27 2014
@@ -30,10 +30,13 @@ import io.netty.channel.socket.nio.NioSe
 import io.netty.handler.codec.LineBasedFrameDecoder;
 import io.netty.handler.codec.compression.SnappyFramedEncoder;
 import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
 import io.netty.util.CharsetUtil;
 
 import java.io.Closeable;
 import java.lang.management.ManagementFactory;
+import java.security.cert.CertificateException;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.util.concurrent.Future;
@@ -48,6 +51,7 @@ import org.slf4j.LoggerFactory;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
+import javax.net.ssl.SSLException;
 
 public class FailoverServer implements FailoverStatusMBean, Closeable {
 
@@ -61,16 +65,44 @@ public class FailoverServer implements F
     private final ServerBootstrap b;
     private final CommunicationObserver observer;
     private final FailoverServerHandler handler;
+    private SslContext sslContext;
     private ChannelFuture channelFuture;
     private boolean running;
 
-    public FailoverServer(int port, final SegmentStore store) {
-        this(port, store, null);
+    public FailoverServer(int port, final SegmentStore store)
+            throws CertificateException, SSLException {
+        this(port, store, null, false, true);
     }
 
-    public FailoverServer(int port, final SegmentStore store, String[] 
allowedClientIPRanges) {
+    public FailoverServer(int port, final SegmentStore store, boolean secure)
+            throws CertificateException, SSLException {
+        this(port, store, null, secure, true);
+    }
+
+    public FailoverServer(int port, final SegmentStore store, boolean secure, 
boolean useChecksums)
+            throws CertificateException, SSLException {
+        this(port, store, null, secure, useChecksums);
+    }
+
+    public FailoverServer(int port, final SegmentStore store, String[] 
allowedClientIPRanges)
+            throws CertificateException, SSLException {
+        this(port, store, allowedClientIPRanges, false, true);
+    }
+
+    public FailoverServer(int port, final SegmentStore store, String[] 
allowedClientIPRanges, boolean secure)
+            throws CertificateException, SSLException {
+        this(port, store, allowedClientIPRanges, secure, true);
+    }
+
+    public FailoverServer(int port, final SegmentStore store, String[] 
allowedClientIPRanges, boolean secure, final boolean checksums)
+            throws CertificateException, SSLException {
         this.port = port;
 
+        if (secure) {
+            SelfSignedCertificate ssc = new SelfSignedCertificate();
+            sslContext = SslContext.newServerContext(ssc.certificate(), 
ssc.privateKey());
+        }
+
         observer = new CommunicationObserver("master");
         handler = new FailoverServerHandler(store, observer, 
allowedClientIPRanges);
         bossGroup = new NioEventLoopGroup(1);
@@ -81,7 +113,7 @@ public class FailoverServer implements F
             jmxServer.registerMBean(new StandardMBean(this, 
FailoverStatusMBean.class), new ObjectName(this.getMBeanName()));
         }
         catch (Exception e) {
-            log.error("can register failover status mbean", e);
+            log.error("can't register failover status mbean", e);
         }
 
         b = new ServerBootstrap();
@@ -98,9 +130,14 @@ public class FailoverServer implements F
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
+                if (sslContext != null) {
+                    p.addLast(sslContext.newHandler(ch.alloc()));
+                }
                 p.addLast(new LineBasedFrameDecoder(8192));
                 p.addLast(new StringDecoder(CharsetUtil.UTF_8));
-                p.addLast(new SnappyFramedEncoder());
+                if (checksums) {
+                    p.addLast(new SnappyFramedEncoder());
+                }
                 p.addLast(new RecordIdEncoder());
                 p.addLast(new SegmentEncoder());
                 p.addLast(handler);

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java
 Thu Sep  4 14:00:27 2014
@@ -101,25 +101,25 @@ public class FailoverServerHandler exten
 
     @Override
     public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) 
throws java.lang.Exception {
-        state = "Channel registered";
+        state = "channel registered";
         super.channelRegistered(ctx);
     }
 
     @Override
     public void channelActive(io.netty.channel.ChannelHandlerContext ctx) 
throws java.lang.Exception {
-        state = "Channel active";
+        state = "channel active";
         super.channelActive(ctx);
     }
 
     @Override
     public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) 
throws java.lang.Exception {
-        state = "Channel inactive";
+        state = "channel inactive";
         super.channelInactive(ctx);
     }
 
     @Override
     public void channelUnregistered(io.netty.channel.ChannelHandlerContext 
ctx) throws java.lang.Exception {
-        state = "Channel unregistered";
+        state = "channel unregistered";
         super.channelUnregistered(ctx);
     }
 
@@ -130,15 +130,18 @@ public class FailoverServerHandler exten
 
         String request = Messages.extractMessageFrom(payload);
         InetSocketAddress client = 
(InetSocketAddress)ctx.channel().remoteAddress();
+
         if (!clientAllowed(client)) {
             log.warn("Got request from client " + client + " which is not in 
the allowed ip ranges! Request will be ignored.");
         }
         else {
-            observer.gotMessageFrom(Messages.extractClientFrom(payload), 
request, client);
+            String clientID = Messages.extractClientFrom(payload);
+            observer.gotMessageFrom(clientID, request, client);
             if (Messages.GET_HEAD.equalsIgnoreCase(request)) {
                 RecordId r = headId();
                 if (r != null) {
                     ctx.writeAndFlush(r);
+                    log.debug("returning from head request");
                     return;
                 }
             } else if (request.startsWith(Messages.GET_SEGMENT)) {
@@ -148,24 +151,24 @@ public class FailoverServerHandler exten
 
                 Segment s = null;
 
-                for (int i = 0; i < 3; i++) {
+                for (int i = 0; i < 10; i++) {
                     try {
                         s = store.readSegment(new SegmentId(store.getTracker(),
                                 uuid.getMostSignificantBits(), uuid
                                 .getLeastSignificantBits()));
                     } catch (IllegalStateException e) {
                         // segment not found
-                        log.warn(e.getMessage());
-                    }
-                    if (s != null) {
-                        break;
-                    } else {
-                        TimeUnit.MILLISECONDS.sleep(500);
+                        log.info("waiting for segment. Got exception: " + 
e.getMessage());
+                        TimeUnit.MILLISECONDS.sleep(1000);
                     }
+                    if (s != null) break;
                 }
 
                 if (s != null) {
+                    log.info("sending segment" + sid + " to " + client);
                     ctx.writeAndFlush(s);
+                    observer.didSendSegmentBytes(clientID, s.size());
+                    log.debug("master returns from segment request");
                     return;
                 }
             } else {
@@ -184,6 +187,6 @@ public class FailoverServerHandler exten
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         state = "exception occurred: " + cause.getMessage();
         log.error(cause.getMessage(), cause);
-        ctx.close();
+        ctx.fireExceptionCaught(cause);
     }
 }

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java
 Thu Sep  4 14:00:27 2014
@@ -29,9 +29,13 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
+import 
org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class FailoverStore implements SegmentStore {
+    private static final Logger log = 
LoggerFactory.getLogger(FailoverStore.class);
 
     private final SegmentTracker tracker = new SegmentTracker(this);
 
@@ -65,7 +69,7 @@ public class FailoverStore implements Se
 
     @Override
     public Segment readSegment(SegmentId sid) {
-
+        log.info("shall read segment " + sid);
         Deque<SegmentId> ids = new ArrayDeque<SegmentId>();
         ids.offer(sid);
         int err = 0;
@@ -74,8 +78,10 @@ public class FailoverStore implements Se
         while (!ids.isEmpty()) {
             SegmentId id = ids.remove();
             if (!seen.contains(id) && !delegate.containsSegment(id)) {
+                log.debug("trying to read segment " + id);
                 Segment s = loader.readSegment(id);
                 if (s != null) {
+                    log.info("got segment " + id + " with size " + s.size());
                     ByteArrayOutputStream bout = new ByteArrayOutputStream(
                             s.size());
                     if (id.isDataSegmentId()) {
@@ -92,6 +98,7 @@ public class FailoverStore implements Se
                     ids.removeAll(seen);
                     err = 0;
                 } else {
+                    log.error("could NOT read segment " + id);
                     if (loader.isClosed() || err == 4) {
                         loader.close();
                         throw new IllegalStateException(
@@ -105,6 +112,7 @@ public class FailoverStore implements Se
             }
         }
 
+        log.info("calling delegate to return segment " + sid);
         return delegate.readSegment(sid);
     }
 

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
 Thu Sep  4 14:00:27 2014
@@ -21,9 +21,12 @@ import static org.apache.felix.scr.annot
 import static org.apache.felix.scr.annotations.ReferencePolicyOption.GREEDY;
 
 import java.io.IOException;
+import java.security.cert.CertificateException;
 import java.util.Dictionary;
 import java.util.Hashtable;
 
+import javax.net.ssl.SSLException;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.ConfigurationPolicy;
@@ -32,7 +35,7 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.PropertyOption;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStoreService;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStoreProvider;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import 
org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
 import 
org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
@@ -73,8 +76,13 @@ public class FailoverStoreService {
     @Property(label = "Client allowed IP-Ranges", description = "accept 
incoming requests for these IP-ranges only")
     public static final String ALLOWED_CLIENT_IP_RANGES = 
"master.allowed-client-ip-ranges";
 
+    public static final boolean SECURE_DEFAULT = false;
+    @Property(label = "Secure", description = "Use secure connections", 
boolValue = SECURE_DEFAULT)
+    public static final String SECURE = "secure";
+
     @Reference(policy = STATIC, policyOption = GREEDY)
-    private NodeStore store = null;
+    private SegmentStoreProvider storeProvider = null;
+
     private SegmentStore segmentStore;
 
     private FailoverServer master = null;
@@ -83,13 +91,12 @@ public class FailoverStoreService {
     private ServiceRegistration syncReg = null;
 
     @Activate
-    private void activate(ComponentContext context) throws IOException {
-        if (store instanceof SegmentNodeStoreService) {
-            segmentStore = ((SegmentNodeStoreService) store).getSegmentStore();
+    private void activate(ComponentContext context) throws IOException, 
CertificateException {
+        if (storeProvider != null) {
+            segmentStore = storeProvider.getSegmentStore();
         } else {
             throw new IllegalArgumentException(
-                    "Unexpected NodeStore impl, expecting 
SegmentNodeStoreService, got "
-                            + store.getClass());
+                    "Missing SegmentStoreProvider service");
         }
         String mode = valueOf(context.getProperties().get(MODE));
         if (MODE_MASTER.equals(mode)) {
@@ -116,23 +123,25 @@ public class FailoverStoreService {
         }
     }
 
-    private void bootstrapMaster(ComponentContext context) {
+    private void bootstrapMaster(ComponentContext context) throws 
CertificateException, SSLException {
         Dictionary<?, ?> props = context.getProperties();
         int port = PropertiesUtil.toInteger(props.get(PORT), PORT_DEFAULT);
         String ipRanges = 
PropertiesUtil.toString(props.get(ALLOWED_CLIENT_IP_RANGES), 
ALLOWED_CLIENT_IP_RANGES_DEFAULT);
         String[] ranges = ipRanges == null ? null : ipRanges.split(",");
-        master = new FailoverServer(port, segmentStore, ranges);
+        boolean secure = PropertiesUtil.toBoolean(props.get(SECURE), 
SECURE_DEFAULT);
+        master = new FailoverServer(port, segmentStore, ranges, secure);
         master.start();
         log.info("started failover master on port {} with allowed ip ranges 
{}.", port, ipRanges);
     }
 
-    private void bootstrapSlave(ComponentContext context) {
+    private void bootstrapSlave(ComponentContext context) throws SSLException {
         Dictionary<?, ?> props = context.getProperties();
         int port = PropertiesUtil.toInteger(props.get(PORT), PORT_DEFAULT);
         long interval = PropertiesUtil.toInteger(props.get(INTERVAL), 
INTERVAL_DEFAULT);
         String host = PropertiesUtil.toString(props.get(MASTER_HOST), 
MASTER_HOST_DEFAULT);
+        boolean secure = PropertiesUtil.toBoolean(props.get(SECURE), 
SECURE_DEFAULT);
 
-        sync = new FailoverClient(host, port, segmentStore);
+        sync = new FailoverClient(host, port, segmentStore, secure);
         Dictionary<Object, Object> dictionary = new Hashtable<Object, 
Object>();
         dictionary.put("scheduler.period", interval);
         dictionary.put("scheduler.concurrent", false);

Added: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java?rev=1622479&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
 Thu Sep  4 14:00:27 2014
@@ -0,0 +1,231 @@
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class NetworkErrorProxy {
+    private static final Logger log = LoggerFactory
+            .getLogger(NetworkErrorProxy.class);
+
+    private final int inboundPort;
+    private final int outboundPort;
+    private final String host;
+    private ChannelFuture f;
+
+    private ForwardHandler fh;
+
+    EventLoopGroup bossGroup = new NioEventLoopGroup();
+    EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    public NetworkErrorProxy(int inboundPort, String outboundHost, int 
outboundPort) {
+        this.inboundPort = inboundPort;
+        this.outboundPort = outboundPort;
+        this.host = outboundHost;
+        this.fh = new ForwardHandler(NetworkErrorProxy.this.host, 
NetworkErrorProxy.this.outboundPort);
+    }
+
+    public void skipBytes(int pos, int n) {
+        this.fh.skipPosition = pos;
+        this.fh.skipBytes = n;
+    }
+
+    public void run() throws Exception {
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup, workerGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+                        @Override
+                        public void initChannel(SocketChannel ch) throws 
Exception {
+                            ch.pipeline().addLast(NetworkErrorProxy.this.fh);
+                        }
+                    });
+
+            f = b.bind(this.inboundPort).sync();
+        } catch (Exception e) {
+            log.warn("exception occurred", e);
+        }
+    }
+
+    public void reset() throws Exception {
+        f.channel().disconnect();
+        this.fh = new ForwardHandler(NetworkErrorProxy.this.host, 
NetworkErrorProxy.this.outboundPort);
+        run();
+    }
+
+    public void close() {
+        f.channel().close();
+        if (bossGroup != null && !bossGroup.isShuttingDown()) {
+            bossGroup.shutdownGracefully(1, 2, 
TimeUnit.SECONDS).syncUninterruptibly();
+        }
+        if (workerGroup != null && !workerGroup.isShuttingDown()) {
+            workerGroup.shutdownGracefully(1, 2, 
TimeUnit.SECONDS).syncUninterruptibly();
+        }
+    }
+}
+
+class ForwardHandler extends ChannelInboundHandlerAdapter {
+    private final String targetHost;
+    private final int targetPort;
+    public long transferredBytes;
+    public int skipPosition;
+    public int skipBytes;
+    private ChannelFuture remote;
+
+    public ForwardHandler(String host, int port) {
+        this.targetHost = host;
+        this.targetPort = port;
+    }
+
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+        final ChannelHandlerContext c = ctx;
+        EventLoopGroup group = new NioEventLoopGroup();
+        Bootstrap cb = new Bootstrap();
+        cb.group(group);
+        cb.channel(NioSocketChannel.class);
+
+        cb.handler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            public void initChannel(SocketChannel ch) throws Exception {
+                ch.pipeline().addFirst(new SwallowingHandler(c, 
ForwardHandler.this.skipPosition, ForwardHandler.this.skipBytes));
+            }
+        });
+        remote = cb.connect(this.targetHost, this.targetPort).sync();
+
+        ctx.fireChannelRegistered();
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws 
Exception {
+        remote.channel().close();
+        remote = null;
+        ctx.fireChannelUnregistered();
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        if (msg instanceof ByteBuf) {
+            ByteBuf bb = (ByteBuf)msg;
+            this.transferredBytes += (bb.writerIndex() - bb.readerIndex());
+        }
+        remote.channel().write(msg);
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws 
Exception {
+        remote.channel().flush();
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        cause.printStackTrace();
+        ctx.close();
+    }
+}
+
+class SendBackHandler implements ChannelInboundHandler {
+    private final ChannelHandlerContext target;
+    public long transferredBytes;
+
+    public SendBackHandler(ChannelHandlerContext ctx) {
+        this.target = ctx;
+    }
+
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws 
Exception {
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    }
+
+    public int messageSize(Object msg) {
+        if (msg instanceof ByteBuf) {
+            ByteBuf bb = (ByteBuf)msg;
+            return (bb.writerIndex() - bb.readerIndex());
+        }
+        // unknown
+        return 0;
+    }
+
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        this.transferredBytes += messageSize(msg);
+        this.target.write(msg);
+    }
+
+    public void channelReadComplete(ChannelHandlerContext ctx) throws 
Exception {
+        this.target.flush();
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
+    }
+
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+    }
+
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+        cause.printStackTrace();
+        this.target.close();
+    }
+
+}
+
+class SwallowingHandler extends SendBackHandler {
+    private int skipStartingPos;
+    private int nrOfBytes;
+
+    public SwallowingHandler(ChannelHandlerContext ctx, int skipStartingPos, 
int numberOfBytes) {
+        super(ctx);
+        this.skipStartingPos = skipStartingPos;
+        this.nrOfBytes = numberOfBytes;
+    }
+
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        if (msg instanceof ByteBuf) {
+            ByteBuf bb = (ByteBuf)msg;
+            if (this.nrOfBytes > 0) {
+                if (this.transferredBytes >= this.skipStartingPos) {
+                    bb.skipBytes(this.nrOfBytes);
+                    this.nrOfBytes = 0;
+                }
+                else {
+                    this.skipStartingPos -= messageSize(msg);
+                }
+            }
+        }
+        super.channelRead(ctx, msg);
+    }
+
+}
+

Added: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java?rev=1622479&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
 Thu Sep  4 14:00:27 2014
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+import org.apache.jackrabbit.oak.plugins.segment.NetworkErrorProxy;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import 
org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import 
org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static 
org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class BrokenNetworkTest extends TestBase {
+    final static int PROXY_PORT = 4711;
+
+    @Before
+    public void setUp() throws Exception {
+        setUpServerAndTwoClients();
+    }
+
+    @After
+    public void after() {
+        closeServerAndTwoClients();
+    }
+
+    @Test
+    public void testProxy() throws Exception {
+        useProxy(false);
+    }
+
+    @Test
+    public void testProxySSL() throws Exception {
+        useProxy(true);
+    }
+
+    @Test
+    public void testProxySkippedBytes() throws Exception {
+        useProxy(false, 100, 1, false);
+    }
+
+    @Test
+    public void testProxySSLSkippedBytes() throws Exception {
+        useProxy(true, 400, 10, false);
+    }
+
+    @Test
+    public void testProxySkippedBytesIntermediateChange() throws Exception {
+        useProxy(false, 100, 1, true);
+    }
+
+    @Test
+    public void testProxySSLSkippedBytesIntermediateChange() throws Exception {
+        useProxy(true, 400, 10, true);
+    }
+
+    // private helper
+
+    private void useProxy(boolean ssl) throws Exception {
+        useProxy(ssl, 0, 0, false);
+    }
+
+    private void useProxy(boolean ssl, int skipPosition, int skipBytes, 
boolean intermediateChange) throws Exception {
+        NetworkErrorProxy p = new NetworkErrorProxy(PROXY_PORT, LOCALHOST, 
port);
+        p.skipBytes(skipPosition, skipBytes);
+        p.run();
+
+        NodeStore store = new SegmentNodeStore(storeS);
+        final FailoverServer server = new FailoverServer(port, storeS, ssl);
+        server.start();
+        addTestContent(store, "server");
+        storeS.flush();  // this speeds up the test a little bit...
+
+        FailoverClient cl = new FailoverClient(LOCALHOST, PROXY_PORT, storeC, 
ssl);
+        cl.run();
+
+        try {
+            if (skipBytes > 0) {
+                assertFalse("stores are not expected to be equal", 
storeS.getHead().equals(storeC.getHead()));
+                assertEquals(storeC2.getHead(), storeC.getHead());
+
+                p.reset();
+                if (intermediateChange) {
+                    addTestContent(store, "server2");
+                    storeS.flush();
+                }
+                cl.run();
+            }
+            assertEquals(storeS.getHead(), storeC.getHead());
+        } finally {
+            server.close();
+            cl.close();
+            p.close();
+        }
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java?rev=1622479&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
 Thu Sep  4 14:00:27 2014
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+import junit.framework.Assert;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import 
org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import 
org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
+import 
org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Set;
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class BulkTest extends TestBase {
+
+    @Before
+    public void setUp() throws Exception {
+        setUpServerAndClient();
+    }
+
+    @After
+    public void after() {
+        closeServerAndClient();
+    }
+
+    @Test
+    public void test100Nodes() throws Exception {
+        test(100, 1, 1, 3000, 3100);
+    }
+
+    @Test
+    public void test1000Nodes() throws Exception {
+        test(1000, 1, 1, 53000, 55000);
+    }
+
+    @Test
+    public void test10000Nodes() throws Exception {
+        test(10000, 1, 1, 245000, 246000);
+    }
+
+    @Test
+    public void test100000Nodes() throws Exception {
+        test(100000, 9, 9, 2210000, 2220000);
+    }
+
+    @Test
+    public void test1MillionNodes() throws Exception {
+        test(1000000, 87, 87, 22700000, 22800000);
+    }
+
+    @Test
+    public void test1MillionNodesNoChecksum() throws Exception {
+        test(1000000, 87, 87, 22700000, 22800000, false, false);
+    }
+
+    @Test
+    public void test1MillionNodesUsingSSL() throws Exception {
+        test(1000000, 87, 87, 22700000, 22800000, true, true);
+    }
+
+    @Test
+    public void test1MillionNodesUsingSSLNoChecksum() throws Exception {
+        test(1000000, 87, 87, 22700000, 22800000, true, false);
+    }
+/*
+    @Test
+    public void test10MillionNodes() throws Exception {
+        test(10000000, 856, 856, 223000000, 224000000);
+    }
+*/
+
+    // private helper
+
+    private void test(int number, int minExpectedSegments, int 
maxExpectedSegments, long minExpectedBytes, long maxExpectedBytes) throws 
Exception {
+        test(number, minExpectedSegments, maxExpectedSegments, 
minExpectedBytes, maxExpectedBytes, false, true);
+    }
+
+    private void test(int number, int minExpectedSegments, int 
maxExpectedSegments, long minExpectedBytes, long maxExpectedBytes,
+                      boolean useSSL, boolean useChecksum) throws Exception {
+        NodeStore store = new SegmentNodeStore(storeS);
+        NodeBuilder rootbuilder = store.getRoot().builder();
+        NodeBuilder b = rootbuilder.child("store");
+        for (int j=0; j<=number / 1000; j++) {
+            NodeBuilder builder = b.child("Folder#" + j);
+            for (int i = 0; i <(number < 1000 ? number : 1000); i++) {
+                builder.child("Test#" + i).setProperty("ts", 
System.currentTimeMillis());
+            }
+        }
+        store.merge(rootbuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        storeS.flush();
+
+        final FailoverServer server = new FailoverServer(port, storeS, useSSL, 
useChecksum);
+        server.start();
+
+        System.setProperty(FailoverClient.CLIENT_ID_PROPERTY_NAME, "Bar");
+        FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC, 
useSSL, useChecksum);
+
+        final MBeanServer jmxServer = 
ManagementFactory.getPlatformMBeanServer();
+        ObjectName status = new ObjectName(FailoverStatusMBean.JMX_NAME + 
",id=*");
+        ObjectName clientStatus = new ObjectName(cl.getMBeanName());
+        ObjectName serverStatus = new ObjectName(server.getMBeanName());
+
+        long start = System.currentTimeMillis();
+        cl.run();
+
+        try {
+            Set<ObjectName> instances = jmxServer.queryNames(status, null);
+            assertEquals(3, instances.size());
+
+            ObjectName connectionStatus = null;
+            for (ObjectName s : instances) {
+                if (!s.equals(clientStatus) && !s.equals(serverStatus)) 
connectionStatus = s;
+            }
+            assertNotNull(connectionStatus);
+
+            long segments = ((Long)jmxServer.getAttribute(connectionStatus, 
"TransferredSegments")).longValue();
+            long bytes = ((Long)jmxServer.getAttribute(connectionStatus, 
"TransferredSegmentBytes")).longValue();
+
+            System.out.println("did transfer " + segments + " segments with " 
+ bytes + " bytes in " + (System.currentTimeMillis() - start) / 1000 + " 
seconds.");
+
+            assertEquals(storeS.getHead(), storeC.getHead());
+
+            //compare(segments, "segment", minExpectedSegments, 
maxExpectedSegments);
+            //compare(bytes, "byte", minExpectedBytes, maxExpectedBytes);
+
+        } finally {
+            server.close();
+            cl.close();
+        }
+    }
+
+    private void compare(long current, String unit, long expectedMin, long 
expectedMax) {
+        assertTrue("current number of " + unit + "s (" + current + ") is less 
than minimum expected: " + expectedMin, current >= expectedMin);
+        assertTrue("current number of " + unit + "s (" + current + ") is 
bigger than maximum expected: " + expectedMax, current <= expectedMax);
+    }
+}

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverIPRangeTest.java
 Thu Sep  4 14:00:27 2014
@@ -48,18 +48,32 @@ public class FailoverIPRangeTest extends
     public void testFailoverAllClients() throws Exception {
         createTestWithConfig(null, true);
     }
-    @Test
 
+    @Test
     public void testFailoverLocalClient() throws Exception {
         createTestWithConfig(new String[]{"127.0.0.1"}, true);
     }
 
     @Test
+    public void testFailoverLocalClientUseIPv6() throws Exception {
+        if (!noDualStackSupport) {
+            createTestWithConfig("::1", new String[]{"::1"}, true);
+        }
+    }
+
+    @Test
     public void testFailoverWrongClient() throws Exception {
         createTestWithConfig(new String[]{"127.0.0.2"}, false);
     }
 
     @Test
+    public void testFailoverWrongClientIPv6() throws Exception {
+        if (!noDualStackSupport) {
+            createTestWithConfig(new String[]{"::2"}, false);
+        }
+    }
+
+    @Test
     public void testFailoverLocalhost() throws Exception {
         createTestWithConfig(new String[]{"localhost"}, true);
     }
@@ -95,17 +109,50 @@ public class FailoverIPRangeTest extends
     }
 
     @Test
+    public void testFailoverCorrectListIPv6() throws Exception {
+        if (!noDualStackSupport) {
+            createTestWithConfig(new String[]{"foobar", "122-126", "::1", 
"126.0.0.1", "127.0.0.0-127.255.255.255"}, true);
+        }
+    }
+
+    @Test
     public void testFailoverWrongList() throws Exception {
-        createTestWithConfig(new String[]{"foobar","126.0.0.1", 
"128.0.0.1-255.255.255.255", "128.0.0.0-127.255.255.255"}, false);
+        createTestWithConfig(new String[]{"foobar", "126.0.0.1", "::2", 
"128.0.0.1-255.255.255.255", "128.0.0.0-127.255.255.255"}, false);
+    }
+
+    @Test
+    public void testFailoverCorrectListUseIPv6() throws Exception {
+        if (!noDualStackSupport) {
+            createTestWithConfig("::1", new String[]{"foobar","127-128", 
"0:0:0:0:0:0:0:1", "126.0.0.1", "127.0.0.0-127.255.255.255"}, true);
+        }
+    }
+
+    @Test
+    public void testFailoverCorrectListIPv6UseIPv6() throws Exception {
+        if (!noDualStackSupport) {
+            createTestWithConfig("::1", new String[]{"foobar", "122-126", 
"::1", "126.0.0.1", "127.0.0.0-127.255.255.255"}, true);
+        }
+    }
+
+    @Test
+    public void testFailoverWrongListUseIPv6() throws Exception {
+        if (!noDualStackSupport) {
+            createTestWithConfig("::1", new String[]{"foobar", "126.0.0.1", 
"::2", "128.0.0.1-255.255.255.255", "128.0.0.0-127.255.255.255"}, false);
+        }
     }
 
     private void createTestWithConfig(String[] ipRanges, boolean 
expectedToWork) throws Exception {
+        createTestWithConfig("127.0.0.1", ipRanges, expectedToWork);
+    }
+
+    private void createTestWithConfig(String host, String[] ipRanges, boolean 
expectedToWork) throws Exception {
         NodeStore store = new SegmentNodeStore(storeS);
         final FailoverServer server = new FailoverServer(port, storeS, 
ipRanges);
         server.start();
         addTestContent(store, "server");
+        storeS.flush();  // this speeds up the test a little bit...
 
-        FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC);
+        FailoverClient cl = new FailoverClient(host, port, storeC);
         cl.run();
 
         try {

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverMultipleClientsTest.java
 Thu Sep  4 14:00:27 2014
@@ -25,7 +25,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -50,6 +49,7 @@ public class FailoverMultipleClientsTest
         final FailoverServer server = new FailoverServer(port, storeS);
         server.start();
         SegmentTestUtils.addTestContent(store, "server");
+        storeS.flush();  // this speeds up the test a little bit...
 
         FailoverClient cl1 = new FailoverClient("127.0.0.1", port, storeC);
         FailoverClient cl2 = new FailoverClient("127.0.0.1", port, storeC2);
@@ -70,7 +70,7 @@ public class FailoverMultipleClientsTest
             cl2.run();
 
             assertEquals(storeS.getHead(), storeC2.getHead());
-            Assert.assertFalse("first client updated in stopped state!", 
storeS.getHead().equals(storeC.getHead()));
+            assertFalse("first client updated in stopped state!", 
storeS.getHead().equals(storeC.getHead()));
 
             cl1.start();
             assertEquals(storeS.getHead(), storeC.getHead());

Added: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java?rev=1622479&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverSslTest.java
 Thu Sep  4 14:00:27 2014
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import 
org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import 
org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static 
org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class FailoverSslTest extends TestBase {
+
+    @Before
+    public void setUp() throws Exception {
+        setUpServerAndClient();
+    }
+
+    @After
+    public void after() {
+        closeServerAndClient();
+    }
+
+    @Test
+    public void testFailoverSecure() throws Exception {
+
+        NodeStore store = new SegmentNodeStore(storeS);
+        final FailoverServer server = new FailoverServer(port, storeS, true);
+        server.start();
+        addTestContent(store, "server");
+        storeS.flush();  // this speeds up the test a little bit...
+
+        FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC, 
true);
+        cl.run();
+
+        try {
+            assertEquals(storeS.getHead(), storeC.getHead());
+        } finally {
+            server.close();
+            cl.close();
+        }
+    }
+
+    @Test
+    public void testFailoverSecureServerPlainClient() throws Exception {
+
+        NodeStore store = new SegmentNodeStore(storeS);
+        final FailoverServer server = new FailoverServer(port, storeS, true);
+        server.start();
+        addTestContent(store, "server");
+        storeS.flush();  // this speeds up the test a little bit...
+
+        FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC);
+        cl.run();
+
+        try {
+            assertFalse("stores are equal but shouldn't!", 
storeS.getHead().equals(storeC.getHead()));
+        } finally {
+            server.close();
+            cl.close();
+        }
+    }
+
+    @Test
+    public void testFailoverPlainServerSecureClient() throws Exception {
+
+        NodeStore store = new SegmentNodeStore(storeS);
+        final FailoverServer server = new FailoverServer(port, storeS);
+        server.start();
+        addTestContent(store, "server");
+        storeS.flush();  // this speeds up the test a little bit...
+
+        FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC, 
true);
+        cl.run();
+
+        try {
+            assertFalse("stores are equal but shouldn't!", 
storeS.getHead().equals(storeC.getHead()));
+        } finally {
+            server.close();
+            cl.close();
+        }
+    }
+}

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
 Thu Sep  4 14:00:27 2014
@@ -31,9 +31,7 @@ import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.Set;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
+import static junit.framework.Assert.*;
 
 public class MBeanTest extends TestBase {
 
@@ -63,7 +61,7 @@ public class MBeanTest extends TestBase 
 
             assertEquals("master", jmxServer.getAttribute(status, "Mode"));
             String m = jmxServer.getAttribute(status, "Status").toString();
-            if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && 
!m.equals("Channel unregistered"))
+            if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && 
!m.equals("channel unregistered"))
                 fail("unexpected Status" + m);
 
             assertEquals(FailoverStatusMBean.STATUS_STARTING, 
jmxServer.getAttribute(status, "Status"));
@@ -150,8 +148,15 @@ public class MBeanTest extends TestBase 
             Set<ObjectName> instances = jmxServer.queryNames(status, null);
             assertEquals(3, instances.size());
 
+            ObjectName connectionStatus = null;
+            for (ObjectName s : instances) {
+                if (!s.equals(clientStatus) && !s.equals(serverStatus)) 
connectionStatus = s;
+            }
+            assertNotNull(connectionStatus);
+
             assertTrue(jmxServer.isRegistered(clientStatus));
             assertTrue(jmxServer.isRegistered(serverStatus));
+            assertTrue(jmxServer.isRegistered(connectionStatus));
 
             String m = jmxServer.getAttribute(clientStatus, "Mode").toString();
             if (!m.startsWith("client: ")) fail("unexpected mode " + m);
@@ -161,11 +166,14 @@ public class MBeanTest extends TestBase 
             assertEquals(true, jmxServer.getAttribute(serverStatus, 
"Running"));
             assertEquals(true, jmxServer.getAttribute(clientStatus, 
"Running"));
 
+            assertEquals(new Long(2), jmxServer.getAttribute(connectionStatus, 
"TransferredSegments"));
+            assertEquals(new Long(128), 
jmxServer.getAttribute(connectionStatus, "TransferredSegmentBytes"));
+
             // stop the master
             jmxServer.invoke(serverStatus, "stop", null, null);
             assertEquals(false, jmxServer.getAttribute(serverStatus, 
"Running"));
             m = jmxServer.getAttribute(serverStatus, "Status").toString();
-            if (!m.equals(FailoverStatusMBean.STATUS_STOPPED) && 
!m.equals("Channel unregistered"))
+            if (!m.equals(FailoverStatusMBean.STATUS_STOPPED) && 
!m.equals("channel unregistered"))
                 fail("unexpected Status" + m);
 
             // restart the master
@@ -173,7 +181,7 @@ public class MBeanTest extends TestBase 
             assertEquals(true, jmxServer.getAttribute(serverStatus, 
"Running"));
             assertEquals(true, jmxServer.getAttribute(clientStatus, 
"Running"));
             m = jmxServer.getAttribute(serverStatus, "Status").toString();
-            if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && 
!m.equals("Channel unregistered"))
+            if (!m.equals(FailoverStatusMBean.STATUS_STARTING) && 
!m.equals("channel unregistered"))
                 fail("unexpected Status" + m);
 
             // stop the slave

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/RecoverTest.java
 Thu Sep  4 14:00:27 2014
@@ -70,4 +70,28 @@ public class RecoverTest extends TestBas
         }
 
     }
+
+    @Test
+    public void testLocalChanges() throws Exception {
+
+        NodeStore store = new SegmentNodeStore(storeC);
+        addTestContent(store, "client");
+
+        final FailoverServer server = new FailoverServer(port, storeS);
+        server.start();
+        store = new SegmentNodeStore(storeS);
+        addTestContent(store, "server");
+        storeS.flush();
+
+        FailoverClient cl = new FailoverClient("127.0.0.1", port, storeC);
+        try {
+            assertFalse("stores are not expected to be equal", 
storeS.getHead().equals(storeC.getHead()));
+            cl.run();
+            assertEquals(storeS.getHead(), storeC.getHead());
+        } finally {
+            server.close();
+            cl.close();
+        }
+
+    }
 }

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java?rev=1622479&r1=1622478&r2=1622479&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/TestBase.java
 Thu Sep  4 14:00:27 2014
@@ -19,6 +19,7 @@
 package org.apache.jackrabbit.oak.plugins.segment.failover;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 
 import java.io.File;
@@ -28,6 +29,7 @@ import static org.apache.jackrabbit.oak.
 
 public class TestBase {
     int port = Integer.valueOf(System.getProperty("failover.server.port", 
"52808"));
+    final static String LOCALHOST = "127.0.0.1";
 
     File directoryS;
     FileStore storeS;
@@ -38,6 +40,12 @@ public class TestBase {
     File directoryC2;
     FileStore storeC2;
 
+    /*
+     Java 6 on Windows doesn't support dual IP stacks, so we will skip our IPv6
+     tests.
+    */
+    protected final boolean noDualStackSupport = SystemUtils.IS_OS_WINDOWS && 
SystemUtils.IS_JAVA_1_6;
+
     public void setUpServerAndClient() throws IOException {
         // server
         directoryS = createTmpTargetDir("FailoverServerTest");


Reply via email to