Re: Does Flink allows for encapsulation of transformations?

2016-06-09 Thread Ser Kho

Chesnay: I have two simple questions, related to the previous ones about 
encapsulation of transformations. 
Question 1. I have tried to extend my code using your suggestions and come up 
with a small concern. First, your code:
public static void main(String[] args) throws Exception 
{
   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
   DataSet pi = new classPI(env).compute();
   new classThatNeedsPI(env).computeWhatever(pi); //append your transformations 
to pi
env.execute();
 }


Below is my code (the bold lines are very similar and work ok). The line of 
concern is marked by blue color. The issue is that I do not use env in the 
constructor of the class classLengthCircle(), instead I use  DataSet pi in the 
method  computeLengthCircle(pi, Radius)and also DataSet Radius, but the latter 
does not matter for the question. Then, I proceed with transformations using 
this DataSet pi, see the  class classLengthCircle below. It seems that the 
logic of this class and its method computeLengthCircle() does not require env 
at all. My question is if this  code work will on a cluster (it does work on a 
local computer)?
    final ExecutionEnvironment env =  
ExecutionEnvironment.getExecutionEnvironment();               DataSet 
Radius = env.fromElements(10.0);            DataSet    NumIter 
=env.fromElements(100L);              // this line is similar to the 
suggested           DataSet pi = new classPI(env).compute(NumIter);  // 
this line is somewhat different from the suggested, as it has no env in the 
constructor           DataSet LengthCircle = new 
classLengthCircle().computeLengthCircle(pi, Radius); =  
public static final class classLengthCircle    {        public  DataSet 
computeLengthCircle(DataSet pi, DataSet Radius)        {       
DataSet result = pi.cross(Radius).map(       new 
MapFunction, Double >() { @Override     public Double 
map(Tuple2 arg0) throws Exception {     return 2*arg0.f0 
*arg0.f1;     }}         ); return result;          }         } 
Question 2:
I tried to enter a parameter DataSet NumIter into a class  MapFunction of 
transformation map(), see the blue mark in the code below. It seems this 
parameter appears in the MapFunction without explicit passing, since nowhere 
the line .map(new MapFunction() has any mentioning of NumIter.Is 
the suggested approach a right way to pass a parameter inside the 
transformation MapFunction ?Note, that the code works all right on a single 
computer.
public static final class classPI implements Serializable
   {  private final ExecutionEnvironment env;  public 
classPI(ExecutionEnvironment env) {this.env = env;} public  DataSet  
compute( final  DataSet NumIter) throws Exception{  return  
this.env.generateSequence(1, NumIter.collect().get(0)) .map(new Sampler()) 
.reduce(new SumReducer()) .map(new MapFunction()   { Long N = 
NumIter.collect().get(0);  @Override public Double map(Long arg0) throws 
Exception { return arg0 *4.0/N; }}); }}

Thanks a lot for your time.Ser



On Tuesday, June 7, 2016 8:14 AM, Chesnay Schepler  
wrote:
 

   1a. ah. yeah i see how it could work, but i wouldn't count on it in a 
cluster.
you would (most likely) run the the sub-job (calculating pi) only on a single 
node.
 
1b. different execution environments generally imply different flink programs.
 
2. sure it does, since it's a normal flink job. yours on the other hand 
doesn't, since the job calculating PI only runs on a single TaskManager.
 
3. there are 2 ways. you can either chain jobs like this: (effectively running 
2 flink programs in succession)
 public static void main(String[] args) throws Exception 
{
  double pi = new classPI().compute();
   System.out.println("We estimate Pi to be: "
 + pi);
  new classThatNeedsPI().computeWhatever(pi); //feeds pi into an 
env.fromElements call and proceeds from there
 } or (if all building blocks are flink programs) build a single job:
 public static void main(String[] args) throws Exception 
{
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
DataSet pi = new classPI(env).compute();
new classThatNeedsPI(env).computeWhatever(pi); //append your 
transformations to pi
env.execute();
 }

...
public DataSet compute() throws Exception {
return this.env.generateSequence(1, NumIter)
.map(new Sampler())
.reduce(new SumReducer())
.map(/*return 4 * x*/);}
...

public ? computeWhatever(DataSet pi) throws Exception {
...
}
 
On 07.06.2016 13:35, Ser Kho wrote:
  
  Chesnay: 
  1a. The code actually works, that is the point.  1b. What restrict for a 
Flink program to have several execution environments? 2. I am not sure that 
your modification allows for parallelism. Does it? 3. This code is a simple 

Re: Data Source Generator emits 4 instances of the same tuple

2016-06-09 Thread Biplob Biswas
Yes Thanks a lot, also the fact that I was using ParallelSourceFunction was
problematic. So as suggested by Fabian and Robert, I used Source Function
and then in the flink job, i set the output of map with a parallelism of 4
to get the desired result.

Thanks again.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-Source-Generator-emits-4-instances-of-the-same-tuple-tp7392p7513.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: NotSerializableException

2016-06-09 Thread Stephan Ewen
You can also make the KeySelector a static inner class. That should work as
well.

On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh  wrote:

> Thank you Aljoscha and Fabian for your replies.
>
> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm
> afraid this is a bug", I am assuming you are referring to Flink engine
> itself.
>
> @Fabian: thanks for the optimization tip.
>
> This is how I have got it working (with a hack): In my dataset, the join
> field/key can be null otherwise .where(fieldName) works and I don't get
> not-serializable exception. So I applied a MapFunction to DataSet and put a
> dummy value in the join field/key where it was null. Then In the join
> function, I change it back to null.
>
> Best,
> Tarandeep
>
> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> the problem is that the KeySelector is an anonymous inner class and as
>> such as a reference to the outer RecordFilterer object. Normally, this
>> would be rectified by the closure cleaner but the cleaner is not used in
>> CoGroup.where(). I'm afraid this is a bug.
>>
>> Best,
>> Aljoscha
>>
>>
>> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske  wrote:
>>
>>> Hi Tarandeep,
>>>
>>> the exception suggests that Flink tries to serialize RecordsFilterer as
>>> a user function (this happens via Java Serialization).
>>> I said suggests because the code that uses RecordsFilterer is not
>>> included.
>>>
>>> To me it looks like RecordsFilterer should not be used as a user
>>> function. It is a helper class to construct a DataSet program, so it should
>>> not be shipped for execution.
>>> You would use such a class as follows:
>>>
>>> DataSet records = ...
>>> DataSet filterIDs = ...
>>>
>>> RecordsFilterer rf = new RecordsFilterer();
>>> DataSet> result = rf.addFilterFlag(records,
>>> filterIDs, "myField");
>>>
>>> Regarding the join code, I would suggest an optimization.
>>> Instead of using CoGroup, I would use distinct and an OuterJoin like
>>> this:
>>>
>>> DataSet distIds = filtereredIds.distinct();
>>> DataSet result = records
>>>   .leftOuterJoin(distIds)
>>>   .where(KEYSELECTOR)
>>>   .equalTo("*") // use full string as key
>>>   .with(JOINFUNC) // set Bool to false if right == null, true otherwise
>>>
>>> Best, Fabian
>>>
>>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh :
>>>
 Hi,

 I am getting NoSerializableException in this class-

 

 public class RecordsFilterer {

 public DataSet> addFilterFlag(DataSet dataset, 
 DataSet filteredIds, String fieldName) {
 return dataset.coGroup(filteredIds)
 .where(new KeySelector() {
 @Override
 public String getKey(T t) throws Exception {
 String s = (String) t.get(fieldName);
 return s != null ? s : 
 UUID.randomUUID().toString();
 }
 })
 .equalTo((KeySelector) s -> s)
 .with(new CoGroupFunction>() {
 @Override
 public void coGroup(Iterable records, 
 Iterable ids,
 Collector> 
 collector) throws Exception {
 boolean filterFlag = false;
 for (String id : ids) {
 filterFlag = true;
 }

 for (T record : records) {
 collector.collect(new Tuple2<>(filterFlag, 
 record));
 }
 }
 });

 }
 }


 What I am trying to do is write a generic code that will join Avro
 records (of different types) with String records and there is a match add a
 filter flag. This way I can use the same code for different Avro record
 types. But I am getting this exception-

 Exception in thread "main"
 org.apache.flink.optimizer.CompilerException: Error translating node 'Map
 "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED]
 ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could
 not write the user code wrapper class
 org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
 java.io.NotSerializableException:
 com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
 at
 

Re: Using Flink watermarks and a large window state for scheduling

2016-06-09 Thread Josh
Ok, thanks Aljoscha.

As an alternative to using Flink to maintain the schedule state, I could
take the (e, t2) stream and write to a external key-value store with a
bucket for each minute. Then have a separate service which polls the
key-value store every minute and retrieves the current bucket, and does the
final transformation.

I just thought there might be a nicer way to do it using Flink!

On Thu, Jun 9, 2016 at 2:23 PM, Aljoscha Krettek 
wrote:

> Hi Josh,
> I'll have to think a bit about that one. Once I have something I'll get
> back to you.
>
> Best,
> Aljoscha
>
> On Wed, 8 Jun 2016 at 21:47 Josh  wrote:
>
>> This is just a question about a potential use case for Flink:
>>
>> I have a Flink job which receives tuples with an event id and a timestamp
>> (e, t) and maps them into a stream (e, t2) where t2 is a future timestamp
>> (up to 1 year in the future, which indicates when to schedule a
>> transformation of e). I then want to key by e and keep track of the max t2
>> for each e. Now the tricky bit: I want to periodically, say every minute
>> (in event time world) take all (e, t2) where t2 occurred in the last
>> minute, do a transformation and emit the result. It is important that the
>> final transformation happens after t2 (preferably as soon as possible, but
>> a delay of minutes is fine).
>>
>> Is it possible to use Flink's windowing and watermark mechanics to
>> achieve this? I want to maintain a large state for the (e, t2) window, e.g.
>> over a year (probably too large to fit in memory). And somehow use
>> watermarks to execute the scheduled transformations.
>>
>> If anyone has any views on how this could be done, (or whether it's even
>> possible/a good idea to do) with Flink then it would be great to hear!
>>
>> Thanks,
>>
>> Josh
>>
>


Join two streams using a count-based window

2016-06-09 Thread Nikos R. Katsipoulakis
Hello all,

At first, I have a question posted on
http://stackoverflow.com/questions/37732978/join-two-streams-using-a-count-based-window
. I am re-posting this on the mailing list in case some of you are not on
SO.

In addition, I would like to know what is the difference between Flink and
other Streaming engines on data-granularity transport and processing. To be
more precise, I am aware that Storm sends tuples using Netty (by filling up
queues) and a Bolt's logic is executed per tuple. Spark, employs
micro-batches to simulate streaming and (I am not entirely certain) each
task performs processing on a micro-batch. What about Flink? How are tuples
transferred and processed. Any explanation and or article/blog-post/link is
more than welcome.

Thanks

-- 
Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh


Re: NotSerializableException

2016-06-09 Thread Tarandeep Singh
Thank you Aljoscha and Fabian for your replies.

@Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm
afraid this is a bug", I am assuming you are referring to Flink engine
itself.

@Fabian: thanks for the optimization tip.

This is how I have got it working (with a hack): In my dataset, the join
field/key can be null otherwise .where(fieldName) works and I don't get
not-serializable exception. So I applied a MapFunction to DataSet and put a
dummy value in the join field/key where it was null. Then In the join
function, I change it back to null.

Best,
Tarandeep

On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek 
wrote:

> Hi,
> the problem is that the KeySelector is an anonymous inner class and as
> such as a reference to the outer RecordFilterer object. Normally, this
> would be rectified by the closure cleaner but the cleaner is not used in
> CoGroup.where(). I'm afraid this is a bug.
>
> Best,
> Aljoscha
>
>
> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske  wrote:
>
>> Hi Tarandeep,
>>
>> the exception suggests that Flink tries to serialize RecordsFilterer as a
>> user function (this happens via Java Serialization).
>> I said suggests because the code that uses RecordsFilterer is not
>> included.
>>
>> To me it looks like RecordsFilterer should not be used as a user
>> function. It is a helper class to construct a DataSet program, so it should
>> not be shipped for execution.
>> You would use such a class as follows:
>>
>> DataSet records = ...
>> DataSet filterIDs = ...
>>
>> RecordsFilterer rf = new RecordsFilterer();
>> DataSet> result = rf.addFilterFlag(records, filterIDs,
>> "myField");
>>
>> Regarding the join code, I would suggest an optimization.
>> Instead of using CoGroup, I would use distinct and an OuterJoin like this:
>>
>> DataSet distIds = filtereredIds.distinct();
>> DataSet result = records
>>   .leftOuterJoin(distIds)
>>   .where(KEYSELECTOR)
>>   .equalTo("*") // use full string as key
>>   .with(JOINFUNC) // set Bool to false if right == null, true otherwise
>>
>> Best, Fabian
>>
>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh :
>>
>>> Hi,
>>>
>>> I am getting NoSerializableException in this class-
>>>
>>> 
>>>
>>> public class RecordsFilterer {
>>>
>>> public DataSet> addFilterFlag(DataSet dataset, 
>>> DataSet filteredIds, String fieldName) {
>>> return dataset.coGroup(filteredIds)
>>> .where(new KeySelector() {
>>> @Override
>>> public String getKey(T t) throws Exception {
>>> String s = (String) t.get(fieldName);
>>> return s != null ? s : UUID.randomUUID().toString();
>>> }
>>> })
>>> .equalTo((KeySelector) s -> s)
>>> .with(new CoGroupFunction>() {
>>> @Override
>>> public void coGroup(Iterable records, 
>>> Iterable ids,
>>> Collector> 
>>> collector) throws Exception {
>>> boolean filterFlag = false;
>>> for (String id : ids) {
>>> filterFlag = true;
>>> }
>>>
>>> for (T record : records) {
>>> collector.collect(new Tuple2<>(filterFlag, 
>>> record));
>>> }
>>> }
>>> });
>>>
>>> }
>>> }
>>>
>>>
>>> What I am trying to do is write a generic code that will join Avro
>>> records (of different types) with String records and there is a match add a
>>> filter flag. This way I can use the same code for different Avro record
>>> types. But I am getting this exception-
>>>
>>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>>> Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties
>>> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
>>> grouped=null, unique=null] ]]': Could not write the user code wrapper class
>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>>> java.io.NotSerializableException:
>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
>>> at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
>>> at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
>>> at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
>>> at
>>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>>> at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>> at
>>> 

Re: Maxby() and KeyBy() question

2016-06-09 Thread iñaki williams
Understood!

I have created a WindowStream and now it is working. Thanks !


El jueves, 9 de junio de 2016, Fabian Hueske  escribió:

> Hi,
>
> you are computing a running aggregate, i.e., you're getting one output
> record for each input record and the output record is the record with the
> largest value observed so far.
> If the record with the largest value is the first, the record is sent out
> another time. This is what happened with Match3 in your example.
>
> There are two ways to compute aggregates on streams: 1) a running
> aggregate as you just did, or 2) a windowed aggregate.
> For a windowed aggregate, you need to need to specify a window. The window
> can be time or count based.
> The following blog post should be a good introduction into Flink's window
> support [1].
>
> Best, Fabian
>
> [1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
>
> 2016-06-09 14:36 GMT+02:00 iñaki williams  >:
>
>> Hi again!
>>
>> I am working with two DataStreams, I want to get the maximun value from
>> each pair of them, for example:
>>
>> //Informacion (matchName, LocalOdd, AwayOdd)
>>
>> Informacion info1= new Informacion("Match1", 1.10, 3.22);
>> Informacion info2= new Informacion("Match2", 2.11, 1.10);
>> Informacion info3= new Informacion("Match3", 4.10, 1.05);
>>
>> Informacion info11= new Informacion("Match1", 1.80, 2.20);
>> Informacion info22= new Informacion("Match2", 3.10, 1.15);
>> Informacion info33= new Informacion("Match3", 2.12, 1.25);
>>
>>
>> DataStream src = see.fromElements(info1,info2,
>> info3);
>> DataStream src2 =
>> see.fromElements(info11,info22,info33);
>> DataStream src3= src.union(src2);
>>
>> DataStream maxLocal =
>> src3.keyBy("nombrePartido").maxBy("cuotaGanadorLocal");
>>
>> maxLocal.print();
>>
>>
>>
>> Let's suppose that those are tennis matches with their names and their
>> bet odds, and the name of the matches are the same on both streams, I mean
>> Match1=Match1 , Match2=Match2  (Image that match 1 name is "Rafa Nadal
>> - Roger Federer").
>>
>>
>> I want to get the maximun localOdd from matches with the same name, the
>> result of my code is:
>>
>>
>> 1> Informacion [matchName=Match2, localOdd=2.11, awayOdd=1.1]
>> 1> *Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
>> 1> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
>> 1>* Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
>> 4> Informacion [matchName=Match1, localOdd=1.1, awayOdd=3.22]
>> 4> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
>>
>> It seems like it is taking the biggest value from all the matches and not
>> by keyed matches
>>
>>
>> I am looking for this:
>>
>>
>> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
>> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
>> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]
>>
>>
>>
>> How can I get it?
>>
>>
>> Thanks in advanced
>>
>>
>>
>>
>>
>


Re: HBase reads and back pressure

2016-06-09 Thread Fabian Hueske
OK, this indicates that the operator following the source is a bottleneck.

If that's the WindowOperator, it makes sense to try the refactoring of the
WindowFunction.
Alternatively, you can try to run that operator with a higher parallelism.

2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
christophe.salperw...@gmail.com>:

> Hi Fabian,
>
> Thanks for the help, I will try that. The backpressure was on the source
> (HBase).
>
> Christophe
>
> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>
>> Hi Christophe,
>>
>> where does the backpressure appear? In front of the sink operator or
>> before the window operator?
>>
>> In any case, I think you can improve your WindowFunction if you convert
>> parts of it into a FoldFunction.
>> The FoldFunction would take care of the statistics computation and the
>> WindowFunction would only assemble the result record including extracting
>> the start time of the window.
>>
>> Then you could do:
>>
>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>> YourWindowFunction());
>>
>> This is more efficient because the FoldFunction is eagerly applied when
>> ever a new element is added to a window. Hence, the window does only hold a
>> single value (SummaryStatistics) instead of all element added to the
>> window. In contrast the WindowFunction is called when the window is finally
>> evaluated.
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
>> christophe.salperw...@gmail.com>:
>>
>>> Hi,
>>>
>>> I am writing a program to read timeseries from HBase and do some daily
>>> aggregations (Flink streaming). For now I am just computing some average so
>>> not very consuming but my HBase read get slower and slower (I have few
>>> billions of points to read). The back pressure is almost all the time close
>>> to 1.
>>>
>>> I use custom timestamp:
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> so I implemented a custom extractor based on:
>>> AscendingTimestampExtractor
>>>
>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>> still being written in HBase (I did a sink similar to the example - with a
>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>
>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>
>>> Do you have an idea why ?
>>>
>>> Here is a bit of code if needed:
>>> final WindowedStream ws = hbaseDS.keyBy(0)
>>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
>>> .keyBy(0)
>>> .timeWindow(Time.days(1));
>>>
>>> final SingleOutputStreamOperator puts = ws.apply(new
>>> WindowFunction() {
>>>
>>> @Override
>>> public void apply(final Tuple key, final TimeWindow window, final
>>> Iterable input,
>>> final Collector out) throws Exception {
>>>
>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>> for (final ANA ana : input) {
>>> summaryStatistics.addValue(ana.getValue());
>>> }
>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>> summaryStatistics);
>>> out.collect(put);
>>> }
>>> });
>>>
>>> And how I started Flink on YARN :
>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>
>>> Thanks for any feedback!
>>>
>>> Christophe
>>>
>>
>>
>


Re: FlinkKafkaProducer API

2016-06-09 Thread Fabian Hueske
Great, thank you!

2016-06-09 17:38 GMT+02:00 Elias Levy :

>
> On Thu, Jun 9, 2016 at 5:16 AM, Fabian Hueske  wrote:
>
>> thanks for your feedback. I think those are good observations and
>> suggestions to improve the Kafka producers.
>> The best place to discuss such improvements is the dev mailing list.
>>
>> Would like to repost your mail there or open JIRAs where the discussion
>> about these changes can continue?
>
>
> I opened FLINK-4050.  Since the JIRAs are posted to the dev list, I won't
> cross post.
>
> Cheers,
> Elias
>
>
>


Re: HBase reads and back pressure

2016-06-09 Thread Christophe Salperwyck
Hi Fabian,

Thanks for the help, I will try that. The backpressure was on the source
(HBase).

Christophe

2016-06-09 16:38 GMT+02:00 Fabian Hueske :

> Hi Christophe,
>
> where does the backpressure appear? In front of the sink operator or
> before the window operator?
>
> In any case, I think you can improve your WindowFunction if you convert
> parts of it into a FoldFunction.
> The FoldFunction would take care of the statistics computation and the
> WindowFunction would only assemble the result record including extracting
> the start time of the window.
>
> Then you could do:
>
> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
> YourWindowFunction());
>
> This is more efficient because the FoldFunction is eagerly applied when
> ever a new element is added to a window. Hence, the window does only hold a
> single value (SummaryStatistics) instead of all element added to the
> window. In contrast the WindowFunction is called when the window is finally
> evaluated.
>
> Hope this helps,
> Fabian
>
> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
> christophe.salperw...@gmail.com>:
>
>> Hi,
>>
>> I am writing a program to read timeseries from HBase and do some daily
>> aggregations (Flink streaming). For now I am just computing some average so
>> not very consuming but my HBase read get slower and slower (I have few
>> billions of points to read). The back pressure is almost all the time close
>> to 1.
>>
>> I use custom timestamp:
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> so I implemented a custom extractor based on:
>> AscendingTimestampExtractor
>>
>> At the beginning I have 5M reads/s and after 15 min I have just 1M read/s
>> then it get worse and worse. Even when I cancel the job, data are still
>> being written in HBase (I did a sink similar to the example - with a cache
>> of 100s of HBase Puts to be a bit more efficient).
>>
>> When I don't put a sink it seems to stay on 1M reads/s.
>>
>> Do you have an idea why ?
>>
>> Here is a bit of code if needed:
>> final WindowedStream ws = hbaseDS.keyBy(0)
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
>> .keyBy(0)
>> .timeWindow(Time.days(1));
>>
>> final SingleOutputStreamOperator puts = ws.apply(new
>> WindowFunction() {
>>
>> @Override
>> public void apply(final Tuple key, final TimeWindow window, final
>> Iterable input,
>> final Collector out) throws Exception {
>>
>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>> for (final ANA ana : input) {
>> summaryStatistics.addValue(ana.getValue());
>> }
>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>> summaryStatistics);
>> out.collect(put);
>> }
>> });
>>
>> And how I started Flink on YARN :
>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>> -Dtaskmanager.network.numberOfBuffers=4096
>>
>> Thanks for any feedback!
>>
>> Christophe
>>
>
>


Re: FlinkKafkaProducer API

2016-06-09 Thread Elias Levy
On Thu, Jun 9, 2016 at 5:16 AM, Fabian Hueske  wrote:

> thanks for your feedback. I think those are good observations and
> suggestions to improve the Kafka producers.
> The best place to discuss such improvements is the dev mailing list.
>
> Would like to repost your mail there or open JIRAs where the discussion
> about these changes can continue?


I opened FLINK-4050.  Since the JIRAs are posted to the dev list, I won't
cross post.

Cheers,
Elias


Re: Data Source Generator emits 4 instances of the same tuple

2016-06-09 Thread Fabian Hueske
We solved this problem yesterday at the Flink Hackathon.
The issue was that the source function was started with parallelism 4 and
each function read the whole file.

Cheers, Fabian

2016-06-06 16:53 GMT+02:00 Biplob Biswas :

> Hi,
>
> I tried streaming the source data 2 ways
>
> 1. Is a simple straight forward way of sending data without using the
> serving speed concept
> http://pastebin.com/cTv0Pk5U
>
>
> 2. The one where I use the TaxiRide source which is exactly similar except
> loading the data in the proper data structures.
> http://pastebin.com/NenvXShH
>
>
> I hope to get a solution out of it.
>
> Thanks and Regards
> Biplob Biswas
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-Source-Generator-emits-4-instances-of-the-same-tuple-tp7392p7405.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: How to maintain the state of a variable in a map transformation.

2016-06-09 Thread Fabian Hueske
Hi,

1) Yes, that is correct. If you set the parallelism of an operator to 1 it
is only executed on a single node. It depends on your application, if you
need a global state or whether multiple local states are OK.
2) Flink programs follow the concept a data flow. There is no communication
between parallel instances of a task, i.e., all four tasks of a MapOperator
with parallelism 4 cannot talk to each other. You might want to take a look
at Flink's iteration operators. With these you can feed data back into a
previous operator [1].
4) Yes, that should work.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html

2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar 
:

> Hi Fabian, Thank you for your answers,
>
> 1) If there is only single instance of that function, then it will defeat
> the purpose of distributed correct me if I am wrong, so If I run
> parallelism with 1 on cluster does that mean it will execute on only one
> node?
>
> 2) I mean to say, when a map operator returns a variable, is there any
> other function which takes that updated variable and returns that to all
> instances of map?
>
> 3) Question Cleared.
>
> 4) My question was can I use same ExecutionEnvironment for all flink
> programs in a module.
>
> 5) Question Cleared.
>
>
> Regards
> Ravikumar
>
>
>
> On 9 June 2016 at 17:58, Fabian Hueske  wrote:
>
>> Hi Ravikumar,
>>
>> I'll try to answer your questions:
>> 1) If you set the parallelism of a map function to 1, there will be only
>> a single instance of that function regardless whether it is execution
>> locally or remotely in a cluster.
>> 2) Flink does also support aggregations, (reduce, groupReduce, combine,
>> ...). However, I do not see how this would help with a stateful map
>> function.
>> 3) In Flink DataSet programs you usually construct the complete program
>> and call execute() after you have defined your sinks. There are two
>> exceptions: print() and collect() which both add special sinks and
>> immediately execute your program. print() prints the result to the stdout
>> of the submitting client and collect() fetches a dataset as collection.
>> 4) I am not sure I understood your question. When you obtain an
>> ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment()
>> the type of the returned environment depends on the context in which the
>> program was executed. It can be a local environment if it is executed from
>> within an IDE or a RemodeExecutionEnvironment if the program is executed
>> via the CLI client and shipped to a remote cluster.
>> 5) A map operator processes records one after the other, i.e., as a
>> sequence. If you need a certain order, you can call DataSet.sortPartition()
>> to locally sort the partition.
>>
>> Hope that helps,
>> Fabian
>>
>> 2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar <
>> ravikumar.hawal...@gmail.com>:
>>
>>> Hi Till, Thank you for your answer, I have couple of questions
>>>
>>> 1) Setting parallelism on a single map function in local is fine but on
>>> distributed will it work as local execution?
>>>
>>> 2) Is there any other way apart from setting parallelism? Like spark
>>> aggregate function?
>>>
>>> 3) Is it necessary that after transformations to call execute function?
>>> Or Execution starts as soon as it encounters a action (Similar to Spark)?
>>>
>>> 4) Can I create a global execution environment (Either local or
>>> distributed) for different Flink program in a module?
>>>
>>> 5) How to make the records come in sequence for a map or any other
>>> operator?
>>>
>>>
>>> Regards,
>>> Ravikumar
>>>
>>>
>>> On 8 June 2016 at 21:14, Till Rohrmann  wrote:
>>>
 Hi Ravikumar,

 Flink's operators are stateful. So you can simply create a variable in
 your mapper to keep the state around. But every mapper instance will have
 it's own state. This state is determined by the records which are sent to
 this mapper instance. If you need a global state, then you have to set the
 parallelism to 1.

 Cheers,
 Till

 On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <
 ravikumar.hawal...@gmail.com> wrote:

> Hello,
>
> I have an DataSet which is roughly a record in a
> DataSet Or a file.
>
> Now I am using map transformation on this DataSet to compute a
> variable (coefficients of linear regression parameters and data structure
> used is a double[]).
>
> Now the issue is that, per record the variable will get updated and I
> am struggling to maintain state of this variable for the next record.
>
> In simple, for first record the variable values will be 0.0, and after
> first record the variable will get updated and I have to pass this updated
> variable for the second record and so on for all records in DataSet.
>
> Any suggestions on how to maintain state of a variable?
>
>
> Regards,

Re: HBase reads and back pressure

2016-06-09 Thread Fabian Hueske
Hi Christophe,

where does the backpressure appear? In front of the sink operator or before
the window operator?

In any case, I think you can improve your WindowFunction if you convert
parts of it into a FoldFunction.
The FoldFunction would take care of the statistics computation and the
WindowFunction would only assemble the result record including extracting
the start time of the window.

Then you could do:

ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
YourWindowFunction());

This is more efficient because the FoldFunction is eagerly applied when
ever a new element is added to a window. Hence, the window does only hold a
single value (SummaryStatistics) instead of all element added to the
window. In contrast the WindowFunction is called when the window is finally
evaluated.

Hope this helps,
Fabian

2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
christophe.salperw...@gmail.com>:

> Hi,
>
> I am writing a program to read timeseries from HBase and do some daily
> aggregations (Flink streaming). For now I am just computing some average so
> not very consuming but my HBase read get slower and slower (I have few
> billions of points to read). The back pressure is almost all the time close
> to 1.
>
> I use custom timestamp:
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> so I implemented a custom extractor based on:
> AscendingTimestampExtractor
>
> At the beginning I have 5M reads/s and after 15 min I have just 1M read/s
> then it get worse and worse. Even when I cancel the job, data are still
> being written in HBase (I did a sink similar to the example - with a cache
> of 100s of HBase Puts to be a bit more efficient).
>
> When I don't put a sink it seems to stay on 1M reads/s.
>
> Do you have an idea why ?
>
> Here is a bit of code if needed:
> final WindowedStream ws = hbaseDS.keyBy(0)
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
> .keyBy(0)
> .timeWindow(Time.days(1));
>
> final SingleOutputStreamOperator puts = ws.apply(new
> WindowFunction() {
>
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable input,
> final Collector out) throws Exception {
>
> final SummaryStatistics summaryStatistics = new SummaryStatistics();
> for (final ANA ana : input) {
> summaryStatistics.addValue(ana.getValue());
> }
> final Put put = buildPut((String) key.getField(0), window.getStart(),
> summaryStatistics);
> out.collect(put);
> }
> });
>
> And how I started Flink on YARN :
> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> -Dtaskmanager.network.numberOfBuffers=4096
>
> Thanks for any feedback!
>
> Christophe
>


Re: Maxby() and KeyBy() question

2016-06-09 Thread Fabian Hueske
Hi,

you are computing a running aggregate, i.e., you're getting one output
record for each input record and the output record is the record with the
largest value observed so far.
If the record with the largest value is the first, the record is sent out
another time. This is what happened with Match3 in your example.

There are two ways to compute aggregates on streams: 1) a running aggregate
as you just did, or 2) a windowed aggregate.
For a windowed aggregate, you need to need to specify a window. The window
can be time or count based.
The following blog post should be a good introduction into Flink's window
support [1].

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html

2016-06-09 14:36 GMT+02:00 iñaki williams :

> Hi again!
>
> I am working with two DataStreams, I want to get the maximun value from
> each pair of them, for example:
>
> //Informacion (matchName, LocalOdd, AwayOdd)
>
> Informacion info1= new Informacion("Match1", 1.10, 3.22);
> Informacion info2= new Informacion("Match2", 2.11, 1.10);
> Informacion info3= new Informacion("Match3", 4.10, 1.05);
>
> Informacion info11= new Informacion("Match1", 1.80, 2.20);
> Informacion info22= new Informacion("Match2", 3.10, 1.15);
> Informacion info33= new Informacion("Match3", 2.12, 1.25);
>
>
> DataStream src = see.fromElements(info1,info2, info3);
> DataStream src2 =
> see.fromElements(info11,info22,info33);
> DataStream src3= src.union(src2);
>
> DataStream maxLocal =
> src3.keyBy("nombrePartido").maxBy("cuotaGanadorLocal");
>
> maxLocal.print();
>
>
>
> Let's suppose that those are tennis matches with their names and their bet
> odds, and the name of the matches are the same on both streams, I mean
> Match1=Match1 , Match2=Match2  (Image that match 1 name is "Rafa Nadal
> - Roger Federer").
>
>
> I want to get the maximun localOdd from matches with the same name, the
> result of my code is:
>
>
> 1> Informacion [matchName=Match2, localOdd=2.11, awayOdd=1.1]
> 1> *Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
> 1> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
> 1>* Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
> 4> Informacion [matchName=Match1, localOdd=1.1, awayOdd=3.22]
> 4> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
>
> It seems like it is taking the biggest value from all the matches and not
> by keyed matches
>
>
> I am looking for this:
>
>
> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]
>
>
>
> How can I get it?
>
>
> Thanks in advanced
>
>
>
>
>


Re: NotSerializableException

2016-06-09 Thread Aljoscha Krettek
Hi,
the problem is that the KeySelector is an anonymous inner class and as such
as a reference to the outer RecordFilterer object. Normally, this would be
rectified by the closure cleaner but the cleaner is not used in
CoGroup.where(). I'm afraid this is a bug.

Best,
Aljoscha


On Thu, 9 Jun 2016 at 14:06 Fabian Hueske  wrote:

> Hi Tarandeep,
>
> the exception suggests that Flink tries to serialize RecordsFilterer as a
> user function (this happens via Java Serialization).
> I said suggests because the code that uses RecordsFilterer is not included.
>
> To me it looks like RecordsFilterer should not be used as a user function.
> It is a helper class to construct a DataSet program, so it should not be
> shipped for execution.
> You would use such a class as follows:
>
> DataSet records = ...
> DataSet filterIDs = ...
>
> RecordsFilterer rf = new RecordsFilterer();
> DataSet> result = rf.addFilterFlag(records, filterIDs,
> "myField");
>
> Regarding the join code, I would suggest an optimization.
> Instead of using CoGroup, I would use distinct and an OuterJoin like this:
>
> DataSet distIds = filtereredIds.distinct();
> DataSet result = records
>   .leftOuterJoin(distIds)
>   .where(KEYSELECTOR)
>   .equalTo("*") // use full string as key
>   .with(JOINFUNC) // set Bool to false if right == null, true otherwise
>
> Best, Fabian
>
> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh :
>
>> Hi,
>>
>> I am getting NoSerializableException in this class-
>>
>> 
>>
>> public class RecordsFilterer {
>>
>> public DataSet> addFilterFlag(DataSet dataset, 
>> DataSet filteredIds, String fieldName) {
>> return dataset.coGroup(filteredIds)
>> .where(new KeySelector() {
>> @Override
>> public String getKey(T t) throws Exception {
>> String s = (String) t.get(fieldName);
>> return s != null ? s : UUID.randomUUID().toString();
>> }
>> })
>> .equalTo((KeySelector) s -> s)
>> .with(new CoGroupFunction>() {
>> @Override
>> public void coGroup(Iterable records, 
>> Iterable ids,
>> Collector> 
>> collector) throws Exception {
>> boolean filterFlag = false;
>> for (String id : ids) {
>> filterFlag = true;
>> }
>>
>> for (T record : records) {
>> collector.collect(new Tuple2<>(filterFlag, 
>> record));
>> }
>> }
>> });
>>
>> }
>> }
>>
>>
>> What I am trying to do is write a generic code that will join Avro
>> records (of different types) with String records and there is a match add a
>> filter flag. This way I can use the same code for different Avro record
>> types. But I am getting this exception-
>>
>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>> Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties
>> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
>> grouped=null, unique=null] ]]': Could not write the user code wrapper class
>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
>> java.io.NotSerializableException:
>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
>> at
>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
>> at
>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
>> at
>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
>> at
>> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>> at
>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>> at
>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>> at
>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
>> at
>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
>> at
>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
>> at
>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>> at
>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
>> at
>> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)

Re: Strange behavior of DataStream.countWindow

2016-06-09 Thread Fabian Hueske
Hi Yukun,

the problem is that the KeySelector is internally invoked multiple times.
Hence it must be deterministic, i.e., it must extract the same key for the
same object if invoked multiple times.
The documentation is not discussing this aspect and should be extended.

Thanks for pointing out this issue.

Cheers,
Fabian


2016-06-09 13:19 GMT+02:00 Yukun Guo :

> I’m playing with the (Window)WordCount example from Flink QuickStart. I
> generate a DataStream consisting of 1000 Strings of random digits, which
> is windowed with a tumbling count window of 50 elements:
>
> import org.apache.flink.api.common.functions.FlatMapFunction;import 
> org.apache.flink.api.java.functions.KeySelector;import 
> org.apache.flink.api.java.tuple.Tuple2;import 
> org.apache.flink.streaming.api.datastream.DataStream;import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
> org.apache.flink.util.Collector;
> import java.util.Random;
> public class DigitCount {
>
>
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> DataStream text = env.fromElements(
> "14159265358979323846264338327950288419716939937510",
> "58209749445923078164062862089986280348253421170679",
> "82148086513282306647093844609550582231725359408128",
> "48111745028410270193852110555964462294895493038196",
> "44288109756659334461284756482337867831652712019091",
> "45648566923460348610454326648213393607260249141273",
> "72458700660631558817488152092096282925409171536436",
> "78925903600113305305488204665213841469519415116094",
> "33057270365759591953092186117381932611793105118548",
> "07446237996274956735188575272489122793818301194912",
> "98336733624406566430860213949463952247371907021798",
> "60943702770539217176293176752384674818467669405132",
> "00056812714526356082778577134275778960917363717872",
> "14684409012249534301465495853710507922796892589235",
> "42019956112129021960864034418159813629774771309960",
> "5187072113499837297804995105973173281609631859",
> "50244594553469083026425223082533446850352619311881",
> "71010003137838752886587533208381420617177669147303",
> "59825349042875546873115956286388235378759375195778",
> "18577805321712268066130019278766111959092164201989"
> );
>
> DataStream> digitCount = text
> .flatMap(new Splitter())
> .keyBy(new KeySelector, Integer>() {
> @Override
> public Integer getKey(Tuple2 x) throws 
> Exception {
> return x.f0 % 2;
> }
> })
> .countWindow(50)
> .sum(1);
>
> digitCount.print();
> env.execute();
>
> }
>
> public static final class Splitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String value, Collector> 
> out) {
> for (String token : value.split("")) {
> if (token.length() == 0) {
> continue;
> }
> out.collect(Tuple2.of(Integer.parseInt(token), 1));
> }
> }
> }
> }
>
> The code above will produce 19 lines of output which is reasonable as the
> 1000 digits will be keyed into 2 partitions where one partition contains
> 500+ elements and the other contains slightly fewer than 500 elements,
> therefore as a result one 50-digit window is ignored.
>
> So far so good, but if I replace the mod KeySelector with a random one:
>
> private static class RandomKeySelector implements KeySelector {
> private int nPartitions;
> private Random random;
>
> RandomKeySelector(int nPartitions) {
> this.nPartitions = nPartitions;
> random = new Random();
> }
>
> @Override
> public Integer getKey(T dummy) throws Exception {
> return random.nextInt(this.nPartitions);
> }
> }
>
> and then
>
> .keyBy(new RandomKeySelector>(2))
>
> it may generate 17 or 18 lines of output. How could that happen? Moreover,
> if I set the number of partitions to 10, in theory the lines of output
> should be no fewer than 11, but actually it can be only 9.
>
> Please help me understand why countWindow behaves like this.
>


Re: Hourly top-k statistics of DataStream

2016-06-09 Thread Philippe Caparroy
You should have a look at this project : https://github.com/addthis/stream-lib

You can use it within Flink, storing intermediate values in a local state.





> Le 9 juin 2016 à 15:29, Yukun Guo  a écrit :
> 
> Thank you very much for the detailed answer. Now I understand a DataStream 
> can be repartitioned or “joined” (don’t know the exact terminology) with 
> keyBy.
> 
> But another question: 
> Despite the non-existence of incremental top-k algorithm, I’d like to 
> incrementally compute the local word count during one hour, probably using a 
> TreeMap for counting. As soon as the hour finishes, the TreeMap is converted 
> to a stream of Tuple2 and forwarded to the remaining computation thereafter. 
> I’m concerned about the memory usage: the TreeMap and the Tuple2 collection 
> hold a huge amount of items, do I have to do some custom memory management?
> 
> I’m also not sure whether a TreeMap is suitable here. This StackOverflow 
> question presents a similar approach: 
> http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data
>  
> ,
>  but the suggested solution seems rather complicated.
> 
> 
> On 8 June 2016 at 08:04, Jamie Grier  > wrote:
> Suggestions in-line below...
> 
> On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo  > wrote:
> Hi,
> 
> I'm working on a project which uses Flink to compute hourly log statistics
> like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and packed
> into a DataStream.
> 
> The problem is, I find the computation quite challenging to express with
> Flink's DataStream API:
> 
> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that the
> data volume is really high, e.g., billions of logs might be generated in one
> hour, will the window grow too large and can't be handled efficiently?
> 
> In the general case you can use:
> 
> stream
> .timeWindow(...)
> .apply(reduceFunction, windowFunction)
> 
> which can take a ReduceFunction and a WindowFunction.  The ReduceFunction is 
> used to reduce the state on the fly and thereby keep the total state size 
> low.  This can commonly be used in analytics applications to reduce the state 
> size that you're accumulating for each window.  In the specific case of TopK, 
> however, you cannot do this if you want an exact result.  To get an exact 
> result I believe you have to actually keep around all of the data and then 
> calculate TopK at the end in your WindowFunction.  If you are able to use 
> approximate algorithms for your use case than you can calculate a 
> probabilistic incremental TopK based on some sort of sketch-based algorithm.
> 
> 2. We have to create a `KeyedStream` before applying `timeWindow`. However,
> the distribution of some keys are skewed hence using them may compromise
> the performance due to unbalanced partition loads. (What I want is just
> rebalance the stream across all partitions.)
> 
> A good and simple way to approach this may be to come up with a composite key 
> for your data that *is* uniformly distributed.  You can imagine something 
> simple like 'natural_key:random_number'.  Then keyBy(natural_key) and 
> reduce() again.  For example:
> 
> stream
> .keyBy(key, rand())  // partition by composite key that is 
> uniformly distributed
> .timeWindow(1 hour)
> .reduce() // pre-aggregation
> .keyBy(key)// repartition
> .timeWindow(1 hour)
> .reduce() // final aggregation
>  
> 
> 3. The top-K algorithm can be straightforwardly implemented with `DataSet`'s
> `mapPartition` and `reduceGroup` API as in
> [FLINK-2549](https://github.com/apache/flink/pull/1161/ 
> ), but not so easy if
> taking the DataStream approach, even with the stateful operators. I still
> cannot figure out how to reunion streams once they are partitioned.
> 
> I'm not sure I know what you're trying to do here.  What do you mean by 
> re-union?
>  
> 4. Is it possible to convert a DataStream into a DataSet? If yes, how can I
> make Flink analyze the data incrementally rather than aggregating the logs for
> one hour before starting to process?
> 
> There is no direct way to turn a DataStream into a DataSet.  I addressed the 
> point about doing the computation incrementally above, though.  You do this 
> with a ReduceFunction.  But again, there doesn't exist an exact incremental 
> TopK algorithm that I'm aware of.  This can be done with sketching, though.
> 
> 
> -- 
> 
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier 
> ja...@data-artisans.com 
> 
> 



Re: Hourly top-k statistics of DataStream

2016-06-09 Thread Christophe Salperwyck
Hi,

There are some implementations to do that with low memory footprint. Have a
look at the count min sketch for example. There are some Java
implementations.

Christophe

2016-06-09 15:29 GMT+02:00 Yukun Guo :

> Thank you very much for the detailed answer. Now I understand a DataStream
> can be repartitioned or “joined” (don’t know the exact terminology) with
> keyBy.
>
> But another question:
> Despite the non-existence of incremental top-k algorithm, I’d like to
> incrementally compute the local word count during one hour, probably using
> a TreeMap for counting. As soon as the hour finishes, the TreeMap is
> converted to a stream of Tuple2 and forwarded to the remaining computation
> thereafter. I’m concerned about the memory usage: the TreeMap and the
> Tuple2 collection hold a huge amount of items, do I have to do some custom
> memory management?
>
> I’m also not sure whether a TreeMap is suitable here. This StackOverflow
> question presents a similar approach:
> http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data,
> but the suggested solution seems rather complicated.
>
> On 8 June 2016 at 08:04, Jamie Grier  wrote:
>
>> Suggestions in-line below...
>>
>> On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo  wrote:
>>
>>> Hi,
>>>
>>> I'm working on a project which uses Flink to compute hourly log
>>> statistics
>>> like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and
>>> packed
>>> into a DataStream.
>>>
>>> The problem is, I find the computation quite challenging to express with
>>> Flink's DataStream API:
>>>
>>> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose
>>> that the
>>> data volume is really high, e.g., billions of logs might be generated in
>>> one
>>> hour, will the window grow too large and can't be handled efficiently?
>>>
>>
>> In the general case you can use:
>>
>> stream
>> .timeWindow(...)
>> .apply(reduceFunction, windowFunction)
>>
>> which can take a ReduceFunction and a WindowFunction.  The ReduceFunction
>> is used to reduce the state on the fly and thereby keep the total state
>> size low.  This can commonly be used in analytics applications to reduce
>> the state size that you're accumulating for each window.  In the specific
>> case of TopK, however, you cannot do this if you want an exact result.  To
>> get an exact result I believe you have to actually keep around all of the
>> data and then calculate TopK at the end in your WindowFunction.  If you are
>> able to use approximate algorithms for your use case than you can calculate
>> a probabilistic incremental TopK based on some sort of sketch-based
>> algorithm.
>>
>>>
>>> 2. We have to create a `KeyedStream` before applying `timeWindow`.
>>> However,
>>> the distribution of some keys are skewed hence using them may compromise
>>> the performance due to unbalanced partition loads. (What I want is just
>>> rebalance the stream across all partitions.)
>>>
>>
>> A good and simple way to approach this may be to come up with a composite
>> key for your data that *is* uniformly distributed.  You can imagine
>> something simple like 'natural_key:random_number'.  Then keyBy(natural_key)
>> and reduce() again.  For example:
>>
>> stream
>> .keyBy(key, rand())  // partition by composite key that is
>> uniformly distributed
>> .timeWindow(1 hour)
>> .reduce() // pre-aggregation
>> .keyBy(key)// repartition
>> .timeWindow(1 hour)
>> .reduce() // final aggregation
>>
>>
>>>
>>> 3. The top-K algorithm can be straightforwardly implemented with
>>> `DataSet`'s
>>> `mapPartition` and `reduceGroup` API as in
>>> [FLINK-2549](https://github.com/apache/flink/pull/1161/), but not so
>>> easy if
>>> taking the DataStream approach, even with the stateful operators. I still
>>> cannot figure out how to reunion streams once they are partitioned.
>>>
>>> I'm not sure I know what you're trying to do here.  What do you mean
>> by re-union?
>>
>>
>>> 4. Is it possible to convert a DataStream into a DataSet? If yes, how
>>> can I
>>> make Flink analyze the data incrementally rather than aggregating the
>>> logs for
>>> one hour before starting to process?
>>>
>>> There is no direct way to turn a DataStream into a DataSet.  I addressed
>> the point about doing the computation incrementally above, though.  You do
>> this with a ReduceFunction.  But again, there doesn't exist an exact
>> incremental TopK algorithm that I'm aware of.  This can be done with
>> sketching, though.
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier 
>> ja...@data-artisans.com
>>
>>
>


Re: Using Flink watermarks and a large window state for scheduling

2016-06-09 Thread Aljoscha Krettek
Hi Josh,
I'll have to think a bit about that one. Once I have something I'll get
back to you.

Best,
Aljoscha

On Wed, 8 Jun 2016 at 21:47 Josh  wrote:

> This is just a question about a potential use case for Flink:
>
> I have a Flink job which receives tuples with an event id and a timestamp
> (e, t) and maps them into a stream (e, t2) where t2 is a future timestamp
> (up to 1 year in the future, which indicates when to schedule a
> transformation of e). I then want to key by e and keep track of the max t2
> for each e. Now the tricky bit: I want to periodically, say every minute
> (in event time world) take all (e, t2) where t2 occurred in the last
> minute, do a transformation and emit the result. It is important that the
> final transformation happens after t2 (preferably as soon as possible, but
> a delay of minutes is fine).
>
> Is it possible to use Flink's windowing and watermark mechanics to achieve
> this? I want to maintain a large state for the (e, t2) window, e.g. over a
> year (probably too large to fit in memory). And somehow use watermarks to
> execute the scheduled transformations.
>
> If anyone has any views on how this could be done, (or whether it's even
> possible/a good idea to do) with Flink then it would be great to hear!
>
> Thanks,
>
> Josh
>


Re: How to maintain the state of a variable in a map transformation.

2016-06-09 Thread Ravikumar Hawaldar
Hi Fabian, Thank you for your answers,

1) If there is only single instance of that function, then it will defeat
the purpose of distributed correct me if I am wrong, so If I run
parallelism with 1 on cluster does that mean it will execute on only one
node?

2) I mean to say, when a map operator returns a variable, is there any
other function which takes that updated variable and returns that to all
instances of map?

3) Question Cleared.

4) My question was can I use same ExecutionEnvironment for all flink
programs in a module.

5) Question Cleared.


Regards
Ravikumar



On 9 June 2016 at 17:58, Fabian Hueske  wrote:

> Hi Ravikumar,
>
> I'll try to answer your questions:
> 1) If you set the parallelism of a map function to 1, there will be only a
> single instance of that function regardless whether it is execution locally
> or remotely in a cluster.
> 2) Flink does also support aggregations, (reduce, groupReduce, combine,
> ...). However, I do not see how this would help with a stateful map
> function.
> 3) In Flink DataSet programs you usually construct the complete program
> and call execute() after you have defined your sinks. There are two
> exceptions: print() and collect() which both add special sinks and
> immediately execute your program. print() prints the result to the stdout
> of the submitting client and collect() fetches a dataset as collection.
> 4) I am not sure I understood your question. When you obtain an
> ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment()
> the type of the returned environment depends on the context in which the
> program was executed. It can be a local environment if it is executed from
> within an IDE or a RemodeExecutionEnvironment if the program is executed
> via the CLI client and shipped to a remote cluster.
> 5) A map operator processes records one after the other, i.e., as a
> sequence. If you need a certain order, you can call DataSet.sortPartition()
> to locally sort the partition.
>
> Hope that helps,
> Fabian
>
> 2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar <
> ravikumar.hawal...@gmail.com>:
>
>> Hi Till, Thank you for your answer, I have couple of questions
>>
>> 1) Setting parallelism on a single map function in local is fine but on
>> distributed will it work as local execution?
>>
>> 2) Is there any other way apart from setting parallelism? Like spark
>> aggregate function?
>>
>> 3) Is it necessary that after transformations to call execute function?
>> Or Execution starts as soon as it encounters a action (Similar to Spark)?
>>
>> 4) Can I create a global execution environment (Either local or
>> distributed) for different Flink program in a module?
>>
>> 5) How to make the records come in sequence for a map or any other
>> operator?
>>
>>
>> Regards,
>> Ravikumar
>>
>>
>> On 8 June 2016 at 21:14, Till Rohrmann  wrote:
>>
>>> Hi Ravikumar,
>>>
>>> Flink's operators are stateful. So you can simply create a variable in
>>> your mapper to keep the state around. But every mapper instance will have
>>> it's own state. This state is determined by the records which are sent to
>>> this mapper instance. If you need a global state, then you have to set the
>>> parallelism to 1.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <
>>> ravikumar.hawal...@gmail.com> wrote:
>>>
 Hello,

 I have an DataSet which is roughly a record in a
 DataSet Or a file.

 Now I am using map transformation on this DataSet to compute a variable
 (coefficients of linear regression parameters and data structure used is a
 double[]).

 Now the issue is that, per record the variable will get updated and I
 am struggling to maintain state of this variable for the next record.

 In simple, for first record the variable values will be 0.0, and after
 first record the variable will get updated and I have to pass this updated
 variable for the second record and so on for all records in DataSet.

 Any suggestions on how to maintain state of a variable?


 Regards,
 Ravikumar

>>>
>>>
>>
>


HBase reads and back pressure

2016-06-09 Thread Christophe Salperwyck
Hi,

I am writing a program to read timeseries from HBase and do some daily
aggregations (Flink streaming). For now I am just computing some average so
not very consuming but my HBase read get slower and slower (I have few
billions of points to read). The back pressure is almost all the time close
to 1.

I use custom timestamp:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

so I implemented a custom extractor based on:
AscendingTimestampExtractor

At the beginning I have 5M reads/s and after 15 min I have just 1M read/s
then it get worse and worse. Even when I cancel the job, data are still
being written in HBase (I did a sink similar to the example - with a cache
of 100s of HBase Puts to be a bit more efficient).

When I don't put a sink it seems to stay on 1M reads/s.

Do you have an idea why ?

Here is a bit of code if needed:
final WindowedStream ws = hbaseDS.keyBy(0)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
.keyBy(0)
.timeWindow(Time.days(1));

final SingleOutputStreamOperator puts = ws.apply(new
WindowFunction() {

@Override
public void apply(final Tuple key, final TimeWindow window, final
Iterable input,
final Collector out) throws Exception {

final SummaryStatistics summaryStatistics = new SummaryStatistics();
for (final ANA ana : input) {
summaryStatistics.addValue(ana.getValue());
}
final Put put = buildPut((String) key.getField(0), window.getStart(),
summaryStatistics);
out.collect(put);
}
});

And how I started Flink on YARN :
flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
-Dtaskmanager.network.numberOfBuffers=4096

Thanks for any feedback!

Christophe


Re: Flink Dashboard

2016-06-09 Thread leon_mclare
Hi Till,

thanks for the clarification. It all makes sense now.

So the keyBy call is more a partitioning scheme and less of an operator, 
similar to Storm's field grouping, and Flink's other schemes such as forward 
and broadcast. The difference is that it produces KeyedStreams, which are a 
prerequisite for certain types of transformations.

Regards
Leon


8. Jun 2016 14:05 by trohrm...@apache.org:


> Hi Leon,
> yes, you're right. The plan visualization shows the actual tasks. Each task 
> can contain one or more (if chained) operators. A task is split into 
> sub-tasks which are executed on the TaskManagers. A TaskManager slot can 
> accommodate one subtask of each task (if the task has not been assigned to 
> a different slot sharing group). Thus (per default) the number of required 
> slots is defined by the operator with the highest parallelism.
> If you click on the different tasks, then you see in the list view at the 
> bottom of the page, where the individual sub-tasks have been deployed to.
> The keyBy API call is actually not realized as a distinct Flink operator at 
> runtime. Instead, the keyBy transformation influences how a downstream 
> operator is connected with the upstream operator (pointwise or all to all). 
> Furthermore, the keyBy transformation sets a special StreamPartitioner 
> (HashPartitioner) which is used by the StreamRecordWriters to select the 
> output channels (receiving sub tasks) for the current stream record. That 
> is the reason why you don't see the keyBy transformation in the plan 
> visualization. Consequently, the illustration on our website is not totally 
> consistent with the actual implementation.
>
> Cheers,> Till
> On Wed, Jun 8, 2016 at 11:01 AM,  <> leon_mcl...@tutanota.com> > wrote:
>
>>   >> I am using the dashboard to inspect my multi stage pipeline. 
>> I cannot seem to find a manual or other description for the dashboard 
>> aside from the quickstart section (>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/quickstart/setup_quickstart.html>>
>>  
>> ).
>>
>> I would like to know how approximately my physical layout (in terms of 
>> task slots, tasks and task chaining) looks like.  I am assuming that each 
>> block in the attached topology plan is an operation that will execute as 
>> multiple parallel tasks based on the assigned parallelism. Multilevel 
>> operator chaining has been applied in most cases. The plan then occupies 3 
>> task slots, as dictated by the highest DOP.  Is this correct?
>>
>> What i am also unsure about is that i have keyBy transformations in my 
>> code, yet they are not shown as transformations in the plan. The 
>> corresponding section on workers and slots (>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#workers-slots-resources>>
>>  
>> ) however explicitly shows keyBy in the layout.
>>
>> Regards
>> Leon
>>
>
> 

Maxby() and KeyBy() question

2016-06-09 Thread iñaki williams
Hi again!

I am working with two DataStreams, I want to get the maximun value from
each pair of them, for example:

//Informacion (matchName, LocalOdd, AwayOdd)

Informacion info1= new Informacion("Match1", 1.10, 3.22);
Informacion info2= new Informacion("Match2", 2.11, 1.10);
Informacion info3= new Informacion("Match3", 4.10, 1.05);

Informacion info11= new Informacion("Match1", 1.80, 2.20);
Informacion info22= new Informacion("Match2", 3.10, 1.15);
Informacion info33= new Informacion("Match3", 2.12, 1.25);


DataStream src = see.fromElements(info1,info2, info3);
DataStream src2 =
see.fromElements(info11,info22,info33);
DataStream src3= src.union(src2);

DataStream maxLocal =
src3.keyBy("nombrePartido").maxBy("cuotaGanadorLocal");

maxLocal.print();



Let's suppose that those are tennis matches with their names and their bet
odds, and the name of the matches are the same on both streams, I mean
Match1=Match1 , Match2=Match2  (Image that match 1 name is "Rafa Nadal
- Roger Federer").


I want to get the maximun localOdd from matches with the same name, the
result of my code is:


1> Informacion [matchName=Match2, localOdd=2.11, awayOdd=1.1]
1> *Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
1> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
1>* Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
4> Informacion [matchName=Match1, localOdd=1.1, awayOdd=3.22]
4> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]

It seems like it is taking the biggest value from all the matches and not
by keyed matches


I am looking for this:


Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]



How can I get it?


Thanks in advanced


Re: How to maintain the state of a variable in a map transformation.

2016-06-09 Thread Fabian Hueske
Hi Ravikumar,

I'll try to answer your questions:
1) If you set the parallelism of a map function to 1, there will be only a
single instance of that function regardless whether it is execution locally
or remotely in a cluster.
2) Flink does also support aggregations, (reduce, groupReduce, combine,
...). However, I do not see how this would help with a stateful map
function.
3) In Flink DataSet programs you usually construct the complete program and
call execute() after you have defined your sinks. There are two exceptions:
print() and collect() which both add special sinks and immediately execute
your program. print() prints the result to the stdout of the submitting
client and collect() fetches a dataset as collection.
4) I am not sure I understood your question. When you obtain an
ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment()
the type of the returned environment depends on the context in which the
program was executed. It can be a local environment if it is executed from
within an IDE or a RemodeExecutionEnvironment if the program is executed
via the CLI client and shipped to a remote cluster.
5) A map operator processes records one after the other, i.e., as a
sequence. If you need a certain order, you can call DataSet.sortPartition()
to locally sort the partition.

Hope that helps,
Fabian

2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar 
:

> Hi Till, Thank you for your answer, I have couple of questions
>
> 1) Setting parallelism on a single map function in local is fine but on
> distributed will it work as local execution?
>
> 2) Is there any other way apart from setting parallelism? Like spark
> aggregate function?
>
> 3) Is it necessary that after transformations to call execute function? Or
> Execution starts as soon as it encounters a action (Similar to Spark)?
>
> 4) Can I create a global execution environment (Either local or
> distributed) for different Flink program in a module?
>
> 5) How to make the records come in sequence for a map or any other
> operator?
>
>
> Regards,
> Ravikumar
>
>
> On 8 June 2016 at 21:14, Till Rohrmann  wrote:
>
>> Hi Ravikumar,
>>
>> Flink's operators are stateful. So you can simply create a variable in
>> your mapper to keep the state around. But every mapper instance will have
>> it's own state. This state is determined by the records which are sent to
>> this mapper instance. If you need a global state, then you have to set the
>> parallelism to 1.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <
>> ravikumar.hawal...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have an DataSet which is roughly a record in a
>>> DataSet Or a file.
>>>
>>> Now I am using map transformation on this DataSet to compute a variable
>>> (coefficients of linear regression parameters and data structure used is a
>>> double[]).
>>>
>>> Now the issue is that, per record the variable will get updated and I am
>>> struggling to maintain state of this variable for the next record.
>>>
>>> In simple, for first record the variable values will be 0.0, and after
>>> first record the variable will get updated and I have to pass this updated
>>> variable for the second record and so on for all records in DataSet.
>>>
>>> Any suggestions on how to maintain state of a variable?
>>>
>>>
>>> Regards,
>>> Ravikumar
>>>
>>
>>
>


Re: FlinkKafkaProducer API

2016-06-09 Thread Fabian Hueske
Hi Elias,

thanks for your feedback. I think those are good observations and
suggestions to improve the Kafka producers.
The best place to discuss such improvements is the dev mailing list.

Would like to repost your mail there or open JIRAs where the discussion
about these changes can continue?

Thanks, Fabian

2016-06-09 3:58 GMT+02:00 Elias Levy :

> The FlinkKafkaProducer API seems more difficult to use than it should be.
>
> The API requires you pass it a SerializationSchema or a
> KeyedSerializationSchema, but the Kafka producer already has a
> serialization API.  Requiring a serializer in the Flink API precludes the
> use of the Kafka serializers.  For instance, they preclude the use of the
> Confluent KafkaAvroSerializer class that makes use of the Confluent Schema
> Registry.  Ideally, the serializer would be optional, so as to allow the
> Kafka producer serializers to handle the task.
>
> In addition, the KeyedSerializationSchema conflates message key extraction
> with key serialization.  If the serializer were optional, to allow the
> Kafka producer serializers to take over, you'd still need to extract a key
> from the message.
>
> And given that the key may not be part of the message you want to write to
> Kafka, an upstream step may have to package the key with the message to
> make both available to the sink, for instance in a tuple. That means you
> also need to define a method to extract the message to write to Kafka from
> the element passed into the sink by Flink.
>
> In summary, there should be separation of extraction of the key and
> message from the element passed into the sink from serialization, and the
> serialization step should be optional.
>
>
>


Re: NotSerializableException

2016-06-09 Thread Fabian Hueske
Hi Tarandeep,

the exception suggests that Flink tries to serialize RecordsFilterer as a
user function (this happens via Java Serialization).
I said suggests because the code that uses RecordsFilterer is not included.

To me it looks like RecordsFilterer should not be used as a user function.
It is a helper class to construct a DataSet program, so it should not be
shipped for execution.
You would use such a class as follows:

DataSet records = ...
DataSet filterIDs = ...

RecordsFilterer rf = new RecordsFilterer();
DataSet> result = rf.addFilterFlag(records, filterIDs,
"myField");

Regarding the join code, I would suggest an optimization.
Instead of using CoGroup, I would use distinct and an OuterJoin like this:

DataSet distIds = filtereredIds.distinct();
DataSet result = records
  .leftOuterJoin(distIds)
  .where(KEYSELECTOR)
  .equalTo("*") // use full string as key
  .with(JOINFUNC) // set Bool to false if right == null, true otherwise

Best, Fabian

2016-06-09 2:28 GMT+02:00 Tarandeep Singh :

> Hi,
>
> I am getting NoSerializableException in this class-
>
> 
>
> public class RecordsFilterer {
>
> public DataSet> addFilterFlag(DataSet dataset, 
> DataSet filteredIds, String fieldName) {
> return dataset.coGroup(filteredIds)
> .where(new KeySelector() {
> @Override
> public String getKey(T t) throws Exception {
> String s = (String) t.get(fieldName);
> return s != null ? s : UUID.randomUUID().toString();
> }
> })
> .equalTo((KeySelector) s -> s)
> .with(new CoGroupFunction>() {
> @Override
> public void coGroup(Iterable records, Iterable 
> ids,
> Collector> 
> collector) throws Exception {
> boolean filterFlag = false;
> for (String id : ids) {
> filterFlag = true;
> }
>
> for (T record : records) {
> collector.collect(new Tuple2<>(filterFlag, 
> record));
> }
> }
> });
>
> }
> }
>
>
> What I am trying to do is write a generic code that will join Avro records
> (of different types) with String records and there is a match add a filter
> flag. This way I can use the same code for different Avro record types. But
> I am getting this exception-
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties
> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
> grouped=null, unique=null] ]]': Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException:
> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
> at
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
> at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
> at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> at
> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
> at
> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> 

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

2016-06-09 Thread Till Rohrmann
Hi Ahmed,

I tried setting up your use case and for me it all seems to work. However,
I didn't use the Spring framework and executed the program in a local Flink
cluster.

Maybe you can compile a self-containing example (including example data) to
reproduce your problem and send it to us.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:48 PM, Ahmed Nader  wrote:

> Hello Flavio,
> Thank you so much for replying, however I didn't download Flink locally, I
> only added dependencies in a maven project. So i don't think I'll be able
> to modify the KryoSerializer class. But yeah me too i think it's the
> problem.
> Thanks,
> Ahmed
>
> On 8 June 2016 at 16:07, Flavio Pompermaier  wrote:
>
>> Hi Ahmed,
>> I also have the same error that is probably caused by the KryoSerializer.
>> Right now I'm testing a patch to this problem so maybe you could also
>> test it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you
>> can use my KryoSerializer but I think so. Actually I just recreate Input
>> and Output every time in the serialized/deserialize and then I close them.
>>
>> This is my attempt to fix the problem (actually the KryoSerializer class
>> in the flink-core module):
>>
>>
>> /*
>>  * Licensed to the Apache Software Foundation (ASF) under one
>>  * or more contributor license agreements.  See the NOTICE file
>>  * distributed with this work for additional information
>>  * regarding copyright ownership.  The ASF licenses this file
>>  * to you under the Apache License, Version 2.0 (the
>>  * "License"); you may not use this file except in compliance
>>  * with the License.  You may obtain a copy of the License at
>>  *
>>  * http://www.apache.org/licenses/LICENSE-2.0
>>  *
>>  * Unless required by applicable law or agreed to in writing, software
>>  * distributed under the License is distributed on an "AS IS" BASIS,
>>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>>  * See the License for the specific language governing permissions and
>>  * limitations under the License.
>>  */
>>
>> package org.apache.flink.api.java.typeutils.runtime.kryo;
>>
>> import com.esotericsoftware.kryo.Kryo;
>> import com.esotericsoftware.kryo.KryoException;
>> import com.esotericsoftware.kryo.Serializer;
>> import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
>> import com.esotericsoftware.kryo.io.Input;
>> import com.esotericsoftware.kryo.io.Output;
>> import com.esotericsoftware.kryo.serializers.JavaSerializer;
>> import com.google.common.base.Preconditions;
>>
>> import org.apache.avro.generic.GenericData;
>> import org.apache.flink.api.common.ExecutionConfig;
>> import org.apache.flink.api.common.typeutils.TypeSerializer;
>> import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
>> import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
>> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
>> import
>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
>> import org.apache.flink.core.memory.DataInputView;
>> import org.apache.flink.core.memory.DataOutputView;
>> import org.objenesis.strategy.StdInstantiatorStrategy;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.io.ByteArrayInputStream;
>> import java.io.ByteArrayOutputStream;
>> import java.io.EOFException;
>> import java.io.IOException;
>> import java.lang.reflect.InvocationTargetException;
>> import java.lang.reflect.Method;
>> import java.lang.reflect.Modifier;
>> import java.util.LinkedHashMap;
>> import java.util.LinkedHashSet;
>> import java.util.Map;
>> import java.util.Objects;
>>
>> /**
>>  * A type serializer that serializes its type using the Kryo serialization
>>  * framework (https://github.com/EsotericSoftware/kryo).
>>  *
>>  * This serializer is intended as a fallback serializer for the cases
>> that are
>>  * not covered by the basic types, tuples, and POJOs.
>>  *
>>  * @param  The type to be serialized.
>>  */
>> public class KryoSerializer extends TypeSerializer {
>>
>> private static final long serialVersionUID = 3L;
>>
>> private static final Logger LOG =
>> LoggerFactory.getLogger(KryoSerializer.class);
>>
>> //
>> 
>>
>> private final LinkedHashMap> ExecutionConfig.SerializableSerializer> registeredTypesWithSerializers;
>> private final LinkedHashMap>
>> registeredTypesWithSerializerClasses;
>> private final LinkedHashMap> ExecutionConfig.SerializableSerializer> defaultSerializers;
>> private final LinkedHashMap>
>> defaultSerializerClasses;
>> private final LinkedHashSet registeredTypes;
>>
>> private final Class type;
>> //
>> 
>> // The fields below are lazily initialized after duplication or
>> 

Strange behavior of DataStream.countWindow

2016-06-09 Thread Yukun Guo
I’m playing with the (Window)WordCount example from Flink QuickStart. I
generate a DataStream consisting of 1000 Strings of random digits, which is
windowed with a tumbling count window of 50 elements:

import org.apache.flink.api.common.functions.FlatMapFunction;import
org.apache.flink.api.java.functions.KeySelector;import
org.apache.flink.api.java.tuple.Tuple2;import
org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.util.Collector;
import java.util.Random;
public class DigitCount {


public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream text = env.fromElements(
"14159265358979323846264338327950288419716939937510",
"58209749445923078164062862089986280348253421170679",
"82148086513282306647093844609550582231725359408128",
"48111745028410270193852110555964462294895493038196",
"44288109756659334461284756482337867831652712019091",
"45648566923460348610454326648213393607260249141273",
"72458700660631558817488152092096282925409171536436",
"78925903600113305305488204665213841469519415116094",
"33057270365759591953092186117381932611793105118548",
"07446237996274956735188575272489122793818301194912",
"98336733624406566430860213949463952247371907021798",
"60943702770539217176293176752384674818467669405132",
"00056812714526356082778577134275778960917363717872",
"14684409012249534301465495853710507922796892589235",
"42019956112129021960864034418159813629774771309960",
"5187072113499837297804995105973173281609631859",
"50244594553469083026425223082533446850352619311881",
"71010003137838752886587533208381420617177669147303",
"59825349042875546873115956286388235378759375195778",
"18577805321712268066130019278766111959092164201989"
);

DataStream> digitCount = text
.flatMap(new Splitter())
.keyBy(new KeySelector, Integer>() {
@Override
public Integer getKey(Tuple2 x)
throws Exception {
return x.f0 % 2;
}
})
.countWindow(50)
.sum(1);

digitCount.print();
env.execute();

}

public static final class Splitter implements
FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) {
for (String token : value.split("")) {
if (token.length() == 0) {
continue;
}
out.collect(Tuple2.of(Integer.parseInt(token), 1));
}
}
}
}

The code above will produce 19 lines of output which is reasonable as the
1000 digits will be keyed into 2 partitions where one partition contains
500+ elements and the other contains slightly fewer than 500 elements,
therefore as a result one 50-digit window is ignored.

So far so good, but if I replace the mod KeySelector with a random one:

private static class RandomKeySelector implements KeySelector {
private int nPartitions;
private Random random;

RandomKeySelector(int nPartitions) {
this.nPartitions = nPartitions;
random = new Random();
}

@Override
public Integer getKey(T dummy) throws Exception {
return random.nextInt(this.nPartitions);
}
}

and then

.keyBy(new RandomKeySelector>(2))

it may generate 17 or 18 lines of output. How could that happen? Moreover,
if I set the number of partitions to 10, in theory the lines of output
should be no fewer than 11, but actually it can be only 9.

Please help me understand why countWindow behaves like this.


Re: Extracting Timestamp in MapFunction

2016-06-09 Thread Biplob Biswas
Hi Aljoscha,

I went to the Flink hackathon by Buzzwords yesterday where Fabian and Robert
helped me with this issue. Apparently I was assuming that the file would be
handled in a single thread but I was using parallelsourcefunction and it was
creating 4 different threads and thus reading the same values 4 times.
I changed it to source function and then changed the parallelism of the map
operator to do what I wanted to do.

Thanks a lot for replying. :) 

Regards
Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7481.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How to maintain the state of a variable in a map transformation.

2016-06-09 Thread Ravikumar Hawaldar
Hi Till, Thank you for your answer, I have couple of questions

1) Setting parallelism on a single map function in local is fine but on
distributed will it work as local execution?

2) Is there any other way apart from setting parallelism? Like spark
aggregate function?

3) Is it necessary that after transformations to call execute function? Or
Execution starts as soon as it encounters a action (Similar to Spark)?

4) Can I create a global execution environment (Either local or
distributed) for different Flink program in a module?

5) How to make the records come in sequence for a map or any other operator?


Regards,
Ravikumar


On 8 June 2016 at 21:14, Till Rohrmann  wrote:

> Hi Ravikumar,
>
> Flink's operators are stateful. So you can simply create a variable in
> your mapper to keep the state around. But every mapper instance will have
> it's own state. This state is determined by the records which are sent to
> this mapper instance. If you need a global state, then you have to set the
> parallelism to 1.
>
> Cheers,
> Till
>
> On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <
> ravikumar.hawal...@gmail.com> wrote:
>
>> Hello,
>>
>> I have an DataSet which is roughly a record in a DataSet
>> Or a file.
>>
>> Now I am using map transformation on this DataSet to compute a variable
>> (coefficients of linear regression parameters and data structure used is a
>> double[]).
>>
>> Now the issue is that, per record the variable will get updated and I am
>> struggling to maintain state of this variable for the next record.
>>
>> In simple, for first record the variable values will be 0.0, and after
>> first record the variable will get updated and I have to pass this updated
>> variable for the second record and so on for all records in DataSet.
>>
>> Any suggestions on how to maintain state of a variable?
>>
>>
>> Regards,
>> Ravikumar
>>
>
>


Re: Scala case classes with a generic parameter

2016-06-09 Thread Aljoscha Krettek
Hi James,
the TypeInformation must be available at the call site, not in the case
class definition. In your WindowFunction you are using a TestGen[String] so
it should suffice to add this line at some point before the call to apply():

implicit val testGenType = createTypeInformation[TestGen[String]]

Hope that helps.

Best,
Aljoscha

On Wed, 1 Jun 2016 at 20:11 James Bucher  wrote:

> Hi,
>
> I have been trying to get a case class with a generic parameter working
> with Filnk 1.0.3 and have been having some trouble. However when I compile
> I get the following error:
> debug-type-bug/src/main/scala/com/example/flink/jobs/CaseClassWithGeneric.scala:40:
> error: could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[com.example.flink.jobs.CaseClassWithGeneric.TestGen[String]]
> [ERROR]   .apply(new AggregateOrigins)
>
> I am importing org.apache.flink.api.scala._ and the generic type is
> defined as [T: TypeInformation] as suggested here:
> https://ci.apache.org/projects/flink/flink-docs-master/internals/types_serialization.html
>
>
> The full code for the program is as follows:
>
> package com.example.flink.jobs
>
> import java.util.{Properties}
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.util.Collector
> import org.apache.flink.streaming.api.scala.function.WindowFunction
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer08}
> import org.apache.flink.api.scala._
>
> object CaseClassWithGeneric {
>   case class TestGen[T: TypeInformation](item: T) {}
>
>   class AggregateOrigins extends WindowFunction[String, TestGen[String], 
> String, TimeWindow] {
> def apply(key: String, win: TimeWindow, values: Iterable[String], col: 
> Collector[TestGen[String]]): Unit = {
>   values.foreach(x => { })
>   col.collect(new TestGen[String]("Foo"))
> }
>   }
>
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val properties = new Properties();
> val messageStream = env.addSource(
>   new FlinkKafkaConsumer08("topic", new SimpleStringSchema, properties))
>   .keyBy(s => s)
>   .timeWindow(Time.days(1))
>   .apply(new AggregateOrigins)
> messageStream.print()
> env.execute("Simple Job")
>   }
> }
>
> When I dug into the apply() function definition I found the following:
>
> def apply[R: TypeInformation](
> function: WindowFunction[T, R, K, W]): DataStream[R] = {
>
>   val cleanFunction = clean(function)
>   val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, 
> W](cleanFunction)
>   asScalaStream(javaStream.apply(applyFunction, 
> implicitly[TypeInformation[R]]))
> }
>
> As Far as I can tell TestGen[String] should correspond to [R: 
> TypeInformation] in apply. Am I missing something or is it not possible to 
> define case class with a generic parameter?
>
> Thanks,
>
> James Bucher
>
>


Re: Extracting Timestamp in MapFunction

2016-06-09 Thread Aljoscha Krettek
Hi,
could you try pulling the problem apart, i.e. determine at which point in
the pipeline you have duplicate data. Is it after the sources or in the
CoFlatMap or the Map after the reduce, for example?

Cheers,
Aljoscha

On Wed, 1 Jun 2016 at 17:11 Biplob Biswas  wrote:

> Hi,
>
> Before giving the method u described above a try, i tried adding the
> timestamp with my data directly at the stream source.
>
> Following is my stream source:
>
> http://pastebin.com/AsXiStMC
>
> and I am using the stream source as follows:
>
> DataStream tuples = env.addSource(new DataStreamGenerator(filePath,
> streamSpeed));
> ConnectedIterativeStreams
> inputsAndMicroCluster =
> tuples.iterate()
>
>
> .withFeedbackType(MicroCluster[].class);
> //mcStream.broadcast().global();
> DataStream updatedMicroCluster =
> inputsAndMicroCluster
>
>   .flatMap(new MyCoFlatmap(k,tw))
>
>   .keyBy(1)
>
>   .reduce(new ReduceMC(k))
>
>   .map(new ReturnMC());
>
>
> inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast());
>
> The problem is, when i execute this, all the 4 different partition gets the
> same data, I don't really understand how is the same data sent to all the 4
> partitions when it should 4 different data tuple to 4 different partitions.
>
> Can you maybe explain this behaviour?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7315.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: ClassCastException when redeploying Flink job on running cluster

2016-06-09 Thread Till Rohrmann
Great to hear :-)

On Wed, Jun 8, 2016 at 7:45 PM, Josh  wrote:

> Thanks Till, your suggestion worked!
>
> I actually just created a new SpecificData for each
> AvroDeserializationSchema instance, so I think it's still just as efficient.
>
> Josh
>
> On Wed, Jun 8, 2016 at 4:41 PM, Till Rohrmann 
> wrote:
>
>> The only thing I could think of is to not use the SpecificData singleton
>> but instead creating a new SpecificData object for each SpecificDatumReader
>> (you can pass it as a third argument to the constructor). This, of course,
>> is not really efficient. But you could try it out to see whether it solves
>> your problem.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 8, 2016 at 4:24 PM, Josh  wrote:
>>
>>> Sorry - I forgot to include my stack trace too. Here it is:
>>>
>>> The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>>> at
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536)
>>> at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85)
>>> at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.lang.Exception: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150)
>>> at
>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>> at
>>>