The only thing I can think of is that the kafka client included in Kafka is not compatible with the kafka version on the brokers (there's been a lot of changes recently), but normally you get errors when that happens.
On 5 February 2016 at 20:02, Justin Ryan <[email protected]> wrote: > Thanks, Gonzalo – that def helped! > > This also ties into an issue I’d raised with mesos-kafka where the zk path > seemed to be ignored, and I now see that there is a node that stores the > mesos-kafka scheduler config, and the kafka path must be specified > separately, so is currently ‘/‘. > > Still not reading events, but definitely looks better in startup log: > > 16/02/05 11:55:38 INFO kafka.KafkaSource: Kafka source kafka-source-test > do started. > 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: > [flume_mesos04-1454702137146-6cd63609-leader-finder-thread], Starting > 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Monitored > counter group for type: SOURCE, name: kafka-source-test: Successfully > registered new MBean. > 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Component > type: SOURCE, name: kafka-source-test started > 16/02/05 11:55:38 INFO utils.VerifiableProperties: Verifying properties > 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property client.id is > overridden to flume > 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property > metadata.broker.list is overridden to > mesos01:31000,mesos02:31000,mesos08:31000 > 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property > request.timeout.ms is overridden to 30000 > 16/02/05 11:55:38 INFO client.ClientUtils$: Fetching metadata from broker > id:1,host:mesos02,port:31000 with correlation id 0 for 1 topic(s) > Set(home_views) > 16/02/05 11:55:38 INFO producer.SyncProducer: Connected to mesos02:31000 > for producing > 16/02/05 11:55:38 INFO producer.SyncProducer: Disconnecting from > mesos02:31000 > 16/02/05 11:55:38 INFO consumer.ConsumerFetcherThread: > [ConsumerFetcherThread-flume_mesos04-1454702137146-6cd63609-0-0], Starting > 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager: > [ConsumerFetcherManager-1454702137389] Added fetcher for partitions > ArrayBuffer([[home_views,0], initOffset -1 to broker > id:0,host:mesos01,port:31000] ) > > — > > $ curl http://mesos04:34545/metrics | json_pp > % Total % Received % Xferd Average Speed Time Time Time > Current > Dload Upload Total Spent Left > Speed > 100 925 0 925 0 0 7741 0 --:--:-- --:--:-- --:--:-- > 7773 > { > "CHANNEL.hdfs-channel-kafka" : { > "ChannelCapacity" : "10", > "StartTime" : "1454702136681", > "EventTakeSuccessCount" : "0", > "ChannelFillPercentage" : "0.0", > "EventPutAttemptCount" : "0", > "EventTakeAttemptCount" : "14", > "StopTime" : "0", > "ChannelSize" : "0", > "EventPutSuccessCount" : "0", > "Type" : "CHANNEL" > }, > "SOURCE.kafka-source-test" : { > "AppendBatchReceivedCount" : "0", > "AppendAcceptedCount" : "0", > "KafkaEmptyCount" : "0", > "AppendReceivedCount" : "0", > "KafkaEventGetTimer" : "18046", > "EventAcceptedCount" : "0", > "StartTime" : "1454702138033", > "StopTime" : "0", > "KafkaCommitTimer" : "0", > "Type" : "SOURCE", > "AppendBatchAcceptedCount" : "0", > "EventReceivedCount" : "0", > "OpenConnectionCount" : "0" > }, > "SINK.hdfs-sink-kafka" : { > "ConnectionCreatedCount" : "0", > "EventDrainAttemptCount" : "0", > "BatchCompleteCount" : "0", > "StartTime" : "1454702136714", > "Type" : "SINK", > "EventDrainSuccessCount" : "0", > "StopTime" : "0", > "BatchUnderflowCount" : "0", > "ConnectionFailedCount" : "0", > "BatchEmptyCount" : "13", > "ConnectionClosedCount" : "0" > } > } > > > From: Gonzalo Herreros <[email protected]> > Reply-To: <[email protected]> > Date: Thursday, February 4, 2016 at 11:15 PM > To: user <[email protected]> > Subject: Re: KafkaSource not picking up any messages > > I'm concerned with the warning "no brokers found when trying to rebalance" > Double check that the path in zookeeper is correct zk01:2181/mesos-kafka > and it's not the standard /kafka > > When you connect with the kafka-console-consumer, do you specify > /mesos-kafka or just zk01:2181? > You can use the zkclient tool to check if there are brokers currently > registered under that path for the topic "test" > > Regards, > Gonzalo > > > On 4 February 2016 at 21:16, Justin Ryan <[email protected]> wrote: > >> Hiya folks, >> >> I’m setting up a new environment with Kafka, Flume, and HDFS, and have >> implemented the simplest possible testing configuration I can come up >> with. It logs successfully configuring and starting the KafkaSource, and >> with kafka tools I can confirm that messages have been sent, but the JSON >> Metrics from Flume show 0 messages processed. >> >> Are there any more tools at my disposal to investigate? Any assistance >> would be greatly appreciated! >> >> My config and log: >> >> — >> # generated by Chef for mesos10, changes will be overwritten >> >> flume1.sources=kafka-source-test >> flume1.channels=hdfs-channel-kafka >> flume1.sinks=hdfs-sink-kafka >> >> flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSource >> flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka >> flume1.sources.kafka-source-test.topic=test >> flume1.sources.kafka-source-test.groupId=flume >> flume1.sources.kafka-source-test.interceptors=i1 >> flume1.sources.kafka-source-test.interceptors.i1.type=timestamp >> flume1.sources.kafka-source-test.consumer.timeout.ms=100 >> flume1.sources.kafka-source-test.channels=hdfs-channel-kafka >> flume1.channels.hdfs-channel-kafka.type=memory >> flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka >> flume1.sinks.hdfs-sink-kafka.type=hdfs >> flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d >> flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5 >> flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0 >> flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0 >> flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream >> flume1.channels.hdfs-channel-kafka.capacity=10 >> flume1.channels.hdfs-channel-kafka.transactionCapacity=10 >> — >> >> Startup log (less incredibly long path lines): >> — >> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: >> Configuration provider starting >> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: >> Reloading configuration file:/etc/flume/conf.chef/flume.conf >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: >> hdfs-sink-kafka Agent: flume1 >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume >> configuration contains configuration for agents: [flume1] >> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating >> channels >> 16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance >> of channel hdfs-channel-kafka type memory >> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created >> channel hdfs-channel-kafka >> 16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of >> source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource >> 16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={ >> parameters:{interceptors.i1.type=timestamp, >> zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka, >> groupId=flume, consumer.timeout.ms=100, topic=test, >> type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} } >> 16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of >> sink: hdfs-sink-kafka, type: hdfs >> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel >> hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka] >> 16/02/04 11:32:07 INFO node.Application: Starting new configuration:{ >> sourceRunners:{kafka-source-test=PollableSourceRunner: { >> source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE} >> counterGroup:{ name:null counters:{} } }} >> sinkRunners:{hdfs-sink-kafka=SinkRunner: { >> policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e >> counterGroup:{ name:null counters:{} } }} >> channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name: >> hdfs-channel-kafka}} } >> 16/02/04 11:32:07 INFO node.Application: Starting Channel >> hdfs-channel-kafka >> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored >> counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully >> registered new MBean. >> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component >> type: CHANNEL, name: hdfs-channel-kafka started >> 16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka >> 16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test >> 16/02/04 11:32:07 INFO kafka.KafkaSource: Starting >> org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}... >> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored >> counter group for type: SINK, name: hdfs-sink-kafka: Successfully >> registered new MBean. >> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component >> type: SINK, name: hdfs-sink-kafka started >> 16/02/04 11:32:07 INFO mortbay.log: Logging to >> org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via >> org.mortbay.log.Slf4jLog >> 16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4 >> 16/02/04 11:32:07 INFO mortbay.log: Started >> [email protected]:34545 >> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties >> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property >> auto.commit.enable is overridden to false >> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property >> consumer.timeout.ms is overridden to 10 >> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id is >> overridden to flume >> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property >> zookeeper.connect is overridden to zk01:2181/mesos-kafka >> 16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at >> zk01:2181/mesos-kafka >> 16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event >> thread. >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name >> =mesos10 >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:java.version=1.8.0_72-internal >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:java.vendor=Oracle Corporation >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:java.io.tmpdir=/tmp >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:java.compiler=<NA> >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name >> =Linux >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:os.arch=amd64 >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:os.version=3.13.0-63-generic >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name >> =marathon >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:user.home=/opt/marathon >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection, >> connectString=zk01:2181/mesos-kafka sessionTimeout=6000 >> watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98 >> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to >> server 10.100.6.251/10.100.6.251:2181. Will not attempt to authenticate >> using SASL (unknown error) >> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection >> established to 10.100.6.251/10.100.6.251:2181, initiating session >> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment >> complete on server 10.100.6.251/10.100.6.251:2181, sessionid = >> 0x152858b1cc07491, negotiated timeout = 6000 >> 16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed >> (SyncConnected) >> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], begin registering consumer >> flume_mesos10-1454614328204-ca8a74df in ZK >> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], end registering consumer >> flume_mesos10-1454614328204-ca8a74df in ZK >> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread >> for consumer flume_mesos10-1454614328204-ca8a74df >> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer >> flume_mesos10-1454614328204-ca8a74df try #0 >> 16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to >> rebalance. >> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer >> flume_mesos10-1454614328204-ca8a74df try #0 >> 16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test >> do started. >> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored >> counter group for type: SOURCE, name: kafka-source-test: Successfully >> registered new MBean. >> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component >> type: SOURCE, name: kafka-source-test started >> -- >> >> -- >> Justin Alan Ryan >> Sr. Systems / Release Engineer >> ZipRealty >> > >
