This was actually generating errors, the memory channel had been configured with very low capacity thinking that would force it to flush more often or something, and was fixed.
Now I¹m on to a challenge I think I understand just fine, or fine enough: HDFS permissions. Thanks, Gonzalo for input and for being a sounding board! Justin From: Justin Ryan <[email protected]> Date: Monday, February 8, 2016 at 11:52 AM To: <[email protected]> Subject: Re: KafkaSource not picking up any messages So it looks like Flume is building against Kafka 2.10-0.8.1.1, and I¹m using 2.10-0.8.2.2, which kafka-mesos leans on. Any likelihood of this being incompatible? I did notice this morning that Flume has an ³EventReceivedCount² that increments 1-2k at a time and is always a round number, but the EventAcceptedCount is 0. When I pipe dmesg to the console producer, and then try to read it with the console consumer, I have to tell it to start from the beginning, so it seems like flume may be reading events and moving the marker, but not doing anything with them. { "CHANNEL.hdfs-channel-kafka" : { "EventTakeAttemptCount" : "117", "EventTakeSuccessCount" : "0", "StartTime" : "1454959470045", "ChannelFillPercentage" : "0.0", "ChannelSize" : "0", "EventPutSuccessCount" : "0", "Type" : "CHANNEL", "EventPutAttemptCount" : "1826", "ChannelCapacity" : "10", "StopTime" : "0" }, "SOURCE.kafka-source-test" : { "StartTime" : "1454959471241", "OpenConnectionCount" : "0", "AppendBatchReceivedCount" : "0", "EventReceivedCount" : "166000", "KafkaCommitTimer" : "0", "AppendBatchAcceptedCount" : "0", "AppendReceivedCount" : "0", "StopTime" : "0", "KafkaEventGetTimer" : "17114", "KafkaEmptyCount" : "0", "EventAcceptedCount" : "0", "Type" : "SOURCE", "AppendAcceptedCount" : "0" }, "SINK.hdfs-sink-kafka" : { "ConnectionFailedCount" : "0", "StartTime" : "1454959470054", "BatchCompleteCount" : "0", "ConnectionClosedCount" : "0", "EventDrainSuccessCount" : "0", "BatchEmptyCount" : "117", "BatchUnderflowCount" : "0", "Type" : "SINK", "ConnectionCreatedCount" : "0", "StopTime" : "0", "EventDrainAttemptCount" : "0" } } -- From: Gonzalo Herreros <[email protected]> Reply-To: <[email protected]> Date: Monday, February 8, 2016 at 12:29 AM To: user <[email protected]> Subject: Re: KafkaSource not picking up any messages The only thing I can think of is that the kafka client included in Kafka is not compatible with the kafka version on the brokers (there's been a lot of changes recently), but normally you get errors when that happens. On 5 February 2016 at 20:02, Justin Ryan <[email protected]> wrote: > Thanks, Gonzalo that def helped! > > This also ties into an issue I¹d raised with mesos-kafka where the zk path > seemed to be ignored, and I now see that there is a node that stores the > mesos-kafka scheduler config, and the kafka path must be specified separately, > so is currently /. > > Still not reading events, but definitely looks better in startup log: > > 16/02/05 11:55:38 INFO kafka.KafkaSource: Kafka source kafka-source-test do > started. > 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: > [flume_mesos04-1454702137146-6cd63609-leader-finder-thread], Starting > 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Monitored > counter group for type: SOURCE, name: kafka-source-test: Successfully > registered new MBean. > 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Component type: > SOURCE, name: kafka-source-test started > 16/02/05 11:55:38 INFO utils.VerifiableProperties: Verifying properties > 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property client.id > <http://client.id> is overridden to flume > 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property > metadata.broker.list is overridden to > mesos01:31000,mesos02:31000,mesos08:31000 > 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property request.timeout.ms > <http://request.timeout.ms> is overridden to 30000 > 16/02/05 11:55:38 INFO client.ClientUtils$: Fetching metadata from broker > id:1,host:mesos02,port:31000 with correlation id 0 for 1 topic(s) > Set(home_views) > 16/02/05 11:55:38 INFO producer.SyncProducer: Connected to mesos02:31000 for > producing > 16/02/05 11:55:38 INFO producer.SyncProducer: Disconnecting from mesos02:31000 > 16/02/05 11:55:38 INFO consumer.ConsumerFetcherThread: > [ConsumerFetcherThread-flume_mesos04-1454702137146-6cd63609-0-0], Starting > 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager: > [ConsumerFetcherManager-1454702137389] Added fetcher for partitions > ArrayBuffer([[home_views,0], initOffset -1 to broker > id:0,host:mesos01,port:31000] ) > > > > $ curl http://mesos04:34545/metrics | json_pp > % Total % Received % Xferd Average Speed Time Time Time > Current > Dload Upload Total Spent Left Speed > 100 925 0 925 0 0 7741 0 --:--:-- --:--:-- --:--:-- 7773 > { > "CHANNEL.hdfs-channel-kafka" : { > "ChannelCapacity" : "10", > "StartTime" : "1454702136681", > "EventTakeSuccessCount" : "0", > "ChannelFillPercentage" : "0.0", > "EventPutAttemptCount" : "0", > "EventTakeAttemptCount" : "14", > "StopTime" : "0", > "ChannelSize" : "0", > "EventPutSuccessCount" : "0", > "Type" : "CHANNEL" > }, > "SOURCE.kafka-source-test" : { > "AppendBatchReceivedCount" : "0", > "AppendAcceptedCount" : "0", > "KafkaEmptyCount" : "0", > "AppendReceivedCount" : "0", > "KafkaEventGetTimer" : "18046", > "EventAcceptedCount" : "0", > "StartTime" : "1454702138033", > "StopTime" : "0", > "KafkaCommitTimer" : "0", > "Type" : "SOURCE", > "AppendBatchAcceptedCount" : "0", > "EventReceivedCount" : "0", > "OpenConnectionCount" : "0" > }, > "SINK.hdfs-sink-kafka" : { > "ConnectionCreatedCount" : "0", > "EventDrainAttemptCount" : "0", > "BatchCompleteCount" : "0", > "StartTime" : "1454702136714", > "Type" : "SINK", > "EventDrainSuccessCount" : "0", > "StopTime" : "0", > "BatchUnderflowCount" : "0", > "ConnectionFailedCount" : "0", > "BatchEmptyCount" : "13", > "ConnectionClosedCount" : "0" > } > } > > > From: Gonzalo Herreros <[email protected]> > Reply-To: <[email protected]> > Date: Thursday, February 4, 2016 at 11:15 PM > To: user <[email protected]> > Subject: Re: KafkaSource not picking up any messages > > I'm concerned with the warning "no brokers found when trying to rebalance" > Double check that the path in zookeeper is correct zk01:2181/mesos-kafka and > it's not the standard /kafka > > When you connect with the kafka-console-consumer, do you specify /mesos-kafka > or just zk01:2181? > You can use the zkclient tool to check if there are brokers currently > registered under that path for the topic "test" > > Regards, > Gonzalo > > > On 4 February 2016 at 21:16, Justin Ryan <[email protected]> wrote: >> Hiya folks, >> >> I¹m setting up a new environment with Kafka, Flume, and HDFS, and have >> implemented the simplest possible testing configuration I can come up with. >> It logs successfully configuring and starting the KafkaSource, and with kafka >> tools I can confirm that messages have been sent, but the JSON Metrics from >> Flume show 0 messages processed. >> >> Are there any more tools at my disposal to investigate? Any assistance would >> be greatly appreciated! >> >> My config and log: >> >> >> # generated by Chef for mesos10, changes will be overwritten >> >> flume1.sources=kafka-source-test >> flume1.channels=hdfs-channel-kafka >> flume1.sinks=hdfs-sink-kafka >> flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSour >> ce >> flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka >> flume1.sources.kafka-source-test.topic=test >> flume1.sources.kafka-source-test.groupId=flume >> flume1.sources.kafka-source-test.interceptors=i1 >> flume1.sources.kafka-source-test.interceptors.i1.type=timestamp >> flume1.sources.kafka-source-test.consumer.timeout.ms >> <http://flume1.sources.kafka-source-test.consumer.timeout.ms> =100 >> flume1.sources.kafka-source-test.channels=hdfs-channel-kafka >> flume1.channels.hdfs-channel-kafka.type=memory >> flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka >> flume1.sinks.hdfs-sink-kafka.type=hdfs >> flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d >> flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5 >> flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0 >> flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0 >> flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream >> flume1.channels.hdfs-channel-kafka.capacity=10 >> flume1.channels.hdfs-channel-kafka.transactionCapacity=10 >> >> >> Startup log (less incredibly long path lines): >> >> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: >> Configuration provider starting >> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: >> Reloading configuration file:/etc/flume/conf.chef/flume.conf >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink-kafka >> Agent: flume1 >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka >> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume >> configuration contains configuration for agents: [flume1] >> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating channels >> 16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of >> channel hdfs-channel-kafka type memory >> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel >> hdfs-channel-kafka >> 16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of >> source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource >> 16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={ >> parameters:{interceptors.i1.type=timestamp, >> zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka, >> groupId=flume, consumer.timeout.ms <http://consumer.timeout.ms> =100, >> topic=test, type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} >> } >> 16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink: >> hdfs-sink-kafka, type: hdfs >> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel >> hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka] >> 16/02/04 11:32:07 INFO node.Application: Starting new configuration:{ >> sourceRunners:{kafka-source-test=PollableSourceRunner: { >> source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state >> :IDLE} counterGroup:{ name:null counters:{} } }} >> sinkRunners:{hdfs-sink-kafka=SinkRunner: { >> policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{ >> name:null counters:{} } }} >> channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name: >> hdfs-channel-kafka}} } >> 16/02/04 11:32:07 INFO node.Application: Starting Channel hdfs-channel-kafka >> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored >> counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully >> registered new MBean. >> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type: >> CHANNEL, name: hdfs-channel-kafka started >> 16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka >> 16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test >> 16/02/04 11:32:07 INFO kafka.KafkaSource: Starting >> org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}. >> .. >> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored >> counter group for type: SINK, name: hdfs-sink-kafka: Successfully registered >> new MBean. >> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type: >> SINK, name: hdfs-sink-kafka started >> 16/02/04 11:32:07 INFO mortbay.log: Logging to >> org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via >> org.mortbay.log.Slf4jLog >> 16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4 >> 16/02/04 11:32:07 INFO mortbay.log: Started >> [email protected]:34545 >> <http://[email protected]:34545> >> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties >> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property >> auto.commit.enable is overridden to false >> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property >> consumer.timeout.ms <http://consumer.timeout.ms> is overridden to 10 >> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id >> <http://group.id> is overridden to flume >> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property zookeeper.connect >> is overridden to zk01:2181/mesos-kafka >> 16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at >> zk01:2181/mesos-kafka >> 16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event >> thread. >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name >> <http://host.name> =mesos10 >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:java.version=1.8.0_72-internal >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:java.vendor=Oracle Corporation >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:java.io.tmpdir=/tmp >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:java.compiler=<NA> >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name >> <http://os.name> =Linux >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64 >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:os.version=3.13.0-63-generic >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name >> <http://user.name> =marathon >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client >> environment:user.home=/opt/marathon >> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection, >> connectString=zk01:2181/mesos-kafka sessionTimeout=6000 >> watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98 >> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to >> server 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181> >> . Will not attempt to authenticate using SASL (unknown error) >> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established to >> 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181> , >> initiating session >> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment complete >> on server 10.100.6.251/10.100.6.251:2181 >> <http://10.100.6.251/10.100.6.251:2181> , sessionid = 0x152858b1cc07491, >> negotiated timeout = 6000 >> 16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed >> (SyncConnected) >> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], begin registering consumer >> flume_mesos10-1454614328204-ca8a74df in ZK >> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], end registering consumer >> flume_mesos10-1454614328204-ca8a74df in ZK >> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread for >> consumer flume_mesos10-1454614328204-ca8a74df >> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer >> flume_mesos10-1454614328204-ca8a74df try #0 >> 16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to >> rebalance. >> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: >> [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer >> flume_mesos10-1454614328204-ca8a74df try #0 >> 16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test do >> started. >> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored >> counter group for type: SOURCE, name: kafka-source-test: Successfully >> registered new MBean. >> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component type: >> SOURCE, name: kafka-source-test started >> -- >> >> -- >> Justin Alan Ryan >> Sr. Systems / Release Engineer >> ZipRealty >
