Consuming Messages from Kafka

2016-04-26 Thread Conlin, Joshua [USA]
Hello,

I am new to Flink and trying to learn this framework.  Seems great so far.  I 
am trying to translate my existing storm Topology to a Flink job and I am 
having trouble consuming data from Kafka.  Here's what my Job looks like:


public static void main(String[] args) throws Exception {

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "hostname:port");


properties.setProperty("group.id", "stream-test");

properties.setProperty("client.id", "test-flink");

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource kafkaStream = env.addSource(new 
FlinkKafkaConsumer09<>("test", new SimpleStringSchema(), properties));


kafkaStream.addSink(new StringLogSink());

env.execute();


}


There are messages being sent to Kafka on that topic, I just never see anything 
in Flink.  Any help/insight you could provide would be greatly appreciated.  If 
it makes a difference this is running on YARN.  Also, here's what I see in the 
logs:


2016-04-26 18:02:38,707 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka version : 0.9.0.1
2016-04-26 18:02:38,707 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka commitId : 23c69d62a0cabf06
2016-04-26 18:02:38,708 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Trying to 
get partitions for topic test
2016-04-26 18:02:38,854 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 1 
partitions from these topics: [test]
2016-04-26 18:02:38,854 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer 
is going to read the following topics (with number of partitions): test (1),
2016-04-26 18:02:38,933 INFO  org.apache.flink.yarn.YarnJobManager  
- Submitting job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,934 INFO  org.apache.flink.yarn.YarnJobManager  
- Using restart strategy NoRestartStrategy for 
0ab4248d8917e707a8f297420e4c564d.
2016-04-26 18:02:38,935 INFO  org.apache.flink.yarn.YarnJobManager  
- Scheduling job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,935 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from 
CREATED to SCHEDULED
2016-04-26 18:02:38,935 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from 
SCHEDULED to DEPLOYING
2016-04-26 18:02:38,935 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying 
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #0) to ip-10-167-233-231
2016-04-26 18:02:38,936 INFO  org.apache.flink.yarn.YarnJobManager  
- Status of job 0ab4248d8917e707a8f297420e4c564d () changed to 
RUNNING.
2016-04-26 18:02:39,151 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from 
DEPLOYING to RUNNING

Thanks,

Josh


Re: [External] Re: Consuming Messages from Kafka

2016-04-26 Thread Conlin, Joshua [USA]
"StringLogSink" just looks like:


System.out.println(msg);

LOG.info("Logging message: " + msg);


And LOG is from slf4j.  In the Flink UI that is running on Yarn, I see no 
counts, nor log statements or stdout under JobManager.  It seems to make no 
difference if I submit the job through yarn via command line or the Flink UI 
session already running under yarn.  Where would you recommend I look in the 
Yarn containers?


Thanks again for your help.


Josh

From: Robert Metzger mailto:rmetz...@apache.org>>
Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Date: Tuesday, April 26, 2016 at 3:30 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: [External] Re: Consuming Messages from Kafka

Hi,

the web interface is a good idea for checking if everything is working as 
expected. However in this case I expect the counts for the task be 0 because 
the source and sink are chained together into one task (upcoming Flink releases 
will fix this behavior).

I assume the "StringLogSink" is logging all incoming events. How do you do 
that? Using slf4j ? our by System.out.println?
I'm asking to make sure you're looking at the right place to capture the 
output. It will be at the YARN containers.

Regards,
Robert


On Tue, Apr 26, 2016 at 8:34 PM, Dominik Choma 
mailto:dominik.ch...@gmail.com>> wrote:
Hi,

You can check if any messages are going through dataflow on flink web dashboard
https://flink.apache.org/img/blog/new-dashboard-screenshot.png



Dominik Choma

Wiadomość napisana przez Conlin, Joshua [USA] 
mailto:conlin_jos...@bah.com>> w dniu 26 kwi 2016, o 
godz. 20:16:

re messages being sent to Kafka on that topic, I just never see anything in 
Flink.  Any help/insight you could provide would be greatly appreciated.  If it 
makes a difference this is running on YARN.  Also, here's what I see in the 
logs: