This is an automated email from the ASF dual-hosted git repository. rgoers pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/flume.git
commit ac5ed049f96213e9b9b7f1c0a111781c19a411ff Author: Ralph Goers <[email protected]> AuthorDate: Fri Oct 7 19:14:26 2022 -0700 FLUME-3435 - Allow Flume Sink and Source to include Kafka headers and timestamp --- .../org/apache/flume/sink/kafka/KafkaSink.java | 64 +++++++++++++++------ .../flume/sink/kafka/KafkaSinkConstants.java | 2 + .../org/apache/flume/sink/kafka/TestConstants.java | 2 + .../org/apache/flume/sink/kafka/TestKafkaSink.java | 65 +++++++++++++++++++++- .../org/apache/flume/source/kafka/KafkaSource.java | 16 +++++- .../flume/source/kafka/KafkaSourceConstants.java | 1 + .../source/kafka/KafkaSourceEmbeddedKafka.java | 11 +++- .../apache/flume/source/kafka/TestKafkaSource.java | 49 ++++++++++++++++ 8 files changed, 188 insertions(+), 22 deletions(-) diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index e4c9ff7e2..add6916df 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -22,7 +22,6 @@ import com.google.common.base.Optional; import com.google.common.base.Throwables; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.flume.Channel; import org.apache.flume.Context; @@ -44,12 +43,16 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -67,10 +70,12 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_ACKS; import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER; import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC; import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_HEADER; import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX; import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER; import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE; import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.TIMESTAMP_HEADER; import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG; import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY; import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY; @@ -122,13 +127,11 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu private Integer staticPartitionId = null; private boolean allowTopicOverride; private String topicHeader = null; + private String timestampHeader = null; + private Map<String, String> headerMap; - private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = - Optional.absent(); - private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = - Optional.absent(); - private Optional<ByteArrayOutputStream> tempOutStream = Optional - .absent(); + private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = Optional.absent(); + private Optional<ByteArrayOutputStream> tempOutStream = Optional.absent(); //Fine to use null for initial value, Avro will create new ones if this // is null @@ -196,7 +199,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu if (logger.isTraceEnabled()) { if (LogPrivacyUtil.allowLogRawData()) { logger.trace("{Event} " + eventTopic + " : " + eventKey + " : " - + new String(eventBody, "UTF-8")); + + new String(eventBody, StandardCharsets.UTF_8)); } else { logger.trace("{Event} " + eventTopic + " : " + eventKey); } @@ -219,12 +222,38 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu partitionId = Integer.parseInt(headerVal); } } + Long timestamp = null; + if (timestampHeader != null) { + String value = headers.get(timestampHeader); + if (value != null) { + try { + timestamp = Long.parseLong(value); + } catch (Exception ex) { + logger.warn("Invalid timestamp in header {} - {}", timestampHeader, value); + } + } + } + List<Header> kafkaHeaders = null; + if (!headerMap.isEmpty()) { + List<Header> tempHeaders = new ArrayList<>(); + for (Map.Entry<String, String> entry : headerMap.entrySet()) { + String value = headers.get(entry.getKey()); + if (value != null) { + tempHeaders.add(new RecordHeader(entry.getValue(), + value.getBytes(StandardCharsets.UTF_8))); + } + } + if (!tempHeaders.isEmpty()) { + kafkaHeaders = tempHeaders; + } + } + if (partitionId != null) { - record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey, - serializeEvent(event, useAvroEventFormat)); + record = new ProducerRecord<>(eventTopic, partitionId, timestamp, eventKey, + serializeEvent(event, useAvroEventFormat), kafkaHeaders); } else { - record = new ProducerRecord<String, byte[]>(eventTopic, eventKey, - serializeEvent(event, useAvroEventFormat)); + record = new ProducerRecord<>(eventTopic, null, timestamp, eventKey, + serializeEvent(event, useAvroEventFormat), kafkaHeaders); } kafkaFutures.add(producer.send(record, new SinkCallback(startTime))); } catch (NumberFormatException ex) { @@ -247,7 +276,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu } long endTime = System.nanoTime(); counter.addToKafkaEventSendTimer((endTime - batchStartTime) / (1000 * 1000)); - counter.addToEventDrainSuccessCount(Long.valueOf(kafkaFutures.size())); + counter.addToEventDrainSuccessCount(kafkaFutures.size()); } transaction.commit(); @@ -256,7 +285,6 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu String errorMsg = "Failed to publish events"; logger.error("Failed to publish events", ex); counter.incrementEventWriteOrChannelFail(ex); - result = Status.BACKOFF; if (transaction != null) { try { kafkaFutures.clear(); @@ -280,7 +308,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu @Override public synchronized void start() { // instantiate the producer - producer = new KafkaProducer<String,byte[]>(kafkaProps); + producer = new KafkaProducer<>(kafkaProps); counter.start(); super.start(); } @@ -305,7 +333,7 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu * 3. We add the sink's documented parameters which can override other * properties * - * @param context + * @param context The Context. */ @Override public void configure(Context context) { @@ -322,6 +350,10 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu topic = topicStr; + timestampHeader = context.getString(TIMESTAMP_HEADER); + + headerMap = context.getSubProperties(KAFKA_HEADER); + batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE); if (logger.isDebugEnabled()) { diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java index ffca3df72..3f1f279e0 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java @@ -35,8 +35,10 @@ public class KafkaSinkConstants { public static final String KEY_HEADER = "key"; public static final String DEFAULT_TOPIC_OVERRIDE_HEADER = "topic"; public static final String TOPIC_OVERRIDE_HEADER = "topicHeader"; + public static final String TIMESTAMP_HEADER = "timestampHeader"; public static final String ALLOW_TOPIC_OVERRIDE_HEADER = "allowTopicOverride"; public static final boolean DEFAULT_ALLOW_TOPIC_OVERRIDE_HEADER = true; + public static final String KAFKA_HEADER = "header."; public static final String AVRO_EVENT = "useFlumeEventFormat"; public static final boolean DEFAULT_AVRO_EVENT = false; diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java index 8d6dce74c..672adc900 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java @@ -25,4 +25,6 @@ public class TestConstants { public static final String CUSTOM_TOPIC = "custom-topic"; public static final String HEADER_1_VALUE = "test-avro-header"; public static final String HEADER_1_KEY = "header1"; + public static final String KAFKA_HEADER_1 = "FLUME_CORRELATOR"; + public static final String KAFKA_HEADER_2 = "FLUME_METHOD"; } diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index 5a69b16fb..fd5130ca9 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -43,6 +43,9 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -51,6 +54,7 @@ import org.mockito.internal.util.reflection.Whitebox; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -185,13 +189,19 @@ public class TestKafkaSink { } private void checkMessageArrived(String msg, String topic) { - ConsumerRecords recs = pollConsumerRecords(topic); + checkMessageArrived(msg, topic, null, null); + } + + private void checkMessageArrived(String msg, String topic, Long timestamp, Headers headers) { + ConsumerRecords<String, String> recs = pollConsumerRecords(topic); assertNotNull(recs); assertTrue(recs.count() > 0); - Iterator<ConsumerRecord> iter = recs.records(topic).iterator(); + Iterator<ConsumerRecord<String, String>> iter = recs.records(topic).iterator(); boolean match = false; while (iter.hasNext()) { - if (msg.equals(iter.next().value())) { + ConsumerRecord<String, String> record = iter.next(); + if (msg.equals(record.value()) && (timestamp == null || timestamp.equals(record.timestamp())) + && (headers == null || validateHeaders(headers, record.headers()))) { match = true; break; } @@ -199,6 +209,10 @@ public class TestKafkaSink { assertTrue("No message matches " + msg, match); } + private boolean validateHeaders(Headers expected, Headers actual) { + return expected.equals(actual); + } + @Test public void testStaticTopic() { Context context = prepareDefaultContext(); @@ -251,6 +265,51 @@ public class TestKafkaSink { checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC); } + @Test + public void testTimestampAndHeaders() { + Sink kafkaSink = new KafkaSink(); + Context context = prepareDefaultContext(); + context.put(KafkaSinkConstants.TIMESTAMP_HEADER, "timestamp"); + context.put("header.correlator", TestConstants.KAFKA_HEADER_1); + context.put("header.method", TestConstants.KAFKA_HEADER_2); + + Configurables.configure(kafkaSink, context); + Channel memoryChannel = new MemoryChannel(); + Configurables.configure(memoryChannel, context); + kafkaSink.setChannel(memoryChannel); + kafkaSink.start(); + + String msg = "test-topic-and-key-from-header"; + Map<String, String> headers = new HashMap<String, String>(); + long now = System.currentTimeMillis(); + headers.put("timestamp", Long.toString(now)); + headers.put("topic", TestConstants.CUSTOM_TOPIC); + headers.put("key", TestConstants.CUSTOM_KEY); + headers.put("correlator", "12345"); + headers.put("method", "testTimestampAndHeaders"); + Transaction tx = memoryChannel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody(msg.getBytes(), headers); + memoryChannel.put(event); + tx.commit(); + tx.close(); + + try { + Sink.Status status = kafkaSink.process(); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); + } + } catch (EventDeliveryException ex) { + // ignore + } + Headers expected = new RecordHeaders(); + expected.add(new RecordHeader(TestConstants.KAFKA_HEADER_1, + "12345".getBytes(StandardCharsets.UTF_8))); + expected.add(new RecordHeader(TestConstants.KAFKA_HEADER_2, + "testTimestampAndHeaders".getBytes(StandardCharsets.UTF_8))); + checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC, now, expected); + } + /** * Tests that a message will be produced to a topic as specified by a * custom topicHeader parameter (FLUME-3046). diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 7a64ed742..2bd19757d 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -48,6 +48,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.security.auth.SecurityProtocol; @@ -89,6 +91,7 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_SET_TOP import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER; import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER; import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX; +import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_HEADER; import static org.apache.flume.source.kafka.KafkaSourceConstants.KEY_HEADER; import static org.apache.flume.source.kafka.KafkaSourceConstants.MIGRATE_ZOOKEEPER_OFFSETS; import static org.apache.flume.source.kafka.KafkaSourceConstants.OFFSET_HEADER; @@ -164,6 +167,7 @@ public class KafkaSource extends AbstractPollableSource private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS; private String topicHeader = null; private boolean setTopicHeader; + private Map<String, String> headerMap; @Override public long getBatchSize() { @@ -285,7 +289,15 @@ public class KafkaSource extends AbstractPollableSource // Add headers to event (timestamp, topic, partition, key) only if they don't exist if (!headers.containsKey(TIMESTAMP_HEADER)) { - headers.put(TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis())); + headers.put(TIMESTAMP_HEADER, String.valueOf(message.timestamp())); + } + if (!headerMap.isEmpty()) { + Headers kafkaHeaders = message.headers(); + for (Map.Entry<String, String> entry : headerMap.entrySet()) { + for (Header kafkaHeader : kafkaHeaders.headers(entry.getValue())) { + headers.put(entry.getKey(), new String(kafkaHeader.value())); + } + } } // Only set the topic header if setTopicHeader and it isn't already populated if (setTopicHeader && !headers.containsKey(topicHeader)) { @@ -440,6 +452,8 @@ public class KafkaSource extends AbstractPollableSource topicHeader = context.getString(TOPIC_HEADER, DEFAULT_TOPIC_HEADER); + headerMap = context.getSubProperties(KAFKA_HEADER); + setConsumerProps(context); if (log.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) { diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index 8ac437add..5c7857d1b 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -36,6 +36,7 @@ public class KafkaSourceConstants { public static final int DEFAULT_BATCH_SIZE = 1000; public static final int DEFAULT_BATCH_DURATION = 1000; public static final String DEFAULT_GROUP_ID = "flume"; + public static final String KAFKA_HEADER = "header."; public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets"; public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true; diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java index 397af64cf..b2deea9c9 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -142,7 +143,7 @@ public class KafkaSourceEmbeddedKafka { } public void produce(String topic, String k, byte[] v) { - ProducerRecord<String, byte[]> rec = new ProducerRecord<String, byte[]>(topic, k, v); + ProducerRecord<String, byte[]> rec = new ProducerRecord<>(topic, k, v); try { producer.send(rec).get(); } catch (InterruptedException e) { @@ -157,7 +158,13 @@ public class KafkaSourceEmbeddedKafka { } public void produce(String topic, int partition, String k, byte[] v) { - ProducerRecord<String, byte[]> rec = new ProducerRecord<String, byte[]>(topic, partition, k, v); + this.produce(topic, partition, null, k, v, null); + } + + public void produce(String topic, int partition, Long timestamp, String k, byte[] v, + Headers headers) { + ProducerRecord<String, byte[]> rec = new ProducerRecord<>(topic, partition, timestamp, k, v, + headers); try { producer.send(rec).get(); } catch (InterruptedException e) { diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index ee913c9e6..d5caf7ceb 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -44,6 +44,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Time; @@ -63,7 +66,10 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -834,6 +840,49 @@ public class TestKafkaSource { events.clear(); } + + /** + * Tests the availability of the custom topic header in the output events, + * based on the configuration parameters added in FLUME-3046 + * @throws InterruptedException + * @throws EventDeliveryException + */ + @Test + public void testTopicKafkaHeaderSet() throws InterruptedException, EventDeliveryException { + final String correlatorHeader = "FLUME_CORRELATOR"; + context.put(TOPICS, topic0); + context.put(KafkaSourceConstants.TOPIC_HEADER, "customTopicHeader"); + context.put(KafkaSourceConstants.KAFKA_HEADER + "correlator", correlatorHeader); + context.put(TIMESTAMP_HEADER, "true"); + kafkaSource.configure(context); + + startKafkaSource(); + + Thread.sleep(500L); + + long date = ZonedDateTime.of(2022, 10, 7, 8, 0, 0, 0, + ZoneId.systemDefault()).toInstant().toEpochMilli(); + Headers headers = new RecordHeaders(); + headers.add(new RecordHeader(correlatorHeader, "12345".getBytes(StandardCharsets.UTF_8))); + kafkaServer.produce(topic0, 0, date, "", "hello, world2".getBytes(StandardCharsets.UTF_8), + headers); + + Thread.sleep(500L); + + Status status = kafkaSource.process(); + assertEquals(Status.READY, status); + Assert.assertEquals("hello, world2", new String(events.get(0).getBody(), + Charsets.UTF_8)); + Map<String, String> flumeHeaders = events.get(0).getHeaders(); + Assert.assertEquals(Long.toString(date), flumeHeaders.get("timestamp")); + Assert.assertEquals(topic0, flumeHeaders.get("customTopicHeader")); + Assert.assertEquals("12345", flumeHeaders.get("correlator")); + + kafkaSource.stop(); + events.clear(); + } + + /** * Tests the unavailability of the topic header in the output events, * based on the configuration parameters added in FLUME-3046
