Re: Flink Kafka Consumer Behaviour

2016-08-02 Thread Janardhan Reddy
Checkpointing wasn't enabled in the streaming job, but the offsets should
have been committed to zookeeper.

But we don't see the offsets being written to zookeeper.

On Tue, Aug 2, 2016 at 7:41 PM, Till Rohrmann  wrote:

> Hi Janardhan,
>
> Flink should commit the current offsets to Zookeeper whenever a checkpoint
> has been completed. In case that you disabled checkpointing, then the
> offsets will be periodically committed to ZooKeeper. The default value is
> 60s.
>
> Could it be that there wasn't yet a completed checkpoint? Which version of
> Flink are you using?
>
> Cheers,
> Till
>
> On Tue, Aug 2, 2016 at 7:26 PM, Janardhan Reddy <
> janardhan.re...@olacabs.com> wrote:
>
>> Hi,
>>
>> When the run the following command i am getting that no topic is
>> available for that consumer group.  i am suing
>> flink-connector-kafka-0.8_${scala.version}(2.11).
>>
>> ./bin/kafka-consumer-groups.sh --zookeeper <> --group <> --describe
>>
>> No topic available for consumer group provided
>>
>>
>> Does the kafka consumer commit offset to the broker always ? Do we need
>> to enable checkpointing for the offsets to be committed ?
>>
>>
>> Thanks
>>
>
>


What is output from DataSet.print()?

2016-08-02 Thread Jon Yeargers
Topology snip:

datastream = 
some_stream.keyBy(keySelector).timeWindow(Time.seconds(60)).reduce(new
some_KeyReduce());


If I have a KeySelector that's pretty 'loose' (IE lots of matches) the
'some_KeyReduce' function gets hit frequently and some set of values is
printed out via 'datastream.print()'.

If I have a more stringent KeySelector the 'keyReduce' function never gets
called but the 'datastream.print()' function still outputs numerous values.

So how are the KeySelector and the output of the datastream.print()
related? Or are they?


Programmatically Creating a Flink Cluster On YARN

2016-08-02 Thread Bostow, Ben
I’m currently trying to programmatically create a Flink cluster on a given YARN 
cluster. I’m using the FlinkYarnClientBase class to do this currently with some 
limitations (Flink version 1.0.3).

I’m wanting to pass in my own YARN configuration so that I can deploy Flink on 
different YARN clusters. Currently these appear to be picked up from an 
environment variable YARN_CONF or if I add the yarn configuration files into 
the classpath it works for that one.

Is there a way I can dynamically build the YARN configuration and Flink 
configuration without utilizing the file system? It would be nice if we had the 
option to utilize the same Flink client code and have the CLI be a wrapper that 
adds all the environment variable stuff, and then when we need to do create 
these programmatically we can do that too.

I’ve started looking at the new code in Flink master and it seems a little 
easier than in 1.0.3 but seems that the YarnConfiguration is private in 
AbstractYarnClusterDescriptor making it difficult to set the configuration 
programmatically. I’m not sure if the new client code still throws an exception 
if the Flink config doesn’t exist, even though I could build out the Flink 
config programmatically.

Is there a recommended approach to allow this?

Benjamin


Re: Reprocessing data in Flink / rebuilding Flink state

2016-08-02 Thread Gyula Fóra
Hi,

I think it would probably be a good idea to make these tunable from the
command line. Otherwise we might run into the problem of accidentally
restoring a job that should fail like it does now.

Gyula

Stephan Ewen  ezt írta (időpont: 2016. aug. 2., K, 17:17):

> +1 to ignore unmatched state.
>
> Also +1 to allow programs that resume partially (add some new state that
> starts empty)
>
> Both are quite important for program evolution.
>
> On Tue, Aug 2, 2016 at 2:58 PM, Ufuk Celebi  wrote:
>
>> No, unfortunately this is the same for 1.1. The idea was to be explicit
>> about what works and what not. I see that this is actually a pain for this
>> use case (which is very nice and reasonable ;)). I think we can either
>> always ignore state that does not match to the new job or if that is too
>> aggressive we can add a flag to ignore unmatched state.
>>
>>
>> On Mon, Aug 1, 2016 at 6:39 PM, Aljoscha Krettek 
>> wrote:
>>
>>> +Ufuk, looping him in directly
>>>
>>> Hmm, I think this is changed for the 1.1 release. Ufuk could you please
>>> comment?
>>>
>>>
>>> On Mon, 1 Aug 2016 at 08:07 Josh  wrote:
>>>
 Cool, thanks - I've tried out the approach where we replay data from
 the Kafka compacted log, then take a savepoint and switch to the live
 stream.

 It works but I did have to add in a dummy operator for every operator
 that was removed. Without doing this, I got an exception:
 java.lang.IllegalStateException: Failed to rollback to savepoint
 Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot
 map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program.
 This indicates that the program has been changed in a non-compatible way
  after the savepoint.

 I had a Kafka source and a flat mapper chained together when replaying,
 so to make it work I had to add two dummy operators and assign the same UID
 I used when replaying, like this:
 stream.map(x =>
 x).uid("kafka-replay").name("dummy-1").startNewChain().map(x =>
 x).name("dummy-2")

 I guess it would be nice if Flink could recover from removed
 tasks/operators without needing to add dummy operators like this.

 Josh

 On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek 
 wrote:

> Hi,
> I have to try this to verify but I think the approach works if you
> give the two sources different UIDs. The reason is that Flink will ignore
> state for which it doesn't have an operator to assign it to. Therefore, 
> the
> state of the "historical Kafka source" should be silently discarded.
>
> Cheers,
> Aljoscha
>
> On Fri, 29 Jul 2016 at 18:12 Josh  wrote:
>
>> @Aljoscha - The N-input operator way sounds very nice, for now I
>> think I'll try and get something quick running the hacky way, then if we
>> decide to make this a permanent solution maybe I can work on the proper
>> solution. I was wondering about your suggestion for "warming up" the 
>> state
>> and then taking a savepoint and switching sources - since the Kafka 
>> sources
>> are stateful and are part of Flink's internal state, wouldn't this break
>> when trying to restore the job with a different source? Would I need to
>> assign the replay source a UID, and when switching from replay to live,
>> remove the replay source and replace it with an dummy operator with the
>> same UID?
>>
>> @Jason - I see what you mean now, with the historical and live Flink
>> jobs. That's an interesting approach - I guess it's solving a slightly
>> different problem to my 'rebuilding Flink state upon starting job' - as
>> you're rebuilding state as part of the main job when it comes across 
>> events
>> that require historical data. Actually I think we'll need to do something
>> very similar in the future but right now I can probably get away with
>> something simpler!
>>
>> Thanks for the replies!
>>
>> Josh
>>
>> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch 
>> wrote:
>>
>>> Aljoscha's approach is probably better, but to answer your
>>> questions...
>>>
>>> >How do you send a request from one Flink job to another?
>>> All of our different flink jobs communicate over kafka.  So the main
>>> flink job would be listening to both a "live" kafka source, and a
>>> "historical" kafka source.  The historical flink job would listen to a
>>> "request" kafka source.  When the main job gets an event that it does 
>>> not
>>> have state for it writes to the "request" topic.  The historical job 
>>> would
>>> read the request, grab the relevant old events from GCS, and write them 
>>> to
>>> the "historical" kafka topic.  The "historical" source 

Re: Container running beyond physical memory limits when processing DataStream

2016-08-02 Thread Jack Huang
Hi Max,

Is there a way to limit the JVM memory usage (something like the -Xmx flag)
for the task manager so that it won't go over the YARN limit but will just
run GC until there is memory to use? Trying to allocate "enough" memory for
this stream task is not ideal because I could have indefinitely many
messages backed-up in the source to be process.

Thanks,
Jack


On Tue, Aug 2, 2016 at 5:21 AM, Maximilian Michels  wrote:

> Your job creates a lot of String objects which need to be garbage
> collected. It could be that the JVM is not fast enough and Yarn kills
> the JVM for consuming too much memory.
>
> You can try two things:
>
> 1) Give the task manager more memory
> 2) Increase the Yarn heap cutoff ratio (e.g yarn.heap-cutoff-ratio: 0.4)
>
> If the error still occurs then we need to investigate further.
>
> Thanks,
> Max
>
>
> >
> >
> >
> >
> > On Fri, Jul 29, 2016 at 11:19 AM, Jack Huang  wrote:
> >>
> >> Hi Max,
> >>
> >> Each events are only a few hundred bytes. I am reading from a Kafka
> topic
> >> from some offset in the past, so the events should be flowing in as
> fast as
> >> Flink can process them.
> >>
> >> The entire YARN task log, which contains both JobManager and TaskManager
> >> outputs, is attached.
> >>
> >> Thanks a lot,
> >> Jack
> >>
> >>
> >> On Fri, Jul 29, 2016 at 2:04 AM, Maximilian Michels 
> >> wrote:
> >>>
> >>> Hi Jack,
> >>>
> >>> Considering the type of job you're running, you shouldn't run out of
> >>> memory. Could it be that the events are quite large strings? It could
> >>> be that the TextOutputFormat doesn't write to disk fast enough and
> >>> accumulates memory. Actually, it doesn't perform regular flushing
> >>> which could be an issue.
> >>>
> >>> I'm just guessing, we need to investigate further. Could you please
> >>> supply the entire JobManager log file output?
> >>>
> >>> Thanks,
> >>> Max
> >>>
> >>> On Fri, Jul 29, 2016 at 12:59 AM, Jack Huang  wrote:
> >>> > Hi all,
> >>> >
> >>> > I am running a test Flink streaming task under YARN. It reads
> messages
> >>> > from
> >>> > a Kafka topic and writes them to local file system.
> >>> >
> >>> > object PricerEvent {
> >>> > def main(args:Array[String]) {
> >>> > val kafkaProp = new Properties()
> >>> > kafkaProp.setProperty("bootstrap.servers", "localhost:6667")
> >>> > kafkaProp.setProperty("auto.offset.reset", "earliest")
> >>> >
> >>> > val env = StreamExecutionEnvironment.getExecutionEnvironment
> >>> > env.setStateBackend(new MemoryStateBackend)
> >>> >
> >>> > val wins = env.addSource(new
> >>> > FlinkKafkaConsumer09[String]("events",
> >>> > new SimpleStringSchema, kafkaProp))
> >>> > wins.writeAsText("/home/user/flink_out/" + new
> >>> > SimpleDateFormat("-MM-dd_HH-mm-ss").format(new Date))
> >>> >
> >>> > env.execute
> >>> > }
> >>> > }
> >>> >
> >>> > With the command
> >>> >
> >>> > flink run -m yarn-cluster -yn 1 -ytm 2048 -c PricerEvent
> >>> > /home/user/flink-example/build/libs/flink-example-1.0-all.jar
> >>> >
> >>> >
> >>> > The task runs fine for a moment and then terminates. I looked into
> the
> >>> > error
> >>> > log and found following out-of-memory error message:
> >>> >
> >>> > 2016-07-28 22:34:40,397 INFO  org.apache.flink.yarn.YarnJobManager
> >>> > - Container container_e05_1467433388200_0136_01_02 is completed
> >>> > with
> >>> > diagnostics: Container
> >>> > [pid=5832,containerID=container_e05_1467433388200_0136_01_02] is
> >>> > running
> >>> > beyond physical memory limits. Current usage: 2.3 GB of 2 GB physical
> >>> > memory
> >>> > used; 6.1 GB of 4.2 GB virtual memory used. Killing container.
> >>> > Dump of the process-tree for
> container_e05_1467433388200_0136_01_02
> >>> > :
> >>> > |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> >>> > SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
> FULL_CMD_LINE
> >>> > |- 5838 5832 5832 5832 (java) 2817 481 6553391104 592779
> >>> > /usr/jdk64/jdk1.8.0_60/bin/java -Xms1448m -Xmx1448m
> >>> > -XX:MaxDirectMemorySize=1448m
> >>> >
> >>> >
> -Dlog.file=/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_02/taskmanager.log
> >>> > -Dlogback.configurationFile=file:logback.xml
> >>> > -Dlog4j.configuration=file:log4j.properties
> >>> > org.apache.flink.yarn.YarnTaskManagerRunner --configDir .
> >>> > |- 5832 5830 5832 5832 (bash) 0 0 12759040 357 /bin/bash -c
> >>> > /usr/jdk64/jdk1.8.0_60/bin/java -Xms1448m -Xmx1448m
> >>> > -XX:MaxDirectMemorySize=1448m
> >>> >
> >>> >
> -Dlog.file=/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_02/taskmanager.log
> >>> > -Dlogback.configurationFile=file:logback.xml
> >>> > -Dlog4j.configuration=file:log4j.properties
> >>> > org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
> >>> >
> >>> >
> 

Re: Reprocessing data in Flink / rebuilding Flink state

2016-08-02 Thread Stephan Ewen
+1 to ignore unmatched state.

Also +1 to allow programs that resume partially (add some new state that
starts empty)

Both are quite important for program evolution.

On Tue, Aug 2, 2016 at 2:58 PM, Ufuk Celebi  wrote:

> No, unfortunately this is the same for 1.1. The idea was to be explicit
> about what works and what not. I see that this is actually a pain for this
> use case (which is very nice and reasonable ;)). I think we can either
> always ignore state that does not match to the new job or if that is too
> aggressive we can add a flag to ignore unmatched state.
>
>
> On Mon, Aug 1, 2016 at 6:39 PM, Aljoscha Krettek 
> wrote:
>
>> +Ufuk, looping him in directly
>>
>> Hmm, I think this is changed for the 1.1 release. Ufuk could you please
>> comment?
>>
>>
>> On Mon, 1 Aug 2016 at 08:07 Josh  wrote:
>>
>>> Cool, thanks - I've tried out the approach where we replay data from the
>>> Kafka compacted log, then take a savepoint and switch to the live stream.
>>>
>>> It works but I did have to add in a dummy operator for every operator
>>> that was removed. Without doing this, I got an exception:
>>> java.lang.IllegalStateException: Failed to rollback to savepoint
>>> Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot
>>> map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program.
>>> This indicates that the program has been changed in a non-compatible way
>>>  after the savepoint.
>>>
>>> I had a Kafka source and a flat mapper chained together when replaying,
>>> so to make it work I had to add two dummy operators and assign the same UID
>>> I used when replaying, like this:
>>> stream.map(x =>
>>> x).uid("kafka-replay").name("dummy-1").startNewChain().map(x =>
>>> x).name("dummy-2")
>>>
>>> I guess it would be nice if Flink could recover from removed
>>> tasks/operators without needing to add dummy operators like this.
>>>
>>> Josh
>>>
>>> On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,
 I have to try this to verify but I think the approach works if you give
 the two sources different UIDs. The reason is that Flink will ignore state
 for which it doesn't have an operator to assign it to. Therefore, the state
 of the "historical Kafka source" should be silently discarded.

 Cheers,
 Aljoscha

 On Fri, 29 Jul 2016 at 18:12 Josh  wrote:

> @Aljoscha - The N-input operator way sounds very nice, for now I think
> I'll try and get something quick running the hacky way, then if we decide
> to make this a permanent solution maybe I can work on the proper solution.
> I was wondering about your suggestion for "warming up" the state and then
> taking a savepoint and switching sources - since the Kafka sources are
> stateful and are part of Flink's internal state, wouldn't this break when
> trying to restore the job with a different source? Would I need to assign
> the replay source a UID, and when switching from replay to live, remove 
> the
> replay source and replace it with an dummy operator with the same UID?
>
> @Jason - I see what you mean now, with the historical and live Flink
> jobs. That's an interesting approach - I guess it's solving a slightly
> different problem to my 'rebuilding Flink state upon starting job' - as
> you're rebuilding state as part of the main job when it comes across 
> events
> that require historical data. Actually I think we'll need to do something
> very similar in the future but right now I can probably get away with
> something simpler!
>
> Thanks for the replies!
>
> Josh
>
> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch 
> wrote:
>
>> Aljoscha's approach is probably better, but to answer your
>> questions...
>>
>> >How do you send a request from one Flink job to another?
>> All of our different flink jobs communicate over kafka.  So the main
>> flink job would be listening to both a "live" kafka source, and a
>> "historical" kafka source.  The historical flink job would listen to a
>> "request" kafka source.  When the main job gets an event that it does not
>> have state for it writes to the "request" topic.  The historical job 
>> would
>> read the request, grab the relevant old events from GCS, and write them 
>> to
>> the "historical" kafka topic.  The "historical" source and the "live"
>> source are merged and proceed through the main flink job as one stream.
>>
>> >How do you handle the switchover between the live stream and the
>> historical stream? Do you somehow block the live stream source and detect
>> when the historical data source is no longer emitting new elements?
>> When the main job sends a request to the historical job, the main job
>> starts storing 

Re: How to read AVRO data from Kafka using Flink

2016-08-02 Thread Shannon Carey
I can tell you that we are reading Avro data from Kafka on Flink without 
problems. It seems like you have a mistake somewhere in your system. If I were 
you I would try your serialization & deserialization code in a simple program 
within the same JVM, then gradually add the other components in order to narrow 
down where the problem is coming from.

Shannon

From: "Alam, Zeeshan" >
Date: Tuesday, August 2, 2016 at 5:28 AM
To: "user@flink.apache.org" 
>
Subject: How to read AVRO data from Kafka using Flink

Hi All,

I am trying to read AVRO data from Kafka using Flink 1.0.3 but I am getting 
error. I have posted this issue in Stack Overflow: 
http://stackoverflow.com/questions/38715286/how-to-decode-kafka-messages-using-avro-and-flink
 . Is there any mistake we can try to look into or there a better way to read 
AVR data from Kafka using Flink?

Thanks & Regards
Zeeshan Alam




Re: CEP and Within Clause

2016-08-02 Thread Sameer W
Thank you-  It is very clear now.

Sameer

On Tue, Aug 2, 2016 at 10:29 AM, Till Rohrmann 
wrote:

> The CEP operator maintains for each pattern a window length. This means
> that every starting event will set its own timeout value.
>
> So if T=51 arrives in the 11th minute, then it depends whether the second
> T=31 arrived sometime between the 1st and 11th minute. If that's the case,
> then you should also see a second matching.
>
> Cheers,
> Till
>
> On Tue, Aug 2, 2016 at 10:20 PM, Sameer W  wrote:
>
>> Thanks Till,
>>
>> In that case if I have a pattern -
>> First = T > 30
>> Followed By = T > 50
>> Within 10 minutes
>>
>> If I get the following sequence of events within 10 minutes
>> T=31, T=51, T=31, T=51
>>
>> I assume the alert will fire twice now.
>>
>> But what happens if the last T=51 arrives in the 11th minute. If the
>> partially matched pattern is discarded after 10 minutes how will the system
>> detect T=51. Or do you mean that that timer (for the within clause) is
>> reset each time the patter T>30 matches. In that case it would fire!
>>
>> Thanks,
>> Sameer
>>
>> On Tue, Aug 2, 2016 at 10:02 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Sameer,
>>>
>>> the within clause of CEP uses neither tumbling nor sliding windows. It
>>> is more like a session window which is started whenever an element which
>>> matches the starting condition arrives. As long as new events which fulfill
>>> the pattern definition arrive within the length of the window, they will be
>>> added. If the pattern should not be completed within the specified time
>>> interval, the partially matched pattern will be discarded. If you've
>>> specified a timeout handler, then the timeout handler is called with the
>>> partial pattern.
>>>
>>> At the moment, there is no way to re-insert elements in the upstream.
>>> Actually there is also no need for it because the CEP operator will detect
>>> the alert patterns if there are two temperature readings > 150 within 6
>>> seconds.
>>>
>>> Cheers,
>>> Till
>>>
>>>
>>>
>>> On Tue, Aug 2, 2016 at 5:12 AM, Aljoscha Krettek 
>>> wrote:
>>>
 +Till, looping him in directly, he probably missed this because he was
 away for a while.



 On Tue, 26 Jul 2016 at 18:21 Sameer W  wrote:

> Hi,
>
> It looks like the WithIn clause of CEP uses Tumbling Windows. I could
> get it to use Sliding windows by using an upstream pipeline which uses
> Sliding Windows and produces repeating elements (in each sliding window)
> and applying a Watermark assigner on the resulting stream with elements
> duplicated. I wanted to use the "followedBy" pattern where there is a
> strong need for sliding windows.
>
> Is there a plan to add sliding windows to the within clause at some
> point?
>
> The PatternStream class's "select" and "flatSelect" have overloaded
> versions which take PatternTimeOut variable. Is there a way to insert some
> of those elements back to the front of the stream. Say I am trying to find
> a pattern where two temperature readings >150 within 6 second window 
> should
> raise an alert. If only one was found, can I insert that one back in the
> front of the stream on that task node (for that window pane) so that I can
> find a pattern match in the events occurring in the next 6 seconds. If I
> can do that, I don't need sliding windows. Else I cannot avoid using them
> for such scenarios.
>
> Thanks,
> Sameer
>

>>>
>>
>


RE: What is the recommended way to read AVRO data from Kafka using flink.

2016-08-02 Thread Alam, Zeeshan
Hi Stephan,

I went through one of the old mail thread 
http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E


Here it is mentioned that  When reading from Kafka you are expected to define a 
DeserializationSchema. There is no out of the box (de)serializer for Flink with 
Kafka, but it should be not very hard to add.



I have some questions:



1.   As per FLINK-3691  you are adding GenericDatumReader, so I suppose I 
need to use it instead of DatumReader in my  DeserializationSchema which is 
required to read data from Kafka?



2.  What is the recommended way to read AVRO binary data from Kafka if I  have 
the AVRO schema file [*.avsc ] with me? Is there a better more efficient 
approach?



3.   Can AvroInputFormat be used to read Kafka data or 
DeserializationSchema is a must to read data from Kafka, also AvroInputFormat 
doesn’t have any javaDoc with it.





Thanks & Regards,
Zeeshan Alam




From: Stephan Ewen [mailto:se...@apache.org]
Sent: Tuesday, August 02, 2016 7:52 PM
To: user@flink.apache.org
Subject: Re: What is the recommended way to read AVRO data from Kafka using 
flink.

Hi!

I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1

Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691

Here is the mail thread:
http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E

You could try and use the latest release candidate to get the 
fix:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html

The release is also happening, so should be out in a stable release soon.

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan 
> wrote:
Hi,

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I 
am having the AVRO schema file with me which was used to write data in Kafka. 
Here 
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
 you have mentioned that using the GenericData.Record type is possible with 
Flink, but not recommended. Since the record contains the full schema, its very 
data intensive and thus probably slow to use. So what is the recommended way to 
read AVRO data from Kafka using flink.

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", 
"dojo3x:9092,dojox:9092,dojox:9092");
  properties.setProperty("zookeeper.connect", 
"dojo3x:2181,dojox:2181,dojox:2181");
  properties.setProperty("group.id", 
"Zeeshantest");
  AvroDeserializationSchema avroSchema = new 
AvroDeserializationSchema<>(GenericData.Record.class);
  FlinkKafkaConsumer08 kafkaConsumer = new 
FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
  DataStream messageStream = 
env.addSource(kafkaConsumer);
  messageStream.rebalance().print();
  env.execute("Flink AVRO KAFKA Test");
   }

This is the AvroDeserializationSchema that I am using.


public class AvroDeserializationSchema implements DeserializationSchema {

   private static final long serialVersionUID = 4330538776656642778L;

   private final Class avroType;
   private transient DatumReader reader;
   private transient BinaryDecoder decoder;

   public AvroDeserializationSchema(Class avroType) {
  this.avroType = avroType;
   }

   @Override
   public T deserialize(byte[] message) {
  ensureInitialized();
  try {
 decoder = DecoderFactory.get().binaryDecoder(message, 
decoder);
 return reader.read(null, decoder);
  } catch (Exception e) {
 throw new RuntimeException(e);
  }
   }

   @Override
   public boolean isEndOfStream(T nextElement) {
  return false;
   }

   @Override
   public TypeInformation getProducedType() {
  return TypeExtractor.getForClass(avroType);
   }

   private void ensureInitialized() {
  if (reader == null) {
 if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
   reader = new SpecificDatumReader(avroType);
 } else {
   reader = new ReflectDatumReader(avroType);
 }
  }
   }
}

On running this I am getting java.lang.Exception: Not a Specific class: class 
org.apache.avro.generic.GenericData$Record.

Thanks & Regards
Zeeshan Alam

Re: CEP and Within Clause

2016-08-02 Thread Till Rohrmann
The CEP operator maintains for each pattern a window length. This means
that every starting event will set its own timeout value.

So if T=51 arrives in the 11th minute, then it depends whether the second
T=31 arrived sometime between the 1st and 11th minute. If that's the case,
then you should also see a second matching.

Cheers,
Till

On Tue, Aug 2, 2016 at 10:20 PM, Sameer W  wrote:

> Thanks Till,
>
> In that case if I have a pattern -
> First = T > 30
> Followed By = T > 50
> Within 10 minutes
>
> If I get the following sequence of events within 10 minutes
> T=31, T=51, T=31, T=51
>
> I assume the alert will fire twice now.
>
> But what happens if the last T=51 arrives in the 11th minute. If the
> partially matched pattern is discarded after 10 minutes how will the system
> detect T=51. Or do you mean that that timer (for the within clause) is
> reset each time the patter T>30 matches. In that case it would fire!
>
> Thanks,
> Sameer
>
> On Tue, Aug 2, 2016 at 10:02 AM, Till Rohrmann 
> wrote:
>
>> Hi Sameer,
>>
>> the within clause of CEP uses neither tumbling nor sliding windows. It is
>> more like a session window which is started whenever an element which
>> matches the starting condition arrives. As long as new events which fulfill
>> the pattern definition arrive within the length of the window, they will be
>> added. If the pattern should not be completed within the specified time
>> interval, the partially matched pattern will be discarded. If you've
>> specified a timeout handler, then the timeout handler is called with the
>> partial pattern.
>>
>> At the moment, there is no way to re-insert elements in the upstream.
>> Actually there is also no need for it because the CEP operator will detect
>> the alert patterns if there are two temperature readings > 150 within 6
>> seconds.
>>
>> Cheers,
>> Till
>>
>>
>>
>> On Tue, Aug 2, 2016 at 5:12 AM, Aljoscha Krettek 
>> wrote:
>>
>>> +Till, looping him in directly, he probably missed this because he was
>>> away for a while.
>>>
>>>
>>>
>>> On Tue, 26 Jul 2016 at 18:21 Sameer W  wrote:
>>>
 Hi,

 It looks like the WithIn clause of CEP uses Tumbling Windows. I could
 get it to use Sliding windows by using an upstream pipeline which uses
 Sliding Windows and produces repeating elements (in each sliding window)
 and applying a Watermark assigner on the resulting stream with elements
 duplicated. I wanted to use the "followedBy" pattern where there is a
 strong need for sliding windows.

 Is there a plan to add sliding windows to the within clause at some
 point?

 The PatternStream class's "select" and "flatSelect" have overloaded
 versions which take PatternTimeOut variable. Is there a way to insert some
 of those elements back to the front of the stream. Say I am trying to find
 a pattern where two temperature readings >150 within 6 second window should
 raise an alert. If only one was found, can I insert that one back in the
 front of the stream on that task node (for that window pane) so that I can
 find a pattern match in the events occurring in the next 6 seconds. If I
 can do that, I don't need sliding windows. Else I cannot avoid using them
 for such scenarios.

 Thanks,
 Sameer

>>>
>>
>


Re: partial savepoints/combining savepoints

2016-08-02 Thread Till Rohrmann
Hi Claudia,

1) At the moment the offset information will be written to the ZooKeeper
quorum used by Kafka as well as to the savepoint. Reading the savepoint is
not so easy to do since you would need to know the internal representation
of the savepoint. But you could try to read the Kafka offsets from
ZooKeeper.

2) That depends a little bit on the deployment and the size of the job. Are
you using a yarn session or a standalone cluster? Then the task manager
should already be registered at the job manager and the deployment for each
task should be in the milli second range. If you start a new yarn
application for a flink job (per job cluster), then it might take a bit
longer depending on how long it takes to allocate the requested resources
by Yarn. But once this is done, the deployment for a task should be in the
sub second range.

3) If you want to keep the different Flink jobs separated, then you should
submit them separately to a Flink cluster or start a Flink cluster per job
(e.g. with Yarn). I don't think that this a bad architecture if you want to
fulfil these requirements. However, I'm not sure whether merging and
splitting savepoints will be implemented anytime soon.

Actually we're currently working on improving Flink's functionality to be
started with a dedicated job. This means that you start a job manager which
has already the job jar in its classpath and directly starts executing the
contained job. This will be helpful for deployment scenarios how they
appear when using docker images, for example. I could imagine that this
could be helpful for your use case as well.

Cheers,
Till

On Mon, Aug 1, 2016 at 10:40 PM, Claudia Wegmann 
wrote:

> Hi Till,
>
>
>
> thanks for the quick reply. Too bad, I thought I was on the right track
> with savepoints here.
>
>
>
> Some follow-up questions:
>
>
>
> 1.)Can I do the whole thing of transferring state and the position in
> the Kafka topic manually for one stream? In other words: is this
> information accessible easily?
>
> 2.)In any case I would need to stop the running job, change the
> topology (e.g. the number of streams in the program) and resume processing.
> Can you name the overhead of time coming from stopping and starting a Flink
> job?
>
> 3.)I’m aware of the upcoming feature for scaling in and out. But I
> don’t quite see, how this will help me with different services.
> I thought of each service having its own Flink instance/cluster. I would
> commit this service as one job to the dedicated Flink containing all the
> necessary streams and computations. Is this a bad architecture?
> Would it be better to have one big Flink cluster and commit one big Job,
> which contains all the streams? (As I got to know, committing multiple jobs
> to one Flink instance is not recommended).
> To be honest, I’m not quite there to totally understand the different
> deployment options of Flink and how to bring them together with a
> microservice architecture where I have a service packed as a JAR-File and
> wanting to be able to just deploy this JAR-File. I thought of this service
> containing Flink and then start the JobManager and some TaskManagers from
> this service and deploy itself as the Flink job with a dedicated entry
> point. Is this a good idea? Or is it even possible?
>
>
>
> Thanks in advance,
>
> Claudia
>
>
>
> *Von:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Gesendet:* Montag, 1. August 2016 16:21
> *An:* user@flink.apache.org
> *Betreff:* Re: partial savepoints/combining savepoints
>
>
>
> Hi Claudia,
>
>
>
> unfortunately neither taking partial savepoints nor combining multiple
> savepoints into one savepoint is currently supported by Flink.
>
>
>
> However, we're currently working on dynamic scaling which will allow to
> adjust the parallelism of your Flink job. This helps you to scale in/out
> depending on the workload of your job. However, you would only be able to
> scale within a single Flink job and not across Flink jobs.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Mon, Aug 1, 2016 at 9:49 PM, Claudia Wegmann 
> wrote:
>
> Hey everyone,
>
>
>
> I’ve got some questions regarding savepoints in Flink. I have the
> following situation:
>
>
>
> There is a microservice that reads data from Kafka topics, creates Flink
> streams from this data and does different computations/pattern matching
> workloads. If the overall workload for this service becomes too big, I want
> to start a new instance of this service and share the work between the
> running services. To accomplish that, I thought about using Flinks
> savepoint mechanism. But there are some open questions:
>
>
>
> 1.)Can I combine two or more savepoints in one program?
> Think of two services already running. Now I’m starting up a third
> service. The new one would get savepoints from the already running
> services. It than would continue computation of some streams while the
> other services would discard calculation on these 

Re: What is the recommended way to read AVRO data from Kafka using flink.

2016-08-02 Thread Stephan Ewen
Hi!

I think this is a known limitation for Flink 1.0 and it is fixed in Flink
1.1

Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691

Here is the mail thread:
http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E

You could try and use the latest release candidate to get the fix:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html

The release is also happening, so should be out in a stable release soon.

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan  wrote:

> Hi,
>
>
>
> I am using *Flink 1.0.3* and *FlinkKafkaConsumer08* to read AVRO data
> from flink. I am having the* AVRO schema file* with me which was used to
> write data in Kafka. Here
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
> you have mentioned that using the GenericData.Record type is possible with
> Flink, but not recommended. Since the record contains the full schema, its
> very data intensive and thus probably slow to use. So what is the
> recommended way to read AVRO data from Kafka using flink.
>
>
>
> *public* *static* *void* main(String[] args) *throws* Exception {
>
>   StreamExecutionEnvironment env = StreamExecutionEnvironment.
> *getExecutionEnvironment*();
>
>   Properties properties = *new* Properties();
>
>   properties.setProperty("bootstrap.servers",
> "dojo3x:9092,dojox:9092,dojox:9092");
>
>   properties.setProperty("zookeeper.connect",
> "dojo3x:2181,dojox:2181,dojox:2181");
>
>   properties.setProperty("group.id", "Zeeshantest");
>
>   AvroDeserializationSchema avroSchema =
> *new* AvroDeserializationSchema<>(GenericData.Record.*class*);
>
>   FlinkKafkaConsumer08 kafkaConsumer =
> *new* FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
>
>   DataStream messageStream = env
> .addSource(kafkaConsumer);
>
>   messageStream.rebalance().print();
>
>   env.execute("Flink AVRO KAFKA Test");
>
>}
>
>
>
> This is the *AvroDeserializationSchema* that I am using.
>
>
>
>
>
> *public* *class* AvroDeserializationSchema *implements*
> DeserializationSchema {
>
>
>
>*private* *static* *final* *long* *serialVersionUID* =
> 4330538776656642778L;
>
>
>
>*private* *final* Class avroType;
>
>*private* *transient* DatumReader reader;
>
>*private* *transient* BinaryDecoder decoder;
>
>
>
>*public* AvroDeserializationSchema(Class avroType) {
>
>   *this*.avroType = avroType;
>
>}
>
>
>
>@Override
>
>*public* T deserialize(*byte*[] message) {
>
>   ensureInitialized();
>
>   *try* {
>
>  decoder = DecoderFactory.*get*().binaryDecoder(
> message, decoder);
>
>  *return* reader.read(*null*, decoder);
>
>   } *catch* (Exception e) {
>
>  *throw* *new* RuntimeException(e);
>
>   }
>
>}
>
>
>
>@Override
>
>*public* *boolean* isEndOfStream(T nextElement) {
>
>   *return* *false*;
>
>}
>
>
>
>@Override
>
>*public* TypeInformation getProducedType() {
>
>   *return* TypeExtractor.*getForClass*(avroType);
>
>}
>
>
>
>*private* *void* ensureInitialized() {
>
>   *if* (reader == *null*) {
>
>  *if* (org.apache.avro.specific.SpecificRecordBase.
> *class*.isAssignableFrom(avroType)) {
>
>reader = *new* SpecificDatumReader(avroType
> );
>
>  } *else* {
>
>reader = *new* ReflectDatumReader(avroType);
>
>  }
>
>   }
>
>}
>
> }
>
>
>
> On running this I am getting *java.lang.Exception*: Not a Specific class:
> class org.apache.avro.generic.GenericData$Record.
>
>
>
> *Thanks & Regards*
>
> *Zeeshan Alam *
>
> [image: cid:image001.jpg@01CFC06C.80406AE0]
>
> *[image: cid:image002.jpg@01CFC2B0.B0315750] +91 80 6626 5982  [image:
> cid:image003.jpg@01CFC2B0.B0315750] +91 7259501608 <%2B91%207259501608>*
>
> *Fidelity Internal Information*
> 
>
>
>
> *Techworks Monitoring link*
> 
>
>
>
>
>


Re: CEP and Within Clause

2016-08-02 Thread Sameer W
Thanks Till,

In that case if I have a pattern -
First = T > 30
Followed By = T > 50
Within 10 minutes

If I get the following sequence of events within 10 minutes
T=31, T=51, T=31, T=51

I assume the alert will fire twice now.

But what happens if the last T=51 arrives in the 11th minute. If the
partially matched pattern is discarded after 10 minutes how will the system
detect T=51. Or do you mean that that timer (for the within clause) is
reset each time the patter T>30 matches. In that case it would fire!

Thanks,
Sameer

On Tue, Aug 2, 2016 at 10:02 AM, Till Rohrmann  wrote:

> Hi Sameer,
>
> the within clause of CEP uses neither tumbling nor sliding windows. It is
> more like a session window which is started whenever an element which
> matches the starting condition arrives. As long as new events which fulfill
> the pattern definition arrive within the length of the window, they will be
> added. If the pattern should not be completed within the specified time
> interval, the partially matched pattern will be discarded. If you've
> specified a timeout handler, then the timeout handler is called with the
> partial pattern.
>
> At the moment, there is no way to re-insert elements in the upstream.
> Actually there is also no need for it because the CEP operator will detect
> the alert patterns if there are two temperature readings > 150 within 6
> seconds.
>
> Cheers,
> Till
>
>
>
> On Tue, Aug 2, 2016 at 5:12 AM, Aljoscha Krettek 
> wrote:
>
>> +Till, looping him in directly, he probably missed this because he was
>> away for a while.
>>
>>
>>
>> On Tue, 26 Jul 2016 at 18:21 Sameer W  wrote:
>>
>>> Hi,
>>>
>>> It looks like the WithIn clause of CEP uses Tumbling Windows. I could
>>> get it to use Sliding windows by using an upstream pipeline which uses
>>> Sliding Windows and produces repeating elements (in each sliding window)
>>> and applying a Watermark assigner on the resulting stream with elements
>>> duplicated. I wanted to use the "followedBy" pattern where there is a
>>> strong need for sliding windows.
>>>
>>> Is there a plan to add sliding windows to the within clause at some
>>> point?
>>>
>>> The PatternStream class's "select" and "flatSelect" have overloaded
>>> versions which take PatternTimeOut variable. Is there a way to insert some
>>> of those elements back to the front of the stream. Say I am trying to find
>>> a pattern where two temperature readings >150 within 6 second window should
>>> raise an alert. If only one was found, can I insert that one back in the
>>> front of the stream on that task node (for that window pane) so that I can
>>> find a pattern match in the events occurring in the next 6 seconds. If I
>>> can do that, I don't need sliding windows. Else I cannot avoid using them
>>> for such scenarios.
>>>
>>> Thanks,
>>> Sameer
>>>
>>
>


Re: Flink Kafka Consumer Behaviour

2016-08-02 Thread Till Rohrmann
Hi Janardhan,

Flink should commit the current offsets to Zookeeper whenever a checkpoint
has been completed. In case that you disabled checkpointing, then the
offsets will be periodically committed to ZooKeeper. The default value is
60s.

Could it be that there wasn't yet a completed checkpoint? Which version of
Flink are you using?

Cheers,
Till

On Tue, Aug 2, 2016 at 7:26 PM, Janardhan Reddy  wrote:

> Hi,
>
> When the run the following command i am getting that no topic is available
> for that consumer group.  i am suing
> flink-connector-kafka-0.8_${scala.version}(2.11).
>
> ./bin/kafka-consumer-groups.sh --zookeeper <> --group <> --describe
>
> No topic available for consumer group provided
>
>
> Does the kafka consumer commit offset to the broker always ? Do we need to
> enable checkpointing for the offsets to be committed ?
>
>
> Thanks
>


What is the recommended way to read AVRO data from Kafka using flink.

2016-08-02 Thread Alam, Zeeshan
Hi,

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I 
am having the AVRO schema file with me which was used to write data in Kafka. 
Here 
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
 you have mentioned that using the GenericData.Record type is possible with 
Flink, but not recommended. Since the record contains the full schema, its very 
data intensive and thus probably slow to use. So what is the recommended way to 
read AVRO data from Kafka using flink.

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", 
"dojo3x:9092,dojox:9092,dojox:9092");
  properties.setProperty("zookeeper.connect", 
"dojo3x:2181,dojox:2181,dojox:2181");
  properties.setProperty("group.id", "Zeeshantest");
  AvroDeserializationSchema avroSchema = new 
AvroDeserializationSchema<>(GenericData.Record.class);
  FlinkKafkaConsumer08 kafkaConsumer = new 
FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
  DataStream messageStream = 
env.addSource(kafkaConsumer);
  messageStream.rebalance().print();
  env.execute("Flink AVRO KAFKA Test");
   }

This is the AvroDeserializationSchema that I am using.


public class AvroDeserializationSchema implements DeserializationSchema {

   private static final long serialVersionUID = 4330538776656642778L;

   private final Class avroType;
   private transient DatumReader reader;
   private transient BinaryDecoder decoder;

   public AvroDeserializationSchema(Class avroType) {
  this.avroType = avroType;
   }

   @Override
   public T deserialize(byte[] message) {
  ensureInitialized();
  try {
 decoder = DecoderFactory.get().binaryDecoder(message, 
decoder);
 return reader.read(null, decoder);
  } catch (Exception e) {
 throw new RuntimeException(e);
  }
   }

   @Override
   public boolean isEndOfStream(T nextElement) {
  return false;
   }

   @Override
   public TypeInformation getProducedType() {
  return TypeExtractor.getForClass(avroType);
   }

   private void ensureInitialized() {
  if (reader == null) {
 if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
   reader = new SpecificDatumReader(avroType);
 } else {
   reader = new ReflectDatumReader(avroType);
 }
  }
   }
}

On running this I am getting java.lang.Exception: Not a Specific class: class 
org.apache.avro.generic.GenericData$Record.

Thanks & Regards
Zeeshan Alam
[cid:image001.jpg@01CFC06C.80406AE0]
[cid:image002.jpg@01CFC2B0.B0315750] +91 80 6626 5982  
[cid:image003.jpg@01CFC2B0.B0315750]  +91 7259501608
Fidelity Internal Information

Techworks Monitoring 
link




Re: CEP and Within Clause

2016-08-02 Thread Till Rohrmann
Hi Sameer,

the within clause of CEP uses neither tumbling nor sliding windows. It is
more like a session window which is started whenever an element which
matches the starting condition arrives. As long as new events which fulfill
the pattern definition arrive within the length of the window, they will be
added. If the pattern should not be completed within the specified time
interval, the partially matched pattern will be discarded. If you've
specified a timeout handler, then the timeout handler is called with the
partial pattern.

At the moment, there is no way to re-insert elements in the upstream.
Actually there is also no need for it because the CEP operator will detect
the alert patterns if there are two temperature readings > 150 within 6
seconds.

Cheers,
Till



On Tue, Aug 2, 2016 at 5:12 AM, Aljoscha Krettek 
wrote:

> +Till, looping him in directly, he probably missed this because he was
> away for a while.
>
>
>
> On Tue, 26 Jul 2016 at 18:21 Sameer W  wrote:
>
>> Hi,
>>
>> It looks like the WithIn clause of CEP uses Tumbling Windows. I could get
>> it to use Sliding windows by using an upstream pipeline which uses Sliding
>> Windows and produces repeating elements (in each sliding window) and
>> applying a Watermark assigner on the resulting stream with elements
>> duplicated. I wanted to use the "followedBy" pattern where there is a
>> strong need for sliding windows.
>>
>> Is there a plan to add sliding windows to the within clause at some
>> point?
>>
>> The PatternStream class's "select" and "flatSelect" have overloaded
>> versions which take PatternTimeOut variable. Is there a way to insert some
>> of those elements back to the front of the stream. Say I am trying to find
>> a pattern where two temperature readings >150 within 6 second window should
>> raise an alert. If only one was found, can I insert that one back in the
>> front of the stream on that task node (for that window pane) so that I can
>> find a pattern match in the events occurring in the next 6 seconds. If I
>> can do that, I don't need sliding windows. Else I cannot avoid using them
>> for such scenarios.
>>
>> Thanks,
>> Sameer
>>
>


Re: Reprocessing data in Flink / rebuilding Flink state

2016-08-02 Thread Ufuk Celebi
No, unfortunately this is the same for 1.1. The idea was to be explicit
about what works and what not. I see that this is actually a pain for this
use case (which is very nice and reasonable ;)). I think we can either
always ignore state that does not match to the new job or if that is too
aggressive we can add a flag to ignore unmatched state.


On Mon, Aug 1, 2016 at 6:39 PM, Aljoscha Krettek 
wrote:

> +Ufuk, looping him in directly
>
> Hmm, I think this is changed for the 1.1 release. Ufuk could you please
> comment?
>
>
> On Mon, 1 Aug 2016 at 08:07 Josh  wrote:
>
>> Cool, thanks - I've tried out the approach where we replay data from the
>> Kafka compacted log, then take a savepoint and switch to the live stream.
>>
>> It works but I did have to add in a dummy operator for every operator
>> that was removed. Without doing this, I got an exception:
>> java.lang.IllegalStateException: Failed to rollback to savepoint
>> Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot
>> map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program.
>> This indicates that the program has been changed in a non-compatible way
>>  after the savepoint.
>>
>> I had a Kafka source and a flat mapper chained together when replaying,
>> so to make it work I had to add two dummy operators and assign the same UID
>> I used when replaying, like this:
>> stream.map(x =>
>> x).uid("kafka-replay").name("dummy-1").startNewChain().map(x =>
>> x).name("dummy-2")
>>
>> I guess it would be nice if Flink could recover from removed
>> tasks/operators without needing to add dummy operators like this.
>>
>> Josh
>>
>> On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> I have to try this to verify but I think the approach works if you give
>>> the two sources different UIDs. The reason is that Flink will ignore state
>>> for which it doesn't have an operator to assign it to. Therefore, the state
>>> of the "historical Kafka source" should be silently discarded.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 29 Jul 2016 at 18:12 Josh  wrote:
>>>
 @Aljoscha - The N-input operator way sounds very nice, for now I think
 I'll try and get something quick running the hacky way, then if we decide
 to make this a permanent solution maybe I can work on the proper solution.
 I was wondering about your suggestion for "warming up" the state and then
 taking a savepoint and switching sources - since the Kafka sources are
 stateful and are part of Flink's internal state, wouldn't this break when
 trying to restore the job with a different source? Would I need to assign
 the replay source a UID, and when switching from replay to live, remove the
 replay source and replace it with an dummy operator with the same UID?

 @Jason - I see what you mean now, with the historical and live Flink
 jobs. That's an interesting approach - I guess it's solving a slightly
 different problem to my 'rebuilding Flink state upon starting job' - as
 you're rebuilding state as part of the main job when it comes across events
 that require historical data. Actually I think we'll need to do something
 very similar in the future but right now I can probably get away with
 something simpler!

 Thanks for the replies!

 Josh

 On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch 
 wrote:

> Aljoscha's approach is probably better, but to answer your questions...
>
> >How do you send a request from one Flink job to another?
> All of our different flink jobs communicate over kafka.  So the main
> flink job would be listening to both a "live" kafka source, and a
> "historical" kafka source.  The historical flink job would listen to a
> "request" kafka source.  When the main job gets an event that it does not
> have state for it writes to the "request" topic.  The historical job would
> read the request, grab the relevant old events from GCS, and write them to
> the "historical" kafka topic.  The "historical" source and the "live"
> source are merged and proceed through the main flink job as one stream.
>
> >How do you handle the switchover between the live stream and the
> historical stream? Do you somehow block the live stream source and detect
> when the historical data source is no longer emitting new elements?
> When the main job sends a request to the historical job, the main job
> starts storing any events that are come in for that key.  As the 
> historical
> events come in they are processed immediately.  The historical flink job
> flags the last event it sends.  When the main flink job sees the flagged
> event it knows it is caught up to where it was when it sent the request.
> You can then process the events that the main job stored, and when 

Re: Window Functions with Incremental Aggregation

2016-08-02 Thread David B. Ciar
Hello again,

Having had another go at this today, I clearly see that I cannot pass a
certain type into the fold/window function and expect to be able to return a
datastream of another type from the window function.  I have tried a
different approach and am now receiving a run-time exception, caused by
trying to use a composite case class as the fold accumulator value.  My
query now is whether this is possible, and if it is possible, how to fix the
run-time exception.  Again any help is appreciated.

The exception:

Exception in thread "main" java.lang.ClassCastException: [Ljava.lang.Object;
cannot be cast to [Lorg.apache.flink.api.common.typeinfo.TypeInformation;
at
org.management.observations.processing.jobs.QCBlockNull$$anon$6.(QCBlockNull.scala:104)
at
org.management.observations.processing.jobs.QCBlockNull$.main(QCBlockNull.scala:104)
at
org.management.observations.processing.jobs.QCBlockNull.main(QCBlockNull.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

The code snippet is pasted below, but also neater formatted Gist link:

// The cause of the exception is the .apply(...) below and the use of
IncrementalPlaceHolder.  The fold and window classes return type
IncrementalWindowPlaceholder

val nullQCEvents1h = nullStream
  .keyBy("feature","procedure")
  .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(30)))
  .apply(new IncrementalWindowPlaceholder(0,None,None,None),
new QCFoldCounter(),
new QCCheckNullAggregate())

// The aggregate class I want to use with the fold/window function and emit
as the DataStream type:

case class IncrementalWindowPlaceholder (foldedValue: Double,
 keys: Option[Tuple],
 startTime: Option[Long],
 endTime: Option[Long]){

  override def toString: String =
   
foldedValue.toString+','+keys.getOrElse('-')+','+startTime.getOrElse('-')+','+endTime.getOrElse('-')
}

Also here:
https://gist.github.com/dbciar/904e2d35d6aae30214666de1176f1d7c





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-Functions-with-Incremental-Aggregation-tp8246p8259.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: OutOfMemoryError

2016-08-02 Thread Stephan Ewen
My guess would be that you have a thread leak in the user code.
More memory will not solve the problem, only push it a bit further away.

On Mon, Aug 1, 2016 at 9:15 PM, Paulo Cezar  wrote:

> Hi folks,
>
>
> I'm trying to run a DataSet program but after around 200k records are 
> processed a "java.lang.OutOfMemoryError: unable to create new native thread" 
> stops me.
>
>
> I'm deploying Flink (via bin/yarn-session.sh) on a YARN cluster with 10 nodes 
> (each with 8 cores) and starting 10 task managers, each with 8 slots and 6GB 
> of RAM.
>
>
> Except for the data sink that writes to HDFS and runs with a parallelism of 
> 1, my job runs with a parallelism of 80 and has two input datasets, each is a 
> HDFS file with around 6GB and 20mi lines. Most of my map functions uses 
> external services via RPC or REST APIs to enrich the raw data with info from 
> other sources.
>
> Might I be doing something wrong or I really should have more memory 
> available?
>
> Thanks,
> Paulo Cezar
>
>


How to read AVRO data from Kafka using Flink

2016-08-02 Thread Alam, Zeeshan
Hi All,

I am trying to read AVRO data from Kafka using Flink 1.0.3 but I am getting 
error. I have posted this issue in Stack Overflow: 
http://stackoverflow.com/questions/38715286/how-to-decode-kafka-messages-using-avro-and-flink
 . Is there any mistake we can try to look into or there a better way to read 
AVR data from Kafka using Flink?

Thanks & Regards
Zeeshan Alam