I'm concerned with the warning "no brokers found when trying to rebalance" Double check that the path in zookeeper is correct zk01:2181/mesos-kafka and it's not the standard /kafka
When you connect with the kafka-console-consumer, do you specify /mesos-kafka or just zk01:2181? You can use the zkclient tool to check if there are brokers currently registered under that path for the topic "test" Regards, Gonzalo On 4 February 2016 at 21:16, Justin Ryan <[email protected]> wrote: > Hiya folks, > > Iām setting up a new environment with Kafka, Flume, and HDFS, and have > implemented the simplest possible testing configuration I can come up > with. It logs successfully configuring and starting the KafkaSource, and > with kafka tools I can confirm that messages have been sent, but the JSON > Metrics from Flume show 0 messages processed. > > Are there any more tools at my disposal to investigate? Any assistance > would be greatly appreciated! > > My config and log: > > ā > # generated by Chef for mesos10, changes will be overwritten > > flume1.sources=kafka-source-test > flume1.channels=hdfs-channel-kafka > flume1.sinks=hdfs-sink-kafka > > flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSource > flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka > flume1.sources.kafka-source-test.topic=test > flume1.sources.kafka-source-test.groupId=flume > flume1.sources.kafka-source-test.interceptors=i1 > flume1.sources.kafka-source-test.interceptors.i1.type=timestamp > flume1.sources.kafka-source-test.consumer.timeout.ms=100 > flume1.sources.kafka-source-test.channels=hdfs-channel-kafka > flume1.channels.hdfs-channel-kafka.type=memory > flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka > flume1.sinks.hdfs-sink-kafka.type=hdfs > flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d > flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5 > flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0 > flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0 > flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream > flume1.channels.hdfs-channel-kafka.capacity=10 > flume1.channels.hdfs-channel-kafka.transactionCapacity=10 > ā > > Startup log (less incredibly long path lines): > ā > 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: > Configuration provider starting > 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: > Reloading configuration file:/etc/flume/conf.chef/flume.conf > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: > hdfs-sink-kafka Agent: flume1 > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume > configuration contains configuration for agents: [flume1] > 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating > channels > 16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of > channel hdfs-channel-kafka type memory > 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel > hdfs-channel-kafka > 16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of > source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource > 16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={ > parameters:{interceptors.i1.type=timestamp, > zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka, > groupId=flume, consumer.timeout.ms=100, topic=test, > type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} } > 16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink: > hdfs-sink-kafka, type: hdfs > 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel > hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka] > 16/02/04 11:32:07 INFO node.Application: Starting new configuration:{ > sourceRunners:{kafka-source-test=PollableSourceRunner: { > source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE} > counterGroup:{ name:null counters:{} } }} > sinkRunners:{hdfs-sink-kafka=SinkRunner: { > policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{ > name:null counters:{} } }} > channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name: > hdfs-channel-kafka}} } > 16/02/04 11:32:07 INFO node.Application: Starting Channel > hdfs-channel-kafka > 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored > counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully > registered new MBean. > 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component > type: CHANNEL, name: hdfs-channel-kafka started > 16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka > 16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test > 16/02/04 11:32:07 INFO kafka.KafkaSource: Starting > org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}... > 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored > counter group for type: SINK, name: hdfs-sink-kafka: Successfully > registered new MBean. > 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component > type: SINK, name: hdfs-sink-kafka started > 16/02/04 11:32:07 INFO mortbay.log: Logging to > org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via > org.mortbay.log.Slf4jLog > 16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4 > 16/02/04 11:32:07 INFO mortbay.log: Started > [email protected]:34545 > 16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties > 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property > auto.commit.enable is overridden to false > 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property > consumer.timeout.ms is overridden to 10 > 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id is > overridden to flume > 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property > zookeeper.connect is overridden to zk01:2181/mesos-kafka > 16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at > zk01:2181/mesos-kafka > 16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event > thread. > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name > =mesos10 > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:java.version=1.8.0_72-internal > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:java.vendor=Oracle Corporation > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:java.io.tmpdir=/tmp > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:java.compiler=<NA> > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name > =Linux > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:os.arch=amd64 > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:os.version=3.13.0-63-generic > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name > =marathon > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:user.home=/opt/marathon > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection, > connectString=zk01:2181/mesos-kafka sessionTimeout=6000 > watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98 > 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to > server 10.100.6.251/10.100.6.251:2181. Will not attempt to authenticate > using SASL (unknown error) > 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established > to 10.100.6.251/10.100.6.251:2181, initiating session > 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment > complete on server 10.100.6.251/10.100.6.251:2181, sessionid = > 0x152858b1cc07491, negotiated timeout = 6000 > 16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed > (SyncConnected) > 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], begin registering consumer > flume_mesos10-1454614328204-ca8a74df in ZK > 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], end registering consumer > flume_mesos10-1454614328204-ca8a74df in ZK > 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread > for consumer flume_mesos10-1454614328204-ca8a74df > 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer > flume_mesos10-1454614328204-ca8a74df try #0 > 16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to > rebalance. > 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer > flume_mesos10-1454614328204-ca8a74df try #0 > 16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test > do started. > 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored > counter group for type: SOURCE, name: kafka-source-test: Successfully > registered new MBean. > 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component > type: SOURCE, name: kafka-source-test started > -- > > -- > Justin Alan Ryan > Sr. Systems / Release Engineer > ZipRealty >
