Hello,
I am new to Flink and trying to learn this framework. Seems great so far. I
am trying to translate my existing storm Topology to a Flink job and I am
having trouble consuming data from Kafka. Here's what my Job looks like:
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hostname:port");
properties.setProperty("group.id", "stream-test");
properties.setProperty("client.id", "test-flink");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource kafkaStream = env.addSource(new
FlinkKafkaConsumer09<>("test", new SimpleStringSchema(), properties));
kafkaStream.addSink(new StringLogSink());
env.execute();
}
There are messages being sent to Kafka on that topic, I just never see anything
in Flink. Any help/insight you could provide would be greatly appreciated. If
it makes a difference this is running on YARN. Also, here's what I see in the
logs:
2016-04-26 18:02:38,707 INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka version : 0.9.0.1
2016-04-26 18:02:38,707 INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka commitId : 23c69d62a0cabf06
2016-04-26 18:02:38,708 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Trying to
get partitions for topic test
2016-04-26 18:02:38,854 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Got 1
partitions from these topics: [test]
2016-04-26 18:02:38,854 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer
is going to read the following topics (with number of partitions): test (1),
2016-04-26 18:02:38,933 INFO org.apache.flink.yarn.YarnJobManager
- Submitting job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,934 INFO org.apache.flink.yarn.YarnJobManager
- Using restart strategy NoRestartStrategy for
0ab4248d8917e707a8f297420e4c564d.
2016-04-26 18:02:38,935 INFO org.apache.flink.yarn.YarnJobManager
- Scheduling job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,935 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from
CREATED to SCHEDULED
2016-04-26 18:02:38,935 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from
SCHEDULED to DEPLOYING
2016-04-26 18:02:38,935 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #0) to ip-10-167-233-231
2016-04-26 18:02:38,936 INFO org.apache.flink.yarn.YarnJobManager
- Status of job 0ab4248d8917e707a8f297420e4c564d () changed to
RUNNING.
2016-04-26 18:02:39,151 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from
DEPLOYING to RUNNING
Thanks,
Josh