[
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420982#comment-15420982
]
ASF GitHub Bot commented on FLINK-3298:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2314#discussion_r74762300
--- Diff:
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * <p>
+ * To create an instance of AMQSink class one should initialize and
configure an
+ * instance of a connection factory that will be used to create a
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param <IN> type of input messages
+ */
+public class AMQSink<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AMQSink.class);
+
+ private final ActiveMQConnectionFactory connectionFactory;
+ private final String queueName;
+ private final SerializationSchema<IN> serializationSchema;
+ private boolean logFailuresOnly = false;
+ private transient MessageProducer producer;
+ private transient Session session;
+ private transient Connection connection;
+
+ /**
+ * Create AMQSink.
+ *
+ * @param connectionFactory factory for creating ActiveMQ connection
+ * @param queueName name of a queue to write to
+ * @param serializationSchema schema to serialize input message into a
byte array
+ */
+ public AMQSink(ActiveMQConnectionFactory connectionFactory, String
queueName, SerializationSchema<IN> serializationSchema) {
+ this.connectionFactory = connectionFactory;
+ this.queueName = queueName;
+ this.serializationSchema = serializationSchema;
+ }
+
+ /**
+ * Defines whether the producer should fail on errors, or only log them.
+ * If this is set to true, then exceptions will be only logged, if set
to false,
+ * exceptions will be eventually thrown and cause the streaming program
to
+ * fail (and enter recovery).
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on
exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ this.logFailuresOnly = logFailuresOnly;
+ }
+
+
+ @Override
+ public void open(Configuration config) throws Exception {
+ super.open(config);
+ // Create a Connection
+ connection = connectionFactory.createConnection();
+ connection.start();
+
+ // Create a Session
+ session = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
--- End diff --
I assume this means that the sink doesn't provide any processing
guarantees. Maybe it would make sense to document the guarantees in the
javadocs.
> Streaming connector for ActiveMQ
> --------------------------------
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Mohit Sethi
> Assignee: Ivan Mushketyk
> Priority: Minor
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)