Okay, here is the amended code:
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaMessageStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.serializer.DefaultDecoder;
public class KafkaTestConsumer {
/**
* @param args
*/
public static void main(final String[] args) {
try {
final Logger logger = Logger.getLogger("KafkaTestConsumer");
// specify some consumer properties
final Properties props = new Properties();
props.put("zk.connect", "testserver:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
// Create the connection to the cluster
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
final ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);
// create 4 partitions of the stream for topic “testTopic”, to
allow
// 4
// threads to consume
final String topicName = "testTopic";
final int numStreams = 4;
List<KafkaMessageStream<Message>> streams = null;
try {
final Map<String, List<KafkaMessageStream<Message>>>
topicMessageStreams = consumerConnector
.createMessageStreams(Collections.singletonMap(topicName, numStreams), new
DefaultDecoder());
streams = topicMessageStreams.get(topicName);
} catch (final Exception e) {
logger.severe(e.getMessage());
}
// create list of 4 threads to consume from each of the
partitions
final ExecutorService executor =
Executors.newFixedThreadPool(numStreams);
// consume the messages in the threads
for (final KafkaMessageStream<Message> stream : streams) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
while (true) {
logger.severe(Calendar.getInstance().getTime().toString());
if (stream == null) {
logger.severe("stream is NULL.");
} else {
logger.severe("stream = " + stream);
for (final Message message : stream) {
logger.severe("!");
logger.severe(message.toString());
}
}
}
} catch (final Throwable t) {
logger.severe("In run " + t.getMessage());
} finally {
if (stream == null) {
logger.severe("stream is NULL.");
} else {
logger.severe("stream = " + stream);
}
}
}
});
}
} catch (final Throwable t) {
System.err.println("In main" + t.getMessage());
}
}
}
Interesting things happen. I get the time printed out only once for each
stream. If I use eclipse's debugging and a breakpoint on the line "if
(stream == null) {" in the "while (true) {" loop, the variable stream says
"<error(s)_during_the_evaluation>" for value. If I step over this line, I'm
taken into the else clause, but the logger is never executed, and seems to
die when referencing the stream value. So, I got to thinking maybe the
problem is a missing dependency somewhere or maybe a conflict. So, here are
the dependencies I have in that project's pom (the project has other
pieces):
<dependencies>
<!-- Import the JMS API, we use provided scope as the API is
included in
JBoss AS 7 -->
<dependency>
<groupId>org.jboss.spec.javax.jms</groupId>
<artifactId>jboss-jms-api_1.1_spec</artifactId>
<scope>provided</scope>
</dependency>
<!-- Import the JCA API, we use provided scope as the API is
included in
JBoss AS 7 -->
<dependency>
<groupId>org.jboss.spec.javax.resource</groupId>
<artifactId>jboss-connector-api_1.6_spec</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.incubator.kafka</groupId>
<artifactId>kafka</artifactId>
<version>0.7.0-incubating</version>
</dependency>
</dependencies>
And here is the pom I'm using for kafka:
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="
http://maven.apache.org/POM/4.0.0" xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.incubator.kafka</groupId>
<artifactId>kafka</artifactId>
<packaging>pom</packaging>
<version>0.7.0-incubating</version>
<name>Apache Kafka</name>
<description>Apache Kafka is a distributed publish-subscribe messaging
system</description>
<url>http://incubator.apache.org/kafka</url>
<inceptionYear>2012</inceptionYear>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
</dependencies>
<scm>
<connection>scm:svn:
http://svn.apache.org/repos/asf/incubator/kafka/trunk</connection>
<developerConnection>scm:svn:
http://svn.apache.org/repos/asf/incubator/kafka/trunk</developerConnection>
<url>http://svn.apache.org/repos/asf/incubator/kafka/trunk</url>
</scm>
</project>
Again, any input would be greatly appreciated. Thanks.
On Tue, May 8, 2012 at 6:34 PM, Jun Rao <[email protected]> wrote:
> Could you put consumer run() code in try/catch and log all throwables?
> Executors can eat exceptions.
>
> Thanks,
>
> Jun
>
> On Tue, May 8, 2012 at 4:08 PM, lessonz <[email protected]> wrote:
>
> > After trying and failing to get a more complicated consumer working, I
> > decided to start at square one and use the example code. Below is my
> barely
> > modified implementation:
> >
> > import java.util.Collections;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.Properties;
> > import java.util.concurrent.ExecutorService;
> > import java.util.concurrent.Executors;
> > import java.util.logging.Logger;
> >
> > import kafka.consumer.Consumer;
> > import kafka.consumer.ConsumerConfig;
> > import kafka.consumer.KafkaMessageStream;
> > import kafka.javaapi.consumer.ConsumerConnector;
> > import kafka.message.Message;
> > import kafka.serializer.DefaultDecoder;
> >
> > public class KafkaTestConsumer {
> >
> > /**
> > * @param args
> > */
> > public static void main(final String[] args) {
> > final Logger logger = Logger.getLogger("KafkaTestConsumer");
> >
> > // specify some consumer properties
> > final Properties props = new Properties();
> > props.put("zk.connect", "testserver:2181");
> > props.put("zk.connectiontimeout.ms", "1000000");
> > props.put("groupid", "test_group");
> >
> > // Create the connection to the cluster
> > final ConsumerConfig consumerConfig = new ConsumerConfig(props);
> > final ConsumerConnector consumerConnector =
> > Consumer.createJavaConsumerConnector(consumerConfig);
> >
> > // create 4 partitions of the stream for topic “testTopic”, to
> > allow 4
> > // threads to consume
> > final String topicName = "testTopic";
> > final int numStreams = 4;
> > List<KafkaMessageStream<Message>> streams = null;
> > try {
> > final Map<String, List<KafkaMessageStream<Message>>>
> > topicMessageStreams = consumerConnector
> >
> > .createMessageStreams(Collections.singletonMap(topicName, numStreams),
> new
> > DefaultDecoder());
> > streams = topicMessageStreams.get(topicName);
> > } catch (final Exception e) {
> > logger.severe(e.getMessage());
> > }
> >
> > // create list of 4 threads to consume from each of the partitions
> > final ExecutorService executor =
> > Executors.newFixedThreadPool(numStreams);
> >
> > // consume the messages in the threads
> > for (final KafkaMessageStream<Message> stream : streams) {
> > executor.submit(new Runnable() {
> > @Override
> > public void run() {
> > for (final Message message : stream) {
> > logger.severe("!");
> > logger.severe(message.toString());
> > }
> > }
> > });
> > }
> > }
> >
> > }
> >
> > It runs, I get no errors, and nothing happens. I don't get any messages.
> I
> > don't THINK it's an issue with my Kafka install for two reasons: 1.
> > Zookeeper logs my client connection. 2. (Granted it's all on localhost
> but)
> > When I used the console consumer and producer on the instance, they seem
> to
> > work just fine.
> >
> > Methodology is to start Zookeeper, start Kafka, start above application,
> > and then connect a console produce to generate messages. I'm really at a
> > loss as to what's happening. Interestingly, if I put in breakpoints, I
> seem
> > to lose a handle as I eventually lose the ability to step over, step
> into,
> > and so on.
> >
> > I'd really appreciate any input.
> >
> > Cheers.
> >
>