Repository: cxf Updated Branches: refs/heads/master 295091064 -> a261507eb
CXF-6622: Enhance Failover Feature to support Circuit Breakers based implementation (using Apache Zest) Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/a261507e Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/a261507e Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/a261507e Branch: refs/heads/master Commit: a261507ebd3104b1a00298801ec9815ed1e7a728 Parents: 2950910 Author: reta <drr...@gmail.com> Authored: Thu Nov 5 21:28:43 2015 -0500 Committer: reta <drr...@gmail.com> Committed: Thu Nov 5 21:28:43 2015 -0500 ---------------------------------------------------------------------- parent/pom.xml | 6 + rt/features/clustering/pom.xml | 5 + .../CircuitBreakerTargetSelector.java | 140 +++++++ .../cxf/clustering/FailoverTargetSelector.java | 21 +- .../LoadDistributorTargetSelector.java | 4 +- .../circuitbreaker/CircuitBreaker.java | 43 ++ .../CircuitBreakerFailoverFeature.java | 58 +++ .../circuitbreaker/ZestCircuitBreaker.java | 47 +++ systests/jaxrs/pom.xml | 5 + .../jaxrs/failover/AbstractFailoverTest.java | 395 +++++++++++++++++++ .../failover/CircuitBreakerFailoverTest.java | 55 +++ .../systest/jaxrs/failover/FailoverTest.java | 369 +---------------- 12 files changed, 777 insertions(+), 371 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 9971539..84df93e 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -195,6 +195,7 @@ <cxf.tika.version>1.6</cxf.tika.version> <cxf.jexl.version>2.1.1</cxf.jexl.version> <cxf.htrace.version>4.0.1-incubating</cxf.htrace.version> + <cxf.zest.version>2.1</cxf.zest.version> <cxf.checkstyle.extension /> <cxf.jaxb.context.class /> <cxf.jaxb.context.class.property>none</cxf.jaxb.context.class.property> @@ -1791,6 +1792,11 @@ <artifactId>htrace-core4</artifactId> <version>${cxf.htrace.version}</version> </dependency> + <dependency> + <groupId>org.qi4j.library</groupId> + <artifactId>org.qi4j.library.circuitbreaker</artifactId> + <version>${cxf.zest.version}</version> + </dependency> </dependencies> </dependencyManagement> <profiles> http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/pom.xml ---------------------------------------------------------------------- diff --git a/rt/features/clustering/pom.xml b/rt/features/clustering/pom.xml index a5e944c..6faee2c 100644 --- a/rt/features/clustering/pom.xml +++ b/rt/features/clustering/pom.xml @@ -85,5 +85,10 @@ <scope>provided</scope> <optional>true</optional> </dependency> + <dependency> + <groupId>org.qi4j.library</groupId> + <artifactId>org.qi4j.library.circuitbreaker</artifactId> + <optional>true</optional> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java ---------------------------------------------------------------------- diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java new file mode 100644 index 0000000..08c9fe6 --- /dev/null +++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/CircuitBreakerTargetSelector.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.clustering; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.cxf.clustering.circuitbreaker.CircuitBreaker; +import org.apache.cxf.clustering.circuitbreaker.ZestCircuitBreaker; +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.common.util.StringUtils; +import org.apache.cxf.endpoint.Client; +import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.helpers.CastUtils; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.Message; + +public class CircuitBreakerTargetSelector extends FailoverTargetSelector { + public static final int DEFAULT_TIMEOUT = 1000 * 60 /* 1 minute timeout as default */; + public static final int DEFAULT_THESHOLD = 1; + + private static final Logger LOG = LogUtils.getL7dLogger(CircuitBreakerTargetSelector.class); + + private final int threshold; + private final long timeout; + private final ConcurrentMap<String, CircuitBreaker> circuits = new ConcurrentHashMap<>(); + + public CircuitBreakerTargetSelector(final int threshold, final long timeout) { + super(); + this.threshold = threshold; + this.timeout = timeout; + } + + public CircuitBreakerTargetSelector() { + this(DEFAULT_THESHOLD, DEFAULT_TIMEOUT); + } + + @Override + public synchronized void setStrategy(FailoverStrategy strategy) { + super.setStrategy(strategy); + + if (strategy != null) { + for (String alternative: strategy.getAlternateAddresses(null /* no Exchange at this point */)) { + if (!StringUtils.isEmpty(alternative)) { + circuits.putIfAbsent( + alternative, + new ZestCircuitBreaker(threshold, timeout) + ); + } + } + } + } + + @Override + protected Endpoint getFailoverTarget(final Exchange exchange, final InvocationContext invocation) { + if (circuits.isEmpty()) { + LOG.log(Level.SEVERE, "No alternative addresses configured"); + return null; + } + + final List<String> alternateAddresses = new ArrayList<>(); + for (final Map.Entry<String, CircuitBreaker> entry: circuits.entrySet()) { + if (entry.getValue().allowRequest()) { + alternateAddresses.add(entry.getKey()); + } + } + + Endpoint failoverTarget = null; + if (!alternateAddresses.isEmpty()) { + final String alternateAddress = getStrategy().selectAlternateAddress(alternateAddresses); + + // Reuse current endpoint + if (alternateAddress != null) { + failoverTarget = getEndpoint(); + failoverTarget.getEndpointInfo().setAddress(alternateAddress); + } + } + + return failoverTarget; + } + + @Override + public void prepare(Message message) { + super.prepare(message); + } + + @Override + protected void onFailure(InvocationContext context, Exception ex) { + super.onFailure(context, ex); + + final Map<String, Object> requestContext = + CastUtils.cast((Map<?, ?>)context.getContext().get(Client.REQUEST_CONTEXT)); + + if (requestContext != null) { + final String address = (String)requestContext.get(Message.ENDPOINT_ADDRESS); + final CircuitBreaker circuitBreaker = circuits.get(address); + if (circuitBreaker != null) { + circuitBreaker.markFailure(ex); + } + } + } + + @Override + protected void onSuccess(InvocationContext context) { + super.onSuccess(context); + + final Map<String, Object> requestContext = + CastUtils.cast((Map<?, ?>)context.getContext().get(Client.REQUEST_CONTEXT)); + + if (requestContext != null) { + final String address = (String)requestContext.get(Message.ENDPOINT_ADDRESS); + final CircuitBreaker circuitBreaker = circuits.get(address); + if (circuitBreaker != null) { + circuitBreaker.markSuccess(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java ---------------------------------------------------------------------- diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java index d364a55..21f129e 100644 --- a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java +++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java @@ -139,8 +139,9 @@ public class FailoverTargetSelector extends AbstractConduitSelector { } boolean failover = false; - if (requiresFailover(exchange)) { - onFailure(invocation); + final Exception ex = getExceptionIfPresent(exchange); + if (requiresFailover(exchange, ex)) { + onFailure(invocation, ex); Conduit old = (Conduit)exchange.getOutMessage().remove(Conduit.class.getName()); Endpoint failoverTarget = getFailoverTarget(exchange, invocation); @@ -208,7 +209,7 @@ public class FailoverTargetSelector extends AbstractConduitSelector { protected void onSuccess(InvocationContext context) { } - protected void onFailure(InvocationContext context) { + protected void onFailure(InvocationContext context, Exception ex) { } /** @@ -260,11 +261,7 @@ public class FailoverTargetSelector extends AbstractConduitSelector { * @param exchange the current Exchange * @return boolean true if a failover should be attempted */ - protected boolean requiresFailover(Exchange exchange) { - Message outMessage = exchange.getOutMessage(); - Exception ex = outMessage.get(Exception.class) != null - ? outMessage.get(Exception.class) - : exchange.get(Exception.class); + protected boolean requiresFailover(Exchange exchange, Exception ex) { getLogger().log(Level.FINE, "CHECK_LAST_INVOKE_FAILED", new Object[] {ex != null}); @@ -286,6 +283,14 @@ public class FailoverTargetSelector extends AbstractConduitSelector { return failover; } + + private Exception getExceptionIfPresent(Exchange exchange) { + Message outMessage = exchange.getOutMessage(); + Exception ex = outMessage.get(Exception.class) != null + ? outMessage.get(Exception.class) + : exchange.get(Exception.class); + return ex; + } /** * Get the failover target endpoint, if a suitable one is available. http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java ---------------------------------------------------------------------- diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java index c4cd273..97bfd27 100644 --- a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java +++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorTargetSelector.java @@ -212,8 +212,8 @@ public class LoadDistributorTargetSelector extends FailoverTargetSelector { } @Override - protected boolean requiresFailover(Exchange exchange) { - return failover && super.requiresFailover(exchange); + protected boolean requiresFailover(Exchange exchange, Exception ex) { + return failover && super.requiresFailover(exchange, ex); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreaker.java ---------------------------------------------------------------------- diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreaker.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreaker.java new file mode 100644 index 0000000..4a4e3e0 --- /dev/null +++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreaker.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.clustering.circuitbreaker; + +/** + * Basic abstract interface for circuit breaker implementation. + */ +public interface CircuitBreaker { + /** + * Is request is allowed to go through (is circuit breaker closed or opened). + * @return "false" if circuit breaker is open, "true" otherwise + */ + boolean allowRequest(); + + /** + * Reports about failure conditions to circuit breaker. + * @param cause exception happened (could be null in case the error is deducted + * from response status/code). + */ + void markFailure(Throwable cause); + + /** + * Reports about successful invocation to circuit breaker. + */ + void markSuccess(); +} http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreakerFailoverFeature.java ---------------------------------------------------------------------- diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreakerFailoverFeature.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreakerFailoverFeature.java new file mode 100644 index 0000000..5ba5eb3 --- /dev/null +++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/CircuitBreakerFailoverFeature.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.clustering.circuitbreaker; + +import org.apache.cxf.clustering.CircuitBreakerTargetSelector; +import org.apache.cxf.clustering.FailoverFeature; +import org.apache.cxf.clustering.FailoverTargetSelector; + +import static org.apache.cxf.clustering.CircuitBreakerTargetSelector.DEFAULT_THESHOLD; +import static org.apache.cxf.clustering.CircuitBreakerTargetSelector.DEFAULT_TIMEOUT; + +public class CircuitBreakerFailoverFeature extends FailoverFeature { + private int threshold; + private long timeout; + private FailoverTargetSelector targetSelector; + + public CircuitBreakerFailoverFeature() { + this(DEFAULT_THESHOLD, DEFAULT_TIMEOUT); + } + + public CircuitBreakerFailoverFeature(int threshold, long timeout) { + this.threshold = threshold; + this.timeout = timeout; + } + + @Override + public FailoverTargetSelector getTargetSelector() { + if (this.targetSelector == null) { + this.targetSelector = new CircuitBreakerTargetSelector(threshold, timeout); + } + return this.targetSelector; + } + + public int getThreshold() { + return threshold; + } + + public long getTimeout() { + return timeout; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/ZestCircuitBreaker.java ---------------------------------------------------------------------- diff --git a/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/ZestCircuitBreaker.java b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/ZestCircuitBreaker.java new file mode 100644 index 0000000..a64936d --- /dev/null +++ b/rt/features/clustering/src/main/java/org/apache/cxf/clustering/circuitbreaker/ZestCircuitBreaker.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.clustering.circuitbreaker; + +import org.qi4j.library.circuitbreaker.CircuitBreaker; + +public class ZestCircuitBreaker extends CircuitBreaker + implements org.apache.cxf.clustering.circuitbreaker.CircuitBreaker { + + private final CircuitBreaker delegate; + + public ZestCircuitBreaker(final int threshold, final long timeout) { + delegate = new CircuitBreaker(threshold, timeout); + } + + @Override + public boolean allowRequest() { + return delegate.isOn(); + } + + @Override + public void markFailure(Throwable cause) { + delegate.throwable(cause); + } + + @Override + public void markSuccess() { + delegate.success(); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/systests/jaxrs/pom.xml ---------------------------------------------------------------------- diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml index b1f3573..33cd1ac 100644 --- a/systests/jaxrs/pom.xml +++ b/systests/jaxrs/pom.xml @@ -522,6 +522,11 @@ <artifactId>atmosphere-runtime</artifactId> <version>${cxf.atmosphere.version}</version> </dependency> + <dependency> + <groupId>org.qi4j.library</groupId> + <artifactId>org.qi4j.library.circuitbreaker</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/AbstractFailoverTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/AbstractFailoverTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/AbstractFailoverTest.java new file mode 100644 index 0000000..a3636cd --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/AbstractFailoverTest.java @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.failover; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.ProcessingException; +import javax.ws.rs.core.Response; + +import org.apache.cxf.clustering.FailoverFeature; +import org.apache.cxf.clustering.FailoverTargetSelector; +import org.apache.cxf.clustering.RandomStrategy; +import org.apache.cxf.clustering.RetryStrategy; +import org.apache.cxf.clustering.SequentialStrategy; +import org.apache.cxf.endpoint.ConduitSelector; +import org.apache.cxf.feature.Feature; +import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean; +import org.apache.cxf.jaxrs.client.WebClient; +import org.apache.cxf.systest.jaxrs.Book; +import org.apache.cxf.systest.jaxrs.BookStore; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; + +import org.junit.BeforeClass; +import org.junit.Test; + + +/** + * Tests failover within a static cluster. + */ +public abstract class AbstractFailoverTest extends AbstractBusClientServerTestBase { + public static final String NON_PORT = allocatePort(AbstractFailoverTest.class); + + @BeforeClass + public static void startServers() throws Exception { + assertTrue("server did not launch correctly", + launchServer(Server.class, true)); + boolean activeReplica1Started = false; + boolean activeReplica2Started = false; + for (int i = 0; i < 60; i++) { + if (!activeReplica1Started) { + activeReplica1Started = checkReplica(Server.ADDRESS2); + } + if (!activeReplica2Started) { + activeReplica2Started = checkReplica(Server.ADDRESS3); + } + if (activeReplica1Started && activeReplica2Started) { + break; + } + Thread.sleep(1000); + } + } + private static boolean checkReplica(String address) { + try { + Response r = WebClient.create(address).query("_wadl").get(); + return r.getStatus() == 200; + } catch (Exception ex) { + return false; + } + } + + @Test + public void testSequentialStrategy() throws Exception { + FailoverFeature feature = + getFeature(false, Server.ADDRESS2, Server.ADDRESS3); + strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, null, false, false, false); + } + + @Test + public void testSequentialStrategyWebClient() throws Exception { + FailoverFeature feature = + getFeature(false, Server.ADDRESS2, Server.ADDRESS3); + strategyTestWebClient(Server.ADDRESS1, feature, Server.ADDRESS2, null, false, false); + } + + @Test + public void testSequentialStrategyWith404() throws Exception { + FailoverFeature feature = getFeature(false, Server.ADDRESS3); + feature.getTargetSelector().setSupportNotAvailableErrorsOnly(true); + strategyTestWebClient(Server.ADDRESS2 + "/new", feature, Server.ADDRESS3, null, false, false); + } + + @Test + public void testSequentialStrategyWith406() throws Exception { + FailoverFeature feature = getFeature(false, Server.ADDRESS3); + feature.getTargetSelector().setSupportNotAvailableErrorsOnly(false); + strategyTestWebClientHttpError(Server.ADDRESS2, feature, Server.ADDRESS3, false); + } + + @Test + public void testSequentialStrategyWith406NoFailover() throws Exception { + FailoverFeature feature = getFeature(false, Server.ADDRESS3); + strategyTestWebClientHttpError(Server.ADDRESS2, feature, Server.ADDRESS3, true); + } + + @Test + public void testRandomStrategyWebClient() throws Exception { + FailoverFeature feature = + getFeature(true, Server.ADDRESS3, Server.ADDRESS2); + strategyTestWebClient(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, true); + } + + @Test + public void testRandomStrategy() throws Exception { + FailoverFeature feature = + getFeature(true, Server.ADDRESS2, Server.ADDRESS3); + strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, false, true, true); + } + + @Test + public void testRandomStrategy2() throws Exception { + FailoverFeature feature = + getFeature(true, Server.ADDRESS2, Server.ADDRESS3); + strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, false, true, false); + } + + @Test + public void testSequentialStrategyWithDiffBaseAddresses() throws Exception { + FailoverFeature feature = + getFeature(false, Server.ADDRESS3, null); + strategyTest(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, false, false); + } + + public void testSequentialStrategyWithDiffBaseAddresses2() throws Exception { + FailoverFeature feature = + getFeature(false, Server.ADDRESS3, null); + strategyTest(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, false, true); + } + + @Test(expected = InternalServerErrorException.class) + public void testSequentialStrategyWithServerException() throws Exception { + FailoverFeature feature = + getFeature(false, Server.ADDRESS2, Server.ADDRESS3); + strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, true, false, false); + } + + @Test(expected = ProcessingException.class) + public void testSequentialStrategyFailure() throws Exception { + FailoverFeature feature = + getFeature(false, "http://localhost:" + NON_PORT + "/non-existent"); + strategyTest(Server.ADDRESS1, feature, null, null, false, false, false); + } + + @Test + public void testSequentialStrategyWithRetries() throws Exception { + String address = "http://localhost:" + NON_PORT + "/non-existent"; + String address2 = "http://localhost:" + NON_PORT + "/non-existent2"; + + FailoverFeature feature = new FailoverFeature(); + List<String> alternateAddresses = new ArrayList<String>(); + alternateAddresses.add(address); + alternateAddresses.add(address2); + CustomRetryStrategy strategy = new CustomRetryStrategy(); + strategy.setMaxNumberOfRetries(5); + strategy.setAlternateAddresses(alternateAddresses); + feature.setStrategy(strategy); + + BookStore store = getBookStore(address, feature); + try { + store.getBook("1"); + fail("Exception expected"); + } catch (ProcessingException ex) { + assertEquals(10, strategy.getTotalCount()); + assertEquals(5, strategy.getAddressCount(address)); + assertEquals(5, strategy.getAddressCount(address2)); + } + } + + protected abstract FailoverFeature getFeature(boolean random, String ...address); + + + + protected BookStore getBookStore(String address, + FailoverFeature feature) throws Exception { + JAXRSClientFactoryBean bean = createBean(address, feature); + bean.setServiceClass(BookStore.class); + return bean.create(BookStore.class); + } + + protected WebClient getWebClient(String address, + FailoverFeature feature) throws Exception { + JAXRSClientFactoryBean bean = createBean(address, feature); + + return bean.createWebClient(); + } + + protected JAXRSClientFactoryBean createBean(String address, + FailoverFeature feature) { + JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean(); + bean.setAddress(address); + List<Feature> features = new ArrayList<Feature>(); + features.add(feature); + bean.setFeatures(features); + + return bean; + } + + protected void strategyTest(String inactiveReplica, + FailoverFeature feature, + String activeReplica1, + String activeReplica2, + boolean expectServerException, + boolean expectRandom, + boolean singleProxy) throws Exception { + boolean randomized = false; + String prevEndpoint = null; + BookStore bookStore = null; + + if (singleProxy) { + bookStore = getBookStore(inactiveReplica, feature); + } + + for (int i = 0; i < 20; i++) { + if (!singleProxy) { + feature.getTargetSelector().close(); + bookStore = getBookStore(inactiveReplica, feature); + } + verifyStrategy(bookStore, expectRandom + ? RandomStrategy.class + : SequentialStrategy.class); + Exception ex = null; + try { + if (expectServerException) { + bookStore.getBook("9999"); + fail("Exception expected"); + } else { + Book book = bookStore.echoBookElementJson(new Book("CXF", 123)); + assertNotNull("expected non-null response", book); + assertEquals("unexpected id", 123L, book.getId()); + } + } catch (Exception error) { + if (!expectServerException) { + //String currEndpoint = getCurrentEndpointAddress(bookStore); + //assertTrue(currEndpoint.equals(inactiveReplica)); + throw error; + } + ex = error; + } + String currEndpoint = getCurrentEndpointAddress(bookStore); + assertFalse(currEndpoint.equals(inactiveReplica)); + if (expectRandom) { + assertTrue(currEndpoint.equals(activeReplica1) || currEndpoint.equals(activeReplica2)); + } else { + assertEquals(activeReplica1, currEndpoint); + } + if (expectServerException) { + assertNotNull(ex); + throw ex; + } + + if (!(prevEndpoint == null || currEndpoint.equals(prevEndpoint))) { + randomized = true; + } + prevEndpoint = currEndpoint; + } + if (!singleProxy) { + assertEquals("unexpected random/sequential distribution of failovers", + expectRandom, + randomized); + } + } + + protected void strategyTestWebClient(String inactiveReplica, + FailoverFeature feature, + String activeReplica1, + String activeReplica2, + boolean expectServerException, + boolean expectRandom) throws Exception { + boolean randomized = false; + String prevEndpoint = null; + for (int i = 0; i < 20; i++) { + feature.getTargetSelector().close(); + WebClient bookStore = getWebClient(inactiveReplica, feature); + verifyStrategy(bookStore, expectRandom + ? RandomStrategy.class + : SequentialStrategy.class); + String bookId = expectServerException ? "9999" : "123"; + bookStore.path("bookstore/books").path(bookId); + Exception ex = null; + try { + Book book = bookStore.get(Book.class); + assertNotNull("expected non-null response", book); + assertEquals("unexpected id", 123L, book.getId()); + } catch (Exception error) { + if (!expectServerException) { + throw error; + } + ex = error; + } + String currEndpoint = getCurrentEndpointAddress(bookStore); + assertFalse(currEndpoint.equals(inactiveReplica)); + if (expectRandom) { + assertTrue(currEndpoint.equals(activeReplica1) || currEndpoint.equals(activeReplica2)); + } else { + assertTrue(currEndpoint.equals(activeReplica1)); + } + if (expectServerException) { + assertNotNull(ex); + throw ex; + } + + if (!(prevEndpoint == null || currEndpoint.equals(prevEndpoint))) { + randomized = true; + } + prevEndpoint = currEndpoint; + } + assertEquals("unexpected random/sequential distribution of failovers", + expectRandom, + randomized); + } + + protected void strategyTestWebClientHttpError(String currentReplica, + FailoverFeature feature, + String newReplica, + boolean notAvailableOnly) throws Exception { + WebClient bookStore = getWebClient(currentReplica, feature); + verifyStrategy(bookStore, SequentialStrategy.class); + bookStore.path("bookstore/webappexceptionXML"); + Response r = bookStore.get(); + assertEquals(406, r.getStatus()); + String currEndpoint = getCurrentEndpointAddress(bookStore); + if (notAvailableOnly) { + assertTrue(currEndpoint.equals(currentReplica)); + } else { + assertTrue(currEndpoint.equals(newReplica)); + } + } + + + protected String getCurrentEndpointAddress(Object client) { + String currentBaseURI = WebClient.client(client).getBaseURI().toString(); + String currentURI = WebClient.client(client).getCurrentURI().toString(); + assertTrue(currentURI.startsWith(currentBaseURI)); + return currentBaseURI; + } + + + protected void verifyStrategy(Object proxy, Class<?> clz) { + ConduitSelector conduitSelector = + WebClient.getConfig(proxy).getConduitSelector(); + if (conduitSelector instanceof FailoverTargetSelector) { + Object strategy = + ((FailoverTargetSelector)conduitSelector).getStrategy(); + assertTrue("unexpected strategy", clz.isInstance(strategy)); + } else { + fail("unexpected conduit selector: " + conduitSelector); + } + } + + private static class CustomRetryStrategy extends RetryStrategy { + private int totalCount; + private Map<String, Integer> map = new HashMap<String, Integer>(); + @Override + protected <T> T getNextAlternate(List<T> alternates) { + totalCount++; + T next = super.getNextAlternate(alternates); + String address = (String)next; + Integer count = map.get(address); + if (count == null) { + count = 0; + } + count++; + map.put(address, count); + return next; + } + + public int getTotalCount() { + return totalCount - 2; + } + + public int getAddressCount(String address) { + return map.get(address) - 1; + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java new file mode 100644 index 0000000..ec00111 --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/CircuitBreakerFailoverTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.failover; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cxf.clustering.FailoverFeature; +import org.apache.cxf.clustering.RandomStrategy; +import org.apache.cxf.clustering.SequentialStrategy; +import org.apache.cxf.clustering.circuitbreaker.CircuitBreakerFailoverFeature; + +/** + * Tests failover within a static cluster. + */ +public class CircuitBreakerFailoverTest extends AbstractFailoverTest { + public static final String NON_PORT = allocatePort(CircuitBreakerFailoverTest.class); + + @Override + protected FailoverFeature getFeature(boolean random, String ...address) { + CircuitBreakerFailoverFeature feature = new CircuitBreakerFailoverFeature(); + List<String> alternateAddresses = new ArrayList<String>(); + for (String s : address) { + alternateAddresses.add(s); + } + if (!random) { + SequentialStrategy strategy = new SequentialStrategy(); + strategy.setAlternateAddresses(alternateAddresses); + feature.setStrategy(strategy); + } else { + RandomStrategy strategy = new RandomStrategy(); + strategy.setAlternateAddresses(alternateAddresses); + feature.setStrategy(strategy); + } + + return feature; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/a261507e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/FailoverTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/FailoverTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/FailoverTest.java index 95bfacd..f27d1a8 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/FailoverTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/failover/FailoverTest.java @@ -20,191 +20,41 @@ package org.apache.cxf.systest.jaxrs.failover; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; - -import javax.ws.rs.InternalServerErrorException; -import javax.ws.rs.ProcessingException; -import javax.ws.rs.core.Response; import org.apache.cxf.clustering.FailoverFeature; import org.apache.cxf.clustering.FailoverTargetSelector; import org.apache.cxf.clustering.RandomStrategy; -import org.apache.cxf.clustering.RetryStrategy; import org.apache.cxf.clustering.SequentialStrategy; -import org.apache.cxf.endpoint.ConduitSelector; -import org.apache.cxf.feature.Feature; -import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean; -import org.apache.cxf.jaxrs.client.WebClient; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; import org.apache.cxf.service.model.EndpointInfo; -import org.apache.cxf.systest.jaxrs.Book; -import org.apache.cxf.systest.jaxrs.BookStore; -import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; -import org.junit.BeforeClass; import org.junit.Test; /** * Tests failover within a static cluster. */ -public class FailoverTest extends AbstractBusClientServerTestBase { - public static final String NON_PORT = allocatePort(FailoverTest.class); - - @BeforeClass - public static void startServers() throws Exception { - assertTrue("server did not launch correctly", - launchServer(Server.class, true)); - boolean activeReplica1Started = false; - boolean activeReplica2Started = false; - for (int i = 0; i < 60; i++) { - if (!activeReplica1Started) { - activeReplica1Started = checkReplica(Server.ADDRESS2); - } - if (!activeReplica2Started) { - activeReplica2Started = checkReplica(Server.ADDRESS3); - } - if (activeReplica1Started && activeReplica2Started) { - break; - } - Thread.sleep(1000); - } - } - private static boolean checkReplica(String address) { - try { - Response r = WebClient.create(address).query("_wadl").get(); - return r.getStatus() == 200; - } catch (Exception ex) { - return false; - } - } - - @Test - public void testSequentialStrategy() throws Exception { - FailoverFeature feature = - getFeature(false, false, Server.ADDRESS2, Server.ADDRESS3); - strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, null, false, false, false); - } - +public class FailoverTest extends AbstractFailoverTest { @Test public void testSequentialStrategyWithCustomTargetSelector() throws Exception { - FailoverFeature feature = - getFeature(true, false, Server.ADDRESS2, Server.ADDRESS3); + FailoverFeature feature = getCustomFeature(true, false, Server.ADDRESS2, Server.ADDRESS3); strategyTest("resolver://info", feature, Server.ADDRESS3, null, false, false, false); } @Test public void testSequentialStrategyWithCustomTargetSelector2() throws Exception { - FailoverFeature feature = - getFeature(true, false, Server.ADDRESS2, Server.ADDRESS3); + FailoverFeature feature = getCustomFeature(true, false, Server.ADDRESS2, Server.ADDRESS3); strategyTest("resolver://info", feature, Server.ADDRESS3, null, false, false, true); } - @Test - public void testSequentialStrategyWebClient() throws Exception { - FailoverFeature feature = - getFeature(false, false, Server.ADDRESS2, Server.ADDRESS3); - strategyTestWebClient(Server.ADDRESS1, feature, Server.ADDRESS2, null, false, false); - } - - @Test - public void testSequentialStrategyWith404() throws Exception { - FailoverFeature feature = getFeature(false, false, Server.ADDRESS3); - feature.getTargetSelector().setSupportNotAvailableErrorsOnly(true); - strategyTestWebClient(Server.ADDRESS2 + "/new", feature, Server.ADDRESS3, null, false, false); - } - - @Test - public void testSequentialStrategyWith406() throws Exception { - FailoverFeature feature = getFeature(false, false, Server.ADDRESS3); - feature.getTargetSelector().setSupportNotAvailableErrorsOnly(false); - strategyTestWebClientHttpError(Server.ADDRESS2, feature, Server.ADDRESS3, false); - } - - @Test - public void testSequentialStrategyWith406NoFailover() throws Exception { - FailoverFeature feature = getFeature(false, false, Server.ADDRESS3); - strategyTestWebClientHttpError(Server.ADDRESS2, feature, Server.ADDRESS3, true); - } - - @Test - public void testRandomStrategyWebClient() throws Exception { - FailoverFeature feature = - getFeature(false, true, Server.ADDRESS3, Server.ADDRESS2); - strategyTestWebClient(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, true); - } - - @Test - public void testRandomStrategy() throws Exception { - FailoverFeature feature = - getFeature(false, true, Server.ADDRESS2, Server.ADDRESS3); - strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, false, true, true); - } - - @Test - public void testRandomStrategy2() throws Exception { - FailoverFeature feature = - getFeature(false, true, Server.ADDRESS2, Server.ADDRESS3); - strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, false, true, false); - } - - @Test - public void testSequentialStrategyWithDiffBaseAddresses() throws Exception { - FailoverFeature feature = - getFeature(false, false, Server.ADDRESS3, null); - strategyTest(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, false, false); - } - - public void testSequentialStrategyWithDiffBaseAddresses2() throws Exception { - FailoverFeature feature = - getFeature(false, false, Server.ADDRESS3, null); - strategyTest(Server.ADDRESS1, feature, Server.ADDRESS3, Server.ADDRESS2, false, false, true); - } - - @Test(expected = InternalServerErrorException.class) - public void testSequentialStrategyWithServerException() throws Exception { - FailoverFeature feature = - getFeature(false, false, Server.ADDRESS2, Server.ADDRESS3); - strategyTest(Server.ADDRESS1, feature, Server.ADDRESS2, Server.ADDRESS3, true, false, false); - } - - @Test(expected = ProcessingException.class) - public void testSequentialStrategyFailure() throws Exception { - FailoverFeature feature = - getFeature(false, false, "http://localhost:" + NON_PORT + "/non-existent"); - strategyTest(Server.ADDRESS1, feature, null, null, false, false, false); - } - - @Test - public void testSequentialStrategyWithRetries() throws Exception { - String address = "http://localhost:" + NON_PORT + "/non-existent"; - String address2 = "http://localhost:" + NON_PORT + "/non-existent2"; - - FailoverFeature feature = new FailoverFeature(); - List<String> alternateAddresses = new ArrayList<String>(); - alternateAddresses.add(address); - alternateAddresses.add(address2); - CustomRetryStrategy strategy = new CustomRetryStrategy(); - strategy.setMaxNumberOfRetries(5); - strategy.setAlternateAddresses(alternateAddresses); - feature.setStrategy(strategy); - - BookStore store = getBookStore(address, feature); - try { - store.getBook("1"); - fail("Exception expected"); - } catch (ProcessingException ex) { - assertEquals(10, strategy.getTotalCount()); - assertEquals(5, strategy.getAddressCount(address)); - assertEquals(5, strategy.getAddressCount(address2)); - } + @Override + protected FailoverFeature getFeature(boolean random, String... address) { + return getCustomFeature(false, random, address); } - - private FailoverFeature getFeature(boolean custom, boolean random, String ...address) { + private FailoverFeature getCustomFeature(boolean custom, boolean random, String ...address) { FailoverFeature feature = new FailoverFeature(); List<String> alternateAddresses = new ArrayList<String>(); for (String s : address) { @@ -226,184 +76,7 @@ public class FailoverTest extends AbstractBusClientServerTestBase { return feature; } - - protected BookStore getBookStore(String address, - FailoverFeature feature) throws Exception { - JAXRSClientFactoryBean bean = createBean(address, feature); - bean.setServiceClass(BookStore.class); - return bean.create(BookStore.class); - } - - protected WebClient getWebClient(String address, - FailoverFeature feature) throws Exception { - JAXRSClientFactoryBean bean = createBean(address, feature); - - return bean.createWebClient(); - } - - protected JAXRSClientFactoryBean createBean(String address, - FailoverFeature feature) { - JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean(); - bean.setAddress(address); - List<Feature> features = new ArrayList<Feature>(); - features.add(feature); - bean.setFeatures(features); - - return bean; - } - - protected void strategyTest(String inactiveReplica, - FailoverFeature feature, - String activeReplica1, - String activeReplica2, - boolean expectServerException, - boolean expectRandom, - boolean singleProxy) throws Exception { - boolean randomized = false; - String prevEndpoint = null; - BookStore bookStore = null; - - if (singleProxy) { - bookStore = getBookStore(inactiveReplica, feature); - } - - for (int i = 0; i < 20; i++) { - if (!singleProxy) { - feature.getTargetSelector().close(); - bookStore = getBookStore(inactiveReplica, feature); - } - verifyStrategy(bookStore, expectRandom - ? RandomStrategy.class - : SequentialStrategy.class); - Exception ex = null; - try { - if (expectServerException) { - bookStore.getBook("9999"); - fail("Exception expected"); - } else { - Book book = bookStore.echoBookElementJson(new Book("CXF", 123)); - assertNotNull("expected non-null response", book); - assertEquals("unexpected id", 123L, book.getId()); - } - } catch (Exception error) { - if (!expectServerException) { - //String currEndpoint = getCurrentEndpointAddress(bookStore); - //assertTrue(currEndpoint.equals(inactiveReplica)); - throw error; - } - ex = error; - } - String currEndpoint = getCurrentEndpointAddress(bookStore); - assertFalse(currEndpoint.equals(inactiveReplica)); - if (expectRandom) { - assertTrue(currEndpoint.equals(activeReplica1) || currEndpoint.equals(activeReplica2)); - } else { - assertEquals(activeReplica1, currEndpoint); - } - if (expectServerException) { - assertNotNull(ex); - throw ex; - } - - if (!(prevEndpoint == null || currEndpoint.equals(prevEndpoint))) { - randomized = true; - } - prevEndpoint = currEndpoint; - } - if (!singleProxy) { - assertEquals("unexpected random/sequential distribution of failovers", - expectRandom, - randomized); - } - } - - protected void strategyTestWebClient(String inactiveReplica, - FailoverFeature feature, - String activeReplica1, - String activeReplica2, - boolean expectServerException, - boolean expectRandom) throws Exception { - boolean randomized = false; - String prevEndpoint = null; - for (int i = 0; i < 20; i++) { - feature.getTargetSelector().close(); - WebClient bookStore = getWebClient(inactiveReplica, feature); - verifyStrategy(bookStore, expectRandom - ? RandomStrategy.class - : SequentialStrategy.class); - String bookId = expectServerException ? "9999" : "123"; - bookStore.path("bookstore/books").path(bookId); - Exception ex = null; - try { - Book book = bookStore.get(Book.class); - assertNotNull("expected non-null response", book); - assertEquals("unexpected id", 123L, book.getId()); - } catch (Exception error) { - if (!expectServerException) { - throw error; - } - ex = error; - } - String currEndpoint = getCurrentEndpointAddress(bookStore); - assertFalse(currEndpoint.equals(inactiveReplica)); - if (expectRandom) { - assertTrue(currEndpoint.equals(activeReplica1) || currEndpoint.equals(activeReplica2)); - } else { - assertTrue(currEndpoint.equals(activeReplica1)); - } - if (expectServerException) { - assertNotNull(ex); - throw ex; - } - - if (!(prevEndpoint == null || currEndpoint.equals(prevEndpoint))) { - randomized = true; - } - prevEndpoint = currEndpoint; - } - assertEquals("unexpected random/sequential distribution of failovers", - expectRandom, - randomized); - } - - protected void strategyTestWebClientHttpError(String currentReplica, - FailoverFeature feature, - String newReplica, - boolean notAvailableOnly) throws Exception { - WebClient bookStore = getWebClient(currentReplica, feature); - verifyStrategy(bookStore, SequentialStrategy.class); - bookStore.path("bookstore/webappexceptionXML"); - Response r = bookStore.get(); - assertEquals(406, r.getStatus()); - String currEndpoint = getCurrentEndpointAddress(bookStore); - if (notAvailableOnly) { - assertTrue(currEndpoint.equals(currentReplica)); - } else { - assertTrue(currEndpoint.equals(newReplica)); - } - } - - protected String getCurrentEndpointAddress(Object client) { - String currentBaseURI = WebClient.client(client).getBaseURI().toString(); - String currentURI = WebClient.client(client).getCurrentURI().toString(); - assertTrue(currentURI.startsWith(currentBaseURI)); - return currentBaseURI; - } - - - protected void verifyStrategy(Object proxy, Class<?> clz) { - ConduitSelector conduitSelector = - WebClient.getConfig(proxy).getConduitSelector(); - if (conduitSelector instanceof FailoverTargetSelector) { - Object strategy = - ((FailoverTargetSelector)conduitSelector).getStrategy(); - assertTrue("unexpected strategy", clz.isInstance(strategy)); - } else { - fail("unexpected conduit selector: " + conduitSelector); - } - } - private static class ReplaceInitialAddressSelector extends FailoverTargetSelector { @Override public synchronized void prepare(Message message) { @@ -414,34 +87,8 @@ public class FailoverTest extends AbstractBusClientServerTestBase { } @Override - protected boolean requiresFailover(Exchange exchange) { + protected boolean requiresFailover(Exchange exchange, Exception ex) { return false; } } - - private static class CustomRetryStrategy extends RetryStrategy { - private int totalCount; - private Map<String, Integer> map = new HashMap<String, Integer>(); - @Override - protected <T> T getNextAlternate(List<T> alternates) { - totalCount++; - T next = super.getNextAlternate(alternates); - String address = (String)next; - Integer count = map.get(address); - if (count == null) { - count = 0; - } - count++; - map.put(address, count); - return next; - } - - public int getTotalCount() { - return totalCount - 2; - } - - public int getAddressCount(String address) { - return map.get(address) - 1; - } - } }