Re: Guava version conflict

2017-06-19 Thread Tzu-Li (Gordon) Tai
Thanks a lot! Please keep me updated with this :)


On 19 June 2017 at 6:33:15 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Ok, I'll let you know as soon as I recompile Flink 1.3.x.

Thanks,
Flavio

On Mon, Jun 19, 2017 at 7:26 AM, Tzu-Li (Gordon) Tai  
wrote:
Hi Flavio,

It’s most likely related to a problem with Maven.
I’m pretty sure this actually isn’t a problem anymore. Could you verify by 
rebuilding Flink and see if the problem remains? Thanks a lot.

Best,
Gordon


On 16 June 2017 at 6:25:10 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

That’s actually what I’m trying to figure out right now, what had changed since 
then ...


On 16 June 2017 at 12:23:57 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

I've never tested with flink 1.3.0, I have the problem with Flink 1.2.1. Didn't 
you say that you also had non-shaded guava dependencies in the flink dist jar 
some days ago?

On Fri, Jun 16, 2017 at 12:19 PM, Tzu-Li (Gordon) Tai  
wrote:
Hi Flavio,

I was just doing some end-to-end rebuild Flink + cluster execution with ES sink 
tests, and it seems like the Guava shading problem isn’t there anymore in the 
flink-dist jar.

On the `release-1.3` branch, built with Maven 3.0.5, the Guava dependencies in 
flink-dist are all properly shaded.
Tested with ES 2.3.5 (the default Elasticsearch 2 connector version) and ES 
2.4.1 (overwritten the ES 2 version and rebuilt the ES connector), all worked 
fine without the Guava conflict issue.

So I’m pretty sure that if the problem still exists for you, the conflict would 
have came from other dependencies in your code.

Cheers,
Gordon


On 15 June 2017 at 8:24:48 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Hi Gordon,
any news on this?

On Mon, Jun 12, 2017 at 9:54 AM, Tzu-Li (Gordon) Tai  
wrote:
This seems like a shading problem then.
I’ve tested this again with Maven 3.0.5, even without building against CDH 
Hadoop binaries the flink-dist jar contains non-shaded Guava dependencies.

Let me investigate a bit and get back to this!

Cheers,
Gordon


On 8 June 2017 at 2:47:02 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

On an empty machine (with Ubuntu 14.04.5 LTS) and an empty maven local repo I 
did:
git clone https://github.com/apache/flink.git && cd flink && git checkout 
tags/release-1.2.1
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
cd flink-dist
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
jar tf target/flink-1.2.1-bin/flink-1.2.1/lib/flink-dist_2.10-1.2.1.jar  | grep 
MoreExecutors
And I still see guava dependencies:

org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$2.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$3.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$4.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutor.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutorService.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$ListenableScheduledTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors.class
com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors.class

It seems 

Re: How can I get last successful checkpoint id in sink?

2017-06-19 Thread Tzu-Li (Gordon) Tai
Hi!

The last completed checkpoint ID should be obtainable using the monitoring REST 
API [1], under the url “/jobs/{jobID}/checkpoints/“.

It is also visible in the JobManager Web UI under the “checkpoints” tab of each 
job. The web UI fetches its information using the monitoring REST API, so 
anything available there should also be retrievable via the REST API.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html


On 18 June 2017 at 2:24:31 PM, yunfan123 (yunfanfight...@foxmail.com) wrote:

It seems flink not support this?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-can-I-get-last-successful-checkpoint-id-in-sink-tp13815.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Possible Data Corruption?

2017-06-19 Thread Ted Yu
See this thread:
http://search-hadoop.com/m/Flink/VkLeQm2nZm1Wa7Ny1?subj=Re+Painful+KryoException+java+lang+IndexOutOfBoundsException+on+Flink+Batch+Api+scala

which mentioned FLINK-6398
 fixed in 1.2.2 / 1.3

On Mon, Jun 19, 2017 at 5:53 PM, Philip Doctor 
wrote:

> Dear Flink Users,
>
> I have a Flink (v1.2.1) process I left running for the last five days.  It
> aggregates a bit of state and exposes it via Queryable State.  It ran
> correctly for the first 3 days.  There were no code changes or data
> changes, but suddenly Queryable State got weird.  The process logs the
> current value of the queryable state, and from the logs I discerned that
> the state was correctly being aggregated.  However they Queryable State
> that was returned was unable to be deserialized.  Rather than the list of
> longs I expect, instead I get 2 bytes (0x 57 02).  It seemed quite clear
> that the state in the Task Manager was not the state I was getting out of
> Queryable State.
>
>
>
> I next reasoned that my data was being check pointed and possibly I could
> restore.  So I restarted the process to recover from a check point.  At
> this point the process fails with the following error
>
>
>
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:293)
>
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:204)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:653)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:640)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:246)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0
>
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>
> at java.util.ArrayList.get(ArrayList.java:429)
>
> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
> MapReferenceResolver.java:42)
>
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:231)
>
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.
> readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
>
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.
> restorePartitionedState(HeapKeyedStateBackend.java:340)
>
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(
> HeapKeyedStateBackend.java:243)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateBackend(StreamTask.java:788)
>
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:284)
>
> ... 6 more
>
>
>
>
>
> This looks to me like Flink has serialized out state incorrectly.
>
>
>
> I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so
> I could manually set the Kafka partition offset, I backed it up 5 days to
> replay all the data and now everything is working fine again.
>
>
>
> However I’m more than a little worried.  Was there a serialization bug
> fixed in 1.3 ?  I don’t believe there’s anything in my code that could be
> causing such an issue, but is there something in my jobs that could make
> something like this happen?  Is this a known bug?  The fact that it not
> only results in bad data in the query but appears to take down my disaster
> recovery plan makes me a bit nervous here.
>
>
>
> Thanks for your time,
>
> Phil
>
>
>


Possible Data Corruption?

2017-06-19 Thread Philip Doctor
Dear Flink Users,
I have a Flink (v1.2.1) process I left running for the last five days.  It 
aggregates a bit of state and exposes it via Queryable State.  It ran correctly 
for the first 3 days.  There were no code changes or data changes, but suddenly 
Queryable State got weird.  The process logs the current value of the queryable 
state, and from the logs I discerned that the state was correctly being 
aggregated.  However they Queryable State that was returned was unable to be 
deserialized.  Rather than the list of longs I expect, instead I get 2 bytes 
(0x 57 02).  It seemed quite clear that the state in the Task Manager was not 
the state I was getting out of Queryable State.

I next reasoned that my data was being check pointed and possibly I could 
restore.  So I restarted the process to recover from a check point.  At this 
point the process fails with the following error

java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:231)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)
... 6 more


This looks to me like Flink has serialized out state incorrectly.

I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so I 
could manually set the Kafka partition offset, I backed it up 5 days to replay 
all the data and now everything is working fine again.

However I’m more than a little worried.  Was there a serialization bug fixed in 
1.3 ?  I don’t believe there’s anything in my code that could be causing such 
an issue, but is there something in my jobs that could make something like this 
happen?  Is this a known bug?  The fact that it not only results in bad data in 
the query but appears to take down my disaster recovery plan makes me a bit 
nervous here.

Thanks for your time,
Phil



Re: Window + Reduce produces more than 1 output per window

2017-06-19 Thread FRANCISCO BORJA ROBLES MARTIN

Hello Piotrek!

Thanks for answering! Yes I have already changed the 
"TimeCharacteristic" to "ProcessingTime". I need it for the 
".setWriteTimestampToKafka(true)" option as I use the timestamp in the 
Kafka consumer who reads this app's output. I have already changed the 
code a bit for using KeyedStreams and be able to use parallelism in the 
window/reduce functions.


About the problem, yesterday I noticed that the problem was growing as I 
did more submits, it was doing x3 outputs (with small differences in 
each input as you can see in my first message), but before it was doing 
x2 only. Finally I stopped the cluster (stop-cluster.sh) and started it 
again (start-cluster.sh) and the problem was solved. I have been trying 
to repeat the problem submitting the app several times but I haven't 
achieved it today. If it happens again I will try to repeat the problem 
with the smaller code as possible to try to find where could be the 
possible bug (it seems to be something wrong when submitting several 
times).


Kind regards!
Fran.


El 2017-06-19 14:43, Piotr Nowojski escribió:

Hi,

It is difficult for me to respond fully to your question. First of all
it would be really useful if you could strip down your example to a
minimal version that shows a problem. Unfortunately I was unable to
reproduce your issue. I was getting only one output line per window
(as expected). Could you try to print output to the console (or use
some different data sink) instead of writing it back to the Kafka,
maybe there is a problem? Also please try remove some parts of the
code bit by bit, so that you may be able to find what’s causing a
problem.

As a side note I have couple of concerns with your
timestamps/watermarks/windows definitions. First you specify time
characteristic to an EventTime:


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


But I don’t see where you are actually setting the
timestamp/watermarks. Didn’t you want to use
“.assignTimestampsAndWatermarks(…)” on your input DataStream
based on it’s content? Nevertheless, later you specify window by
ProcessingTime:

  
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));


Which defines the windows independent of the content of those events.
Maybe switching to properly EvenTime will solve your problem?

Thanks, Piotrek



Re: Window + Reduce produces more than 1 output per window

2017-06-19 Thread Piotr Nowojski
Hi,

It is difficult for me to respond fully to your question. First of all it would 
be really useful if you could strip down your example to a minimal version that 
shows a problem. Unfortunately I was unable to reproduce your issue. I was 
getting only one output line per window (as expected). Could you try to print 
output to the console (or use some different data sink) instead of writing it 
back to the Kafka, maybe there is a problem? Also please try remove some parts 
of the code bit by bit, so that you may be able to find what’s causing a 
problem.

As a side note I have couple of concerns with your 
timestamps/watermarks/windows definitions. First you specify time 
characteristic to an EventTime:

> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

But I don’t see where you are actually setting the timestamp/watermarks. Didn’t 
you want to use “.assignTimestampsAndWatermarks(…)” on your input DataStream 
based on it’s content? Nevertheless, later you specify window by ProcessingTime:

>   .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));

Which defines the windows independent of the content of those events. Maybe 
switching to properly EvenTime will solve your problem?

Thanks, Piotrek

> On Jun 18, 2017, at 6:12 PM, FRANCISCO BORJA ROBLES MARTIN 
>  wrote:
> 
> Hello everybody! First of all, thanks for reading :D
> 
> I am currently working on my bachelor's final project which is a comparison 
> between Spark Streaming and Flink. Now let's focus on the problem:
> 
> - THE PROBLEM: the problem is that my program is writing to Kafka more than 
> once every window (is creating 2-3 or more lines per window, meanwhile it is 
> supposed to create 1 line per window as with the reduce function it lets only 
> one element). I have the same code written in Spark and it works perfectly. I 
> have been trying to find info about this issue and I haven't found anything 
> :(. Also I have been trying changing some functions' parallelism and some 
> more things and nothing worked, and I can not realise where can be the 
> problem.
> 
> - MY CLUSTER: I am using Flink 1.2.0 and OpenJDK 8. I have 3 computers: 1 
> JobManager, 2 TaskManagers (4 cores, 2GB RAM, 4 task slots each TaskManager).
> 
> - INPUT DATA: lines produced by one java producer to the Kafka 24 partitions' 
> topic with two elements: incremental value and creation timestamp:
> 1 1497790546981
> 2 1497790546982
> 3 1497790546983
> 4 1497790546984
> ..
> 
> - MY JAVA APPLICATION:
> + It reads from a Kafka topic with 24 partitions (Kafka is in the same 
> machine than the JobManager).
> + The filter functions are useless together with the union as I use them just 
> for checking their latency.
> + Basically, it adds a "1" to each line,then there is a tumbling window every 
> 2 seconds, and the  reduce function sum all this 1's and all the timestamps, 
> this last timestamp is later divided in the map function between the sum of 
> 1's which gives me the average, and finally in the last map function it adds 
> a timestamp of the current moment to each reduced line and the difference 
> between this timestamp and the average timestamp.
> + This line is written to Kafka (to a 2 partitions' topic).
> 
> # - CODE - 
> 
>//FLINK CONFIGURATION
>final StreamExecutionEnvironment env = StreamExecutionEnvironment
>.getExecutionEnvironment();
> 
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>//env.setParallelism(2);
> 
>//KAFKA CONSUMER CONFIGURATION
>Properties properties = new Properties();
>properties.setProperty("bootstrap.servers", "192.168.0.155:9092");
>FlinkKafkaConsumer010 myConsumer = new 
> FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties);
> 
> 
>//KAFKA PRODUCER
>Properties producerConfig = new Properties();
>producerConfig.setProperty("bootstrap.servers", "192.168.0.155:9092");
>producerConfig.setProperty("acks", "0");
>producerConfig.setProperty("linger.ms", "0");
> 
> 
>//MAIN PROGRAM
>//Read from Kafka
>DataStream line = env.addSource(myConsumer);
> 
>//Add 1 to each line
>DataStream> line_Num = line.map(new NumberAdder());
> 
>//Filted Odd numbers
>DataStream> line_Num_Odd = line_Num.filter(new 
> FilterOdd());
> 
>//Filter Even numbers
>DataStream> line_Num_Even = line_Num.filter(new 
> FilterEven());
> 
>//Join Even and Odd
>DataStream> line_Num_U = 
> line_Num_Odd.union(line_Num_Even);
> 
>//Tumbling windows every 2 seconds
>AllWindowedStream, TimeWindow> windowedLine_Num_U 
> = line_Num_U
>.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
> 
>//Reduce to one line with the sum
>

Re: How choose between YARN/Mesos/StandAlone Flink

2017-06-19 Thread AndreaKinn
Ok I understand standalone mode it will be sufficient, but for my thesis I
would like to setup a well performed ready-to-use infrastructure. My
workload it's not heavy, about 35 millions of messages a day (35 gb) but it
should be easily expandable and running for many days... due to this I would
like to setup Flink on top of a cluster manager.




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-choose-between-YARN-Mesos-StandAlone-Flink-tp13793p13829.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: FlinkCEP 1.3 scala, cannot apply select function

2017-06-19 Thread Sonex
Thank you for your quick response. That worked and compiled but another error
came up. On runtime it gives the following error:

java.lang.ClassCastException: MyEventType cannot be cast to
scala.collection.IterableLike

The error is at line

val startEvent = pattern.get("first").get.head

of myFunction.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-1-3-scala-cannot-apply-select-function-tp13824p13828.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How to sessionize stream with Apache Flink?

2017-06-19 Thread Fabian Hueske
An alternative would be to use a FlatMapFunction with a ListState instead
of a window with custom trigger.

When a new element arrives (i.e., the flatMap() method is called), you
check if the value changed.
If the value did not changed, you append the element to the state.
If the value changed, you emit the current list state as a session, clear
the list, and insert the new element as the first to the list state.

However, you should keep in mind that this assumes that the order of
elements is preserved.
Flink ensures within a partition, i.e, as long as elements are not shuffled
and all operators run with the same parallelism.

Best, Fabian

2017-06-18 15:10 GMT+02:00 Jonas :

> Hey Milad,
>
> since you cannot look into the future which element comes next, you have to
> "lag" one behind. This requires building an operator that creates 2-tuples
> from incoming elements containing (current-1, current), so basically a
> single value state that emits the last and the current element in a tuple.
>
> In a trigger, the element is then of the 2-tuple type and you can see
> changes "beforehand". The last element of 1's is then (1, 2).
>
> Hope this helps.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/How-to-
> sessionize-stream-with-Apache-Flink-tp13817p13818.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: DataSet: combineGroup/reduceGroup with large number of groups

2017-06-19 Thread Fabian Hueske
Hi Urs,

ad 1) Yes, my motivation for the bound was to prevent OOMEs. If you have
enough memory to hold the AggregateT for each key in memory, you should be
fine without a bound. If the size of AggregateT depends on the number of
aggregated elements, you might run into skew issues though.
ad 2) AFAIK, all sources emit a Long.MAX_VALUE watermark once they
completely emitted their data (would only happen for bounded data of
course). You do not need to generate any other watermarks or timestamps
because you want the results after all data was processed.

Best, Fabian

2017-06-17 0:19 GMT+02:00 Urs Schoenenberger :

> Hi Fabian,
>
> thanks, that is very helpful indeed - I now understand why the DataSet
> drivers insist on sorting the buffers and then processing instead of
> keeping state.
>
> In our case, the state should easily fit into the heap of the cluster,
> though. In a quick example I tried just now, the MapPartition
> solution outperforms GroupReduce/Combine by a factor of 3, looking
> forward to testing this on our real data set soon.
>
> Two things I'd like to clarify:
>
> - Your suggestion of limiting the size of the HashMap in the
> MapPartitionFunction is meant to reduce the risks of OOMEs, right? If
> I'm confident my state fits into heap, there's no reason to do this?
>
> - With your DataStream suggestion, I can't tell when to schedule a
> processing time timer. I would therefore need to use an event time timer
> (at Long.MAX_VALUE-1, say), and modify my source to emit a watermark
> with Long.MAX_VALUE after it reaches the end of input, correct?
>
> Thanks,
> Urs
>
> On 16.06.2017 17:58, Fabian Hueske wrote:
> > Hi Urs,
> >
> > on the DataSet API, the only memory-safe way to do it is
> > GroupReduceFunction.
> > As you observed this requires a full sort of the dataset which can be
> quite
> > expensive but after the sort the computation is streamed.
> > You could also try to manually implement a hash-based combiner using a
> > MapPartitionFunction. The function would have a HashMap on the key with a
> > fixed size that needs to be manually tuned.
> > When you have to insert a new record into the HashMap but it reached the
> > max size, you have to evict a record first. Since all of this happens on
> > the heap, it won't be memory-safe and might fail with an OOME.
> >
> > On the DataStream API you can use a ProcessFunction with keyed ValueState
> > for the current AggregateT of each key. For each record you fetch the
> > Aggregate from the state and update it.
> > To emit the results at the end, you'll need to register a timer to emit
> the
> > results at the end because the final aggregates are stored in the local
> > state but never emitted.
> > Another thing to consider is the state backend. You'll probably have to
> use
> > the RocksDBStateBackend to be able to spill state to disk.
> >
> > Hope this helps,
> > Fabian
> >
> >
> > 2017-06-16 17:00 GMT+02:00 Urs Schoenenberger <
> > urs.schoenenber...@tngtech.com>:
> >
> >> Hi,
> >>
> >> I'm working on a batch job (roughly 10 billion records of input, 10
> >> million groups) that is essentially a 'fold' over each group, that is, I
> >> have a function
> >>
> >> AggregateT addToAggrate(AggregateT agg, RecordT record) {...}
> >>
> >> and want to fold this over each group in my DataSet.
> >>
> >> My understanding is that I cannot use .groupBy(0).reduce(...) since the
> >> ReduceFunction only supports the case where AggregateT is the same as
> >> RecordT.
> >>
> >> A simple solution using .reduceGroup(...) works, but spills all input
> >> data in the reduce step, which produces a lot of slow & expensive Disk
> IO.
> >>
> >> Therefore, we tried using .combineGroup(...).reduceGroup(...), but
> >> experienced a similar amount of spilling. Checking the source of the
> >> *Combine drivers, it seems that they accumulate events in a buffer, sort
> >> the buffer by key, and combine adjacent records in the same group. This
> >> does not work in my case due to the large number of groups - the records
> >> in the buffer are most likely to all belong to different groups. The
> >> "combine" phase therefore becomes a noop turning a single RecordT into
> >> an AggregateT, and the reduce phase has 10 billion AggregateTs to
> combine.
> >>
> >> Is there a way of modelling this computation efficiently with the
> >> DataSet API? Alternatively, can I turn this into a DataStream job? (The
> >> implementation there would simply be a MapFunction on a KeyedStream with
> >> the AggregateT residing in keyed state, although I don't know how I
> >> would emit this state at the end of the data stream only.)
> >>
> >> Thanks,
> >> Urs
> >>
> >> --
> >> Urs Schönenberger
> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>
> >>
> >>
> >> Hi Urs,
> >>
> >> on the DataSet API, the only 

Re: FlinkCEP 1.3 scala, cannot apply select function

2017-06-19 Thread Dawid Wysakowicz
Hi,

Because of some optimizations between java <-> scala collections
conversions, the type of Map used for select method is scala.collection.Map
instead of Predef.Map imported by default.

Try importing:

import scala.collection.Map


or use fully qualified name in function definition:

def myFunction(pattern: scala.collection.Map[String,Iterable[MyEventType]]):
> MyEventType = {
> val startEvent = pattern.get("first").get.head
> val endEvent = pattern.get("second").get.head
> // dummy functionality for illustrating purposes
> endEvent
> }


Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder



2017-06-19 9:35 GMT+02:00 Sonex :

> Hello I have created a simple pattern with FlinkCEP 1.3 as well as a simple
> pattern select function. My simple function is as follows:
>
> def myFunction(pattern: Map[String,Iterable[MyEventType]]): MyEventType =
> {
> val startEvent = pattern.get("first").get.head
> val endEvent = pattern.get("second").get.head
> // dummy functionality for illustrating purposes
> endEvent
> }
>
> When I apply the function above to a pattern in the following way:
>
> CEP.pattern(myKeyedStream,myDummyPattern).select(myFunction(_)) it gives
> the
> following error:
>
> Cannot resolve reference myFunction with such signature.
>
> Type mismatch, expected:
> scala.Predef.Map[scala.Predef.String,scala.Iterable[MyEventType]], actual:
> scala.collection.Map[scala.Predef.String,scala.Iterable[MyEventType]]
>
> What is the reason of this behavior?
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-1-3-
> scala-cannot-apply-select-function-tp13824.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


FlinkCEP 1.3 scala, cannot apply select function

2017-06-19 Thread Sonex
Hello I have created a simple pattern with FlinkCEP 1.3 as well as a simple
pattern select function. My simple function is as follows:

def myFunction(pattern: Map[String,Iterable[MyEventType]]): MyEventType = {
val startEvent = pattern.get("first").get.head
val endEvent = pattern.get("second").get.head
// dummy functionality for illustrating purposes
endEvent
}

When I apply the function above to a pattern in the following way:

CEP.pattern(myKeyedStream,myDummyPattern).select(myFunction(_)) it gives the
following error:

Cannot resolve reference myFunction with such signature.

Type mismatch, expected:
scala.Predef.Map[scala.Predef.String,scala.Iterable[MyEventType]], actual:
scala.collection.Map[scala.Predef.String,scala.Iterable[MyEventType]]

What is the reason of this behavior?





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-1-3-scala-cannot-apply-select-function-tp13824.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.