Author: baedke
Date: Mon Sep 15 10:37:25 2014
New Revision: 1624994

URL: http://svn.apache.org/r1624994
Log:
OAK-1915: TarMK failover 2.0

Added test cases, improved client recovery from crashes, tweaked OSGi config 
handling.

Modified:
    jackrabbit/oak/trunk/oak-core/pom.xml
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
    
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java

Modified: jackrabbit/oak/trunk/oak-core/pom.xml
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/pom.xml?rev=1624994&r1=1624993&r2=1624994&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-core/pom.xml Mon Sep 15 10:37:25 2014
@@ -77,6 +77,7 @@
               org.apache.jackrabbit.oak.plugins.observation.filter,
               org.apache.jackrabbit.oak.plugins.segment,
               org.apache.jackrabbit.oak.plugins.segment.http,
+              org.apache.jackrabbit.oak.plugins.segment.file,
               org.apache.jackrabbit.oak.plugins.value,
               org.apache.jackrabbit.oak.plugins.version,
               org.apache.jackrabbit.oak.spi.commit,

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1624994&r1=1624993&r2=1624994&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
 Mon Sep 15 10:37:25 2014
@@ -669,7 +669,7 @@ public class FileStore implements Segmen
             }
         }
 
-        throw new IllegalStateException("Segment " + id + " not found");
+        throw new FileStoreCorruptException(id);
     }
 
     @Override
@@ -731,4 +731,13 @@ public class FileStore implements Segmen
         this.pauseCompaction = pauseCompaction;
         return this;
     }
+
+    public class FileStoreCorruptException extends IllegalStateException {
+        public final SegmentId id;
+
+        public FileStoreCorruptException(SegmentId id) {
+            super("Segment " + id + " not found");
+            this.id = id;
+        }
+    }
 }

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg?rev=1624994&r1=1624993&r2=1624994&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg
 Mon Sep 15 10:37:25 2014
@@ -1,2 +1 @@
 mode=master
-master.allowed-client-ip-ranges=localhost,127.0.0.1,::1,192.168.0.0-192.168.255.255
\ No newline at end of file

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java?rev=1624994&r1=1624993&r2=1624994&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
 Mon Sep 15 10:37:25 2014
@@ -19,12 +19,12 @@
 package org.apache.jackrabbit.oak.plugins.segment.failover.client;
 
 import static 
org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetSegmentReq;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.EventExecutorGroup;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +37,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply;
 import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
 import 
org.apache.jackrabbit.oak.plugins.segment.failover.store.RemoteSegmentLoader;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.spi.state.ApplyDiff;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,15 +96,39 @@ public class SegmentLoaderHandler extend
             SegmentNodeBuilder builder = before.builder();
 
             SegmentNodeState current = new SegmentNodeState(head);
-            current.compareAgainstBaseState(before, new ApplyDiff(builder));
-
+            do {
+                try {
+                    current.compareAgainstBaseState(before, new 
ApplyDiff(builder));
+                    break;
+                }
+                catch (FileStore.FileStoreCorruptException e) {
+                    // the segment is locally damaged or not present anymore
+                    // lets try to read this from the master again
+                    Segment s = readSegment(e.id);
+                    if (s == null) {
+                        log.warn("can't read locally corrupt segment " + e.id 
+ " from master");
+                        throw e;
+                    }
+
+                    log.info("did reread locally corrupt segment " + e.id + " 
with size " + s.size());
+                    ByteArrayOutputStream bout = new 
ByteArrayOutputStream(s.size());
+                    try {
+                        s.writeTo(bout);
+                    }
+                    catch (IOException f) {
+                        log.error("can't wrap segment to output stream", f);
+                        throw e;
+                    }
+                    store.writeSegment(e.id, bout.toByteArray(), 0, s.size());
+                }
+            } while(true);
             boolean ok = store.setHead(before, builder.getNodeState());
             log.info("#updated state (set head {}) in {}ms.", ok,
                     System.currentTimeMillis() - t);
         } finally {
             close();
         }
-        log.info("returning initSync");
+        log.debug("returning initSync");
     }
 
     @Override

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java?rev=1624994&r1=1624993&r2=1624994&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
 Mon Sep 15 10:37:25 2014
@@ -39,13 +39,13 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import 
org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
 import 
org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
-import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+@Property(name = "org.apache.sling.installer.configuration.persist", 
label="Distribute config", description = "Should be always disabled to avoid 
storing the configuration in the repository", boolValue = false)
 @Component(policy = ConfigurationPolicy.REQUIRE)
 public class FailoverStoreService {
 
@@ -55,29 +55,31 @@ public class FailoverStoreService {
     private static final String MODE_SLAVE = "slave";
 
     public static final String MODE_DEFAULT = MODE_MASTER;
-    @Property(label = "Mode", description = "TarMK Failover mode (master or 
slave)", options = {
-            @PropertyOption(name = "master", value = "master"),
-            @PropertyOption(name = "slave", value = "slave") }, value = 
MODE_DEFAULT)
+    @Property(name = "Mode", description = "TarMK Cold Standby mode (master or 
slave)",
+            options = {
+            @PropertyOption(name = MODE_MASTER, value = MODE_MASTER),
+            @PropertyOption(name = MODE_SLAVE, value = MODE_SLAVE) },
+            value = MODE_DEFAULT)
     public static final String MODE = "mode";
 
     public static final int PORT_DEFAULT = 8023;
-    @Property(label = "Port", description = "TarMK Failover port", intValue = 
PORT_DEFAULT)
+    @Property(name = "Port", description = "TCP/IP port to use", intValue = 
PORT_DEFAULT)
     public static final String PORT = "port";
 
     public static final String MASTER_HOST_DEFAULT = "127.0.0.1";
-    @Property(label = "Master Host", description = "TarMK Failover master host 
(enabled for slave mode only)", value = MASTER_HOST_DEFAULT)
+    @Property(name = "Master Host", description = "Master host (slave mode 
only)", value = MASTER_HOST_DEFAULT)
     public static final String MASTER_HOST = "master.host";
 
     public static final int INTERVAL_DEFAULT = 5;
-    @Property(label = "Sync interval (seconds)", description = "TarMK Failover 
sync interval (seconds)", intValue = INTERVAL_DEFAULT)
+    @Property(name = "Sync interval (seconds)", description = "Sync interval 
in seconds (slave mode only)", intValue = INTERVAL_DEFAULT)
     public static final String INTERVAL = "interval";
 
     public static final String ALLOWED_CLIENT_IP_RANGES_DEFAULT = null;
-    @Property(label = "Client allowed IP-Ranges", description = "accept 
incoming requests for these IP-ranges only")
+    @Property(name = "Allowed IP-Ranges", description = "Accept incoming 
requests for these host names and IP-ranges only (master mode only)")
     public static final String ALLOWED_CLIENT_IP_RANGES = 
"master.allowed-client-ip-ranges";
 
     public static final boolean SECURE_DEFAULT = false;
-    @Property(label = "Secure", description = "Use secure connections", 
boolValue = SECURE_DEFAULT)
+    @Property(name = "Secure", description = "Use secure connections", 
boolValue = SECURE_DEFAULT)
     public static final String SECURE = "secure";
 
     @Reference(policy = STATIC, policyOption = GREEDY)

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java?rev=1624994&r1=1624993&r2=1624994&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java
 Mon Sep 15 10:37:25 2014
@@ -1,19 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.jackrabbit.oak.plugins.segment;
 
 import io.netty.bootstrap.Bootstrap;
@@ -56,6 +40,10 @@ public class NetworkErrorProxy {
         this.fh.skipBytes = n;
     }
 
+    public void flipByte(int pos) {
+        this.fh.flipPosition = pos;
+    }
+
     public void run() throws Exception {
         try {
             ServerBootstrap b = new ServerBootstrap();
@@ -97,11 +85,13 @@ class ForwardHandler extends ChannelInbo
     public long transferredBytes;
     public int skipPosition;
     public int skipBytes;
+    public int flipPosition;
     private ChannelFuture remote;
 
     public ForwardHandler(String host, int port) {
         this.targetHost = host;
         this.targetPort = port;
+        this.flipPosition = -1;
     }
 
     @Override
@@ -115,7 +105,14 @@ class ForwardHandler extends ChannelInbo
         cb.handler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
-                ch.pipeline().addFirst(new SwallowingHandler(c, 
ForwardHandler.this.skipPosition, ForwardHandler.this.skipBytes));
+                SendBackHandler sbh = new SendBackHandler(c);
+                if (ForwardHandler.this.flipPosition >= 0) {
+                    sbh = new BitFlipHandler(c, 
ForwardHandler.this.flipPosition);
+                }
+                else if (ForwardHandler.this.skipBytes > 0) {
+                    sbh = new SwallowingHandler(c, 
ForwardHandler.this.skipPosition, ForwardHandler.this.skipBytes);
+                }
+                ch.pipeline().addFirst(sbh);
             }
         });
         remote = cb.connect(this.targetHost, this.targetPort).sync();
@@ -245,3 +242,32 @@ class SwallowingHandler extends SendBack
 
 }
 
+class BitFlipHandler extends SendBackHandler {
+    private static final Logger log = LoggerFactory
+            .getLogger(BitFlipHandler.class);
+
+    private int startingPos;
+
+    public BitFlipHandler(ChannelHandlerContext ctx, int pos) {
+        super(ctx);
+        this.startingPos = pos;
+    }
+
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        if (msg instanceof ByteBuf) {
+            ByteBuf bb = (ByteBuf)msg;
+            log.debug("FlipHandler. Got Buffer size: " + bb.readableBytes());
+            if (this.startingPos >= 0) {
+                if (this.transferredBytes + bb.readableBytes() >= 
this.startingPos) {
+                    int i = this.startingPos - (int)this.transferredBytes;
+                    log.info("FlipHandler flips byte at offset " + 
(this.transferredBytes + i));
+                    byte b = (byte) (bb.getByte(i) ^ 0x01);
+                    bb.setByte(i, b);
+                    this.startingPos = -1;
+                }
+            }
+        }
+        super.channelRead(ctx, msg);
+    }
+
+}

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java?rev=1624994&r1=1624993&r2=1624994&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java
 Mon Sep 15 10:37:25 2014
@@ -75,6 +75,36 @@ public class BrokenNetworkTest extends T
         useProxy(true, 400, 10, true);
     }
 
+    @Test
+    public void testProxyFlippedStartByte() throws Exception {
+        useProxy(false, 0, 0, 0, false);
+    }
+
+    @Test
+    public void testProxyFlippedStartByteSSL() throws Exception {
+        useProxy(true, 0, 0, 0, false);
+    }
+
+    @Test
+    public void testProxyFlippedIntermediateByte() throws Exception {
+        useProxy(false, 0, 0, 150, false);
+    }
+
+    @Test
+    public void testProxyFlippedIntermediateByteSSL() throws Exception {
+        useProxy(true, 0, 0, 560, false);
+    }
+
+    @Test
+    public void testProxyFlippedEndByte() throws Exception {
+        useProxy(false, 0, 0, 220, false);
+    }
+
+    @Test
+    public void testProxyFlippedEndByteSSL() throws Exception {
+        useProxy(true, 0, 0, 575, false);
+    }
+
     // private helper
 
     private void useProxy(boolean ssl) throws Exception {
@@ -82,8 +112,13 @@ public class BrokenNetworkTest extends T
     }
 
     private void useProxy(boolean ssl, int skipPosition, int skipBytes, 
boolean intermediateChange) throws Exception {
+        useProxy(ssl, skipPosition, skipBytes, -1, intermediateChange);
+    }
+
+    private void useProxy(boolean ssl, int skipPosition, int skipBytes, int 
flipPosition, boolean intermediateChange) throws Exception {
         NetworkErrorProxy p = new NetworkErrorProxy(PROXY_PORT, LOCALHOST, 
port);
         p.skipBytes(skipPosition, skipBytes);
+        p.flipByte(flipPosition);
         p.run();
 
         NodeStore store = new SegmentNodeStore(storeS);
@@ -96,7 +131,7 @@ public class BrokenNetworkTest extends T
         cl.run();
 
         try {
-            if (skipBytes > 0) {
+            if (skipBytes > 0 || flipPosition >= 0) {
                 assertFalse("stores are not expected to be equal", 
storeS.getHead().equals(storeC.getHead()));
                 assertEquals(storeC2.getHead(), storeC.getHead());
 

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java?rev=1624994&r1=1624993&r2=1624994&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java
 Mon Sep 15 10:37:25 2014
@@ -18,6 +18,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.failover;
 
+import junit.framework.Assert;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
 import 
org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
 import 
org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;

Modified: 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java?rev=1624994&r1=1624993&r2=1624994&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java
 Mon Sep 15 10:37:25 2014
@@ -21,14 +21,13 @@ package org.apache.jackrabbit.oak.plugin
 import 
org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
 import 
org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean;
 import 
org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
-
 import java.lang.management.ManagementFactory;
 import java.util.Set;
 
@@ -133,7 +132,6 @@ public class MBeanTest extends TestBase 
     }
 
     @Test
-    @Ignore("OAK-2086")
     public void testClientAndServerEmptyConfig() throws Exception {
         final FailoverServer server = new FailoverServer(this.port, 
this.storeS);
         server.start();


Reply via email to