Repository: incubator-edgent Updated Branches: refs/heads/develop 6050ec42a -> d8d6c40fc
[EDGENT-447][Connectors] contribute a RabbitMQ connector Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/616f47e0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/616f47e0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/616f47e0 Branch: refs/heads/develop Commit: 616f47e0304165090786ca167c57c00b8355b3d2 Parents: 188dac1 Author: vinoyang <vinoy...@tencent.com> Authored: Sun Mar 4 13:39:25 2018 +0800 Committer: vinoyang <vinoy...@tencent.com> Committed: Sun Mar 4 13:39:25 2018 +0800 ---------------------------------------------------------------------- connectors/pom.xml | 1 + connectors/rabbitmq/pom.xml | 70 +++++++ .../rabbitmq/RabbitmqConfigKeyConstants.java | 92 +++++++++ .../connectors/rabbitmq/RabbitmqConsumer.java | 87 +++++++++ .../connectors/rabbitmq/RabbitmqProducer.java | 99 ++++++++++ .../connectors/rabbitmq/package-info.java | 28 +++ .../rabbitmq/runtime/RabbitmqConnector.java | 188 +++++++++++++++++++ .../rabbitmq/runtime/RabbitmqPublisher.java | 74 ++++++++ .../rabbitmq/runtime/RabbitmqSubscriber.java | 111 +++++++++++ .../RabbitmqStreamsGlobalTestManual.java | 38 ++++ .../rabbitmq/RabbitmqStreamsTestManual.java | 118 ++++++++++++ 11 files changed, 906 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/616f47e0/connectors/pom.xml ---------------------------------------------------------------------- diff --git a/connectors/pom.xml b/connectors/pom.xml index 9190cfa..edc2e5b 100644 --- a/connectors/pom.xml +++ b/connectors/pom.xml @@ -49,6 +49,7 @@ <module>websocket-jetty</module> <module>websocket-misc</module> <module>websocket-server</module> + <module>rabbitmq</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/616f47e0/connectors/rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/connectors/rabbitmq/pom.xml b/connectors/rabbitmq/pom.xml new file mode 100644 index 0000000..9c67fad --- /dev/null +++ b/connectors/rabbitmq/pom.xml @@ -0,0 +1,70 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>edgent-connectors</artifactId> + <groupId>org.apache.edgent</groupId> + <version>1.3.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>edgent-connectors-rabbitmq</artifactId> + + <name>Apache Edgent (Java 8): Connectors: RabbitMQ</name> + + <properties> + <remote-resources-maven-plugin.remote-resources.dir>../../src/main/ibm-remote-resources</remote-resources-maven-plugin.remote-resources.dir> + <edgent.version></edgent.version> + <rabbitmq.version>5.1.2</rabbitmq.version> + </properties> + + <dependencies> + <!-- edgent start --> + <dependency> + <groupId>org.apache.edgent</groupId> + <artifactId>edgent-api-topology</artifactId> + <version>1.3.0-SNAPSHOT</version> + </dependency> + <!-- edgent end --> + + <!-- edgent test start --> + <dependency> + <groupId>org.apache.edgent</groupId> + <artifactId>edgent-providers-direct</artifactId> + <version>1.3.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.edgent</groupId> + <artifactId>edgent-providers-direct</artifactId> + <version>1.3.0-SNAPSHOT</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.edgent</groupId> + <artifactId>edgent-connectors-common</artifactId> + <version>1.3.0-SNAPSHOT</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.edgent</groupId> + <artifactId>edgent-api-topology</artifactId> + <version>1.3.0-SNAPSHOT</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <!-- edgent test end --> + + <!-- rabbitmq start --> + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>${rabbitmq.version}</version> + </dependency> + <!-- rabbitmq end --> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/616f47e0/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java ---------------------------------------------------------------------- diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java new file mode 100644 index 0000000..f35a8fe --- /dev/null +++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java @@ -0,0 +1,92 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.rabbitmq; + +/** + * Defines all RabbitMQ config key constant items. + */ +public class RabbitmqConfigKeyConstants { + + /** + * config key for connection URI, eg: amqp://userName:password@hostName:portNumber/virtualHost + */ + public static final String RABBITMQ_CONFIG_KEY_URI = "rabbitmq.connection.uri"; + + /** + * config key for RabbitMQ server host + */ + public static final String RABBITMQ_CONFIG_KEY_HOST = "rabbitmq.connection.host"; + + /** + * config key for RabbitMQ server port, default port is : 5672 + */ + public static final String RABBITMQ_CONFIG_KEY_PORT = "rabbitmq.connection.port"; + + /** + * config key for virtual host which used to split multi-users + */ + public static final String RABBITMQ_CONFIG_KEY_VIRTUAL_HOST = "rabbitmq.connection.virtualHost"; + + /** + * config key for authorization (user name) + */ + public static final String RABBITMQ_CONFIG_KEY_AUTH_NAME = "rabbitmq.connection.authUsername"; + + /** + * config key for authorization (password) + */ + public static final String RABBITMQ_CONFIG_KEY_AUTH_PASSWORD = "rabbitmq.connection.authPassword"; + + /** + * config key for specifying whether enable auto recovery or not. + */ + public static final String RABBITMQ_CONFIG_KEY_AUTO_RECOVERY = "rabbitmq.connection.autoRecovery"; + + /** + * config key for connection timeout + */ + public static final String RABBITMQ_CONFIG_KEY_TIMEOUT = "rabbitmq.connection.timeout"; + + /** + * config key for connection network recovery interval + */ + public static final String RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL = "rabbitmq.connection.networkRecoveryInterval"; + + /** + * config key for connection requested-heartbeat + */ + public static final String RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT = "rabbitmq.connection.requestedHeartbeat"; + + /** + * config key for specifying whether enable topology recovery or not. + */ + public static final String RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED = "rabbitmq.connection.topologyRecoveryEnabled"; + + /** + * config key for connection requested channel max num + */ + public static final String RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX = "rabbitmq.connection.requestedChannelMax"; + + /** + * config key for connection requested frame max + */ + public static final String RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX = "rabbitmq.connection.requestedFrameMax"; + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/616f47e0/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java ---------------------------------------------------------------------- diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java new file mode 100644 index 0000000..4c2e881 --- /dev/null +++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java @@ -0,0 +1,87 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.rabbitmq; + +import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqConnector; +import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqSubscriber; +import org.apache.edgent.function.Function; +import org.apache.edgent.function.Supplier; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.Topology; + +import java.util.Map; + +/** + * {@code RabbitmqConsumer} is a consumer to consume messages from a RabbitMQ messaging broker + * <p> + * The connector uses and includes components from the RabbitMQ 3.7.3 release. + * It has been successfully tested against 3.7.3. + * For more information about RabbitMQ see <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a> + * </p> + * Smaple use: + * <pre>{@code + * Map<String, Object> config = new HashMap<>(); + * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1"); + * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672); + * String queue = "testQueue"; + * + * Topology t = ... + * + * RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap); + * TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue); + * + * //... + * } + * </pre> + */ +public class RabbitmqConsumer { + + private final RabbitmqConnector connector; + private final Topology topology; + + /** + * Create a consumer connector for consuming tuples from a RabbitMQ queue. + * <p> + * See the RabbitMQ java client document : + * <a href="http://www.rabbitmq.com/api-guide.html">http://www.rabbitmq.com/api-guide.html</a> + * The full config option please see RabbitMQ java client API Reference : + * < a href="https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ConnectionFactory.html">ConnectionFactory</> + * </p> + * @param topology topology to add to + * @param config RabbitmqProducer configuration information. + */ + public RabbitmqConsumer(Topology topology, Supplier<Map<String, Object>> config) { + this.topology = topology; + this.connector = new RabbitmqConnector(config); + } + + /** + * Subscribe to the specified topics and yield a stream of tuples from the published RabbitMQ records. + * + * @param toTupleFn A function that yields a tuple from a byte array, + * @param queue the specified RabbitMQ queue + * @param <T> A function that yields a tuple from a + * @return stream of tuples + */ + public <T> TStream<T> subscribe(Function<byte[], T> toTupleFn, String queue) { + return topology.events(new RabbitmqSubscriber<>(connector, queue, toTupleFn)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/616f47e0/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java ---------------------------------------------------------------------- diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java new file mode 100644 index 0000000..8437285 --- /dev/null +++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java @@ -0,0 +1,99 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.rabbitmq; + +import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqConnector; +import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqPublisher; +import org.apache.edgent.function.Function; +import org.apache.edgent.function.Supplier; +import org.apache.edgent.topology.TSink; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.Topology; + +import java.util.Map; + +/** + * {@code RabbitmqProducer} is a producer to produce messages to a RabbitMQ messaging broker + * <p> + * The connector uses and includes components from the RabbitMQ 3.7.3 release. + * It has been successfully tested against 3.7.3. + * For more information about RabbitMQ see <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a> + * </p> + * Sample use: + * <pre>{@code + * Map<String, Object> config = new HashMap<>(); + * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1"); + * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672); + * String queue = "testQueue"; + * + * Topology t = newTopology("testSimple"); + * RabbitmqProducer producer = new RabbitmqProducer(t, () -> config); + * + * //TStream<String> stream = ... + * + * TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes()); + * } + * </pre> + */ +public class RabbitmqProducer { + + private final RabbitmqConnector connector; + private final Topology topology; + + /** + * Create a producer connector for publishing tuples to a RabbitMQ queue. + * <p> + * See the RabbitMQ java client document : + * <a href="http://www.rabbitmq.com/api-guide.html">http://www.rabbitmq.com/api-guide.html</a> + * The full config option please see RabbitMQ java client API Reference : + * < a href="https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ConnectionFactory.html">ConnectionFactory</> + * </p> + * @param topology topology to add to + * @param config RabbitmqProducer configuration information. + */ + public RabbitmqProducer(Topology topology, Supplier<Map<String, Object>> config) { + this.topology = topology; + this.connector = new RabbitmqConnector(config); + } + + /** + * Publish the stream of tuples to the specified queue. + * @param stream The stream to publish + * @param queue The specified queue of RabbitMQ + * @param msgFn A function that yields the byte[] records from the tuple + * @param <T> Tuple type + * @return {@link TSink} + */ + public <T> TSink<T> publish(TStream<T> stream, String queue, Function<T, byte[]> msgFn) { + return stream.sink(new RabbitmqPublisher<>(connector, queue, msgFn)); + } + + /** + * Publish the stream of tuples to the specified queue. + * @param stream The stream to publish + * @param queue The specified queue of RabbitMQ + * @param msg The string message to publish + * @return + */ + public TSink<String> publish(TStream<String> stream, String queue, String msg) { + return publish(stream, queue, (String m) -> msg.getBytes()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/616f47e0/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java ---------------------------------------------------------------------- diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java new file mode 100644 index 0000000..49dafd2 --- /dev/null +++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java @@ -0,0 +1,28 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +/** + * RabbitMQ stream connector. + * <P> + * Stream tuples may be published to RabbitMQ queues + * and created by subscribing to queues. + * For more information about RabbitMQ see + * <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a> + */ +package org.apache.edgent.connectors.rabbitmq; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/616f47e0/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java ---------------------------------------------------------------------- diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java new file mode 100644 index 0000000..b83cfb6 --- /dev/null +++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java @@ -0,0 +1,188 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.rabbitmq.runtime; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import org.apache.edgent.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTH_NAME; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTH_PASSWORD; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTO_RECOVERY; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_TIMEOUT; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_URI; +import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_VIRTUAL_HOST; + +/** + * A connector to an RabbitMQ server. + */ +public class RabbitmqConnector implements AutoCloseable { + + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(RabbitmqConnector.class); + + private final Supplier<Map<String, Object>> configFn; + private ConnectionFactory connectionFactory; + private Connection connection; + private Channel channel; + private String id; + + public RabbitmqConnector(Supplier<Map<String, Object>> configFn) { + this.configFn = configFn; + initConnection(); + } + + public synchronized Channel channel() { + if (channel == null) { + if (connection != null) { + try { + channel = connection.createChannel(); + } catch (IOException e) { + logger.error("IOExcetion occurs when create connection channel {}", e); + throw new RuntimeException(e); + } catch (Exception e) { + logger.error("Unknown Exception : {}", e); + } + } else { + logger.error("Inner statue inconformity ï¼ the rabbitmq connection is null."); + throw new RuntimeException("Inner statue inconformity ï¼ the rabbitmq connection is null."); + } + } + + return channel; + } + + @Override + public synchronized void close() throws Exception { + if (channel != null) { + channel.close(); + } + + if (connection != null) { + connection.close(); + } + + if (connectionFactory != null) { + connectionFactory = null; + } + } + + public String id() { + if (id == null) { + // include our short object Id + id = "RabbitMQ " + toString().substring(toString().indexOf('@') + 1); + } + return id; + } + + private void initConnection() { + try { + this.connectionFactory = getConnectionFactory(); + this.connection = connectionFactory.newConnection(); + } catch (Exception e) { + logger.error("{}", e); + throw new RuntimeException(e); + } + } + + private ConnectionFactory getConnectionFactory() throws Exception { + ConnectionFactory connectionFactory = new ConnectionFactory(); + Map<String, Object> configMap = configFn.get(); + + if (configMap.containsKey(RABBITMQ_CONFIG_KEY_URI)) { + connectionFactory.setUri(configMap.get(RABBITMQ_CONFIG_KEY_URI).toString()); + } else { + if (!configMap.containsKey(RABBITMQ_CONFIG_KEY_HOST)) { + throw new RuntimeException("Missed key : " + RABBITMQ_CONFIG_KEY_HOST); + } + + connectionFactory.setHost(configMap.get(RABBITMQ_CONFIG_KEY_HOST).toString()); + + if (!configMap.containsKey(RABBITMQ_CONFIG_KEY_PORT)) { + throw new RuntimeException("Missed key : " + RABBITMQ_CONFIG_KEY_PORT); + } + + connectionFactory.setPort(Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_PORT).toString())); + } + + if (configMap.containsKey(RABBITMQ_CONFIG_KEY_VIRTUAL_HOST)) { + connectionFactory.setVirtualHost(configMap.get(RABBITMQ_CONFIG_KEY_VIRTUAL_HOST).toString()); + } + + if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTH_NAME)) { + connectionFactory.setUsername(configMap.get(RABBITMQ_CONFIG_KEY_AUTH_NAME).toString()); + } + + if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTH_PASSWORD)) { + connectionFactory.setPassword(configMap.get(RABBITMQ_CONFIG_KEY_AUTH_PASSWORD).toString()); + } + + if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTO_RECOVERY)) { + connectionFactory.setAutomaticRecoveryEnabled( + Boolean.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_AUTO_RECOVERY).toString())); + } + + if (configMap.containsKey(RABBITMQ_CONFIG_KEY_TIMEOUT)) { + connectionFactory.setConnectionTimeout(Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_TIMEOUT).toString())); + } + + if (configMap.containsKey(RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL)) { + connectionFactory.setNetworkRecoveryInterval( + Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL).toString())); + } + + if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT)) { + connectionFactory.setRequestedHeartbeat( + Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT).toString())); + } + + if (configMap.containsKey(RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED)) { + connectionFactory.setTopologyRecoveryEnabled( + Boolean.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED).toString())); + } + + if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX)) { + connectionFactory.setRequestedChannelMax( + Integer.parseInt(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX).toString())); + } + + if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX)) { + connectionFactory.setRequestedChannelMax( + Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX).toString())); + } + + return connectionFactory; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/616f47e0/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java ---------------------------------------------------------------------- diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java new file mode 100644 index 0000000..67c2d45 --- /dev/null +++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java @@ -0,0 +1,74 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.rabbitmq.runtime; + +import org.apache.edgent.function.Consumer; +import org.apache.edgent.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * a publisher for RabbitMQ connector + */ +public class RabbitmqPublisher<T> implements Consumer<T>, AutoCloseable { + + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(RabbitmqPublisher.class); + + private final RabbitmqConnector connector; + private final Function<T, byte[]> msgFn; + private final String queue; + private String id; + + public RabbitmqPublisher(RabbitmqConnector connector, String queue, Function<T, byte[]> msgFn) { + this.connector = connector; + this.queue = queue; + this.msgFn = msgFn; + } + + @Override + public synchronized void close() throws Exception { + logger.info("{} is closing.", id()); + connector.close(); + logger.info("{} is closed.", id()); + } + + @Override + public void accept(T value) { + byte[] msg = msgFn.apply(value); + try { + connector.channel().basicPublish("", queue, null, msg); + } catch (IOException e) { + logger.error("publish exception : {}", e); + throw new RuntimeException(e); + } + } + + public String id() { + if (id == null) { + // include our short object Id + id = connector.id() + " PUB " + toString().substring(toString().indexOf('@') + 1); + } + + return id; + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/616f47e0/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java ---------------------------------------------------------------------- diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java new file mode 100644 index 0000000..40c2d45 --- /dev/null +++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java @@ -0,0 +1,111 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.rabbitmq.runtime; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import org.apache.edgent.function.Consumer; +import org.apache.edgent.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * a subscriber for RabbitMQ connector + */ +public class RabbitmqSubscriber<T> implements Consumer<Consumer<T>>, AutoCloseable { + + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(RabbitmqSubscriber.class); + + private final RabbitmqConnector connector; + private Function<byte[], T> toTupleFn; + private Consumer<T> eventSubmitter; + private ExecutorService executor; + private String queue; + private String id; + + public RabbitmqSubscriber(RabbitmqConnector connector, String queue, Function<byte[], T> toTupleFn) { + this.connector = connector; + this.queue = queue; + this.toTupleFn = toTupleFn; + } + + @Override + public synchronized void close() throws Exception { + logger.info("{} is closing.", id()); + if (executor != null && !executor.isShutdown()) { + executor.shutdownNow(); + } + + connector.close(); + logger.info("{} is closed", id()); + } + + @Override + public void accept(Consumer<T> eventSubmitter) { + this.eventSubmitter = eventSubmitter; + + executor = Executors.newFixedThreadPool(1); + + executor.submit(() -> { + boolean autoAck = false; + try { + connector.channel().basicConsume(queue, autoAck, + new DefaultConsumer(connector.channel()) { + @Override + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) + throws IOException { + long deliveryTag = envelope.getDeliveryTag(); + + acceptCallback(body); + + connector.channel().basicAck(deliveryTag, false); + } + }); + } catch (IOException e) { + logger.error("Consumer exception : {}", e); + } + }); + + } + + private void acceptCallback(byte[] msg) { + T tuple = toTupleFn.apply(msg); + eventSubmitter.accept(tuple); + } + + public String id() { + if (id == null) { + // include our short object Id + id = connector.id() + " SUB " + toString().substring(toString().indexOf('@') + 1); + } + + return id; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/616f47e0/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java ---------------------------------------------------------------------- diff --git a/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java new file mode 100644 index 0000000..9565bf9 --- /dev/null +++ b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.test.connectors.rabbitmq; + +/** + * RabbitmqStreams connector globalization tests. + */ +public class RabbitmqStreamsGlobalTestManual extends RabbitmqStreamsTestManual { + + private static final String globalMsg1 = "ä½ å¥½"; + private static final String globalMsg2 = "ä½ å¨å"; + + public String getMsg1() { + return globalMsg1; + } + + public String getMsg2() { + return globalMsg2; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/616f47e0/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java ---------------------------------------------------------------------- diff --git a/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java new file mode 100644 index 0000000..02ea0bc --- /dev/null +++ b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java @@ -0,0 +1,118 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.test.connectors.rabbitmq; + +import org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants; +import org.apache.edgent.connectors.rabbitmq.RabbitmqConsumer; +import org.apache.edgent.connectors.rabbitmq.RabbitmqProducer; +import org.apache.edgent.test.connectors.common.ConnectorTestBase; +import org.apache.edgent.topology.TSink; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.Topology; +import org.apache.edgent.topology.plumbing.PlumbingStreams; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertNotNull; + +/** + * A {@link RabbitmqConsumer} manual test case. + * Please follow there steps + * + * step 1 : + * Install RabbitMQ server. + * For Mac os x, you can use homebrew to install it, the simple command is : `brew install rabbitmq` + * For other system, please follow RabbitMQ's offical document : + * <a href="http://www.rabbitmq.com/download.html">http://www.rabbitmq.com/download.html</a> + * + * step 2 : + * Start the RabbitMQ server. + * For Mac os x, if you install it with homebrew, you can start it with : `brew services start rabbitmq` + * For other system, please follow RabbitMQ's offical document : + * <a href="http://www.rabbitmq.com/download.html">http://www.rabbitmq.com/download.html</a> + * Note : the default port which RabbitMQ server listen is 5672 + * and it will start a web console which listen port 15672 + * so you can access it with your web browser, just enter : http://localhost:15672/ + * you will see a authorization page, the default user name and password are both `guest` + * + * step 3 : + * Create a test queue use the web console. + * after starting the server, you can access `http://localhost:15672/#/queues` + * then you will find a button named `Add a new queue`, click it and input `testQueue` into `name` field + * and click `Add queue` button to create the queue. + * + * step 4 : + * run this test case to test the publish and subscribe method. + */ +public class RabbitmqStreamsTestManual extends ConnectorTestBase { + + private static final int PUB_DELAY_MSEC = 15*1000; // have seen 12sec 1st test's consumer startup delay + private static final int SEC_TIMEOUT = 20; + + private final String msg1 = "Hello"; + private final String msg2 = "Are you there?"; + + public String getMsg1() { + return msg1; + } + + public String getMsg2() { + return msg2; + } + + private Map<String, Object> initRabbitmqConfig() { + Map<String, Object> config = new HashMap<>(); + config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1"); + config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672); + + return config; + } + + @Test + public void testSimple() throws Exception { + Topology t = newTopology("testSimple"); + Map<String, Object> configMap = initRabbitmqConfig(); + MsgGenerator generator = new MsgGenerator(t.getName()); + String queue = "testQueue"; + + List<String> msgs = createMsgs(generator, queue, getMsg1(), getMsg2()); + + TStream<String> stream = PlumbingStreams.blockingOneShotDelay( + t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS); + + RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap); + + TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue); + + RabbitmqProducer producer = new RabbitmqProducer(t, () -> configMap); + + TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes()); + + completeAndValidate("", t, receivedStream, generator, SEC_TIMEOUT, msgs.toArray(new String[0])); + + assertNotNull(sink); + } + + +}