Hello,

I am new to Flink and trying to learn this framework.  Seems great so far.  I 
am trying to translate my existing storm Topology to a Flink job and I am 
having trouble consuming data from Kafka.  Here's what my Job looks like:


public static void main(String[] args) throws Exception {

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "hostname:port");


properties.setProperty("group.id", "stream-test");

properties.setProperty("client.id", "test-flink");

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> kafkaStream = env.addSource(new 
FlinkKafkaConsumer09<>("test", new SimpleStringSchema(), properties));


kafkaStream.addSink(new StringLogSink());

env.execute();


}


There are messages being sent to Kafka on that topic, I just never see anything 
in Flink.  Any help/insight you could provide would be greatly appreciated.  If 
it makes a difference this is running on YARN.  Also, here's what I see in the 
logs:


2016-04-26 18:02:38,707 INFO  org.apache.kafka.common.utils.AppInfoParser       
            - Kafka version : 0.9.0.1
2016-04-26 18:02:38,707 INFO  org.apache.kafka.common.utils.AppInfoParser       
            - Kafka commitId : 23c69d62a0cabf06
2016-04-26 18:02:38,708 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Trying to 
get partitions for topic test
2016-04-26 18:02:38,854 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 1 
partitions from these topics: [test]
2016-04-26 18:02:38,854 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer 
is going to read the following topics (with number of partitions): test (1),
2016-04-26 18:02:38,933 INFO  org.apache.flink.yarn.YarnJobManager              
            - Submitting job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,934 INFO  org.apache.flink.yarn.YarnJobManager              
            - Using restart strategy NoRestartStrategy for 
0ab4248d8917e707a8f297420e4c564d.
2016-04-26 18:02:38,935 INFO  org.apache.flink.yarn.YarnJobManager              
            - Scheduling job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,935 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom 
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from 
CREATED to SCHEDULED
2016-04-26 18:02:38,935 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom 
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from 
SCHEDULED to DEPLOYING
2016-04-26 18:02:38,935 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying 
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #0) to ip-10-167-233-231
2016-04-26 18:02:38,936 INFO  org.apache.flink.yarn.YarnJobManager              
            - Status of job 0ab4248d8917e707a8f297420e4c564d () changed to 
RUNNING.
2016-04-26 18:02:39,151 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom 
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from 
DEPLOYING to RUNNING

Thanks,

Josh

Reply via email to