Repository: flume Updated Branches: refs/heads/flume-1.6 7888f94ed -> 832594a29
FLUME-2500: Add a channel that uses Kafka (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/832594a2 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/832594a2 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/832594a2 Branch: refs/heads/flume-1.6 Commit: 832594a293dd9f4fc3d45164215aa776bea2c1f9 Parents: 7888f94 Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Oct 28 14:59:23 2014 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Oct 28 14:59:55 2014 -0700 ---------------------------------------------------------------------- flume-ng-channels/flume-kafka-channel/pom.xml | 54 +++ .../flume/channel/kafka/KafkaChannel.java | 411 ++++++++++++++++++ .../kafka/KafkaChannelConfiguration.java | 44 ++ .../flume/channel/kafka/TestKafkaChannel.java | 418 +++++++++++++++++++ .../src/test/resources/kafka-server.properties | 118 ++++++ .../src/test/resources/log4j.properties | 78 ++++ .../src/test/resources/zookeeper.properties | 20 + flume-ng-channels/pom.xml | 1 + flume-ng-sinks/flume-ng-kafka-sink/pom.xml | 11 + .../flume/sink/kafka/util/KafkaConsumer.java | 2 +- .../apache/flume/sink/kafka/util/TestUtil.java | 1 + 11 files changed, 1157 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/832594a2/flume-ng-channels/flume-kafka-channel/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml new file mode 100644 index 0000000..2da98b9 --- /dev/null +++ b/flume-ng-channels/flume-kafka-channel/pom.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flume-ng-channels</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.6.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.flume.flume-ng-channels</groupId> + <artifactId>flume-kafka-channel</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-ng-kafka-sink</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/832594a2/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java new file mode 100644 index 0000000..d767aac --- /dev/null +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.kafka; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import kafka.consumer.*; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import org.apache.avro.io.*; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.flume.*; +import org.apache.flume.channel.BasicChannelSemantics; +import org.apache.flume.channel.BasicTransactionSemantics; +import org.apache.flume.conf.ConfigurationException; + +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; + +import org.apache.flume.event.EventBuilder; +import org.apache.flume.source.avro.AvroFlumeEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +public class KafkaChannel extends BasicChannelSemantics { + + private final static Logger LOGGER = + LoggerFactory.getLogger(KafkaChannel.class); + + + private final Properties kafkaConf = new Properties(); + private Producer<String, byte[]> producer; + private final String channelUUID = UUID.randomUUID().toString(); + + private AtomicReference<String> topic = new AtomicReference<String>(); + private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT; + private final Map<String, Integer> topicCountMap = + Collections.synchronizedMap(new HashMap<String, Integer>()); + + // Track all consumers to close them eventually. + private final List<ConsumerAndIterator> consumers = + Collections.synchronizedList(new LinkedList<ConsumerAndIterator>()); + + /* Each ConsumerConnector commit will commit all partitions owned by it. To + * ensure that each partition is only committed when all events are + * actually done, we will need to keep a ConsumerConnector per thread. + * See Neha's answer here: + * http://grokbase.com/t/kafka/users/13b4gmk2jk/commit-offset-per-topic + * Since only one consumer connector will a partition at any point in time, + * when we commit the partition we would have committed all events to the + * final destination from that partition. + * + * If a new partition gets assigned to this connector, + * my understanding is that all message from the last partition commit will + * get replayed which may cause duplicates -- which is fine as this + * happens only on partition rebalancing which is on failure or new nodes + * coming up, which is rare. + */ + private final ThreadLocal<ConsumerAndIterator> consumerAndIter = new + ThreadLocal<ConsumerAndIterator>() { + @Override + public ConsumerAndIterator initialValue() { + return createConsumerAndIter(); + } + }; + + @Override + public void start() { + try { + LOGGER.info("Starting Kafka Channel: " + getName()); + producer = new Producer<String, byte[]>(new ProducerConfig(kafkaConf)); + // We always have just one topic being read by one thread + LOGGER.info("Topic = " + topic.get()); + topicCountMap.put(topic.get(), 1); + super.start(); + } catch (Exception e) { + LOGGER.error("Could not start producer"); + throw new FlumeException("Unable to create Kafka Connections. " + + "Check whether Kafka Brokers are up and that the " + + "Flume agent can connect to it.", e); + } + } + + @Override + public void stop() { + for (ConsumerAndIterator c : consumers) { + try { + decommissionConsumerAndIterator(c); + } catch (Exception ex) { + LOGGER.warn("Error while shutting down consumer.", ex); + } + } + producer.close(); + super.stop(); + } + + @Override + protected BasicTransactionSemantics createTransaction() { + return new KafkaTransaction(); + } + + private synchronized ConsumerAndIterator createConsumerAndIter() { + try { + ConsumerConfig consumerConfig = new ConsumerConfig(kafkaConf); + ConsumerConnector consumer = + Consumer.createJavaConsumerConnector(consumerConfig); + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = + consumer.createMessageStreams(topicCountMap); + final List<KafkaStream<byte[], byte[]>> streamList = consumerMap + .get(topic.get()); + KafkaStream<byte[], byte[]> stream = streamList.remove(0); + ConsumerAndIterator ret = + new ConsumerAndIterator(consumer, stream.iterator(), channelUUID); + consumers.add(ret); + LOGGER.info("Created new consumer to connect to Kafka"); + return ret; + } catch (Exception e) { + throw new FlumeException("Unable to connect to Kafka", e); + } + } + + Properties getKafkaConf() { + return kafkaConf; + } + + @Override + public void configure(Context ctx) { + String topicStr = ctx.getString(TOPIC); + if (topicStr == null || topicStr.isEmpty()) { + topicStr = DEFAULT_TOPIC; + LOGGER + .info("Topic was not specified. Using " + topicStr + " as the topic."); + } + topic.set(topicStr); + String groupId = ctx.getString(GROUP_ID_FLUME); + if (groupId == null || groupId.isEmpty()) { + groupId = DEFAULT_GROUP_ID; + LOGGER.info( + "Group ID was not specified. Using " + groupId + " as the group id."); + } + String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY); + if (brokerList == null || brokerList.isEmpty()) { + throw new ConfigurationException("Broker List must be specified"); + } + String zkConnect = ctx.getString(ZOOKEEPER_CONNECT_FLUME_KEY); + if (zkConnect == null || zkConnect.isEmpty()) { + throw new ConfigurationException( + "Zookeeper Connection must be specified"); + } + Long timeout = ctx.getLong(TIMEOUT, Long.valueOf(DEFAULT_TIMEOUT)); + kafkaConf.putAll(ctx.getSubProperties(KAFKA_PREFIX)); + kafkaConf.put(GROUP_ID, groupId); + kafkaConf.put(BROKER_LIST_KEY, brokerList); + kafkaConf.put(ZOOKEEPER_CONNECT, zkConnect); + kafkaConf.put(AUTO_COMMIT_ENABLED, String.valueOf(false)); + kafkaConf.put(CONSUMER_TIMEOUT, String.valueOf(timeout)); + kafkaConf.put(REQUIRED_ACKS_KEY, "-1"); + LOGGER.info(kafkaConf.toString()); + parseAsFlumeEvent = + ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT); + + boolean readSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET, + DEFAULT_READ_SMALLEST_OFFSET); + // If the data is to be parsed as Flume events, we always read the smallest. + // Else, we read the configuration, which by default reads the largest. + if (parseAsFlumeEvent || readSmallest) { + // readSmallest is eval-ed only if parseAsFlumeEvent is false. + // The default is largest, so we don't need to set it explicitly. + kafkaConf.put("auto.offset.reset", "smallest"); + } + + } + + private void decommissionConsumerAndIterator(ConsumerAndIterator c) { + if (c.failedEvents.isEmpty()) { + c.consumer.commitOffsets(); + } + c.failedEvents.clear(); + c.consumer.shutdown(); + } + + // Force a consumer to be initialized. There are many duplicates in + // tests due to rebalancing - making testing tricky. In production, + // this is less of an issue as + // rebalancing would happen only on startup. + @VisibleForTesting + void registerThread() { + consumerAndIter.get(); + } + + private enum TransactionType { + PUT, + TAKE, + NONE + } + + + private class KafkaTransaction extends BasicTransactionSemantics { + + private TransactionType type = TransactionType.NONE; + // For Puts + private Optional<ByteArrayOutputStream> tempOutStream = Optional + .absent(); + + // For put transactions, serialize the events and batch them and send it. + private Optional<LinkedList<byte[]>> serializedEvents = Optional.absent(); + // For take transactions, deserialize and hold them till commit goes through + private Optional<LinkedList<Event>> events = Optional.absent(); + private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = + Optional.absent(); + private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = + Optional.absent(); + + // Fine to use null for initial value, Avro will create new ones if this + // is null + private BinaryEncoder encoder = null; + private BinaryDecoder decoder = null; + private final String batchUUID = UUID.randomUUID().toString(); + private boolean eventTaken = false; + + @Override + protected void doPut(Event event) throws InterruptedException { + type = TransactionType.PUT; + if (!serializedEvents.isPresent()) { + serializedEvents = Optional.of(new LinkedList<byte[]>()); + } + + try { + if (!tempOutStream.isPresent()) { + tempOutStream = Optional.of(new ByteArrayOutputStream()); + } + if (!writer.isPresent()) { + writer = Optional.of(new + SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class)); + } + tempOutStream.get().reset(); + AvroFlumeEvent e = new AvroFlumeEvent( + toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody())); + encoder = EncoderFactory.get() + .directBinaryEncoder(tempOutStream.get(), encoder); + writer.get().write(e, encoder); + // Not really possible to avoid this copy :( + serializedEvents.get().add(tempOutStream.get().toByteArray()); + } catch (Exception e) { + throw new ChannelException("Error while serializing event", e); + } + } + + @SuppressWarnings("unchecked") + @Override + protected Event doTake() throws InterruptedException { + type = TransactionType.TAKE; + try { + if (!(consumerAndIter.get().uuid.equals(channelUUID))) { + LOGGER.info("UUID mismatch, creating new consumer"); + decommissionConsumerAndIterator(consumerAndIter.get()); + consumerAndIter.remove(); + } + } catch (Exception ex) { + LOGGER.warn("Error while shutting down consumer", ex); + } + if (!events.isPresent()) { + events = Optional.of(new LinkedList<Event>()); + } + Event e; + if (!consumerAndIter.get().failedEvents.isEmpty()) { + e = consumerAndIter.get().failedEvents.removeFirst(); + } else { + try { + ConsumerIterator<byte[], byte[]> it = consumerAndIter.get().iterator; + it.hasNext(); + if (parseAsFlumeEvent) { + ByteArrayInputStream in = + new ByteArrayInputStream(it.next().message()); + decoder = DecoderFactory.get().directBinaryDecoder(in, decoder); + if (!reader.isPresent()) { + reader = Optional.of( + new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class)); + } + AvroFlumeEvent event = reader.get().read(null, decoder); + e = EventBuilder.withBody(event.getBody().array(), + toStringMap(event.getHeaders())); + } else { + e = EventBuilder.withBody(it.next().message(), + Collections.EMPTY_MAP); + } + + } catch (ConsumerTimeoutException ex) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Timed out while waiting for data to come from Kafka", + ex); + } + return null; + } catch (Exception ex) { + LOGGER.warn("Error while getting events from Kafka", ex); + throw new ChannelException("Error while getting events from Kafka", + ex); + } + } + eventTaken = true; + events.get().add(e); + return e; + } + + @Override + protected void doCommit() throws InterruptedException { + if (type.equals(TransactionType.NONE)) { + return; + } + if (type.equals(TransactionType.PUT)) { + try { + List<KeyedMessage<String, byte[]>> messages = new + ArrayList<KeyedMessage<String, byte[]>>(serializedEvents.get() + .size()); + for (byte[] event : serializedEvents.get()) { + messages.add(new KeyedMessage<String, byte[]>(topic.get(), null, + batchUUID, event)); + } + producer.send(messages); + serializedEvents.get().clear(); + } catch (Exception ex) { + LOGGER.warn("Sending events to Kafka failed", ex); + throw new ChannelException("Commit failed as send to Kafka failed", + ex); + } + } else { + if (consumerAndIter.get().failedEvents.isEmpty() && eventTaken) { + consumerAndIter.get().consumer.commitOffsets(); + } + events.get().clear(); + } + } + + @Override + protected void doRollback() throws InterruptedException { + if (type.equals(TransactionType.NONE)) { + return; + } + if (type.equals(TransactionType.PUT)) { + serializedEvents.get().clear(); + } else { + consumerAndIter.get().failedEvents.addAll(events.get()); + events.get().clear(); + } + } + } + + + private class ConsumerAndIterator { + final ConsumerConnector consumer; + final ConsumerIterator<byte[], byte[]> iterator; + final String uuid; + final LinkedList<Event> failedEvents = new LinkedList<Event>(); + + ConsumerAndIterator(ConsumerConnector consumerConnector, + ConsumerIterator<byte[], byte[]> iterator, String uuid) { + this.consumer = consumerConnector; + this.iterator = iterator; + this.uuid = uuid; + } + } + + /** + * Helper function to convert a map of String to a map of CharSequence. + */ + private static Map<CharSequence, CharSequence> toCharSeqMap( + Map<String, String> stringMap) { + Map<CharSequence, CharSequence> charSeqMap = + new HashMap<CharSequence, CharSequence>(); + for (Map.Entry<String, String> entry : stringMap.entrySet()) { + charSeqMap.put(entry.getKey(), entry.getValue()); + } + return charSeqMap; + } + + /** + * Helper function to convert a map of CharSequence to a map of String. + */ + private static Map<String, String> toStringMap( + Map<CharSequence, CharSequence> charSeqMap) { + Map<String, String> stringMap = + new HashMap<String, String>(); + for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) { + stringMap.put(entry.getKey().toString(), entry.getValue().toString()); + } + return stringMap; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/832594a2/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java new file mode 100644 index 0000000..9a342ef --- /dev/null +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.kafka; + +public class KafkaChannelConfiguration { + + public static final String KAFKA_PREFIX = "kafka."; + public static final String BROKER_LIST_KEY = "metadata.broker.list"; + public static final String REQUIRED_ACKS_KEY = "request.required.acks"; + public static final String BROKER_LIST_FLUME_KEY = "brokerList"; + public static final String TOPIC = "topic"; + public static final String GROUP_ID = "group.id"; + public static final String GROUP_ID_FLUME = "groupId"; + public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable"; + public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; + public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect"; + public static final String DEFAULT_GROUP_ID = "flume"; + public static final String DEFAULT_TOPIC = "flume-channel"; + public static final String TIMEOUT = "timeout"; + public static final String DEFAULT_TIMEOUT = "100"; + public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms"; + + public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent"; + public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true; + + public static final String READ_SMALLEST_OFFSET = "readSmallestOffset"; + public static final boolean DEFAULT_READ_SMALLEST_OFFSET = false; +} http://git-wip-us.apache.org/repos/asf/flume/blob/832594a2/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java new file mode 100644 index 0000000..e665431 --- /dev/null +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.kafka; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import kafka.admin.AdminUtils; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import kafka.utils.ZKStringSerializer$; +import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.sink.kafka.util.TestUtil; +import org.junit.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestKafkaChannel { + + private static TestUtil testUtil = TestUtil.getInstance(); + private String topic = null; + private final Set<String> usedTopics = new HashSet<String>(); + private CountDownLatch latch = null; + + @BeforeClass + public static void setupClass() throws Exception { + testUtil.prepare(); + Thread.sleep(2500); + } + + @Before + public void setup() throws Exception { + boolean topicFound = false; + while (!topicFound) { + topic = RandomStringUtils.randomAlphabetic(8); + if (!usedTopics.contains(topic)) { + usedTopics.add(topic); + topicFound = true; + } + } + try { + createTopic(topic); + } catch (Exception e) { + } + Thread.sleep(2500); + latch = new CountDownLatch(5); + } + + @AfterClass + public static void tearDown() { + testUtil.tearDown(); + } + + @Test + public void testSuccess() throws Exception { + doTestSuccessRollback(false, false); + } + + @Test + public void testSuccessInterleave() throws Exception { + doTestSuccessRollback(false, true); + } + + @Test + public void testRollbacks() throws Exception { + doTestSuccessRollback(true, false); + } + + @Test + public void testRollbacksInterleave() throws Exception { + doTestSuccessRollback(true, true); + } + + private void doTestSuccessRollback(final boolean rollback, + final boolean interleave) throws Exception { + final KafkaChannel channel = startChannel(true); + writeAndVerify(rollback, channel, interleave); + channel.stop(); + } + + + @Test + public void testStopAndStart() throws Exception { + doTestStopAndStart(false, false); + } + + @Test + public void testStopAndStartWithRollback() throws Exception { + doTestStopAndStart(true, true); + } + + @Test + public void testStopAndStartWithRollbackAndNoRetry() throws Exception { + doTestStopAndStart(true, false); + } + + @Test + public void testNoParsingAsFlumeAgent() throws Exception { + final KafkaChannel channel = startChannel(false); + Producer<String, byte[]> producer = new Producer<String, byte[]>( + new ProducerConfig(channel.getKafkaConf())); + List<KeyedMessage<String, byte[]>> original = Lists.newArrayList(); + for (int i = 0; i < 50; i++) { + KeyedMessage<String, byte[]> data = new KeyedMessage<String, + byte[]>(topic, null, RandomStringUtils.randomAlphabetic(6), + String.valueOf(i).getBytes()); + original.add(data); + } + producer.send(original); + ExecutorCompletionService<Void> submitterSvc = new + ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); + List<Event> events = pullEvents(channel, submitterSvc, + 50, false, false); + wait(submitterSvc, 5); + Set<Integer> finals = Sets.newHashSet(); + for (int i = 0; i < 50; i++) { + finals.add(Integer.parseInt(new String(events.get(i).getBody()))); + } + for (int i = 0; i < 50; i++) { + Assert.assertTrue(finals.contains(i)); + finals.remove(i); + } + Assert.assertTrue(finals.isEmpty()); + channel.stop(); + } + + /** + * This method starts a channel, puts events into it. The channel is then + * stopped and restarted. Then we check to make sure if all events we put + * come out. Optionally, 10 events are rolled back, + * and optionally we restart the agent immediately after and we try to pull it + * out. + * + * @param rollback + * @param retryAfterRollback + * @throws Exception + */ + private void doTestStopAndStart(boolean rollback, + boolean retryAfterRollback) throws Exception { + final KafkaChannel channel = startChannel(true); + ExecutorService underlying = Executors + .newCachedThreadPool(); + ExecutorCompletionService<Void> submitterSvc = + new ExecutorCompletionService<Void>(underlying); + final List<List<Event>> events = createBaseList(); + putEvents(channel, events, submitterSvc); + int completed = 0; + wait(submitterSvc, 5); + channel.stop(); + final KafkaChannel channel2 = startChannel(true); + int total = 50; + if (rollback && !retryAfterRollback) { + total = 40; + } + final List<Event> eventsPulled = + pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback); + wait(submitterSvc, 5); + channel2.stop(); + if (!retryAfterRollback && rollback) { + final KafkaChannel channel3 = startChannel(true); + int expectedRemaining = 50 - eventsPulled.size(); + final List<Event> eventsPulled2 = + pullEvents(channel3, submitterSvc, expectedRemaining, false, false); + wait(submitterSvc, 5); + Assert.assertEquals(expectedRemaining, eventsPulled2.size()); + eventsPulled.addAll(eventsPulled2); + channel3.stop(); + } + underlying.shutdownNow(); + verify(eventsPulled); + } + + private KafkaChannel startChannel(boolean parseAsFlume) throws Exception { + Context context = prepareDefaultContext(parseAsFlume); + final KafkaChannel channel = new KafkaChannel(); + Configurables.configure(channel, context); + channel.start(); + return channel; + } + + private void writeAndVerify(final boolean testRollbacks, + final KafkaChannel channel) throws Exception { + writeAndVerify(testRollbacks, channel, false); + } + + private void writeAndVerify(final boolean testRollbacks, + final KafkaChannel channel, final boolean interleave) throws Exception { + + final List<List<Event>> events = createBaseList(); + + ExecutorCompletionService<Void> submitterSvc = + new ExecutorCompletionService<Void>(Executors + .newCachedThreadPool()); + + putEvents(channel, events, submitterSvc); + + if (interleave) { + wait(submitterSvc, 5); + } + + ExecutorCompletionService<Void> submitterSvc2 = + new ExecutorCompletionService<Void>(Executors + .newCachedThreadPool()); + + final List<Event> eventsPulled = + pullEvents(channel, submitterSvc2, 50, testRollbacks, true); + + if (!interleave) { + wait(submitterSvc, 5); + } + wait(submitterSvc2, 5); + + verify(eventsPulled); + } + + private List<List<Event>> createBaseList() { + final List<List<Event>> events = new ArrayList<List<Event>>(); + for (int i = 0; i < 5; i++) { + List<Event> eventList = new ArrayList<Event>(10); + events.add(eventList); + for (int j = 0; j < 10; j++) { + Map<String, String> hdrs = new HashMap<String, String>(); + String v = (String.valueOf(i) + " - " + String + .valueOf(j)); + hdrs.put("header", v); + eventList.add(EventBuilder.withBody(v.getBytes(), hdrs)); + } + } + return events; + } + + private void putEvents(final KafkaChannel channel, final List<List<Event>> + events, ExecutorCompletionService<Void> submitterSvc) { + for (int i = 0; i < 5; i++) { + final int index = i; + submitterSvc.submit(new Callable<Void>() { + @Override + public Void call() { + Transaction tx = channel.getTransaction(); + tx.begin(); + List<Event> eventsToPut = events.get(index); + for (int j = 0; j < 10; j++) { + channel.put(eventsToPut.get(j)); + } + try { + tx.commit(); + } finally { + tx.close(); + } + return null; + } + }); + } + } + + private List<Event> pullEvents(final KafkaChannel channel, + ExecutorCompletionService<Void> submitterSvc, final int total, + final boolean testRollbacks, final boolean retryAfterRollback) { + final List<Event> eventsPulled = Collections.synchronizedList(new + ArrayList<Event>(50)); + final CyclicBarrier barrier = new CyclicBarrier(5); + final AtomicInteger counter = new AtomicInteger(0); + final AtomicInteger rolledBackCount = new AtomicInteger(0); + final AtomicBoolean startedGettingEvents = new AtomicBoolean(false); + final AtomicBoolean rolledBack = new AtomicBoolean(false); + for (int k = 0; k < 5; k++) { + final int index = k; + submitterSvc.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + Transaction tx = null; + final List<Event> eventsLocal = Lists.newLinkedList(); + int takenByThisThread = 0; + channel.registerThread(); + Thread.sleep(1000); + barrier.await(); + while (counter.get() < (total - rolledBackCount.get())) { + if (tx == null) { + tx = channel.getTransaction(); + tx.begin(); + } + try { + Event e = channel.take(); + if (e != null) { + startedGettingEvents.set(true); + eventsLocal.add(e); + } else { + if (testRollbacks && + index == 4 && + (!rolledBack.get()) && + startedGettingEvents.get()) { + tx.rollback(); + tx.close(); + tx = null; + rolledBack.set(true); + final int eventsLocalSize = eventsLocal.size(); + eventsLocal.clear(); + if (!retryAfterRollback) { + rolledBackCount.set(eventsLocalSize); + return null; + } + } else { + tx.commit(); + tx.close(); + tx = null; + eventsPulled.addAll(eventsLocal); + counter.getAndAdd(eventsLocal.size()); + eventsLocal.clear(); + } + } + } catch (Exception ex) { + eventsLocal.clear(); + if (tx != null) { + tx.rollback(); + tx.close(); + } + tx = null; + ex.printStackTrace(); + } + } + // Close txn. + return null; + } + }); + } + return eventsPulled; + } + + private void wait(ExecutorCompletionService<Void> submitterSvc, int max) + throws Exception { + int completed = 0; + while (completed < max) { + submitterSvc.take(); + completed++; + } + } + + private void verify(List<Event> eventsPulled) { + Assert.assertFalse(eventsPulled.isEmpty()); + Assert.assertEquals(50, eventsPulled.size()); + Set<String> eventStrings = new HashSet<String>(); + for (Event e : eventsPulled) { + Assert + .assertEquals(e.getHeaders().get("header"), new String(e.getBody())); + eventStrings.add(e.getHeaders().get("header")); + } + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 10; j++) { + String v = String.valueOf(i) + " - " + String.valueOf(j); + Assert.assertTrue(eventStrings.contains(v)); + eventStrings.remove(v); + } + } + Assert.assertTrue(eventStrings.isEmpty()); + } + + private Context prepareDefaultContext(boolean parseAsFlume) { + // Prepares a default context with Kafka Server Properties + Context context = new Context(); + context.put(KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY, + testUtil.getKafkaServerUrl()); + context.put(KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY, + testUtil.getZkUrl()); + context.put(KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT, + String.valueOf(parseAsFlume)); + context.put(KafkaChannelConfiguration.READ_SMALLEST_OFFSET, "true"); + context.put(KafkaChannelConfiguration.TOPIC, topic); + return context; + } + + public static void createTopic(String topicName) { + int numPartitions = 5; + int sessionTimeoutMs = 10000; + int connectionTimeoutMs = 10000; + ZkClient zkClient = new ZkClient(testUtil.getZkUrl(), + sessionTimeoutMs, connectionTimeoutMs, + ZKStringSerializer$.MODULE$); + + int replicationFactor = 1; + Properties topicConfig = new Properties(); + AdminUtils.createTopic(zkClient, topicName, numPartitions, + replicationFactor, topicConfig); + } + + public static void deleteTopic(String topicName) { + int sessionTimeoutMs = 10000; + int connectionTimeoutMs = 10000; + ZkClient zkClient = new ZkClient(testUtil.getZkUrl(), + sessionTimeoutMs, connectionTimeoutMs, + ZKStringSerializer$.MODULE$); + AdminUtils.deleteTopic(zkClient, topicName); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/832594a2/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties new file mode 100644 index 0000000..c10c89d --- /dev/null +++ b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9092 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name=<hostname routable by clients> + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port=<port accessible by clients> + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs=target/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=5 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 http://git-wip-us.apache.org/repos/asf/flume/blob/832594a2/flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties b/flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties new file mode 100644 index 0000000..b86600b --- /dev/null +++ b/flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kafka.logs.dir=target/logs + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log +log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log +log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log +log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log +log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +# Turn on all our debugging info +#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender +#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender +#log4j.logger.kafka.perf=DEBUG, kafkaAppender +#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.kafka=INFO, kafkaAppender + +log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender +log4j.additivity.kafka.network.RequestChannel$=false + +#log4j.logger.kafka.network.Processor=TRACE, requestAppender +#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender +#log4j.additivity.kafka.server.KafkaApis=false +log4j.logger.kafka.request.logger=WARN, requestAppender +log4j.additivity.kafka.request.logger=false + +log4j.logger.kafka.controller=TRACE, controllerAppender +log4j.additivity.kafka.controller=false + +log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender +log4j.additivity.kafka.log.LogCleaner=false + +log4j.logger.state.change.logger=TRACE, stateChangeAppender +log4j.additivity.state.change.logger=false http://git-wip-us.apache.org/repos/asf/flume/blob/832594a2/flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties b/flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties new file mode 100644 index 0000000..89e1b5e --- /dev/null +++ b/flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=target +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/832594a2/flume-ng-channels/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/pom.xml b/flume-ng-channels/pom.xml index dc8dbc6..f44171d 100644 --- a/flume-ng-channels/pom.xml +++ b/flume-ng-channels/pom.xml @@ -44,5 +44,6 @@ limitations under the License. <module>flume-jdbc-channel</module> <module>flume-file-channel</module> <module>flume-spillable-memory-channel</module> + <module>flume-kafka-channel</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/832594a2/flume-ng-sinks/flume-ng-kafka-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml index 746a395..e323658 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml +++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml @@ -28,6 +28,17 @@ <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/flume/blob/832594a2/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java index 1c98922..d5dfbd6 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java @@ -57,7 +57,7 @@ public class KafkaConsumer { Properties props = new Properties(); props.put("zookeeper.connect", zkUrl); props.put("group.id", groupId); - props.put("zookeeper.session.timeout.ms", "400"); + props.put("zookeeper.session.timeout.ms", "1000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); http://git-wip-us.apache.org/repos/asf/flume/blob/832594a2/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java index 8855c53..6405d6c 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java @@ -66,6 +66,7 @@ public class TestUtil { Properties kafkaProperties = new Properties(); Properties zkProperties = new Properties(); + logger.info("Starting kafka server."); try { //load properties zkProperties.load(Class.class.getResourceAsStream(
