Repository: cxf Updated Branches: refs/heads/master ce969249d -> 248c8f045
CXF-6622: Enhance Failover Feature to support Circuit Breakers based implementation. Refactored conduit selection logic to respect endpoint status. Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/248c8f04 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/248c8f04 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/248c8f04 Branch: refs/heads/master Commit: 248c8f0458ce938a65c35006d484c6a4610cde1b Parents: ce96924 Author: reta <[email protected]> Authored: Wed Nov 18 21:04:35 2015 -0500 Committer: reta <[email protected]> Committed: Wed Nov 18 21:04:35 2015 -0500 ---------------------------------------------------------------------- .../CircuitBreakerTargetSelector.java | 68 ++++++++++++++++++++ .../cxf/clustering/FailoverFailedException.java | 37 +++++++++++ .../failover/CircuitBreakerFailoverTest.java | 29 +++++++++ 3 files changed, 134 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/248c8f04/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java ---------------------------------------------------------------------- diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java index 86aabaa..62541d9 100644 --- a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java +++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java @@ -33,13 +33,16 @@ import org.apache.cxf.common.util.StringUtils; import org.apache.cxf.endpoint.Client; import org.apache.cxf.endpoint.Endpoint; import org.apache.cxf.helpers.CastUtils; +import org.apache.cxf.interceptor.Fault; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; +import org.apache.cxf.transport.Conduit; public class CircuitBreakerTargetSelector extends FailoverTargetSelector { public static final int DEFAULT_TIMEOUT = 1000 * 60 /* 1 minute timeout as default */; public static final int DEFAULT_THESHOLD = 1; + private static final String IS_SELECTED = "org.apache.cxf.clustering.CircuitBreakerTargetSelector.IS_SELECTED"; private static final Logger LOG = LogUtils.getL7dLogger(CircuitBreakerTargetSelector.class); private final int threshold; @@ -60,6 +63,14 @@ public class CircuitBreakerTargetSelector extends FailoverTargetSelector { public synchronized void setStrategy(FailoverStrategy strategy) { super.setStrategy(strategy); + // Registering the original endpoint in the list of circuit breakers + if (getEndpoint() != null) { + circuits.putIfAbsent( + getEndpoint().getEndpointInfo().getAddress(), + new ZestCircuitBreaker(threshold, timeout) + ); + } + if (strategy != null) { for (String alternative: strategy.getAlternateAddresses(null /* no Exchange at this point */)) { if (!StringUtils.isEmpty(alternative)) { @@ -71,6 +82,37 @@ public class CircuitBreakerTargetSelector extends FailoverTargetSelector { } } } + @Override + public synchronized Conduit selectConduit(Message message) { + Conduit c = message.get(Conduit.class); + if (c != null) { + return c; + } + Exchange exchange = message.getExchange(); + InvocationKey key = new InvocationKey(exchange); + InvocationContext invocation = inProgress.get(key); + if (invocation != null && !invocation.getContext().containsKey(IS_SELECTED)) { + final String address = (String) message.get(Message.ENDPOINT_ADDRESS); + + if (isFailoverRequired(address)) { + Endpoint target = getFailoverTarget(exchange, invocation); + + if (target == null) { + throw new Fault(new FailoverFailedException( + "None of alternative addresses are available at the moment")); + } + + if (isEndpointChanged(address, target)) { + setEndpoint(target); + message.put(Message.ENDPOINT_ADDRESS, target.getEndpointInfo().getAddress()); + overrideAddressProperty(invocation.getContext()); + invocation.getContext().put(IS_SELECTED, null); + } + } + } + + return getSelectedConduit(message); + } @Override protected Endpoint getFailoverTarget(final Exchange exchange, final InvocationContext invocation) { @@ -136,4 +178,30 @@ public class CircuitBreakerTargetSelector extends FailoverTargetSelector { } } } + + private boolean isEndpointChanged(final String address, final Endpoint target) { + if (address != null) { + return !address.startsWith(target.getEndpointInfo().getAddress()); + } + + if (getEndpoint().equals(target)) { + return false; + } + + return !getEndpoint().getEndpointInfo().getAddress().startsWith( + target.getEndpointInfo().getAddress()); + } + + protected boolean isFailoverRequired(final String address) { + if (address != null) { + for (final Map.Entry<String, CircuitBreaker> entry: circuits.entrySet()) { + if (address.startsWith(entry.getKey())) { + return !entry.getValue().allowRequest(); + } + } + } + + LOG.log(Level.WARNING, "No circuit breaker present for address: " + address); + return false; + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/248c8f04/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java ---------------------------------------------------------------------- diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java new file mode 100644 index 0000000..d51408b --- /dev/null +++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFailedException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.cxf.clustering; + +public class FailoverFailedException extends RuntimeException { + private static final long serialVersionUID = 6987181998625258047L; + + public FailoverFailedException() { + super(); + } + + public FailoverFailedException(String message) { + super(message); + } + + public FailoverFailedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/248c8f04/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java index ec00111..81cba78 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java @@ -22,10 +22,15 @@ package org.apache.cxf.systest.jaxrs.failover; import java.util.ArrayList; import java.util.List; +import javax.ws.rs.ProcessingException; + +import org.apache.cxf.clustering.FailoverFailedException; import org.apache.cxf.clustering.FailoverFeature; import org.apache.cxf.clustering.RandomStrategy; import org.apache.cxf.clustering.SequentialStrategy; import org.apache.cxf.clustering.circuitbreaker.CircuitBreakerFailoverFeature; +import org.apache.cxf.systest.jaxrs.BookStore; +import org.junit.Test; /** * Tests failover within a static cluster. @@ -33,6 +38,30 @@ import org.apache.cxf.clustering.circuitbreaker.CircuitBreakerFailoverFeature; public class CircuitBreakerFailoverTest extends AbstractFailoverTest { public static final String NON_PORT = allocatePort(CircuitBreakerFailoverTest.class); + + @Test(expected = FailoverFailedException.class) + public void testSequentialStrategyUnavailableAlternatives() throws Exception { + FailoverFeature feature = getFeature(false, + "http://localhost:" + NON_PORT + "/non-existent", + "http://localhost:" + NON_PORT + "/non-existent2"); + + final BookStore bookStore = getBookStore( + "http://localhost:" + NON_PORT + "/non-existent", feature); + + // First iteration is going to open all circuit breakers. + // Second iteration should not call any URL as all targets are not available. + for (int i = 0; i < 2; ++i) { + try { + bookStore.getBook(1); + fail("Exception expected"); + } catch (ProcessingException ex) { + if (ex.getCause() instanceof FailoverFailedException) { + throw (FailoverFailedException) ex.getCause(); + } + } + } + } + @Override protected FailoverFeature getFeature(boolean random, String ...address) { CircuitBreakerFailoverFeature feature = new CircuitBreakerFailoverFeature();
