This is an automated email from the ASF dual-hosted git repository.
tristan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new 185b95137 Add Kafka Transactions to KafkaChannel and KafkaSink,
including unit tests.
185b95137 is described below
commit 185b9513706cd5b6046f00c9821a83580e2c35da
Author: tmgstevens <[email protected]>
AuthorDate: Fri Oct 28 14:46:05 2022 +0100
Add Kafka Transactions to KafkaChannel and KafkaSink, including unit tests.
---
.../apache/flume/channel/kafka/KafkaChannel.java | 74 +++++++++++++------
.../channel/kafka/KafkaChannelConfiguration.java | 4 ++
.../flume/channel/kafka/TestKafkaChannelBase.java | 21 ++++--
.../apache/flume/channel/kafka/TestRollback.java | 33 +++++++--
.../src/test/resources/kafka-server.properties | 7 ++
.../org/apache/flume/sink/kafka/KafkaSink.java | 84 ++++++++++++++--------
.../flume/sink/kafka/KafkaSinkConstants.java | 3 +
.../org/apache/flume/sink/kafka/TestConstants.java | 1 +
.../org/apache/flume/sink/kafka/TestKafkaSink.java | 79 +++++++++++++++-----
.../org/apache/flume/sink/kafka/util/TestUtil.java | 28 +++++++-
.../src/test/resources/kafka-server.properties | 7 ++
11 files changed, 258 insertions(+), 83 deletions(-)
diff --git
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 8336e94e2..acfb3b193 100644
---
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -58,6 +58,8 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
@@ -73,26 +75,10 @@ import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_ACKS;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_AUTO_OFFSET_RESET;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_GROUP_ID;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_DESERIALIZER;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_SERIALIZER;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_PARSE_AS_FLUME_EVENT;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_POLL_TIMEOUT;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_TOPIC;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_DESERIAIZER;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_SERIAIZER;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARTITION_HEADER_NAME;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.POLL_TIMEOUT;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.STATIC_PARTITION_CONF;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
import static
org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
@@ -105,6 +91,11 @@ public class KafkaChannel extends BasicChannelSemantics {
private final Properties producerProps = new Properties();
private KafkaProducer<String, byte[]> producer;
+
+ // Used to ensure that we don't have multiple threads attempting to use the
KafkaProducer (above) for transactions
+ // at the same time as whilst the KafkaProducer is thread-safe you cannot
have more than one in-flight transaction.
+ private Lock kafkaTxLock = new ReentrantLock();
+
private final String channelUUID = UUID.randomUUID().toString();
private AtomicReference<String> topic = new AtomicReference<String>();
@@ -113,6 +104,7 @@ public class KafkaChannel extends BasicChannelSemantics {
private String groupId = DEFAULT_GROUP_ID;
private String partitionHeader = null;
private Integer staticPartitionId;
+ private boolean useKafkaTransactions;
// used to indicate if a rebalance has occurred during the current
transaction
AtomicBoolean rebalanceFlag = new AtomicBoolean();
@@ -142,6 +134,10 @@ public class KafkaChannel extends BasicChannelSemantics {
public void start() {
logger.info("Starting Kafka Channel: {}", getName());
producer = new KafkaProducer<String, byte[]>(producerProps);
+ if (useKafkaTransactions) {
+ logger.debug("Initializing Kafka Transaction");
+ producer.initTransactions();
+ }
// We always have just one topic being read by one thread
logger.info("Topic = {}", topic.get());
counter.start();
@@ -188,6 +184,17 @@ public class KafkaChannel extends BasicChannelSemantics {
throw new ConfigurationException("Bootstrap Servers must be specified");
}
+ String transactionalID = ctx.getString(TRANSACTIONAL_ID);
+ if (transactionalID != null) {
+ try {
+ ctx.put(TRANSACTIONAL_ID,
InetAddress.getLocalHost().getCanonicalHostName() +
+ Thread.currentThread().getName() + transactionalID);
+ useKafkaTransactions = true;
+ } catch (UnknownHostException e) {
+ throw new ConfigurationException("Unable to configure transactional
id, as cannot work out hostname", e);
+ }
+ }
+
setProducerProps(ctx, bootStrapServers);
setConsumerProps(ctx, bootStrapServers);
@@ -438,6 +445,11 @@ public class KafkaChannel extends BasicChannelSemantics {
kafkaFutures = Optional.of(new LinkedList<Future<RecordMetadata>>());
}
try {
+ if (useKafkaTransactions) {
+ kafkaTxLock.lock();
+ logger.debug("Beginning Kafka Transaction");
+ producer.beginTransaction();
+ }
long batchSize = producerRecords.get().size();
long startTime = System.nanoTime();
int index = 0;
@@ -445,11 +457,19 @@ public class KafkaChannel extends BasicChannelSemantics {
index++;
kafkaFutures.get().add(producer.send(record, new
ChannelCallback(index, startTime)));
}
- //prevents linger.ms from being a problem
- producer.flush();
- for (Future<RecordMetadata> future : kafkaFutures.get()) {
- future.get();
+ if (useKafkaTransactions) {
+ logger.debug("Committing Kafka Transaction");
+ producer.commitTransaction();
+ kafkaTxLock.unlock();
+ } else {
+ // Ensure that the records are actually flushed by the producer,
regardless of linger.ms.
+ // Per the Kafka docs we do not need to linger or wait for the
callback if we're using transactions
+ producer.flush();
+
+ for (Future<RecordMetadata> future : kafkaFutures.get()) {
+ future.get();
+ }
}
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 *
1000));
@@ -457,6 +477,14 @@ public class KafkaChannel extends BasicChannelSemantics {
producerRecords.get().clear();
kafkaFutures.get().clear();
} catch (Exception ex) {
+ if (useKafkaTransactions) {
+ logger.debug("Aborting transaction");
+ try {
+ producer.abortTransaction();
+ } finally {
+ kafkaTxLock.unlock();
+ }
+ }
logger.warn("Sending events to Kafka failed", ex);
throw new ChannelException("Commit failed as send to Kafka failed",
ex);
diff --git
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
index 87fcbc1a3..cc23d1af2 100644
---
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
+++
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.flume.channel.kafka;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
public class KafkaChannelConfiguration {
@@ -37,6 +38,9 @@ public class KafkaChannelConfiguration {
public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic";
public static final String BOOTSTRAP_SERVERS_CONFIG =
KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+
+ public static final String TRANSACTIONAL_ID =
+ KAFKA_PRODUCER_PREFIX + ProducerConfig.TRANSACTIONAL_ID_CONFIG;
public static final String DEFAULT_TOPIC = "flume-channel";
public static final String DEFAULT_GROUP_ID = "flume";
public static final String POLL_TIMEOUT = KAFKA_PREFIX + "pollTimeout";
diff --git
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
index e1279a39c..b370749c9 100644
---
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
+++
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
@@ -44,10 +44,8 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
-import static
org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+import static
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
public class TestKafkaChannelBase {
@@ -97,19 +95,32 @@ public class TestKafkaChannelBase {
}
KafkaChannel startChannel(boolean parseAsFlume) throws Exception {
- Context context = prepareDefaultContext(parseAsFlume);
+ return startChannel(parseAsFlume, false);
+ }
+ KafkaChannel startChannel(boolean parseAsFlume, boolean useKafksTxns) throws
Exception {
+ Context context = prepareDefaultContext(parseAsFlume, useKafksTxns);
KafkaChannel channel = createChannel(context);
channel.start();
return channel;
}
Context prepareDefaultContext(boolean parseAsFlume) {
+ return prepareDefaultContext(parseAsFlume, false);
+ }
+
+ Context prepareDefaultContext(boolean parseAsFlume, boolean useKafkaTxns) {
// Prepares a default context with Kafka Server Properties
Context context = new Context();
context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume));
context.put(TOPIC_CONFIG, topic);
context.put(KAFKA_CONSUMER_PREFIX + "max.poll.interval.ms", "10000");
+ if (useKafkaTxns) {
+ context.put(TRANSACTIONAL_ID, "3");
+ context.put("kafka.producer." + ENABLE_IDEMPOTENCE_CONFIG, "true");
+ context.put("kafka.producer.acks", "all");
+ context.put("kafka.consumer.isolation.level", "read_committed");
+ }
return context;
}
diff --git
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java
index 6e9e1e25f..219b026f2 100644
---
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java
+++
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java
@@ -29,27 +29,48 @@ public class TestRollback extends TestKafkaChannelBase {
@Test
public void testSuccess() throws Exception {
- doTestSuccessRollback(false, false);
+ doTestSuccessRollback(false, false, false);
}
@Test
public void testSuccessInterleave() throws Exception {
- doTestSuccessRollback(false, true);
+ doTestSuccessRollback(false, true, false);
}
@Test
public void testRollbacks() throws Exception {
- doTestSuccessRollback(true, false);
+ doTestSuccessRollback(true, false, false);
}
@Test
public void testRollbacksInterleave() throws Exception {
- doTestSuccessRollback(true, true);
+ doTestSuccessRollback(true, true, false);
+ }
+
+ @Test
+ public void testSuccessTxns() throws Exception {
+ doTestSuccessRollback(false, false, true);
+ }
+
+ @Test
+ public void testSuccessInterleaveTxns() throws Exception {
+ doTestSuccessRollback(false, true, true);
+ }
+
+ @Test
+ public void testRollbacksTxns() throws Exception {
+ doTestSuccessRollback(true, false, true);
+ }
+
+ @Test
+ public void testRollbacksInterleaveTxns() throws Exception {
+ doTestSuccessRollback(true, true, true);
}
private void doTestSuccessRollback(final boolean rollback,
- final boolean interleave) throws
Exception {
- final KafkaChannel channel = startChannel(true);
+ final boolean interleave,
+ final boolean useKafkaTxns) throws
Exception {
+ final KafkaChannel channel = startChannel(true, useKafkaTxns);
writeAndVerify(rollback, channel, interleave);
channel.stop();
}
diff --git
a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
index 67419d458..d8a5b2817 100644
---
a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
+++
b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
@@ -109,6 +109,13 @@ log.retention.check.interval.ms=60000
# marked for log compaction.
log.cleaner.enable=false
+############################# Transactions #############################
+
+# Settings only to be used for non-production
+transaction.state.log.replication.factor = 1
+transaction.state.log.min.isr = 1
+transaction.state.log.num.partitions = 1
+
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
diff --git
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 92014f0f4..c090f3708 100644
---
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -43,6 +43,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
@@ -50,6 +51,8 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -62,18 +65,6 @@ import java.util.concurrent.Future;
import static
org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled;
-import static
org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
-import static
org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_BATCH_SIZE;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_ACKS;
-import static
org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
-import static
org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_HEADER;
-import static
org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.TIMESTAMP_HEADER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
/**
* A Flume Sink that can publish messages to Kafka.
@@ -125,6 +116,8 @@ public class KafkaSink extends AbstractSink implements
Configurable, BatchSizeSu
private String timestampHeader = null;
private Map<String, String> headerMap;
+ private boolean useKafkaTransactions = false;
+
private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
Optional.absent();
private Optional<ByteArrayOutputStream> tempOutStream = Optional.absent();
@@ -156,6 +149,9 @@ public class KafkaSink extends AbstractSink implements
Configurable, BatchSizeSu
transaction = channel.getTransaction();
transaction.begin();
+ if (useKafkaTransactions) {
+ producer.beginTransaction();
+ }
kafkaFutures.clear();
long batchStartTime = System.nanoTime();
@@ -190,7 +186,7 @@ public class KafkaSink extends AbstractSink implements
Configurable, BatchSizeSu
eventTopic = topic;
}
- eventKey = headers.get(KEY_HEADER);
+ eventKey = headers.get(KafkaSinkConstants.KEY_HEADER);
if (logger.isTraceEnabled()) {
if (LogPrivacyUtil.allowLogRawData()) {
logger.trace("{Event} " + eventTopic + " : " + eventKey + " : "
@@ -261,17 +257,20 @@ public class KafkaSink extends AbstractSink implements
Configurable, BatchSizeSu
}
}
- //Prevent linger.ms from holding the batch
- producer.flush();
-
- // publish batch and commit.
- if (processedEvents > 0) {
+ if (useKafkaTransactions) {
+ producer.commitTransaction();
+ } else {
+ //Prevent linger.ms from holding the batch
+ producer.flush();
for (Future<RecordMetadata> future : kafkaFutures) {
future.get();
}
+ }
+ // publish batch and commit.
+ if (processedEvents > 0) {
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime - batchStartTime) / (1000 *
1000));
- counter.addToEventDrainSuccessCount(kafkaFutures.size());
+ counter.addToEventDrainSuccessCount(processedEvents);
}
transaction.commit();
@@ -283,8 +282,16 @@ public class KafkaSink extends AbstractSink implements
Configurable, BatchSizeSu
if (transaction != null) {
try {
kafkaFutures.clear();
- transaction.rollback();
- counter.incrementRollbackCount();
+ try {
+ if (useKafkaTransactions) {
+ producer.abortTransaction();
+ }
+ } catch (ProducerFencedException e) {
+ logger.error("Could not rollback transaction as producer fenced",
e);
+ } finally {
+ transaction.rollback();
+ counter.incrementRollbackCount();
+ }
} catch (Exception e) {
logger.error("Transaction rollback failed", e);
throw Throwables.propagate(e);
@@ -304,6 +311,10 @@ public class KafkaSink extends AbstractSink implements
Configurable, BatchSizeSu
public synchronized void start() {
// instantiate the producer
producer = new KafkaProducer<>(kafkaProps);
+ if (useKafkaTransactions) {
+ logger.info("Transactions enabled, initializing transactions");
+ producer.initTransactions();
+ }
counter.start();
super.start();
}
@@ -333,9 +344,9 @@ public class KafkaSink extends AbstractSink implements
Configurable, BatchSizeSu
@Override
public void configure(Context context) {
- String topicStr = context.getString(TOPIC_CONFIG);
+ String topicStr = context.getString(KafkaSinkConstants.TOPIC_CONFIG);
if (topicStr == null || topicStr.isEmpty()) {
- topicStr = DEFAULT_TOPIC;
+ topicStr = KafkaSinkConstants.DEFAULT_TOPIC;
logger.warn("Topic was not specified. Using {} as the topic.", topicStr);
} else {
logger.info("Using the static topic {}. This may be overridden by event
headers", topicStr);
@@ -343,11 +354,11 @@ public class KafkaSink extends AbstractSink implements
Configurable, BatchSizeSu
topic = topicStr;
- timestampHeader = context.getString(TIMESTAMP_HEADER);
+ timestampHeader = context.getString(KafkaSinkConstants.TIMESTAMP_HEADER);
- headerMap = context.getSubProperties(KAFKA_HEADER);
+ headerMap = context.getSubProperties(KafkaSinkConstants.KAFKA_HEADER);
- batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
+ batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE,
KafkaSinkConstants.DEFAULT_BATCH_SIZE);
if (logger.isDebugEnabled()) {
logger.debug("Using batch size: {}", batchSize);
@@ -365,13 +376,24 @@ public class KafkaSink extends AbstractSink implements
Configurable, BatchSizeSu
topicHeader = context.getString(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER,
KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER);
+ String transactionalID =
context.getString(KafkaSinkConstants.TRANSACTIONAL_ID);
+ if (transactionalID != null) {
+ try {
+ context.put(KafkaSinkConstants.TRANSACTIONAL_ID,
InetAddress.getLocalHost().getCanonicalHostName() +
+ Thread.currentThread().getName() + transactionalID);
+ useKafkaTransactions = true;
+ } catch (UnknownHostException e) {
+ throw new ConfigurationException("Unable to configure transactional
id, as cannot work out hostname", e);
+ }
+ }
+
if (logger.isDebugEnabled()) {
logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}",
useAvroEventFormat);
}
kafkaFutures = new LinkedList<Future<RecordMetadata>>();
- String bootStrapServers = context.getString(BOOTSTRAP_SERVERS_CONFIG);
+ String bootStrapServers =
context.getString(KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG);
if (bootStrapServers == null || bootStrapServers.isEmpty()) {
throw new ConfigurationException("Bootstrap Servers must be specified");
}
@@ -389,11 +411,11 @@ public class KafkaSink extends AbstractSink implements
Configurable, BatchSizeSu
private void setProducerProps(Context context, String bootStrapServers) {
kafkaProps.clear();
- kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
+ kafkaProps.put(ProducerConfig.ACKS_CONFIG,
KafkaSinkConstants.DEFAULT_ACKS);
//Defaults overridden based on config
- kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
DEFAULT_KEY_SERIALIZER);
- kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
DEFAULT_VALUE_SERIAIZER);
- kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
+ kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
+ kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER);
+
kafkaProps.putAll(context.getSubProperties(KafkaSinkConstants.KAFKA_PRODUCER_PREFIX));
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
// The default value of `ssl.endpoint.identification.algorithm`
// is changed to `https`, since kafka client 2.0+
diff --git
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index df9000f18..7e1bd2332 100644
---
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -19,6 +19,7 @@
package org.apache.flume.sink.kafka;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
public class KafkaSinkConstants {
@@ -32,6 +33,8 @@ public class KafkaSinkConstants {
public static final String BOOTSTRAP_SERVERS_CONFIG =
KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+ public static final String TRANSACTIONAL_ID = KAFKA_PREFIX + "producer." +
ProducerConfig.TRANSACTIONAL_ID_CONFIG;
+
public static final String KEY_HEADER = "key";
public static final String DEFAULT_TOPIC_OVERRIDE_HEADER = "topic";
public static final String TOPIC_OVERRIDE_HEADER = "topicHeader";
diff --git
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
index 672adc900..165601dd9 100644
---
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
+++
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
@@ -23,6 +23,7 @@ public class TestConstants {
public static final String HEADER_TOPIC = "%{header1}-topic";
public static final String CUSTOM_KEY = "custom-key";
public static final String CUSTOM_TOPIC = "custom-topic";
+ public static final String TRANSACTIONS_TOPIC = "transactions-topic";
public static final String HEADER_1_VALUE = "test-avro-header";
public static final String HEADER_1_KEY = "header1";
public static final String KAFKA_HEADER_1 = "FLUME_CORRELATOR";
diff --git
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 3dbcd000f..2c0f1b8a3 100644
---
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -39,6 +39,8 @@ import org.apache.flume.shared.kafka.test.PartitionOption;
import org.apache.flume.shared.kafka.test.PartitionTestScenario;
import org.apache.flume.sink.kafka.util.TestUtil;
import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.clients.admin.TransactionState;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -55,6 +57,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -65,15 +68,9 @@ import java.util.Properties;
import java.util.Set;
import static
org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.AVRO_EVENT;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
-import static
org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
-import static
org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PREFIX;
-import static
org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.*;
import static
org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
import static
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
import static
org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
import static org.junit.Assert.assertEquals;
@@ -87,21 +84,24 @@ import static org.junit.Assert.fail;
public class TestKafkaSink {
private static final TestUtil testUtil = TestUtil.getInstance();
+ private static List<String> topicsList;
private final Set<String> usedTopics = new HashSet<String>();
@BeforeClass
public static void setup() {
testUtil.prepare();
- List<String> topics = new ArrayList<String>(3);
- topics.add(DEFAULT_TOPIC);
- topics.add(TestConstants.STATIC_TOPIC);
- topics.add(TestConstants.CUSTOM_TOPIC);
- topics.add(TestConstants.HEADER_1_VALUE + "-topic");
- testUtil.initTopicList(topics);
+ topicsList = new ArrayList<String>(3);
+ topicsList.add(DEFAULT_TOPIC);
+ topicsList.add(TestConstants.STATIC_TOPIC);
+ topicsList.add(TestConstants.CUSTOM_TOPIC);
+ topicsList.add(TestConstants.TRANSACTIONS_TOPIC);
+ topicsList.add(TestConstants.HEADER_1_VALUE + "-topic");
+ testUtil.initTopicList(topicsList);
}
@AfterClass
public static void tearDown() {
+ testUtil.deleteTopics(topicsList);
testUtil.tearDown();
}
@@ -276,7 +276,7 @@ public class TestKafkaSink {
fail("Error Occurred");
}
} catch (EventDeliveryException ex) {
- // ignore
+ fail(ex.getMessage());
}
Headers expected = new RecordHeaders();
expected.add(new RecordHeader(TestConstants.KAFKA_HEADER_1,
@@ -400,7 +400,7 @@ public class TestKafkaSink {
@SuppressWarnings("rawtypes")
@Test
- public void testAvroEvent() throws IOException {
+ public void testAvroEvent() throws IOException, InterruptedException {
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
context.put(AVRO_EVENT, "true");
@@ -491,7 +491,7 @@ public class TestKafkaSink {
}
ConsumerRecords recs = pollConsumerRecords(DEFAULT_TOPIC, 2);
assertNotNull(recs);
- assertEquals(recs.count(), 0);
+ assertEquals(0, recs.count());
}
@Test
@@ -791,4 +791,49 @@ public class TestKafkaSink {
checkMessageArrived(msg, DEFAULT_TOPIC);
}
+ @Test
+ public void testKafkaTransactions() {
+ Sink kafkaSink = new KafkaSink();
+ Context context = prepareDefaultContext();
+
+ context.put(TOPIC_CONFIG, TestConstants.TRANSACTIONS_TOPIC);
+
+ context.put(TRANSACTIONAL_ID, "3");
+ context.put("kafka.producer." + ENABLE_IDEMPOTENCE_CONFIG, "true");
+ context.put("kafka.producer.acks", "all");
+ Configurables.configure(kafkaSink, context);
+ Channel memoryChannel = new MemoryChannel();
+ Configurables.configure(memoryChannel, context);
+ kafkaSink.setChannel(memoryChannel);
+ kafkaSink.start();
+
+ for (int i = 0; i < 5; i++) {
+ String msg = "test tx message " + i;
+ Transaction tx = memoryChannel.getTransaction();
+ tx.begin();
+ Event event = EventBuilder.withBody(msg.getBytes());
+ memoryChannel.put(event);
+ tx.commit();
+ tx.close();
+
+ try {
+ Sink.Status status = kafkaSink.process();
+ if (status == Sink.Status.BACKOFF) {
+ fail("Error Occurred");
+ }
+ } catch (EventDeliveryException ex) {
+ // ignore
+ }
+ checkMessageArrived(msg, TestConstants.TRANSACTIONS_TOPIC);
+ }
+
+ Collection<TransactionListing> transactions =
testUtil.getTransactionState();
+ Assert.assertEquals(1, transactions.size(), 2);
+ for (TransactionListing transaction : transactions) {
+ Assert.assertEquals(transaction.state(),
TransactionState.COMPLETE_COMMIT);
+ }
+
+ }
+
+
}
\ No newline at end of file
diff --git
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
index cfd57fc1c..2073b6314 100644
---
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
+++
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
@@ -20,7 +20,9 @@ package org.apache.flume.sink.kafka.util;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.ListTransactionsResult;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TransactionListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -34,6 +36,7 @@ import java.net.InetAddress;
import java.net.ServerSocket;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -205,7 +208,10 @@ public class TestUtil {
}
public ConsumerRecords<String, String> getNextMessageFromConsumer(String
topic) {
- return consumer.poll(Duration.ofMillis(1000L));
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000L));
+ consumer.commitSync();
+ return records;
+
}
public void prepare() {
@@ -265,4 +271,24 @@ public class TestUtil {
public String getKafkaServerSslUrl() {
return kafkaServerSslUrl;
}
+
+ public Collection<TransactionListing> getTransactionState() {
+ ListTransactionsResult result = getAdminClient().listTransactions();
+ Throwable throwable = null;
+ for (int i = 0; i < 10; ++i) {
+ try {
+ return result.all().get(1, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throwable = e;
+ }
+ }
+ throw new RuntimeException("Error getting transactions info", throwable);
+
+ }
+
+ public void deleteTopics(List<String> topicsList) {
+ for (String topic: topicsList) {
+ deleteTopic(topic);
+ }
+ }
}
diff --git
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
index 12c72fd3c..7cc06753c 100644
---
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
+++
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
@@ -109,6 +109,13 @@ log.retention.check.interval.ms=60000
# marked for log compaction.
log.cleaner.enable=false
+############################# Transactions #############################
+
+# Settings only to be used for non-production
+transaction.state.log.replication.factor = 1
+transaction.state.log.min.isr = 1
+transaction.state.log.num.partitions = 1
+
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).