Author: baedke Date: Fri Sep 12 13:56:46 2014 New Revision: 1624551 URL: http://svn.apache.org/r1624551 Log: OAK-1915: TarMK failover 2.0
Added test cases, improved client recovery from crashes, tweaked OSGi config handling. Modified: jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java Modified: jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg?rev=1624551&r1=1624550&r2=1624551&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/osgi-conf/master/org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStoreService.cfg Fri Sep 12 13:56:46 2014 @@ -1,2 +1 @@ mode=master -master.allowed-client-ip-ranges=localhost,127.0.0.1,::1,192.168.0.0-192.168.255.255 \ No newline at end of file Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java?rev=1624551&r1=1624550&r2=1624551&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java Fri Sep 12 13:56:46 2014 @@ -19,12 +19,12 @@ package org.apache.jackrabbit.oak.plugins.segment.failover.client; import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetSegmentReq; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.concurrent.EventExecutorGroup; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -37,6 +37,7 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply; import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore; import org.apache.jackrabbit.oak.plugins.segment.failover.store.RemoteSegmentLoader; +import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.spi.state.ApplyDiff; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,15 +96,39 @@ public class SegmentLoaderHandler extend SegmentNodeBuilder builder = before.builder(); SegmentNodeState current = new SegmentNodeState(head); - current.compareAgainstBaseState(before, new ApplyDiff(builder)); - + do { + try { + current.compareAgainstBaseState(before, new ApplyDiff(builder)); + break; + } + catch (FileStore.FileStoreCorruptException e) { + // the segment is locally damaged or not present anymore + // lets try to read this from the master again + Segment s = readSegment(e.id); + if (s == null) { + log.warn("can't read locally corrupt segment " + e.id + " from master"); + throw e; + } + + log.info("did reread locally corrupt segment " + e.id + " with size " + s.size()); + ByteArrayOutputStream bout = new ByteArrayOutputStream(s.size()); + try { + s.writeTo(bout); + } + catch (IOException f) { + log.error("can't wrap segment to output stream", f); + throw e; + } + store.writeSegment(e.id, bout.toByteArray(), 0, s.size()); + } + } while(true); boolean ok = store.setHead(before, builder.getNodeState()); log.info("#updated state (set head {}) in {}ms.", ok, System.currentTimeMillis() - t); } finally { close(); } - log.info("returning initSync"); + log.debug("returning initSync"); } @Override Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java?rev=1624551&r1=1624550&r2=1624551&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java Fri Sep 12 13:56:46 2014 @@ -39,13 +39,13 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient; import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer; -import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.osgi.framework.ServiceRegistration; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Property(name = "org.apache.sling.installer.configuration.persist", label="Distribute config", description = "Should be always disabled to avoid storing the configuration in the repository", boolValue = false) @Component(policy = ConfigurationPolicy.REQUIRE) public class FailoverStoreService { @@ -55,29 +55,31 @@ public class FailoverStoreService { private static final String MODE_SLAVE = "slave"; public static final String MODE_DEFAULT = MODE_MASTER; - @Property(label = "Mode", description = "TarMK Failover mode (master or slave)", options = { - @PropertyOption(name = "master", value = "master"), - @PropertyOption(name = "slave", value = "slave") }, value = MODE_DEFAULT) + @Property(name = "Mode", description = "TarMK Cold Standby mode (master or slave)", + options = { + @PropertyOption(name = MODE_MASTER, value = MODE_MASTER), + @PropertyOption(name = MODE_SLAVE, value = MODE_SLAVE) }, + value = MODE_DEFAULT) public static final String MODE = "mode"; public static final int PORT_DEFAULT = 8023; - @Property(label = "Port", description = "TarMK Failover port", intValue = PORT_DEFAULT) + @Property(name = "Port", description = "TCP/IP port to use", intValue = PORT_DEFAULT) public static final String PORT = "port"; public static final String MASTER_HOST_DEFAULT = "127.0.0.1"; - @Property(label = "Master Host", description = "TarMK Failover master host (enabled for slave mode only)", value = MASTER_HOST_DEFAULT) + @Property(name = "Master Host", description = "Master host (slave mode only)", value = MASTER_HOST_DEFAULT) public static final String MASTER_HOST = "master.host"; public static final int INTERVAL_DEFAULT = 5; - @Property(label = "Sync interval (seconds)", description = "TarMK Failover sync interval (seconds)", intValue = INTERVAL_DEFAULT) + @Property(name = "Sync interval (seconds)", description = "Sync interval in seconds (slave mode only)", intValue = INTERVAL_DEFAULT) public static final String INTERVAL = "interval"; public static final String ALLOWED_CLIENT_IP_RANGES_DEFAULT = null; - @Property(label = "Client allowed IP-Ranges", description = "accept incoming requests for these IP-ranges only") + @Property(name = "Allowed IP-Ranges", description = "Accept incoming requests for these host names and IP-ranges only (master mode only)") public static final String ALLOWED_CLIENT_IP_RANGES = "master.allowed-client-ip-ranges"; public static final boolean SECURE_DEFAULT = false; - @Property(label = "Secure", description = "Use secure connections", boolValue = SECURE_DEFAULT) + @Property(name = "Secure", description = "Use secure connections", boolValue = SECURE_DEFAULT) public static final String SECURE = "secure"; @Reference(policy = STATIC, policyOption = GREEDY) Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java?rev=1624551&r1=1624550&r2=1624551&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/NetworkErrorProxy.java Fri Sep 12 13:56:46 2014 @@ -1,19 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.jackrabbit.oak.plugins.segment; import io.netty.bootstrap.Bootstrap; @@ -56,6 +40,10 @@ public class NetworkErrorProxy { this.fh.skipBytes = n; } + public void flipByte(int pos) { + this.fh.flipPosition = pos; + } + public void run() throws Exception { try { ServerBootstrap b = new ServerBootstrap(); @@ -97,11 +85,13 @@ class ForwardHandler extends ChannelInbo public long transferredBytes; public int skipPosition; public int skipBytes; + public int flipPosition; private ChannelFuture remote; public ForwardHandler(String host, int port) { this.targetHost = host; this.targetPort = port; + this.flipPosition = -1; } @Override @@ -115,7 +105,14 @@ class ForwardHandler extends ChannelInbo cb.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addFirst(new SwallowingHandler(c, ForwardHandler.this.skipPosition, ForwardHandler.this.skipBytes)); + SendBackHandler sbh = new SendBackHandler(c); + if (ForwardHandler.this.flipPosition >= 0) { + sbh = new BitFlipHandler(c, ForwardHandler.this.flipPosition); + } + else if (ForwardHandler.this.skipBytes > 0) { + sbh = new SwallowingHandler(c, ForwardHandler.this.skipPosition, ForwardHandler.this.skipBytes); + } + ch.pipeline().addFirst(sbh); } }); remote = cb.connect(this.targetHost, this.targetPort).sync(); @@ -245,3 +242,32 @@ class SwallowingHandler extends SendBack } +class BitFlipHandler extends SendBackHandler { + private static final Logger log = LoggerFactory + .getLogger(BitFlipHandler.class); + + private int startingPos; + + public BitFlipHandler(ChannelHandlerContext ctx, int pos) { + super(ctx); + this.startingPos = pos; + } + + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf bb = (ByteBuf)msg; + log.debug("FlipHandler. Got Buffer size: " + bb.readableBytes()); + if (this.startingPos >= 0) { + if (this.transferredBytes + bb.readableBytes() >= this.startingPos) { + int i = this.startingPos - (int)this.transferredBytes; + log.info("FlipHandler flips byte at offset " + (this.transferredBytes + i)); + byte b = (byte) (bb.getByte(i) ^ 0x01); + bb.setByte(i, b); + this.startingPos = -1; + } + } + } + super.channelRead(ctx, msg); + } + +} Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java?rev=1624551&r1=1624550&r2=1624551&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BrokenNetworkTest.java Fri Sep 12 13:56:46 2014 @@ -75,6 +75,36 @@ public class BrokenNetworkTest extends T useProxy(true, 400, 10, true); } + @Test + public void testProxyFlippedStartByte() throws Exception { + useProxy(false, 0, 0, 0, false); + } + + @Test + public void testProxyFlippedStartByteSSL() throws Exception { + useProxy(true, 0, 0, 0, false); + } + + @Test + public void testProxyFlippedIntermediateByte() throws Exception { + useProxy(false, 0, 0, 150, false); + } + + @Test + public void testProxyFlippedIntermediateByteSSL() throws Exception { + useProxy(true, 0, 0, 560, false); + } + + @Test + public void testProxyFlippedEndByte() throws Exception { + useProxy(false, 0, 0, 220, false); + } + + @Test + public void testProxyFlippedEndByteSSL() throws Exception { + useProxy(true, 0, 0, 575, false); + } + // private helper private void useProxy(boolean ssl) throws Exception { @@ -82,8 +112,13 @@ public class BrokenNetworkTest extends T } private void useProxy(boolean ssl, int skipPosition, int skipBytes, boolean intermediateChange) throws Exception { + useProxy(ssl, skipPosition, skipBytes, -1, intermediateChange); + } + + private void useProxy(boolean ssl, int skipPosition, int skipBytes, int flipPosition, boolean intermediateChange) throws Exception { NetworkErrorProxy p = new NetworkErrorProxy(PROXY_PORT, LOCALHOST, port); p.skipBytes(skipPosition, skipBytes); + p.flipByte(flipPosition); p.run(); NodeStore store = new SegmentNodeStore(storeS); @@ -96,7 +131,7 @@ public class BrokenNetworkTest extends T cl.run(); try { - if (skipBytes > 0) { + if (skipBytes > 0 || flipPosition >= 0) { assertFalse("stores are not expected to be equal", storeS.getHead().equals(storeC.getHead())); assertEquals(storeC2.getHead(), storeC.getHead()); Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java?rev=1624551&r1=1624550&r2=1624551&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/BulkTest.java Fri Sep 12 13:56:46 2014 @@ -18,6 +18,7 @@ */ package org.apache.jackrabbit.oak.plugins.segment.failover; +import junit.framework.Assert; import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient; import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean; Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java?rev=1624551&r1=1624550&r2=1624551&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java (original) +++ jackrabbit/oak/trunk/oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/MBeanTest.java Fri Sep 12 13:56:46 2014 @@ -21,14 +21,13 @@ package org.apache.jackrabbit.oak.plugin import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient; import org.apache.jackrabbit.oak.plugins.segment.failover.jmx.FailoverStatusMBean; import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer; + import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import javax.management.MBeanServer; import javax.management.ObjectName; - import java.lang.management.ManagementFactory; import java.util.Set; @@ -133,7 +132,6 @@ public class MBeanTest extends TestBase } @Test - @Ignore("OAK-2086") public void testClientAndServerEmptyConfig() throws Exception { final FailoverServer server = new FailoverServer(this.port, this.storeS); server.start();