Re: The implementation of the CoGroupFunction is not serializable

2017-05-01 Thread Kaepke, Marc
Hi Greg, thanks! Should all inner classes be static? Best, Marc Am 01.05.2017 um 00:21 schrieb Greg Hogan mailto:c...@greghogan.com>>: Hi Marc, These errors are usually an inner class which should be a static. Greg On Apr 30, 2017, at 2:13 PM, Kaepke, Marc mailto:marc.kae...@haw-hamburg.

RE: Collector.collect

2017-05-01 Thread Newport, Billy
We’ve done that but it’s very expensive from a serialization point of view when writing the same record multiple times, each in a different tuple. For example, we started with this: .collect(new Tuplemailto:gaurav671...@gmail.com] Sent: Saturday, April 29, 2017 4:32 AM To: user@flink.apache.org

Flink Event Time order

2017-05-01 Thread Björn Zachrisson
Hi, According to the link https://pastebin.com/409Z0tSc I'm trying to start building my flink application based on "taxi drives"-example. I use a priority queue to add the events and emitting watermarks every period+1min where period normally is 30minutes. The thing is that the events seems to

Re: Collector.collect

2017-05-01 Thread Chesnay Schepler
Hello, @Billy, what prevented you from duplicating/splitting the record, based on the bitmask, in a map function before the sink? This shouldn't incur any serialization overhead if the sink is chained to the map. The emitted Tuple could also share the GenericRecord; meaning you don't even have

RE: Collector.collect

2017-05-01 Thread Newport, Billy
There is likely a bug then, the ENUM,Record stream to a filter to a set of outputformats per filter was slower than the BITMASK,Record to single OutputFormat which demux’s the data to each file internally Are you saying do a custom writer inside a map rather than either of the 2 above approache

Re: Collector.collect

2017-05-01 Thread Chesnay Schepler
Oh you have multiple different output formats, missed that. For the Batch API you are i believe correct, using a custom output-format is the best solution. In the Streaming API the code below should be equally fast, if the filtered sets don't overlap. input = ... input.filter(conditionA).ou

High Availability on Yarn

2017-05-01 Thread Jain, Ankit
Hi fellow users, We are trying to straighten out high availability story for flink. Our setup includes a long running EMR cluster, job submission is a two-step process – 1) Flink cluster is first created using flink yarn client on the EMR cluster already running 2) Flink job is submitted. I als

Re: Iterating over keys in state backend

2017-05-01 Thread Ken Krugler
Hi Kostas, In my use case I’m keeping track of the state of URLs during a web crawl. This represents both current state (“URL X should be crawled at time Y, and has an estimated value of Z), and is the source of URLs to be fed into the crawl infrastructure - it’s a floor wax and a dessert toppi

Re: Problems reading Parquet input from HDFS

2017-05-01 Thread Lukas Kircher
Hi Flavio, thanks for your help. With Flink 1.2.0 and avro 1.8.1 it works fine for me too as long as I run it from the IDE. As soon as I submit it as a job to the cluster I get the described dependency issues. * If I use the Flink 1.2.0 binary and just add Flink as a Maven dependency to my pro