This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push: new 67f5afa0f4 [AMQ-9259] Remove activemq-partition and zookeeper test dependency new 53f9390c41 Merge pull request #1009 from mattrpav/AMQ-9259 67f5afa0f4 is described below commit 67f5afa0f44a0c21992f81e5262970a8e14e172a Author: Matt Pavlovich <m...@hyte.io> AuthorDate: Mon May 22 10:59:41 2023 -0500 [AMQ-9259] Remove activemq-partition and zookeeper test dependency --- activemq-osgi/pom.xml | 5 - activemq-partition/pom.xml | 149 ------ .../apache/activemq/partition/PartitionBroker.java | 367 ------------- .../activemq/partition/PartitionBrokerPlugin.java | 66 --- .../org/apache/activemq/partition/ZKClient.java | 596 --------------------- .../partition/ZooKeeperPartitionBroker.java | 124 ----- .../partition/ZooKeeperPartitionBrokerPlugin.java | 68 --- .../activemq/partition/dto/Partitioning.java | 161 ------ .../org/apache/activemq/partition/dto/Target.java | 59 -- .../activemq/partition/PartitionBrokerTest.java | 251 --------- .../partition/ZooKeeperPartitionBrokerTest.java | 97 ---- activemq-spring/pom.xml | 16 - activemq-unit-tests/pom.xml | 4 - .../partition/SpringPartitionBrokerTest.java | 53 -- .../src/test/resources/activemq-partition.xml | 58 -- assembly/pom.xml | 17 - assembly/src/main/descriptors/common-bin.xml | 1 - pom.xml | 67 --- 18 files changed, 2159 deletions(-) diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml index 03d29cd7a0..7b99b34119 100644 --- a/activemq-osgi/pom.xml +++ b/activemq-osgi/pom.xml @@ -68,10 +68,6 @@ <groupId>${project.groupId}</groupId> <artifactId>activemq-http</artifactId> </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>activemq-partition</artifactId> - </dependency> <!-- Additional Dependencies. --> <dependency> @@ -187,7 +183,6 @@ org.codehaus.jettison*;resolution:=optional, org.jasypt*;resolution:=optional, org.eclipse.jetty*;resolution:=optional;version="[9.0,10)", - org.apache.zookeeper*;resolution:=optional, org.fusesource.hawtjni*;resolution:=optional, org.springframework.jms*;version="[4,6)";resolution:=optional, org.springframework.transaction*;version="[4,6)";resolution:=optional, diff --git a/activemq-partition/pom.xml b/activemq-partition/pom.xml deleted file mode 100644 index 901c869d3a..0000000000 --- a/activemq-partition/pom.xml +++ /dev/null @@ -1,149 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-parent</artifactId> - <version>5.19.0-SNAPSHOT</version> - </parent> - - <artifactId>activemq-partition</artifactId> - <packaging>jar</packaging> - - <name>ActiveMQ :: Partition Management</name> - <description>Used to partition clients over a cluster of brokers</description> - - <dependencies> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-broker</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>org.linkedin</groupId> - <artifactId>org.linkedin.zookeeper-impl</artifactId> - </dependency> - <dependency> - <groupId>org.linkedin</groupId> - <artifactId>org.linkedin.util-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </dependency> - - <!-- For Optional Snappy Compression --> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - - <!-- Testing Dependencies --> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-slf4j2-impl</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-broker</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - </plugins> - </build> - <profiles> - <profile> - <id>activemq.tests-sanity</id> - <activation> - <property> - <name>activemq.tests</name> - <value>smoke</value> - </property> - </activation> - <build> - <plugins> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <includes> - <include>**/PartitionBrokerTest.*</include> - </includes> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>activemq.tests-autoTransport</id> - <activation> - <property> - <name>activemq.tests</name> - <value>autoTransport</value> - </property> - </activation> - <build> - <plugins> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <excludes> - <exclude>**</exclude> - </excludes> - </configuration> - </plugin> - </plugins> - </build> - </profile> - - </profiles> -</project> diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java deleted file mode 100644 index 9362e64b26..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java +++ /dev/null @@ -1,367 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerFilter; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.TransportConnection; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ConnectionControl; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.partition.dto.Partitioning; -import org.apache.activemq.partition.dto.Target; -import org.apache.activemq.state.ConsumerState; -import org.apache.activemq.state.SessionState; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.util.LRUCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A BrokerFilter which partitions client connections over a cluster of brokers. - * - * It can use a client identifier like client id, authenticated user name, source ip - * address or even destination being used by the connection to figure out which - * is the best broker in the cluster that the connection should be using and then - * redirects failover clients to that broker. - */ -public class PartitionBroker extends BrokerFilter { - - protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class); - protected final PartitionBrokerPlugin plugin; - protected boolean reloadConfigOnPoll = true; - - public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) { - super(broker); - this.plugin = plugin; - } - - @Override - public void start() throws Exception { - super.start(); - getExecutor().execute(new Runnable() { - @Override - public void run() { - Thread.currentThread().setName("Partition Monitor"); - onMonitorStart(); - try { - runPartitionMonitor(); - } catch (Exception e) { - onMonitorStop(); - } - } - }); - } - - protected void onMonitorStart() { - } - protected void onMonitorStop() { - } - - protected void runPartitionMonitor() { - while( !isStopped() ) { - try { - monitorWait(); - } catch (InterruptedException e) { - break; - } - - if(reloadConfigOnPoll) { - try { - reloadConfiguration(); - } catch (Exception e) { - continue; - } - } - - for( ConnectionMonitor monitor: monitors.values()) { - checkTarget(monitor); - } - } - } - - protected void monitorWait() throws InterruptedException { - synchronized (this) { - this.wait(1000); - } - } - - protected void monitorWakeup() { - synchronized (this) { - this.notifyAll(); - } - } - - protected void reloadConfiguration() throws Exception { - } - - protected void checkTarget(ConnectionMonitor monitor) { - - // can we find a preferred target for the connection? - Target targetDTO = pickBestBroker(monitor); - if( targetDTO == null || targetDTO.ids==null) { - LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId()); - return; - } - - // Are we one the the targets? - if( targetDTO.ids.contains(getBrokerName()) ) { - LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId()); - return; - } - - // Then we need to move the connection over. - String connectionString = getConnectionString(targetDTO.ids); - if( connectionString==null ) { - LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids); - return; - } - - LOG.info("Redirecting connection to: " + connectionString); - TransportConnection connection = (TransportConnection)monitor.context.getConnection(); - ConnectionControl cc = new ConnectionControl(); - cc.setConnectedBrokers(connectionString); - cc.setRebalanceConnection(true); - connection.dispatchAsync(cc); - } - - protected String getConnectionString(HashSet<String> ids) { - StringBuilder rc = new StringBuilder(); - for (String id : ids) { - String url = plugin.getBrokerURL(this, id); - if( url!=null ) { - if( rc.length()!=0 ) { - rc.append(','); - } - rc.append(url); - } - } - if( rc.length()==0 ) - return null; - return rc.toString(); - } - - static private class Score { - int value; - } - - protected Target pickBestBroker(ConnectionMonitor monitor) { - - if( getConfig() ==null ) - return null; - - if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) { - TransportConnection connection = (TransportConnection)monitor.context.getConnection(); - Transport transport = connection.getTransport(); - Socket socket = transport.narrow(Socket.class); - if( socket !=null ) { - SocketAddress address = socket.getRemoteSocketAddress(); - if( address instanceof InetSocketAddress) { - String ip = ((InetSocketAddress) address).getAddress().getHostAddress(); - Target targetDTO = getConfig().bySourceIp.get(ip); - if( targetDTO!=null ) { - return targetDTO; - } - } - } - } - - if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) { - String userName = monitor.context.getUserName(); - if( userName !=null ) { - Target targetDTO = getConfig().byUserName.get(userName); - if( targetDTO!=null ) { - return targetDTO; - } - } - } - - if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) { - String clientId = monitor.context.getClientId(); - if( clientId!=null ) { - Target targetDTO = getConfig().byClientId.get(clientId); - if( targetDTO!=null ) { - return targetDTO; - } - } - } - - if( - (getConfig().byQueue !=null && !getConfig().byQueue.isEmpty()) - || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty()) - ) { - - // Collect the destinations the connection is consuming from... - HashSet<ActiveMQDestination> dests = new HashSet<ActiveMQDestination>(); - for (SessionState session : monitor.context.getConnectionState().getSessionStates()) { - for (ConsumerState consumer : session.getConsumerStates()) { - ActiveMQDestination destination = consumer.getInfo().getDestination(); - if( destination.isComposite() ) { - dests.addAll(Arrays.asList(destination.getCompositeDestinations())); - } else { - dests.addAll(Collections.singletonList(destination)); - } - } - } - - // Group them by the partitioning target for the destinations and score them.. - HashMap<Target, Score> targetScores = new HashMap<Target, Score>(); - for (ActiveMQDestination dest : dests) { - Target target = getTarget(dest); - if( target!=null ) { - Score score = targetScores.get(target); - if( score == null ) { - score = new Score(); - targetScores.put(target, score); - } - score.value++; - } - } - - // The target with largest score wins.. - if (!targetScores.isEmpty()) { - Target bestTarget = null; - int bestScore = 0; - for (Map.Entry<Target, Score> entry : targetScores.entrySet()) { - if (entry.getValue().value > bestScore) { - bestTarget = entry.getKey(); - bestScore = entry.getValue().value; - } - } - return bestTarget; - } - - // If we get here is because there were no consumers, or the destinations for those - // consumers did not have an assigned destination.. So partition based on producer - // usage. - Target best = monitor.findBestProducerTarget(this); - if( best!=null ) { - return best; - } - } - return null; - } - - protected Target getTarget(ActiveMQDestination dest) { - Partitioning config = getConfig(); - if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) { - return config.byQueue.get(dest.getPhysicalName()); - } else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) { - return config.byTopic.get(dest.getPhysicalName()); - } - return null; - } - - protected final ConcurrentMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>(); - - @Override - public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { - if( info.isFaultTolerant() ) { - ConnectionMonitor monitor = new ConnectionMonitor(context); - monitors.put(info.getConnectionId(), monitor); - super.addConnection(context, info); - checkTarget(monitor); - } else { - super.addConnection(context, info); - } - } - - @Override - public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { - super.removeConnection(context, info, error); - if( info.isFaultTolerant() ) { - monitors.remove(info.getConnectionId()); - } - } - - @Override - public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { - ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId()); - if( monitor!=null ) { - monitor.onSend(producerExchange, messageSend); - } - } - - protected Partitioning getConfig() { - return plugin.getConfig(); - } - - - static class Traffic { - long messages; - long bytes; - } - - static class ConnectionMonitor { - - final ConnectionContext context; - LRUCache<ActiveMQDestination, Traffic> trafficPerDestination = new LRUCache<ActiveMQDestination, Traffic>(); - - public ConnectionMonitor(ConnectionContext context) { - this.context = context; - } - - synchronized public Target findBestProducerTarget(PartitionBroker broker) { - Target best = null; - long bestSize = 0 ; - for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) { - Traffic t = entry.getValue(); - // Once we get enough messages... - if( t.messages < broker.plugin.getMinTransferCount()) { - continue; - } - if( t.bytes > bestSize) { - bestSize = t.bytes; - Target target = broker.getTarget(entry.getKey()); - if( target!=null ) { - best = target; - } - } - } - return best; - } - - synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) { - ActiveMQDestination dest = message.getDestination(); - Traffic traffic = trafficPerDestination.get(dest); - if( traffic == null ) { - traffic = new Traffic(); - trafficPerDestination.put(dest, traffic); - } - traffic.messages += 1; - traffic.bytes += message.getSize(); - } - - - } - -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java deleted file mode 100644 index 418f564cab..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.partition.dto.Partitioning; - -import java.io.IOException; - -/** - * A BrokerPlugin which partitions client connections over a cluster of brokers. - * - * @org.apache.xbean.XBean element="partitionBrokerPlugin" - */ -public class PartitionBrokerPlugin implements BrokerPlugin { - - protected int minTransferCount; - protected Partitioning config; - - @Override - public Broker installPlugin(Broker broker) throws Exception { - return new PartitionBroker(broker, this); - } - - public int getMinTransferCount() { - return minTransferCount; - } - - public void setMinTransferCount(int minTransferCount) { - this.minTransferCount = minTransferCount; - } - - public Partitioning getConfig() { - return config; - } - - public void setConfig(Partitioning config) { - this.config = config; - } - - public void setConfigAsJson(String config) throws IOException { - this.config = Partitioning.MAPPER.readValue(config, Partitioning.class); - } - - public String getBrokerURL(PartitionBroker partitionBroker, String id) { - if( config!=null && config.brokers!=null ) { - return config.brokers.get(id); - } - return null; - } -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java deleted file mode 100644 index 2baec62762..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java +++ /dev/null @@ -1,596 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; -import org.apache.zookeeper.data.Stat; -import org.linkedin.util.clock.Clock; -import org.linkedin.util.clock.SystemClock; -import org.linkedin.util.clock.Timespan; -import org.linkedin.util.concurrent.ConcurrentUtils; -import org.linkedin.util.io.PathUtils; -import org.linkedin.zookeeper.client.*; -import org.slf4j.Logger; - -import java.io.UnsupportedEncodingException; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -public class ZKClient extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher { - - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class); - - private Map<String, String> acls; - private String password; - - public void start() throws Exception { - // Grab the lock to make sure that the registration of the ManagedService - // won't be updated immediately but that the initial update will happen first - synchronized (_lock) { - _stateChangeDispatcher.setDaemon(true); - _stateChangeDispatcher.start(); - doStart(); - } - } - - public void setACLs(Map<String, String> acls) { - this.acls = acls; - } - - public void setPassword(String password) { - this.password = password; - } - - protected void doStart() throws UnsupportedEncodingException { - connect(); - } - - @Override - public void close() { - if (_stateChangeDispatcher != null) { - _stateChangeDispatcher.end(); - try { - _stateChangeDispatcher.join(1000); - } catch (Exception e) { - LOG.debug("ignored exception", e); - } - } - synchronized(_lock) { - if (_zk != null) { - try { - changeState(State.NONE); - _zk.close(); - Thread th = getSendThread(); - if (th != null) { - th.join(1000); - } - _zk = null; - } catch (Exception e) { - LOG.debug("ignored exception", e); - } - } - } - } - - protected Thread getSendThread() { - try { - return (Thread) getField(_zk, "_zk", "cnxn", "sendThread"); - } catch (Throwable e) { - return null; - } - } - - protected Object getField(Object obj, String... names) throws Exception { - for (String name : names) { - obj = getField(obj, name); - } - return obj; - } - - protected Object getField(Object obj, String name) throws Exception { - Class clazz = obj.getClass(); - while (clazz != null) { - for (Field f : clazz.getDeclaredFields()) { - if (f.getName().equals(name)) { - f.setAccessible(true); - return f.get(obj); - } - } - } - throw new NoSuchFieldError(name); - } - - protected void changeState(State newState) { - synchronized (_lock) { - State oldState = _state; - if (oldState != newState) { - _stateChangeDispatcher.addEvent(oldState, newState); - _state = newState; - _lock.notifyAll(); - } - } - } - - public void testGenerateConnectionLoss() throws Exception { - waitForConnected(); - Object clientCnxnSocket = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket"); - callMethod(clientCnxnSocket, "testableCloseSocket"); - } - - protected Object callMethod(Object obj, String name, Object... args) throws Exception { - Class clazz = obj.getClass(); - while (clazz != null) { - for (Method m : clazz.getDeclaredMethods()) { - if (m.getName().equals(name)) { - m.setAccessible(true); - return m.invoke(obj, args); - } - } - } - throw new NoSuchMethodError(name); - } - - protected void tryConnect() { - synchronized (_lock) { - try { - connect(); - } catch (Throwable e) { - LOG.warn("Error while restarting:", e); - if (_expiredSessionRecovery == null) { - _expiredSessionRecovery = new ExpiredSessionRecovery(); - _expiredSessionRecovery.setDaemon(true); - _expiredSessionRecovery.start(); - } - } - } - } - - public void connect() throws UnsupportedEncodingException { - synchronized (_lock) { - changeState(State.CONNECTING); - _zk = _factory.createZooKeeper(this); - if (password != null) { - _zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8")); - } - } - } - - public void process(WatchedEvent event) { - if (event.getState() != null) { - LOG.debug("event: {}", event.getState()); - synchronized (_lock) { - switch(event.getState()) { - case SyncConnected: - changeState(State.CONNECTED); - break; - case Disconnected: - if (_state != State.NONE) { - changeState(State.RECONNECTING); - } - break; - case Expired: - // when expired, the zookeeper object is invalid and we need to recreate a new one - _zk = null; - LOG.warn("Expiration detected: trying to restart..."); - tryConnect(); - break; - default: - LOG.warn("Unsupported event state: {}", event.getState()); - } - } - } - } - - @Override - protected IZooKeeper getZk() { - State state = _state; - if (state == State.NONE) { - throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one."); - } else if (state != State.CONNECTING) { - try { - waitForConnected(); - } catch (Exception e) { - throw new IllegalStateException("Error waiting for ZooKeeper connection", e); - } - } - IZooKeeper zk = _zk; - if (zk == null) { - throw new IllegalStateException("No ZooKeeper connection available"); - } - return zk; - } - - public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException { - waitForState(State.CONNECTED, timeout); - } - - public void waitForConnected() throws InterruptedException, TimeoutException { - waitForConnected(null); - } - - public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException { - long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock); - if (_state != state) { - synchronized (_lock) { - while (_state != state) { - ConcurrentUtils.awaitUntil(_clock, _lock, endTime); - } - } - } - } - - @Override - public void registerListener(LifecycleListener listener) { - if (listener == null) { - throw new IllegalStateException("listener is null"); - } - if (!_listeners.contains(listener)) { - _listeners.add(listener); - } - if (_state == State.CONNECTED) { - listener.onConnected(); - //_stateChangeDispatcher.addEvent(null, State.CONNECTED); - } - } - - @Override - public void removeListener(LifecycleListener listener) { - if (listener == null) { - throw new IllegalStateException("listener is null"); - } - _listeners.remove(listener); - } - - @Override - public org.linkedin.zookeeper.client.IZKClient chroot(String path) { - return new ChrootedZKClient(this, adjustPath(path)); - } - - @Override - public boolean isConnected() { - return _state == State.CONNECTED; - } - - public boolean isConfigured() { - return _state != State.NONE; - } - - @Override - public String getConnectString() { - return _factory.getConnectString(); - } - - public static enum State { - NONE, - CONNECTING, - CONNECTED, - RECONNECTING - } - - private final static String CHARSET = "UTF-8"; - - private final Clock _clock = SystemClock.instance(); - private final List<LifecycleListener> _listeners = new CopyOnWriteArrayList<>(); - - protected final Object _lock = new Object(); - protected volatile State _state = State.NONE; - - private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher(); - - protected IZooKeeperFactory _factory; - protected IZooKeeper _zk; - protected Timespan _reconnectTimeout = Timespan.parse("20s"); - protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND); - - private ExpiredSessionRecovery _expiredSessionRecovery = null; - - private class StateChangeDispatcher extends Thread { - private final AtomicBoolean _running = new AtomicBoolean(true); - private final BlockingQueue<Boolean> _events = new LinkedBlockingQueue<>(); - - private StateChangeDispatcher() { - super("ZooKeeper state change dispatcher thread"); - } - - @Override - public void run() { - Map<Object, Boolean> history = new IdentityHashMap<>(); - LOG.info("Starting StateChangeDispatcher"); - while (_running.get()) { - Boolean isConnectedEvent; - try { - isConnectedEvent = _events.take(); - } catch (InterruptedException e) { - continue; - } - if (!_running.get() || isConnectedEvent == null) { - continue; - } - Map<Object, Boolean> newHistory = callListeners(history, isConnectedEvent); - // we save which event each listener has seen last - // we don't update the map in place because we need to get rid of unregistered listeners - history = newHistory; - } - LOG.info("StateChangeDispatcher terminated."); - } - - public void end() { - _running.set(false); - _events.add(false); - } - - public void addEvent(ZKClient.State oldState, ZKClient.State newState) { - LOG.debug("addEvent: {} => {}", oldState, newState); - if (newState == ZKClient.State.CONNECTED) { - _events.add(true); - } else if (oldState == ZKClient.State.CONNECTED) { - _events.add(false); - } - } - } - - protected Map<Object, Boolean> callListeners(Map<Object, Boolean> history, Boolean connectedEvent) { - Map<Object, Boolean> newHistory = new IdentityHashMap<>(); - for (LifecycleListener listener : _listeners) { - Boolean previousEvent = history.get(listener); - // we propagate the event only if it was not already sent - if (previousEvent == null || previousEvent != connectedEvent) { - try { - if (connectedEvent) { - listener.onConnected(); - } else { - listener.onDisconnected(); - } - } catch (Throwable e) { - LOG.warn("Exception while executing listener (ignored)", e); - } - } - newHistory.put(listener, connectedEvent); - } - return newHistory; - } - - private class ExpiredSessionRecovery extends Thread { - - private ExpiredSessionRecovery() { - super("ZooKeeper expired session recovery thread"); - } - - @Override - public void run() { - LOG.info("Entering recovery mode"); - synchronized (_lock) { - try { - int count = 0; - while (_state == ZKClient.State.NONE) { - try { - count++; - LOG.warn("Recovery mode: trying to reconnect to zookeeper [{}]", count); - ZKClient.this.connect(); - } catch (Throwable e) { - LOG.warn("Recovery mode: reconnect attempt failed [{}]... waiting for {}", count, _reconnectTimeout, e); - try { - _lock.wait(_reconnectTimeout.getDurationInMilliseconds()); - } catch (InterruptedException e1) { - throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1); - } - } - } - } finally { - _expiredSessionRecovery = null; - LOG.info("Exiting recovery mode."); - } - } - } - - } - - public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher) { - this(new ZooKeeperFactory(connectString, sessionTimeout, watcher)); - } - - public ZKClient(IZooKeeperFactory factory) { - this(factory, null); - } - - public ZKClient(IZooKeeperFactory factory, String chroot) { - super(chroot); - _factory = factory; - Map<String, String> acls = new HashMap<>(); - acls.put("/", "world:anyone:acdrw"); - setACLs(acls); - } - - static private int getPermFromString(String permString) { - int perm = 0; - for (int i = 0; i < permString.length(); i++) { - switch (permString.charAt(i)) { - case 'r': - perm |= ZooDefs.Perms.READ; - break; - case 'w': - perm |= ZooDefs.Perms.WRITE; - break; - case 'c': - perm |= ZooDefs.Perms.CREATE; - break; - case 'd': - perm |= ZooDefs.Perms.DELETE; - break; - case 'a': - perm |= ZooDefs.Perms.ADMIN; - break; - default: - System.err.println("Unknown perm type:" + permString.charAt(i)); - } - } - return perm; - } - - private static List<ACL> parseACLs(String aclString) { - List<ACL> acl; - String acls[] = aclString.split(","); - acl = new ArrayList<>(); - for (String a : acls) { - int firstColon = a.indexOf(':'); - int lastColon = a.lastIndexOf(':'); - if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) { - System.err.println(a + " does not have the form scheme:id:perm"); - continue; - } - ACL newAcl = new ACL(); - newAcl.setId(new Id(a.substring(0, firstColon), a.substring(firstColon + 1, lastColon))); - newAcl.setPerms(getPermFromString(a.substring(lastColon + 1))); - acl.add(newAcl); - } - return acl; - } - - public Stat createOrSetByteWithParents(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException { - if (exists(path) != null) { - return setByteData(path, data); - } - try { - createBytesNodeWithParents(path, data, acl, createMode); - return null; - } catch (KeeperException.NodeExistsException e) { - // this should not happen very often (race condition) - return setByteData(path, data); - } - } - - public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException { - return create(path, (byte[]) null, createMode); - } - - public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException { - return create(path, toByteData(data), createMode); - } - - public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException { - return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode); - } - - public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException { - return createWithParents(path, (byte[]) null, createMode); - } - - public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException { - return createWithParents(path, toByteData(data), createMode); - } - - public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException { - createParents(path); - return create(path, data, createMode); - } - - public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException { - return createOrSetWithParents(path, toByteData(data), createMode); - } - - public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException { - if (exists(path) != null) { - return setByteData(path, data); - } - try { - createWithParents(path, data, createMode); - return null; - } catch (KeeperException.NodeExistsException e) { - // this should not happen very often (race condition) - return setByteData(path, data); - } - } - - public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException { - if (exists(path) != null) { - doFixACLs(path, recursive); - } - } - - private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException { - setACL(path, getNodeACLs(path), -1); - if (recursive) { - for (String child : getChildren(path)) { - doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive); - } - } - } - - private List<ACL> getNodeACLs(String path) { - String acl = doGetNodeACLs(adjustPath(path)); - if (acl == null) { - throw new IllegalStateException("Could not find matching ACLs for " + path); - } - return parseACLs(acl); - } - - protected String doGetNodeACLs(String path) { - String longestPath = ""; - for (String acl : acls.keySet()) { - if (acl.length() > longestPath.length() && path.startsWith(acl)) { - longestPath = acl; - } - } - return acls.get(longestPath); - } - - private void createParents(String path) throws InterruptedException, KeeperException { - path = PathUtils.getParentPath(adjustPath(path)); - path = PathUtils.removeTrailingSlash(path); - List<String> paths = new ArrayList<>(); - while (!path.equals("") && getZk().exists(path, false) == null) { - paths.add(path); - path = PathUtils.getParentPath(path); - path = PathUtils.removeTrailingSlash(path); - } - Collections.reverse(paths); - for (String p : paths) { - try { - getZk().create(p, - null, - getNodeACLs(p), - CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException e) { - // ok we continue... - if (LOG.isDebugEnabled()) { - LOG.debug("parent already exists " + p); - } - } - } - } - - private byte[] toByteData(String data) { - if (data == null) { - return null; - } else { - try { - return data.getBytes(CHARSET); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - } - -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java deleted file mode 100644 index 6d2474b495..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.activemq.broker.Broker; -import org.apache.activemq.partition.dto.Partitioning; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; -import org.linkedin.util.clock.Timespan; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - */ -public class ZooKeeperPartitionBroker extends PartitionBroker { - - protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperPartitionBroker.class); - - protected volatile ZKClient zk_client = null; - protected volatile Partitioning config; - protected final CountDownLatch configAcquired = new CountDownLatch(1); - - public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) { - super(broker, plugin); - } - - @Override - public void start() throws Exception { - super.start(); - // Lets block a bit until we get our config.. Otherwise just keep - // on going.. not a big deal if we get our config later. Perhaps - // ZK service is not having a good day. - configAcquired.await(5, TimeUnit.SECONDS); - } - - @Override - protected void onMonitorStop() { - zkDisconnect(); - } - - @Override - protected Partitioning getConfig() { - return config; - } - - protected ZooKeeperPartitionBrokerPlugin plugin() { - return (ZooKeeperPartitionBrokerPlugin)plugin; - } - - protected void zkConnect() throws Exception { - zk_client = new ZKClient(plugin().getZkAddress(), Timespan.parse(plugin().getZkSessionTmeout()), null); - if( plugin().getZkPassword()!=null ) { - zk_client.setPassword(plugin().getZkPassword()); - } - zk_client.start(); - zk_client.waitForConnected(Timespan.parse("30s")); - } - - protected void zkDisconnect() { - if( zk_client!=null ) { - zk_client.close(); - zk_client = null; - } - } - - protected void reloadConfiguration() throws Exception { - if( zk_client==null ) { - LOG.debug("Connecting to ZooKeeper"); - try { - zkConnect(); - LOG.debug("Connected to ZooKeeper"); - } catch (Exception e) { - LOG.debug("Connection to ZooKeeper failed: "+e); - zkDisconnect(); - throw e; - } - } - - byte[] data = null; - try { - Stat stat = new Stat(); - data = zk_client.getData(plugin().getZkPath(), new Watcher() { - @Override - public void process(WatchedEvent watchedEvent) { - try { - reloadConfiguration(); - } catch (Exception e) { - } - monitorWakeup(); - } - }, stat); - configAcquired.countDown(); - reloadConfigOnPoll = false; - } catch (Exception e) { - LOG.warn("Could load partitioning configuration: " + e, e); - reloadConfigOnPoll = true; - } - - try { - config = Partitioning.MAPPER.readValue(data, Partitioning.class); - } catch (Exception e) { - LOG.warn("Invalid partitioning configuration: " + e, e); - } - } - -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java deleted file mode 100644 index 34fa0fc1d1..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerPlugin; - -/** - * A PartitionBrokerPlugin which gets it's configuration from ZooKeeper. - */ -public class ZooKeeperPartitionBrokerPlugin extends PartitionBrokerPlugin { - - String zkAddress = "127.0.0.1:2181"; - String zkPassword; - String zkPath = "/broker-assignments"; - String zkSessionTmeout = "10s"; - - @Override - public Broker installPlugin(Broker broker) throws Exception { - return new ZooKeeperPartitionBroker(broker, this); - } - - public String getZkAddress() { - return zkAddress; - } - - public void setZkAddress(String zkAddress) { - this.zkAddress = zkAddress; - } - - public String getZkPassword() { - return zkPassword; - } - - public void setZkPassword(String zkPassword) { - this.zkPassword = zkPassword; - } - - public String getZkPath() { - return zkPath; - } - - public void setZkPath(String zkPath) { - this.zkPath = zkPath; - } - - public String getZkSessionTmeout() { - return zkSessionTmeout; - } - - public void setZkSessionTmeout(String zkSessionTmeout) { - this.zkSessionTmeout = zkSessionTmeout; - } -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java deleted file mode 100644 index 43f79242fe..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition.dto; - - - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.DeserializationConfig; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.util.HashMap; - -/** - * The main Configuration class for the PartitionBroker plugin - */ -public class Partitioning { - - static final public ObjectMapper MAPPER = new ObjectMapper(); - static { - MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); - MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - } - - static final public ObjectMapper TO_STRING_MAPPER = new ObjectMapper(); - static { - TO_STRING_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - TO_STRING_MAPPER.enable(SerializationFeature.INDENT_OUTPUT); - } - - /** - * If a client connects with a clientId which is listed in the - * map, then he will be immediately reconnected - * to the partition target immediately. - */ - @JsonProperty("by_client_id") - @JsonDeserialize(contentAs = Target.class) - public HashMap<String, Target> byClientId; - - /** - * If a client connects with a user priciple which is listed in the - * map, then he will be immediately reconnected - * to the partition target immediately. - */ - @JsonProperty("by_user_name") - @JsonDeserialize(contentAs = Target.class) - public HashMap<String, Target> byUserName; - - /** - * If a client connects with source ip which is listed in the - * map, then he will be immediately reconnected - * to the partition target immediately. - */ - @JsonProperty("by_source_ip") - @JsonDeserialize(contentAs = Target.class) - public HashMap<String, Target> bySourceIp; - - /** - * Used to map the preferred partitioning of queues across - * a set of brokers. Once a it is deemed that a connection mostly - * works with a set of targets configured in this map, the client - * will be reconnected to the appropriate target. - */ - @JsonProperty("by_queue") - @JsonDeserialize(contentAs = Target.class) - public HashMap<String, Target> byQueue; - - /** - * Used to map the preferred partitioning of topics across - * a set of brokers. Once a it is deemed that a connection mostly - * works with a set of targets configured in this map, the client - * will be reconnected to the appropriate target. - */ - @JsonProperty("by_topic") - @JsonDeserialize(contentAs = Target.class) - public HashMap<String, Target> byTopic; - - /** - * Maps broker names to broker URLs. - */ - @JsonProperty("brokers") - @JsonDeserialize(contentAs = String.class) - public HashMap<String, String> brokers; - - - @Override - public String toString() { - try { - return TO_STRING_MAPPER.writeValueAsString(this); - } catch (IOException e) { - return super.toString(); - } - } - - public HashMap<String, String> getBrokers() { - return brokers; - } - - public void setBrokers(HashMap<String, String> brokers) { - this.brokers = brokers; - } - - public HashMap<String, Target> getByClientId() { - return byClientId; - } - - public void setByClientId(HashMap<String, Target> byClientId) { - this.byClientId = byClientId; - } - - public HashMap<String, Target> getByQueue() { - return byQueue; - } - - public void setByQueue(HashMap<String, Target> byQueue) { - this.byQueue = byQueue; - } - - public HashMap<String, Target> getBySourceIp() { - return bySourceIp; - } - - public void setBySourceIp(HashMap<String, Target> bySourceIp) { - this.bySourceIp = bySourceIp; - } - - public HashMap<String, Target> getByTopic() { - return byTopic; - } - - public void setByTopic(HashMap<String, Target> byTopic) { - this.byTopic = byTopic; - } - - public HashMap<String, Target> getByUserName() { - return byUserName; - } - - public void setByUserName(HashMap<String, Target> byUserName) { - this.byUserName = byUserName; - } -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java deleted file mode 100644 index 79b53efa3a..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition.dto; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; - -/** - * Represents a partition target. This identifies the brokers that - * a partition lives on. - */ -public class Target { - - @JsonProperty("ids") - public HashSet<String> ids = new HashSet<String>(); - - public Target() { - ids = new HashSet<String>(); - } - - public Target(String ...ids) { - this.ids.addAll(java.util.Arrays.asList(ids)); - } - - @Override - public String toString() { - try { - return Partitioning.TO_STRING_MAPPER.writeValueAsString(this); - } catch (IOException e) { - return super.toString(); - } - } - - public HashSet<String> getIds() { - return ids; - } - - public void setIds(Collection<String> ids) { - this.ids = new HashSet<String>(ids); - } - -} diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java deleted file mode 100644 index 1b49f0b008..0000000000 --- a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java +++ /dev/null @@ -1,251 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.partition.dto.Partitioning; -import org.apache.activemq.partition.dto.Target; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import javax.jms.*; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.*; - -/** - * Unit tests for the PartitionBroker plugin. - */ -public class PartitionBrokerTest { - - protected HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>(); - protected ArrayList<Connection> connections = new ArrayList<Connection>(); - Partitioning partitioning; - - @Before - public void setUp() throws Exception { - partitioning = new Partitioning(); - partitioning.brokers = new HashMap<String, String>(); - } - - /** - * Partitioning can only re-direct failover clients since those - * can re-connect and re-establish their state with another broker. - */ - @Test(timeout = 1000*60*60) - public void testNonFailoverClientHasNoPartitionEffect() throws Exception { - - partitioning.byClientId = new HashMap<String, Target>(); - partitioning.byClientId.put("client1", new Target("broker1")); - createBrokerCluster(2); - - Connection connection = createConnectionToUrl(getConnectURL("broker2")); - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(0, getTransportConnector("broker1").getConnections().size()); - assertEquals(1, getTransportConnector("broker2").getConnections().size()); - } - }); - - connection.setClientID("client1"); - connection.start(); - - Thread.sleep(1000); - assertEquals(0, getTransportConnector("broker1").getConnections().size()); - assertEquals(1, getTransportConnector("broker2").getConnections().size()); - } - - @Test(timeout = 1000*60*60) - public void testPartitionByClientId() throws Exception { - partitioning.byClientId = new HashMap<String, Target>(); - partitioning.byClientId.put("client1", new Target("broker1")); - partitioning.byClientId.put("client2", new Target("broker2")); - createBrokerCluster(2); - - Connection connection = createConnectionTo("broker2"); - - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(0, getTransportConnector("broker1").getConnections().size()); - assertEquals(1, getTransportConnector("broker2").getConnections().size()); - } - }); - - connection.setClientID("client1"); - connection.start(); - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(1, getTransportConnector("broker1").getConnections().size()); - assertEquals(0, getTransportConnector("broker2").getConnections().size()); - } - }); - } - - @Test(timeout = 1000*60*60) - public void testPartitionByQueue() throws Exception { - partitioning.byQueue = new HashMap<String, Target>(); - partitioning.byQueue.put("foo", new Target("broker1")); - createBrokerCluster(2); - - Connection connection2 = createConnectionTo("broker2"); - - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(0, getTransportConnector("broker1").getConnections().size()); - assertEquals(1, getTransportConnector("broker2").getConnections().size()); - } - }); - - Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session2.createConsumer(session2.createQueue("foo")); - - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(1, getTransportConnector("broker1").getConnections().size()); - assertEquals(0, getTransportConnector("broker2").getConnections().size()); - } - }); - - Connection connection1 = createConnectionTo("broker2"); - Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session1.createProducer(session1.createQueue("foo")); - - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(1, getTransportConnector("broker1").getConnections().size()); - assertEquals(1, getTransportConnector("broker2").getConnections().size()); - } - }); - - for (int i = 0; i < 100; i++) { - producer.send(session1.createTextMessage("#" + i)); - } - - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(2, getTransportConnector("broker1").getConnections().size()); - assertEquals(0, getTransportConnector("broker2").getConnections().size()); - } - }); - } - - - static interface Task { - public void run() throws Exception; - } - - private void within(int time, TimeUnit unit, Task task) throws InterruptedException { - long timeMS = unit.toMillis(time); - long deadline = System.currentTimeMillis() + timeMS; - while (true) { - try { - task.run(); - return; - } catch (Throwable e) { - long remaining = deadline - System.currentTimeMillis(); - if( remaining <=0 ) { - if( e instanceof RuntimeException ) { - throw (RuntimeException)e; - } - if( e instanceof Error ) { - throw (Error)e; - } - throw new RuntimeException(e); - } - Thread.sleep(Math.min(timeMS/10, remaining)); - } - } - } - - protected Connection createConnectionTo(String brokerId) throws IOException, URISyntaxException, JMSException { - return createConnectionToUrl("failover://(" + getConnectURL(brokerId) + ")?randomize=false"); - } - - private Connection createConnectionToUrl(String url) throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); - Connection connection = factory.createConnection(); - connections.add(connection); - return connection; - } - - protected String getConnectURL(String broker) throws IOException, URISyntaxException { - TransportConnector tcp = getTransportConnector(broker); - return tcp.getConnectUri().toString(); - } - - private TransportConnector getTransportConnector(String broker) { - BrokerService brokerService = brokers.get(broker); - if( brokerService==null ) { - throw new IllegalArgumentException("Invalid broker id"); - } - return brokerService.getTransportConnectorByName("tcp"); - } - - protected void createBrokerCluster(int brokerCount) throws Exception { - for (int i = 1; i <= brokerCount; i++) { - String brokerId = "broker" + i; - BrokerService broker = createBroker(brokerId); - broker.setPersistent(false); - broker.addConnector("tcp://localhost:0").setName("tcp"); - addPartitionBrokerPlugin(broker); - broker.start(); - broker.waitUntilStarted(); - partitioning.brokers.put(brokerId, getConnectURL(brokerId)); - } - } - - protected void addPartitionBrokerPlugin(BrokerService broker) { - PartitionBrokerPlugin plugin = new PartitionBrokerPlugin(); - plugin.setConfig(partitioning); - broker.setPlugins(new BrokerPlugin[]{plugin}); - } - - protected BrokerService createBroker(String name) { - BrokerService broker = new BrokerService(); - broker.setBrokerName(name); - brokers.put(name, broker); - return broker; - } - - @After - public void tearDown() throws Exception { - for (Connection connection : connections) { - try { - connection.close(); - } catch (Throwable e) { - } - } - connections.clear(); - for (BrokerService broker : brokers.values()) { - try { - broker.stop(); - broker.waitUntilStopped(); - } catch (Throwable e) { - } - } - brokers.clear(); - } - -} diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java deleted file mode 100644 index 0a6416b2d7..0000000000 --- a/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.junit.After; -import org.junit.Before; -import org.linkedin.util.clock.Timespan; - -import java.io.File; -import java.net.InetSocketAddress; - -/** - */ -public class ZooKeeperPartitionBrokerTest extends PartitionBrokerTest { - - NIOServerCnxnFactory connector; - - @Before - public void setUp() throws Exception { - System.out.println("Starting ZooKeeper"); - ZooKeeperServer zk_server = new ZooKeeperServer(); - zk_server.setTickTime(500); - zk_server.setTxnLogFactory(new FileTxnSnapLog(new File("target/test-data/zk-log"), new File("target/test-data/zk-data"))); - connector = new NIOServerCnxnFactory(); - connector.configure(new InetSocketAddress(0), 100); - connector.startup(zk_server); - System.out.println("ZooKeeper started"); - super.setUp(); - } - - @After - public void tearDown() throws Exception { - super.tearDown(); - if( connector!=null ) { - connector.shutdown(); - connector = null; - } - } - - String zkPath = "/partition-config"; - - @Override - protected void createBrokerCluster(int brokerCount) throws Exception { - // Store the partitioning in ZK. - ZKClient zk_client = new ZKClient("localhost:" + connector.getLocalPort(), Timespan.parse("10s"), null); - try { - zk_client.start(); - zk_client.waitForConnected(Timespan.parse("30s")); - try { - zk_client.delete(zkPath); - } catch (Throwable e) { - } - zk_client.create(zkPath, partitioning.toString(), CreateMode.PERSISTENT); - } finally { - zk_client.close(); - } - super.createBrokerCluster(brokerCount); - } - - @Override - protected void addPartitionBrokerPlugin(BrokerService broker) { - // Have the borker plugin get the partition config via ZK. - ZooKeeperPartitionBrokerPlugin plugin = new ZooKeeperPartitionBrokerPlugin(){ - @Override - public String getBrokerURL(PartitionBroker partitionBroker, String id) { - try { - return getConnectURL(id); - } catch (Exception e) { - return null; - } - } - }; - plugin.setZkAddress("localhost:" + connector.getLocalPort()); - plugin.setZkPath(zkPath); - broker.setPlugins(new BrokerPlugin[]{plugin}); - } -} diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml index 005f46a10d..9618371824 100644 --- a/activemq-spring/pom.xml +++ b/activemq-spring/pom.xml @@ -77,21 +77,6 @@ <version>${hawtdispatch-version}</version> <scope>provided</scope> </dependency> - <dependency> - <groupId>org.linkedin</groupId> - <artifactId>org.linkedin.zookeeper-impl</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.linkedin</groupId> - <artifactId>org.linkedin.util-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <scope>provided</scope> - </dependency> <dependency> <groupId>org.osgi</groupId> <artifactId>osgi.core</artifactId> @@ -202,7 +187,6 @@ <include>${basedir}/../activemq-kahadb-store/src/main/java</include> <include>${basedir}/../activemq-mqtt/src/main/java</include> <include>${basedir}/../activemq-stomp/src/main/java</include> - <include>${basedir}/../activemq-partition/src/main/java</include> <include>${basedir}/../activemq-runtime-config/src/main/java</include> </includes> <strictXsdOrder>false</strictXsdOrder> diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index bbe633cab0..6184efbb3e 100644 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -58,10 +58,6 @@ <groupId>org.apache.activemq</groupId> <artifactId>activemq-stomp</artifactId> </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-partition</artifactId> - </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-runtime-config</artifactId> diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java deleted file mode 100644 index dcf4e69e5f..0000000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.partition; - -import junit.framework.TestCase; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.partition.PartitionBrokerPlugin; -import org.apache.activemq.partition.dto.Partitioning; - -/** - */ -public class SpringPartitionBrokerTest extends TestCase { - - public void testCreatePartitionBroker() throws Exception { - - BrokerService broker = BrokerFactory.createBroker("xbean:activemq-partition.xml"); - assertEquals(1, broker.getPlugins().length); - PartitionBrokerPlugin plugin = (PartitionBrokerPlugin)broker.getPlugins()[0]; - Partitioning config = plugin.getConfig(); - assertEquals(2, config.getBrokers().size()); - - Object o; - String json = "{\n" + - " \"by_client_id\":{\n" + - " \"client1\":{\"ids\":[\"broker1\"]},\n" + - " \"client2\":{\"ids\":[\"broker1\",\"broker2\"]}\n" + - " },\n" + - " \"brokers\":{\n" + - " \"broker1\":\"tcp://localhost:61616\",\n" + - " \"broker2\":\"tcp://localhost:61616\"\n" + - " }\n" + - "}"; - Partitioning expected = Partitioning.MAPPER.readValue(json, Partitioning.class); - assertEquals(expected.toString(), config.toString()); - - } - -} diff --git a/activemq-unit-tests/src/test/resources/activemq-partition.xml b/activemq-unit-tests/src/test/resources/activemq-partition.xml deleted file mode 100644 index 4bb96f22a9..0000000000 --- a/activemq-unit-tests/src/test/resources/activemq-partition.xml +++ /dev/null @@ -1,58 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<!-- START SNIPPET: xbean --> -<beans - xmlns="http://www.springframework.org/schema/beans" - xmlns:amq="http://activemq.apache.org/schema/core" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd - http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> - - <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> - - <bean id="config" class="java.lang.String"> - <constructor-arg><value> - <![CDATA[ - { - "by_client_id":{ - "client1":{"ids":["broker1"]}, - "client2":{"ids":["broker1","broker2"]} - }, - "brokers":{ - "broker1":"tcp://localhost:61616", - "broker2":"tcp://localhost:61616" - } - } - ]]> - </value></constructor-arg> - </bean> - - <broker useJmx="false" xmlns="http://activemq.apache.org/schema/core" persistent="false"> - - <plugins> - <partitionBrokerPlugin minTransferCount="5" configAsJson="#config"/> - </plugins> - - <transportConnectors> - <transportConnector uri="tcp://localhost:61616"/> - </transportConnectors> - - </broker> - -</beans> -<!-- END SNIPPET: xbean --> diff --git a/assembly/pom.xml b/assembly/pom.xml index a5d2b0cdb8..d1d30d44f9 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -54,10 +54,6 @@ <artifactId>activemq-unit-tests</artifactId> <type>test-jar</type> </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>activemq-partition</artifactId> - </dependency> <dependency> <groupId>org.apache.activemq.tooling</groupId> <artifactId>activemq-junit</artifactId> @@ -68,19 +64,6 @@ <artifactId>hawtdispatch-transport</artifactId> <version>${hawtdispatch-version}</version> </dependency> - <dependency> - <groupId>org.linkedin</groupId> - <artifactId>org.linkedin.zookeeper-impl</artifactId> - </dependency> - <dependency> - <groupId>org.linkedin</groupId> - <artifactId>org.linkedin.util-core</artifactId> - <version>${linkedin-zookeeper-version}</version> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </dependency> <dependency> <groupId>org.osgi</groupId> diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml index f77fa1c7d2..ff9c509459 100644 --- a/assembly/src/main/descriptors/common-bin.xml +++ b/assembly/src/main/descriptors/common-bin.xml @@ -182,7 +182,6 @@ <include>${pom.groupId}:activemq-log4j-appender</include> <include>${pom.groupId}:activemq-jms-pool</include> <include>${pom.groupId}:activemq-pool</include> - <include>${pom.groupId}:activemq-partition</include> <include>${pom.groupId}:activemq-shiro</include> <include>commons-beanutils:commons-beanutils</include> <include>commons-collections:commons-collections</include> diff --git a/pom.xml b/pom.xml index 325985d291..4d02cf94c2 100644 --- a/pom.xml +++ b/pom.xml @@ -90,8 +90,6 @@ <mqtt-client-version>1.16</mqtt-client-version> <org-apache-derby-version>10.15.2.0</org-apache-derby-version> <osgi-version>6.0.0</osgi-version> - <linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version> - <zookeeper-version>3.4.14</zookeeper-version> <qpid-proton-version>0.33.10</qpid-proton-version> <qpid-jms-version>1.6.0</qpid-jms-version> <netty-version>4.1.75.Final</netty-version> @@ -229,7 +227,6 @@ <module>activemq-runtime-config</module> <module>activemq-tooling</module> <module>activemq-web</module> - <module>activemq-partition</module> <module>activemq-osgi</module> <module>activemq-blueprint</module> <module>activemq-web-demo</module> @@ -315,11 +312,6 @@ <artifactId>activemq-all</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-partition</artifactId> - <version>${project.version}</version> - </dependency> <dependency> <groupId>org.apache.activemq.tooling</groupId> <artifactId>activemq-junit</artifactId> @@ -585,65 +577,6 @@ <version>${pax-logging-version}</version> </dependency> - <dependency> - <groupId>org.apache.hadoop.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <version>${zookeeper-version}</version> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <version>${zookeeper-version}</version> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.linkedin</groupId> - <artifactId>org.linkedin.zookeeper-impl</artifactId> - <version>${linkedin-zookeeper-version}</version> - <exclusions> - <exclusion> - <groupId>org.json</groupId> - <artifactId>json</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.linkedin</groupId> - <artifactId>org.linkedin.util-core</artifactId> - <version>${linkedin-zookeeper-version}</version> - </dependency> - <!-- zeroconf transport --> <dependency> <groupId>org.jmdns</groupId>