This is an automated email from the ASF dual-hosted git repository.

tristan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 185b95137 Add Kafka Transactions to KafkaChannel and KafkaSink, 
including unit tests.
185b95137 is described below

commit 185b9513706cd5b6046f00c9821a83580e2c35da
Author: tmgstevens <[email protected]>
AuthorDate: Fri Oct 28 14:46:05 2022 +0100

    Add Kafka Transactions to KafkaChannel and KafkaSink, including unit tests.
---
 .../apache/flume/channel/kafka/KafkaChannel.java   | 74 +++++++++++++------
 .../channel/kafka/KafkaChannelConfiguration.java   |  4 ++
 .../flume/channel/kafka/TestKafkaChannelBase.java  | 21 ++++--
 .../apache/flume/channel/kafka/TestRollback.java   | 33 +++++++--
 .../src/test/resources/kafka-server.properties     |  7 ++
 .../org/apache/flume/sink/kafka/KafkaSink.java     | 84 ++++++++++++++--------
 .../flume/sink/kafka/KafkaSinkConstants.java       |  3 +
 .../org/apache/flume/sink/kafka/TestConstants.java |  1 +
 .../org/apache/flume/sink/kafka/TestKafkaSink.java | 79 +++++++++++++++-----
 .../org/apache/flume/sink/kafka/util/TestUtil.java | 28 +++++++-
 .../src/test/resources/kafka-server.properties     |  7 ++
 11 files changed, 258 insertions(+), 83 deletions(-)

diff --git 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 8336e94e2..acfb3b193 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -58,6 +58,8 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Arrays;
@@ -73,26 +75,10 @@ import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_ACKS;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_AUTO_OFFSET_RESET;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_GROUP_ID;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_DESERIALIZER;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_SERIALIZER;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_PARSE_AS_FLUME_EVENT;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_POLL_TIMEOUT;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_TOPIC;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_DESERIAIZER;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_SERIAIZER;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARTITION_HEADER_NAME;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.POLL_TIMEOUT;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.STATIC_PARTITION_CONF;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
 import static 
org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
 import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
 
@@ -105,6 +91,11 @@ public class KafkaChannel extends BasicChannelSemantics {
   private final Properties producerProps = new Properties();
 
   private KafkaProducer<String, byte[]> producer;
+
+  // Used to ensure that we don't have multiple threads attempting to use the 
KafkaProducer (above) for transactions
+  // at the same time as whilst the KafkaProducer is thread-safe you cannot 
have more than one in-flight transaction.
+  private Lock kafkaTxLock = new ReentrantLock();
+
   private final String channelUUID = UUID.randomUUID().toString();
 
   private AtomicReference<String> topic = new AtomicReference<String>();
@@ -113,6 +104,7 @@ public class KafkaChannel extends BasicChannelSemantics {
   private String groupId = DEFAULT_GROUP_ID;
   private String partitionHeader = null;
   private Integer staticPartitionId;
+  private boolean useKafkaTransactions;
 
   // used to indicate if a rebalance has occurred during the current 
transaction
   AtomicBoolean rebalanceFlag = new AtomicBoolean();
@@ -142,6 +134,10 @@ public class KafkaChannel extends BasicChannelSemantics {
   public void start() {
     logger.info("Starting Kafka Channel: {}", getName());
     producer = new KafkaProducer<String, byte[]>(producerProps);
+    if (useKafkaTransactions) {
+      logger.debug("Initializing Kafka Transaction");
+      producer.initTransactions();
+    }
     // We always have just one topic being read by one thread
     logger.info("Topic = {}", topic.get());
     counter.start();
@@ -188,6 +184,17 @@ public class KafkaChannel extends BasicChannelSemantics {
       throw new ConfigurationException("Bootstrap Servers must be specified");
     }
 
+    String transactionalID = ctx.getString(TRANSACTIONAL_ID);
+    if (transactionalID != null) {
+      try {
+        ctx.put(TRANSACTIONAL_ID, 
InetAddress.getLocalHost().getCanonicalHostName() +
+                Thread.currentThread().getName() + transactionalID);
+        useKafkaTransactions = true;
+      } catch (UnknownHostException e) {
+        throw new ConfigurationException("Unable to configure transactional 
id, as cannot work out hostname", e);
+      }
+    }
+
     setProducerProps(ctx, bootStrapServers);
     setConsumerProps(ctx, bootStrapServers);
 
@@ -438,6 +445,11 @@ public class KafkaChannel extends BasicChannelSemantics {
           kafkaFutures = Optional.of(new LinkedList<Future<RecordMetadata>>());
         }
         try {
+          if (useKafkaTransactions) {
+            kafkaTxLock.lock();
+            logger.debug("Beginning Kafka Transaction");
+            producer.beginTransaction();
+          }
           long batchSize = producerRecords.get().size();
           long startTime = System.nanoTime();
           int index = 0;
@@ -445,11 +457,19 @@ public class KafkaChannel extends BasicChannelSemantics {
             index++;
             kafkaFutures.get().add(producer.send(record, new 
ChannelCallback(index, startTime)));
           }
-          //prevents linger.ms from being a problem
-          producer.flush();
 
-          for (Future<RecordMetadata> future : kafkaFutures.get()) {
-            future.get();
+          if (useKafkaTransactions) {
+            logger.debug("Committing Kafka Transaction");
+            producer.commitTransaction();
+            kafkaTxLock.unlock();
+          } else {
+            // Ensure that the records are actually flushed by the producer, 
regardless of linger.ms.
+            // Per the Kafka docs we do not need to linger or wait for the 
callback if we're using transactions
+            producer.flush();
+
+            for (Future<RecordMetadata> future : kafkaFutures.get()) {
+              future.get();
+            }
           }
           long endTime = System.nanoTime();
           counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 
1000));
@@ -457,6 +477,14 @@ public class KafkaChannel extends BasicChannelSemantics {
           producerRecords.get().clear();
           kafkaFutures.get().clear();
         } catch (Exception ex) {
+          if (useKafkaTransactions) {
+            logger.debug("Aborting transaction");
+            try {
+              producer.abortTransaction();
+            } finally {
+              kafkaTxLock.unlock();
+            }
+          }
           logger.warn("Sending events to Kafka failed", ex);
           throw new ChannelException("Commit failed as send to Kafka failed",
                   ex);
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
index 87fcbc1a3..cc23d1af2 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flume.channel.kafka;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
 
 public class KafkaChannelConfiguration {
 
@@ -37,6 +38,9 @@ public class KafkaChannelConfiguration {
   public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic";
   public static final String BOOTSTRAP_SERVERS_CONFIG =
       KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+
+  public static final String TRANSACTIONAL_ID =
+          KAFKA_PRODUCER_PREFIX + ProducerConfig.TRANSACTIONAL_ID_CONFIG;
   public static final String DEFAULT_TOPIC = "flume-channel";
   public static final String DEFAULT_GROUP_ID = "flume";
   public static final String POLL_TIMEOUT = KAFKA_PREFIX + "pollTimeout";
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
index e1279a39c..b370749c9 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
@@ -44,10 +44,8 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
-import static 
org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
 
 public class TestKafkaChannelBase {
 
@@ -97,19 +95,32 @@ public class TestKafkaChannelBase {
   }
 
   KafkaChannel startChannel(boolean parseAsFlume) throws Exception {
-    Context context = prepareDefaultContext(parseAsFlume);
+    return startChannel(parseAsFlume, false);
+  }
+  KafkaChannel startChannel(boolean parseAsFlume, boolean useKafksTxns) throws 
Exception {
+    Context context = prepareDefaultContext(parseAsFlume, useKafksTxns);
     KafkaChannel channel = createChannel(context);
     channel.start();
     return channel;
   }
 
   Context prepareDefaultContext(boolean parseAsFlume) {
+    return prepareDefaultContext(parseAsFlume, false);
+  }
+
+  Context prepareDefaultContext(boolean parseAsFlume, boolean useKafkaTxns) {
     // Prepares a default context with Kafka Server Properties
     Context context = new Context();
     context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
     context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume));
     context.put(TOPIC_CONFIG, topic);
     context.put(KAFKA_CONSUMER_PREFIX + "max.poll.interval.ms", "10000");
+    if (useKafkaTxns) {
+      context.put(TRANSACTIONAL_ID, "3");
+      context.put("kafka.producer." + ENABLE_IDEMPOTENCE_CONFIG, "true");
+      context.put("kafka.producer.acks", "all");
+      context.put("kafka.consumer.isolation.level", "read_committed");
+    }
 
     return context;
   }
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java
 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java
index 6e9e1e25f..219b026f2 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java
@@ -29,27 +29,48 @@ public class TestRollback extends TestKafkaChannelBase {
 
   @Test
   public void testSuccess() throws Exception {
-    doTestSuccessRollback(false, false);
+    doTestSuccessRollback(false, false, false);
   }
 
   @Test
   public void testSuccessInterleave() throws Exception {
-    doTestSuccessRollback(false, true);
+    doTestSuccessRollback(false, true, false);
   }
 
   @Test
   public void testRollbacks() throws Exception {
-    doTestSuccessRollback(true, false);
+    doTestSuccessRollback(true, false, false);
   }
 
   @Test
   public void testRollbacksInterleave() throws Exception {
-    doTestSuccessRollback(true, true);
+    doTestSuccessRollback(true, true, false);
+  }
+
+  @Test
+  public void testSuccessTxns() throws Exception {
+    doTestSuccessRollback(false, false, true);
+  }
+
+  @Test
+  public void testSuccessInterleaveTxns() throws Exception {
+    doTestSuccessRollback(false, true, true);
+  }
+
+  @Test
+  public void testRollbacksTxns() throws Exception {
+    doTestSuccessRollback(true, false, true);
+  }
+
+  @Test
+  public void testRollbacksInterleaveTxns() throws Exception {
+    doTestSuccessRollback(true, true, true);
   }
 
   private void doTestSuccessRollback(final boolean rollback,
-                                     final boolean interleave) throws 
Exception {
-    final KafkaChannel channel = startChannel(true);
+                                     final boolean interleave,
+                                     final boolean useKafkaTxns) throws 
Exception {
+    final KafkaChannel channel = startChannel(true, useKafkaTxns);
     writeAndVerify(rollback, channel, interleave);
     channel.stop();
   }
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
 
b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
index 67419d458..d8a5b2817 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
+++ 
b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
@@ -109,6 +109,13 @@ log.retention.check.interval.ms=60000
 # marked for log compaction.
 log.cleaner.enable=false
 
+############################# Transactions #############################
+
+# Settings only to be used for non-production
+transaction.state.log.replication.factor = 1
+transaction.state.log.min.isr = 1
+transaction.state.log.num.partitions = 1
+
 ############################# Zookeeper #############################
 
 # Zookeeper connection string (see zookeeper docs for details).
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 92014f0f4..c090f3708 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -43,6 +43,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.slf4j.Logger;
@@ -50,6 +51,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -62,18 +65,6 @@ import java.util.concurrent.Future;
 
 import static 
org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
 import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
-import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
-import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_BATCH_SIZE;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_ACKS;
-import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
-import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_HEADER;
-import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.TIMESTAMP_HEADER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
 
 /**
  * A Flume Sink that can publish messages to Kafka.
@@ -125,6 +116,8 @@ public class KafkaSink extends AbstractSink implements 
Configurable, BatchSizeSu
   private String timestampHeader = null;
   private Map<String, String> headerMap;
 
+  private boolean useKafkaTransactions = false;
+
   private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = 
Optional.absent();
   private Optional<ByteArrayOutputStream> tempOutStream = Optional.absent();
 
@@ -156,6 +149,9 @@ public class KafkaSink extends AbstractSink implements 
Configurable, BatchSizeSu
 
       transaction = channel.getTransaction();
       transaction.begin();
+      if (useKafkaTransactions) {
+        producer.beginTransaction();
+      }
 
       kafkaFutures.clear();
       long batchStartTime = System.nanoTime();
@@ -190,7 +186,7 @@ public class KafkaSink extends AbstractSink implements 
Configurable, BatchSizeSu
           eventTopic = topic;
         }
 
-        eventKey = headers.get(KEY_HEADER);
+        eventKey = headers.get(KafkaSinkConstants.KEY_HEADER);
         if (logger.isTraceEnabled()) {
           if (LogPrivacyUtil.allowLogRawData()) {
             logger.trace("{Event} " + eventTopic + " : " + eventKey + " : "
@@ -261,17 +257,20 @@ public class KafkaSink extends AbstractSink implements 
Configurable, BatchSizeSu
         }
       }
 
-      //Prevent linger.ms from holding the batch
-      producer.flush();
-
-      // publish batch and commit.
-      if (processedEvents > 0) {
+      if (useKafkaTransactions) {
+        producer.commitTransaction();
+      } else {
+        //Prevent linger.ms from holding the batch
+        producer.flush();
         for (Future<RecordMetadata> future : kafkaFutures) {
           future.get();
         }
+      }
+      // publish batch and commit.
+      if (processedEvents > 0) {
         long endTime = System.nanoTime();
         counter.addToKafkaEventSendTimer((endTime - batchStartTime) / (1000 * 
1000));
-        counter.addToEventDrainSuccessCount(kafkaFutures.size());
+        counter.addToEventDrainSuccessCount(processedEvents);
       }
 
       transaction.commit();
@@ -283,8 +282,16 @@ public class KafkaSink extends AbstractSink implements 
Configurable, BatchSizeSu
       if (transaction != null) {
         try {
           kafkaFutures.clear();
-          transaction.rollback();
-          counter.incrementRollbackCount();
+          try {
+            if (useKafkaTransactions) {
+              producer.abortTransaction();
+            }
+          } catch (ProducerFencedException e) {
+            logger.error("Could not rollback transaction as producer fenced", 
e);
+          } finally {
+            transaction.rollback();
+            counter.incrementRollbackCount();
+          }
         } catch (Exception e) {
           logger.error("Transaction rollback failed", e);
           throw Throwables.propagate(e);
@@ -304,6 +311,10 @@ public class KafkaSink extends AbstractSink implements 
Configurable, BatchSizeSu
   public synchronized void start() {
     // instantiate the producer
     producer = new KafkaProducer<>(kafkaProps);
+    if (useKafkaTransactions) {
+      logger.info("Transactions enabled, initializing transactions");
+      producer.initTransactions();
+    }
     counter.start();
     super.start();
   }
@@ -333,9 +344,9 @@ public class KafkaSink extends AbstractSink implements 
Configurable, BatchSizeSu
   @Override
   public void configure(Context context) {
 
-    String topicStr = context.getString(TOPIC_CONFIG);
+    String topicStr = context.getString(KafkaSinkConstants.TOPIC_CONFIG);
     if (topicStr == null || topicStr.isEmpty()) {
-      topicStr = DEFAULT_TOPIC;
+      topicStr = KafkaSinkConstants.DEFAULT_TOPIC;
       logger.warn("Topic was not specified. Using {} as the topic.", topicStr);
     } else {
       logger.info("Using the static topic {}. This may be overridden by event 
headers", topicStr);
@@ -343,11 +354,11 @@ public class KafkaSink extends AbstractSink implements 
Configurable, BatchSizeSu
 
     topic = topicStr;
 
-    timestampHeader = context.getString(TIMESTAMP_HEADER);
+    timestampHeader = context.getString(KafkaSinkConstants.TIMESTAMP_HEADER);
 
-    headerMap = context.getSubProperties(KAFKA_HEADER);
+    headerMap = context.getSubProperties(KafkaSinkConstants.KAFKA_HEADER);
 
-    batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
+    batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE, 
KafkaSinkConstants.DEFAULT_BATCH_SIZE);
 
     if (logger.isDebugEnabled()) {
       logger.debug("Using batch size: {}", batchSize);
@@ -365,13 +376,24 @@ public class KafkaSink extends AbstractSink implements 
Configurable, BatchSizeSu
     topicHeader = context.getString(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER,
                                     
KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER);
 
+    String transactionalID = 
context.getString(KafkaSinkConstants.TRANSACTIONAL_ID);
+    if (transactionalID != null) {
+      try {
+        context.put(KafkaSinkConstants.TRANSACTIONAL_ID, 
InetAddress.getLocalHost().getCanonicalHostName() +
+                Thread.currentThread().getName() + transactionalID);
+        useKafkaTransactions = true;
+      } catch (UnknownHostException e) {
+        throw new ConfigurationException("Unable to configure transactional 
id, as cannot work out hostname", e);
+      }
+    }
+
     if (logger.isDebugEnabled()) {
       logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", 
useAvroEventFormat);
     }
 
     kafkaFutures = new LinkedList<Future<RecordMetadata>>();
 
-    String bootStrapServers = context.getString(BOOTSTRAP_SERVERS_CONFIG);
+    String bootStrapServers = 
context.getString(KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG);
     if (bootStrapServers == null || bootStrapServers.isEmpty()) {
       throw new ConfigurationException("Bootstrap Servers must be specified");
     }
@@ -389,11 +411,11 @@ public class KafkaSink extends AbstractSink implements 
Configurable, BatchSizeSu
 
   private void setProducerProps(Context context, String bootStrapServers) {
     kafkaProps.clear();
-    kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
+    kafkaProps.put(ProducerConfig.ACKS_CONFIG, 
KafkaSinkConstants.DEFAULT_ACKS);
     //Defaults overridden based on config
-    kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
DEFAULT_KEY_SERIALIZER);
-    kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
DEFAULT_VALUE_SERIAIZER);
-    kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
+    kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
+    kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER);
+    
kafkaProps.putAll(context.getSubProperties(KafkaSinkConstants.KAFKA_PRODUCER_PREFIX));
     kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
     //  The default value of `ssl.endpoint.identification.algorithm`
     //  is changed to `https`, since kafka client 2.0+
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index df9000f18..7e1bd2332 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -19,6 +19,7 @@
 package org.apache.flume.sink.kafka;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
 
 public class KafkaSinkConstants {
 
@@ -32,6 +33,8 @@ public class KafkaSinkConstants {
   public static final String BOOTSTRAP_SERVERS_CONFIG =
       KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
+  public static final String TRANSACTIONAL_ID = KAFKA_PREFIX + "producer." +  
ProducerConfig.TRANSACTIONAL_ID_CONFIG;
+
   public static final String KEY_HEADER = "key";
   public static final String DEFAULT_TOPIC_OVERRIDE_HEADER = "topic";
   public static final String TOPIC_OVERRIDE_HEADER = "topicHeader";
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
index 672adc900..165601dd9 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
@@ -23,6 +23,7 @@ public class TestConstants {
   public static final String HEADER_TOPIC = "%{header1}-topic";
   public static final String CUSTOM_KEY = "custom-key";
   public static final String CUSTOM_TOPIC = "custom-topic";
+  public static final String TRANSACTIONS_TOPIC = "transactions-topic";
   public static final String HEADER_1_VALUE = "test-avro-header";
   public static final String HEADER_1_KEY = "header1";
   public static final String KAFKA_HEADER_1 = "FLUME_CORRELATOR";
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 3dbcd000f..2c0f1b8a3 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -39,6 +39,8 @@ import org.apache.flume.shared.kafka.test.PartitionOption;
 import org.apache.flume.shared.kafka.test.PartitionTestScenario;
 import org.apache.flume.sink.kafka.util.TestUtil;
 import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.clients.admin.TransactionState;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -55,6 +57,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -65,15 +68,9 @@ import java.util.Properties;
 import java.util.Set;
 
 import static 
org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.AVRO_EVENT;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
-import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
-import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PREFIX;
-import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.*;
 import static 
org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
 import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
 import static 
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
 import static org.junit.Assert.assertEquals;
@@ -87,21 +84,24 @@ import static org.junit.Assert.fail;
 public class TestKafkaSink {
 
   private static final TestUtil testUtil = TestUtil.getInstance();
+  private static List<String> topicsList;
   private final Set<String> usedTopics = new HashSet<String>();
 
   @BeforeClass
   public static void setup() {
     testUtil.prepare();
-    List<String> topics = new ArrayList<String>(3);
-    topics.add(DEFAULT_TOPIC);
-    topics.add(TestConstants.STATIC_TOPIC);
-    topics.add(TestConstants.CUSTOM_TOPIC);
-    topics.add(TestConstants.HEADER_1_VALUE + "-topic");
-    testUtil.initTopicList(topics);
+    topicsList = new ArrayList<String>(3);
+    topicsList.add(DEFAULT_TOPIC);
+    topicsList.add(TestConstants.STATIC_TOPIC);
+    topicsList.add(TestConstants.CUSTOM_TOPIC);
+    topicsList.add(TestConstants.TRANSACTIONS_TOPIC);
+    topicsList.add(TestConstants.HEADER_1_VALUE + "-topic");
+    testUtil.initTopicList(topicsList);
   }
 
   @AfterClass
   public static void tearDown() {
+    testUtil.deleteTopics(topicsList);
     testUtil.tearDown();
   }
 
@@ -276,7 +276,7 @@ public class TestKafkaSink {
         fail("Error Occurred");
       }
     } catch (EventDeliveryException ex) {
-      // ignore
+      fail(ex.getMessage());
     }
     Headers expected = new RecordHeaders();
     expected.add(new RecordHeader(TestConstants.KAFKA_HEADER_1,
@@ -400,7 +400,7 @@ public class TestKafkaSink {
 
   @SuppressWarnings("rawtypes")
   @Test
-  public void testAvroEvent() throws IOException {
+  public void testAvroEvent() throws IOException, InterruptedException {
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
     context.put(AVRO_EVENT, "true");
@@ -491,7 +491,7 @@ public class TestKafkaSink {
     }
     ConsumerRecords recs = pollConsumerRecords(DEFAULT_TOPIC, 2);
     assertNotNull(recs);
-    assertEquals(recs.count(), 0);
+    assertEquals(0, recs.count());
   }
 
   @Test
@@ -791,4 +791,49 @@ public class TestKafkaSink {
     checkMessageArrived(msg, DEFAULT_TOPIC);
   }
 
+  @Test
+  public void testKafkaTransactions() {
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+
+    context.put(TOPIC_CONFIG, TestConstants.TRANSACTIONS_TOPIC);
+
+    context.put(TRANSACTIONAL_ID, "3");
+    context.put("kafka.producer." + ENABLE_IDEMPOTENCE_CONFIG, "true");
+    context.put("kafka.producer.acks", "all");
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    for (int i = 0; i < 5; i++) {
+      String msg = "test tx message " + i;
+      Transaction tx = memoryChannel.getTransaction();
+      tx.begin();
+      Event event = EventBuilder.withBody(msg.getBytes());
+      memoryChannel.put(event);
+      tx.commit();
+      tx.close();
+
+      try {
+        Sink.Status status = kafkaSink.process();
+        if (status == Sink.Status.BACKOFF) {
+          fail("Error Occurred");
+        }
+      } catch (EventDeliveryException ex) {
+        // ignore
+      }
+      checkMessageArrived(msg, TestConstants.TRANSACTIONS_TOPIC);
+    }
+
+    Collection<TransactionListing> transactions = 
testUtil.getTransactionState();
+    Assert.assertEquals(1, transactions.size(), 2);
+    for (TransactionListing transaction : transactions) {
+      Assert.assertEquals(transaction.state(), 
TransactionState.COMPLETE_COMMIT);
+    }
+
+  }
+
+
 }
\ No newline at end of file
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
index cfd57fc1c..2073b6314 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
@@ -20,7 +20,9 @@ package org.apache.flume.sink.kafka.util;
 
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.ListTransactionsResult;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TransactionListing;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -34,6 +36,7 @@ import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -205,7 +208,10 @@ public class TestUtil {
   }
 
   public ConsumerRecords<String, String> getNextMessageFromConsumer(String 
topic) {
-    return consumer.poll(Duration.ofMillis(1000L));
+    ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000L));
+    consumer.commitSync();
+    return records;
+
   }
 
   public void prepare() {
@@ -265,4 +271,24 @@ public class TestUtil {
   public String getKafkaServerSslUrl() {
     return kafkaServerSslUrl;
   }
+
+  public Collection<TransactionListing> getTransactionState() {
+    ListTransactionsResult result = getAdminClient().listTransactions();
+    Throwable throwable = null;
+    for (int i = 0; i < 10; ++i) {
+      try {
+        return result.all().get(1, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        throwable = e;
+      }
+    }
+    throw new RuntimeException("Error getting transactions info", throwable);
+
+  }
+
+  public void deleteTopics(List<String> topicsList) {
+    for (String topic: topicsList) {
+      deleteTopic(topic);
+    }
+  }
 }
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
index 12c72fd3c..7cc06753c 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
@@ -109,6 +109,13 @@ log.retention.check.interval.ms=60000
 # marked for log compaction.
 log.cleaner.enable=false
 
+############################# Transactions #############################
+
+# Settings only to be used for non-production
+transaction.state.log.replication.factor = 1
+transaction.state.log.min.isr = 1
+transaction.state.log.num.partitions = 1
+
 ############################# Zookeeper #############################
 
 # Zookeeper connection string (see zookeeper docs for details).


Reply via email to