Github user ppatierno commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1607#discussion_r146578570
--- Diff:
integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java
---
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.integration.kafka.bridge;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.ConnectorService;
+import org.apache.activemq.artemis.core.server.Consumer;
+import org.apache.activemq.artemis.core.server.HandleStatus;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import
org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
+import org.apache.activemq.artemis.utils.ConfigurationHelper;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+
+class KafkaProducerBridge implements Consumer, ConnectorService {
+
+ private final String connectorName;
+
+ private final String queueName;
+
+ private final String topicName;
+
+ private final PostOffice postOffice;
+
+ private Queue queue = null;
+
+ private Filter filter = null;
+
+ private String filterString;
+
+ private AtomicBoolean isStarted = new AtomicBoolean();
+
+ private boolean isConnected = false;
+
+ private Producer<String, Message> kafkaProducer;
+
+ private Map<String, Object> configuration;
+
+ private long sequentialID;
+
+ private final ReusableLatch pendingAcks = new ReusableLatch(0);
+
+ private final java.util.Map<Long, MessageReference> refs = new
LinkedHashMap<>();
+
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ private final int retryAttempts;
+
+ private final long retryInterval;
+
+ private final double retryMultiplier;
+
+ private final long retryMaxInterval;
+
+ private final AtomicInteger retryCount = new AtomicInteger();
+
+ private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
+
+ private final KafkaProducerFactory<String, Message>
kafkaProducerFactory;
+
+
+ KafkaProducerBridge(String connectorName, KafkaProducerFactory<String,
Message> kafkaProducerFactory, Map<String, Object> configuration,
StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService
scheduledExecutorService) {
+ this.sequentialID = storageManager.generateID();
+
+ this.kafkaProducerFactory = kafkaProducerFactory;
+
+ this.connectorName = connectorName;
+ this.queueName =
ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null,
configuration);
+ this.topicName =
ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null,
configuration);
+
+ this.retryAttempts =
ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1,
configuration);
+ this.retryInterval =
ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000,
configuration);
+ this.retryMultiplier =
ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2,
configuration);
+ this.retryMaxInterval =
ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME,
30000, configuration);
+
+ this.filterString =
ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null,
configuration);
+
+ configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ if
(!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+ configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
CoreMessageSerializer.class.getName());
+ }
+
+ this.postOffice = postOffice;
+ this.configuration = configuration;
+ this.scheduledExecutorService = scheduledExecutorService;
+ }
+
+ private Map<String, Object> kafkaConfig(Map<String, Object>
configuration) {
+ Map<String, Object> filteredConfig = new HashMap<>(configuration);
+
KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
+
KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
+ return filteredConfig;
+ }
+
+ @Override
+ public void start() throws Exception {
+ synchronized (this) {
+ if (this.isStarted.get()) {
+ return;
+ }
+ if (this.connectorName == null ||
this.connectorName.trim().equals("")) {
+ throw new Exception("invalid connector name: " +
this.connectorName);
+ }
+
+ if (this.topicName == null || this.topicName.trim().equals("")) {
+ throw new Exception("invalid topic name: " + topicName);
+ }
+
+ if (this.queueName == null || this.queueName.trim().equals("")) {
+ throw new Exception("invalid queue name: " + queueName);
+ }
+
+ this.filter = FilterImpl.createFilter(filterString);
+
+ SimpleString name = new SimpleString(this.queueName);
+ Binding b = this.postOffice.getBinding(name);
+ if (b == null) {
+ throw new Exception(connectorName + ": queue " + queueName + "
not found");
+ }
+ this.queue = (Queue) b.getBindable();
+
+ this.kafkaProducer =
kafkaProducerFactory.create(kafkaConfig(configuration));
+
+ List<PartitionInfo> topicPartitions =
kafkaProducer.partitionsFor(topicName);
+ if (topicPartitions == null || topicPartitions.size() == 0) {
+ throw new Exception(connectorName + ": topic " + topicName + "
not found");
+ }
+
+ this.retryCount.set(0);
+ this.isStarted.set(true);
+ connect();
+ ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
+ }
+ }
+
+ public void connect() throws Exception {
+ synchronized (this) {
+ if (!isConnected && isStarted.get()) {
+ isConnected = true;
+ this.queue.addConsumer(this);
+ this.queue.deliverAsync();
+ ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
+ }
+ }
+ }
+
+ @Override
+ public void disconnect() {
+ synchronized (this) {
+ if (isConnected) {
+ if (queue != null) {
+ this.queue.removeConsumer(this);
+ }
+
+ cancelRefs();
+ if (queue != null) {
+ queue.deliverAsync();
+ }
+ isConnected = false;
+ ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ synchronized (this) {
+ if (!this.isStarted.get()) {
+ return;
+ }
+
ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
+
+ disconnect();
+
+ kafkaProducer.close();
+ kafkaProducer = null;
+ this.isStarted.set(false);
+ ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
+ }
+ }
+
+ @Override
+ public boolean isStarted() {
+ return this.isStarted.get();
+ }
+
+ @Override
+ public String getName() {
+ return this.connectorName;
+ }
+
+ @Override
+ public boolean supportsDirectDelivery() {
+ return false;
+ }
+
+ @Override
+ public HandleStatus handle(MessageReference ref) throws Exception {
+ if (filter != null && !filter.match(ref.getMessage())) {
+ return HandleStatus.NO_MATCH;
+ }
+
+ synchronized (this) {
+ ref.handled();
+
+ Message message = ref.getMessage();
+
+ synchronized (refs) {
+ refs.put(ref.getMessage().getMessageID(), ref);
+ }
+
+ Integer partition = null;
+ SimpleString groupdID = message.getGroupID();
+ if (groupdID != null) {
+ List partitions = kafkaProducer.partitionsFor(topicName);
+ int numPartitions = partitions.size();
+ partition =
Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
--- End diff --
It's the same way as the DefaultPartitioner works in Kafka, why do you
re-used the logic here ?
---