Re: 1.1.4 IntelliJ Problem

2017-01-04 Thread Stephan Epping
Hey Yuri,

thanks a lot. It was flink-spector that was requiring flink-test-utils 1.1.0

best,
Stephan

> On 04 Jan 2017, at 13:17, Yury Ruchin <yuri.ruc...@gmail.com> wrote:
> 
> Hi Stephan,  
> 
> It looks like you have libraries from different versions of Flink 
> distribution on the same classpath.
> 
> ForkableFlinkMiniCluster resides in flink-test-utils. As of distribution 
> version 1.1.3 it invokes JobManager.startJobManagerActors() with 6 arguments. 
> The signature changed by 1.1.4, and ForkableFlinkMiniCluster now invokes the 
> method with 8 arguments of different types. This might mean that 
> flink-test-utils library on your classpath has version 1.1.3 whereas 
> flink-runtime has 1.1.4.
> 
> You might want to thoroughly inspect your classpath to ensure that every 
> Flink-related dependency has version 1.1.4.
> 
> Regards,
> Yury
> 
> 2017-01-04 11:20 GMT+03:00 Stephan Epping <stephan.epp...@zweitag.de 
> <mailto:stephan.epp...@zweitag.de>>:
> I also changed the scala version of the packages/artifacts to 2.11, with no 
> success.
> Further, I am not deeply familiar with maven or java dependency management at 
> all.
> 
> best Stephan
> 
>> On 03 Jan 2017, at 22:06, Stephan Ewen <se...@apache.org 
>> <mailto:se...@apache.org>> wrote:
>> 
>> Hi!
>> 
>> It is probably some inconsistent configuration in the IDE.
>> 
>> It often helps to do "Maven->Reimport" or use "restart and clear caches".
>> 
>> 
>> On Tue, Jan 3, 2017 at 9:48 AM, Stephan Epping <stephan.epp...@zweitag.de 
>> <mailto:stephan.epp...@zweitag.de>> wrote:
>> Hey,
>> 
>> thanks for the reply. I didn’t change the scala version, as it worked 
>> before. I just changed the flink version in my pom.xml thats it, a one line 
>> change.
>> Maybe you could elaborate a bit more, what I can do to change the scala 
>> version?
>> 
>> best Stephan
>> 
>> 
>>> On 03 Jan 2017, at 03:11, Kurt Young <ykt...@gmail.com 
>>> <mailto:ykt...@gmail.com>> wrote:
>>> 
>>> Seems like you didn't setup the correct scala SDK
>>> 
>>> best,
>>> Kurt
>>> 
>>> On Mon, Jan 2, 2017 at 10:41 PM, Stephan Epping <stephan.epp...@zweitag.de 
>>> <mailto:stephan.epp...@zweitag.de>> wrote:
>>> Hi,
>>> 
>>> I am getting this error running my tests with 1.1.4 inside intellij ide.
>>> 
>>> java.lang.NoSuchMethodError: 
>>> org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(Lorg/apache/flink/configuration/Configuration;Lakka/actor/ActorSystem;Lscala/Option;Lscala/Option;Ljava/lang/Class;Ljava/lang/Class;)Lscala/Tuple2;
>>> 
>>> at 
>>> org.apache.flink.test.util.ForkableFlinkMiniCluster.startJobManager(ForkableFlinkMiniCluster.scala:103)
>>> at 
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:292)
>>> at 
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:286)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at scala.collection.immutable.Range.foreach(Range.scala:141)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>> at 
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:286)
>>> at 
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:277)
>>> at 
>>> org.apache.flink.test.util.ForkableFlinkMiniCluster.start(ForkableFlinkMiniCluster.scala:255)
>>> at 
>>> org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:152)
>>> at 
>>> org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:126)
>>> at 
>>> org.flinkspector.datastream.DataStreamTestEnvironment.createTestEnvironment(DataStreamTestEnvironment.java:72)
>>> 
>>> Any ideas?
>>> 
>>> best,
>>> Stephan
>>> 
>>> 
>> 
>> 
> 
> 



Re: 1.1.4 IntelliJ Problem

2017-01-04 Thread Stephan Epping
I also changed the scala version of the packages/artifacts to 2.11, with no 
success.
Further, I am not deeply familiar with maven or java dependency management at 
all.

best Stephan

> On 03 Jan 2017, at 22:06, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> It is probably some inconsistent configuration in the IDE.
> 
> It often helps to do "Maven->Reimport" or use "restart and clear caches".
> 
> 
> On Tue, Jan 3, 2017 at 9:48 AM, Stephan Epping <stephan.epp...@zweitag.de 
> <mailto:stephan.epp...@zweitag.de>> wrote:
> Hey,
> 
> thanks for the reply. I didn’t change the scala version, as it worked before. 
> I just changed the flink version in my pom.xml thats it, a one line change.
> Maybe you could elaborate a bit more, what I can do to change the scala 
> version?
> 
> best Stephan
> 
> 
>> On 03 Jan 2017, at 03:11, Kurt Young <ykt...@gmail.com 
>> <mailto:ykt...@gmail.com>> wrote:
>> 
>> Seems like you didn't setup the correct scala SDK
>> 
>> best,
>> Kurt
>> 
>> On Mon, Jan 2, 2017 at 10:41 PM, Stephan Epping <stephan.epp...@zweitag.de 
>> <mailto:stephan.epp...@zweitag.de>> wrote:
>> Hi,
>> 
>> I am getting this error running my tests with 1.1.4 inside intellij ide.
>> 
>> java.lang.NoSuchMethodError: 
>> org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(Lorg/apache/flink/configuration/Configuration;Lakka/actor/ActorSystem;Lscala/Option;Lscala/Option;Ljava/lang/Class;Ljava/lang/Class;)Lscala/Tuple2;
>> 
>>  at 
>> org.apache.flink.test.util.ForkableFlinkMiniCluster.startJobManager(ForkableFlinkMiniCluster.scala:103)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:292)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:286)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>  at scala.collection.immutable.Range.foreach(Range.scala:141)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:286)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:277)
>>  at 
>> org.apache.flink.test.util.ForkableFlinkMiniCluster.start(ForkableFlinkMiniCluster.scala:255)
>>  at 
>> org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:152)
>>  at 
>> org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:126)
>>  at 
>> org.flinkspector.datastream.DataStreamTestEnvironment.createTestEnvironment(DataStreamTestEnvironment.java:72)
>> 
>> Any ideas?
>> 
>> best,
>> Stephan
>> 
>> 
> 
> 



Re: 1.1.4 IntelliJ Problem

2017-01-04 Thread Stephan Epping
Thanks Stephan, 

but that didn’t help. The IDE is configured to use Default Scala Compiler and 
JDK 1.8.0_92.

best Stephan


> On 03 Jan 2017, at 22:06, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> It is probably some inconsistent configuration in the IDE.
> 
> It often helps to do "Maven->Reimport" or use "restart and clear caches".
> 
> 
> On Tue, Jan 3, 2017 at 9:48 AM, Stephan Epping <stephan.epp...@zweitag.de 
> <mailto:stephan.epp...@zweitag.de>> wrote:
> Hey,
> 
> thanks for the reply. I didn’t change the scala version, as it worked before. 
> I just changed the flink version in my pom.xml thats it, a one line change.
> Maybe you could elaborate a bit more, what I can do to change the scala 
> version?
> 
> best Stephan
> 
> 
>> On 03 Jan 2017, at 03:11, Kurt Young <ykt...@gmail.com 
>> <mailto:ykt...@gmail.com>> wrote:
>> 
>> Seems like you didn't setup the correct scala SDK
>> 
>> best,
>> Kurt
>> 
>> On Mon, Jan 2, 2017 at 10:41 PM, Stephan Epping <stephan.epp...@zweitag.de 
>> <mailto:stephan.epp...@zweitag.de>> wrote:
>> Hi,
>> 
>> I am getting this error running my tests with 1.1.4 inside intellij ide.
>> 
>> java.lang.NoSuchMethodError: 
>> org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(Lorg/apache/flink/configuration/Configuration;Lakka/actor/ActorSystem;Lscala/Option;Lscala/Option;Ljava/lang/Class;Ljava/lang/Class;)Lscala/Tuple2;
>> 
>>  at 
>> org.apache.flink.test.util.ForkableFlinkMiniCluster.startJobManager(ForkableFlinkMiniCluster.scala:103)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:292)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:286)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>  at scala.collection.immutable.Range.foreach(Range.scala:141)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:286)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:277)
>>  at 
>> org.apache.flink.test.util.ForkableFlinkMiniCluster.start(ForkableFlinkMiniCluster.scala:255)
>>  at 
>> org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:152)
>>  at 
>> org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:126)
>>  at 
>> org.flinkspector.datastream.DataStreamTestEnvironment.createTestEnvironment(DataStreamTestEnvironment.java:72)
>> 
>> Any ideas?
>> 
>> best,
>> Stephan
>> 
>> 
> 
> 



1.1.4 IntelliJ Problem

2017-01-02 Thread Stephan Epping
Hi,

I am getting this error running my tests with 1.1.4 inside intellij ide.

java.lang.NoSuchMethodError: 
org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(Lorg/apache/flink/configuration/Configuration;Lakka/actor/ActorSystem;Lscala/Option;Lscala/Option;Ljava/lang/Class;Ljava/lang/Class;)Lscala/Tuple2;

at 
org.apache.flink.test.util.ForkableFlinkMiniCluster.startJobManager(ForkableFlinkMiniCluster.scala:103)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:292)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:286)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:286)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:277)
at 
org.apache.flink.test.util.ForkableFlinkMiniCluster.start(ForkableFlinkMiniCluster.scala:255)
at 
org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:152)
at 
org.apache.flink.test.util.TestBaseUtils.startCluster(TestBaseUtils.java:126)
at 
org.flinkspector.datastream.DataStreamTestEnvironment.createTestEnvironment(DataStreamTestEnvironment.java:72)

Any ideas?

best,
Stephan



Re: Problem with JodaTime

2017-01-01 Thread Stephan Epping
Hi Robert,

thanks for the fast response. The following line fixed the problem (maybe a 
good topic for the docs), because I thought JodaTime is supported out of the 
box.
Buts I think thats only the case with the JacksonParser.

this.env.addDefaultKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
best,
Stephan



> On 24 Dec 2016, at 09:40, Robert Metzger <rmetz...@apache.org> wrote:
> 
> Hi Stephan,
> 
> Can you post the list of fields in the POJO and the full exception (so that I 
> can see which serializer is being used).
> 
> In general, to fix such an issue, you have to implement a custom serializer 
> for the field that is causing the issues.
> 
> On Thu, Dec 22, 2016 at 3:44 PM, Stephan Epping <stephan.epp...@zweitag.de 
> <mailto:stephan.epp...@zweitag.de>> wrote:
> Hi,
> 
> I noticed the following Problem with a POJO I use to encapsulate Values.
> 
> java.lang.NullPointerException
>   at org.joda.time.tz 
> <http://org.joda.time.tz/>.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143)
>   at org.joda.time.tz 
> <http://org.joda.time.tz/>.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103)
>   at 
> org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:722)
>   at 
> org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:535)
>   at 
> org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:671)
>   at org.joda.time.base.AbstractInstant.toString(AbstractInstant.java:424)
>   at 
> org.joda.time.base.AbstractDateTime.toString(AbstractDateTime.java:314)
>   at java.lang.String.valueOf(String.java:2994)
>   at java.lang.StringBuilder.append(StringBuilder.java:131)
> 
> and found this 
> 
> https://github.com/JodaOrg/joda-time/issues/307 
> <https://github.com/JodaOrg/joda-time/issues/307>
> 
> Help is really appreciated.
> 
> best,
> Stephan
> 
> 
> 
> 



Problem with JodaTime

2016-12-22 Thread Stephan Epping
Hi,

I noticed the following Problem with a POJO I use to encapsulate Values.

java.lang.NullPointerException
at 
org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143)
at 
org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103)
at 
org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:722)
at 
org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:535)
at 
org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:671)
at org.joda.time.base.AbstractInstant.toString(AbstractInstant.java:424)
at 
org.joda.time.base.AbstractDateTime.toString(AbstractDateTime.java:314)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)

and found this 

https://github.com/JodaOrg/joda-time/issues/307 


Help is really appreciated.

best,
Stephan





Re: What is the best way to load/add patterns dynamically (at runtime) with Flink?

2016-11-24 Thread Stephan Epping
I also found this really interesting post

http://stackoverflow.com/questions/40018199/flink-and-dynamic-templates-recognition
 


Re: Cassandra Connector

2016-11-22 Thread Stephan Epping
Hey Chesnay,

that looks good. I like to use the same mechanism for all my sinks. Thus,

this 
> readings.addSink(new CassandraTupleSink(, );

will be my desired way.

best, Stephan


> On 22 Nov 2016, at 09:33, Chesnay Schepler <ches...@apache.org> wrote:
> 
> Actually this is a bit inaccurate. _Some_ implementations are not implemented 
> as a sink.
> 
> Also, you can in fact instantiate the sinks yourself as well, as in
> readings.addSink(new CassandraTupleSink(, );
> 
> On 22.11.2016 09:30, Chesnay Schepler wrote:
>> Hello,
>> 
>> the CassandraSink is not implemented as a sink but as a special operator, so 
>> you wouldn't be able to use the
>> addSink() method. (I can't remember the actual method being used.)
>> 
>> There are also several different implementations for various types (tuples, 
>> pojo's, scala case classes) but we
>> did not want the user to be aware of it. This has the neat property that we 
>> can change the underlying classes
>> any way we want (like modifying the constructor) without breaking anything.
>> 
>> Regards,
>> Chesnay
>> 
>> On 22.11.2016 08:06, Stephan Epping wrote:
>>> Hello,
>>> 
>>> I wondered why the cassandra connector has such an unusual interface:
>>> CassandraSink csink = CassandraSink.addSink(readings)
>>> while all other sinks seem to look like
>>> 
>>> RMQSink sink = new RMQSink(cfg, "readings_persist_out", 
>>> new JSONReadingSchema());
>>> readings.addSink(sink);
>>> best,
>>> Stephan
>>> 
>>> 
>>> 
>> 
> 



Re: Maintaining watermarks per key, instead of per operator instance

2016-11-21 Thread Stephan Epping
Hey Aljoscha,

the first solution did not work out as expected. As when late elements arrive 
the first window is triggered again and would emit a new (accumulated) event, 
that would be counted twice (in time accumulation and late accumulation) in the 
second window.I could implement my own (discarding strategy) like in Apache 
Beam, but the out stream should contain accumulated events that are stored in 
cassandra. The second solution just gave an compiler error, thus I think is not 
possible right now.

best Stephan



> On 21 Nov 2016, at 17:56, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi,
> why did you settle for the last solution?
> 
> Cheers,
> Aljoscha
> 
> On Thu, 17 Nov 2016 at 15:57 kaelumania <stephan.epp...@zweitag.de 
> <mailto:stephan.epp...@zweitag.de>> wrote:
> Hi Fabian,
> 
> your proposed solution for:
>  
> Multiple window aggregations
> You can construct a data flow of cascading window operators and fork off (to 
> emit or further processing) the result after each window.
> 
> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
> \-> out_1\-> out_2 \-> out_3
> does not work, am I missing something?
> 
> First I tried the following
> DataStream values = input.assignTimestampsAndWatermarks(new 
> StrictWatermarkAssigner()); // force lateness
> 
> DataStream aggregatesPerMinute = values
> .keyBy("id")
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.minutes(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> 
> DataStream aggregatesPerHour = aggregatesPerMinute
> .keyBy("id")
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2))
> .apply(new AggregateReadingAggregates(), new 
> AggregateReadingAggregates());
> but due to late data the first fold function would emit 2 rolling aggregates 
> (one with and one without the late element), which results in being counted 
> twice within the second reducer. Therefore i tried
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
> .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
> force lateness
> .keyBy("id")
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.hours(2));
> 
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = 
> readingsPerMinute
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2));
> 
> DataStream aggregatesPerMinute = 
> readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> DataStream aggregatesPerHour = readingsPerHours.apply(new 
> ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
> which gives me a compiler error as WindowedStream does not provide a 
> timeWindow method.
> 
> Finally I settled with this:
> KeyedStream<Reading, Tuple> readings = input
> .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // 
> force lateness
> .keyBy("id");
> 
> DataStream aggregatesPerMinute = readings
> .timeWindow(Time.minutes(1))
> .allowedLateness(Time.hours(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> 
> DataStream aggregatesPerHour = readings
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(2))
> .apply(new ReadingAggregate(), new AggregateReadings(), new 
> AggregateReadings());
> 
> 
> Feedback is very welcome.
> 
> best, Stephan
> 
> 
> 
> 
>> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing 
>> List archive.] <[hidden email] 
>> <http://user/SendEmail.jtp?type=node=10179=0>> wrote:
>> 
> 
>> Hi Stephan,
>> 
>> I just wrote an answer to your SO question. 
>> 
>> Best, Fabian
> 
>> 
>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <> href="x-msg://3/user/SendEmail.jtp?type=nodenode=10033i=0" 
>> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
> 
>> 
>> Hello,
>> 
>> I found this question in the Nabble archive 
>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>>  
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
>>  but was unable/dont know how to repl

Cassandra Connector

2016-11-21 Thread Stephan Epping
Hello,

I wondered why the cassandra connector has such an unusual interface:
CassandraSink csink = CassandraSink.addSink(readings)
while all other sinks seem to look like

RMQSink sink = new RMQSink(cfg, "readings_persist_out", new 
JSONReadingSchema());
readings.addSink(sink);
best,
Stephan





Docker

2016-11-17 Thread Stephan Epping
Hi,

It would be really nice to have an official docker image (e.g. 
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 
).
Or a least to have a regular image/build on docker hub

best,
Stephan





Re: Maintaining watermarks per key, instead of per operator instance

2016-11-15 Thread Stephan Epping
Hey Aljoscha,

that sounds very promising, awesome! Though, I still would need to implement my 
own window management logic (window assignment and window state purging), 
right? I was thinking about reusing some of the existing components 
(TimeWindow) and WindowAssigner, but run my own WindowOperator (aka 
ProcessFunction). But I am not sure, if that is done easily. I would love to 
hear your opinion on that, and what the tricky parts will be? For example, 
common mistakes you experienced in developing the windowing mechanism.

best Stephan


> On 14 Nov 2016, at 19:05, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi Stephan,
> I was going to suggest that using a flatMap and tracking the timestamp of 
> each key yourself is a bit like having a per-key watermark. I wanted to wait 
> a bit before answering because I'm currently working on a new type of 
> Function that will be release with Flink 1.2: ProcessFunction. This is 
> somewhat like a FlatMap but also allows to access the element timestamp, 
> query current processing time/event time and set (per key) timers for 
> processing time and event time. With this you should be able to easily 
> implement your per-key tracking, I hope.
> 
> Cheers,
> Aljoscha
> 
> P.S. ProcessFunction is already in the Flink repository but it's called 
> TimelyFlatMapFunction right now, because I was working on it under that 
> working title.
> 
> On Mon, 14 Nov 2016 at 15:47 kaelumania <stephan.epp...@zweitag.de 
> <mailto:stephan.epp...@zweitag.de>> wrote:
> Hey Fabian,
> 
> thank you very much. 
> 
> - yes, I would window by event time and fire/purge by processing time
> - Cheaper in the end meant, that having too much state in the flink cluster 
> would be more expensive, as we store all data in cassandra too.I think the 
> fault tolerance would be okay, as we would make a compare and set with 
> cassandra. 
> 
> With the flatMap Operator wouldn’t it be like running my own windowing 
> mechanism? I need to keep the aggregate window per sensor open (with 
> checkpointing and state management) until I receive an element for a sensor 
> that is later in time than the windows time and then purge the state and emit 
> a new event (which is like having a watermark per sensor). Further, I need a 
> timer that fires like after 24 hours, in case a sensor dies and doesn’t send 
> more data which might is possible with window assigner/trigger, right? But 
> not inside normal functions, e.g. flatMap? We can guarantee that all sensor 
> data per sensor comes almost in order (might be out of order within a few 
> seconds), but there might be gaps of several hours after network partitions.
> 
> There is now way to define/redefine the watermark per keyed stream? Or adjust 
> the window assigner + trigger to achieve the desired behaviour? I am a bit 
> reserved in implementing the whole state management. Do you plan to support 
> such use cases on keyed streams? Maybe the WatermarkAssigner could also 
> receive information about the key for wich the watermark should be calculated 
> etc.
> 
> best, Stephan
> 
> 
> 
>> On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User Mailing 
>> List archive.] <[hidden email] 
>> <http://user/SendEmail.jtp?type=node=10098=0>> wrote:
>> 
> 
>> Hi Stephan,
>> 
>> I'm skeptical about two things: 
>> - using processing time will result in inaccurately bounded aggregates (or 
>> do you want to group by event time in a processing time window?)
>> - writing to and reading from Cassandra might be expensive (not sure what 
>> you mean by cheaper in the end) and it is not integrated with Flink's 
>> checkpointing mechanism for fault-tolerance.
>> 
>> To me, the stateful FlatMapOperator looks like the best approach. There is 
>> an upcoming feature for registering timers in user-functions, i.e., a 
>> function is called after the timer exceeds. This could be helpful to 
>> overcome the problem of closing the window without new data.
>> 
>> Best, 
>> Fabian
> 
>> 
>> 2016-11-14 8:39 GMT+01:00 Stephan Epping <> href="x-msg://10/user/SendEmail.jtp?type=nodenode=10094i=0 
>> " 
>> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
> 
>> Hello Fabian,
>> 
>> Thank you very much. What is your opinion on the following solution:
>> 
>> - Window data per time window, e.g. 15 minutes
>> - using processing time as trigger, e.g. 15 minutes
>> - which results in an aggregate over sensor values
>> - then use cassandra to select the previous aggregate (as there can be 
>> multipl

apply with fold- and window function

2016-11-14 Thread Stephan Epping
Hello,

I wondered if there is a particular reason for the window function to have 
explicitly the same input/output type?
public  SingleOutputStreamOperator apply(R initialValue, FoldFunction foldFunction, WindowFunction function)
for example (the following does not work):
DataStream aggregates = values
.assignTimestampsAndWatermarks(new SensorTimestampsAndWatermarks())
.keyBy("id")
.timeWindow(Time.minutes(1))
.apply(new SensorValueAccumulator(), new AccumulateSensorValues(), 
new AggregateSensorValues());
because in this case my accumulator object does not have any id or timestamp 
information - just count, sum, min, max etc. And finally in the window function 
I receive the key (sensorId) and time window (start/end) and can build an 
aggregated value with all information needed. But currently the apply function 
forces me to use one cluttered class with id, count, sum, …, where the 
id,start,end time are invalid during the fold function.

kind regards,
Stephan

Re: Maintaining watermarks per key, instead of per operator instance

2016-11-13 Thread Stephan Epping
Hello Fabian,

Thank you very much. What is your opinion on the following solution:

- Window data per time window, e.g. 15 minutes
- using processing time as trigger, e.g. 15 minutes
- which results in an aggregate over sensor values
- then use cassandra to select the previous aggregate (as there can be multiple 
for the time window due to processing time)
- then update the aggregate and put it into a cassandra sink again

The cassandra select will be a bit slower than using an in memory/flink state, 
but will be cheaper in the end. Further, what does this have for consequences?
For example, replaying events will be more difficult, right? Also, what about 
Snapshots? Will they work with the mentioned design?

kind regards,
Stephan


> On 11 Nov 2016, at 00:39, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Stephan,
> 
> I just wrote an answer to your SO question. 
> 
> Best, Fabian
> 
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <stephan.epp...@zweitag.de 
> <mailto:stephan.epp...@zweitag.de>>:
> Hello,
> 
> I found this question in the Nabble archive 
> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
>  but was unable/dont know how to reply.
> 
> Here is my question regarding the mentioned thread:
> 
>> Hello, 
>> 
>> I have similar requirements (see StackOverflor 
>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
>>  
>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
>>  I am pretty new to flink, could you elaborate on a possible solution? We 
>> can guarantee good ordering by sensor_id, thus watermarking by key would be 
>> the only reasonable way for us 
>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my 
>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per 
>> key? Or maybe using custom state plus a custom trigger? What happens if a 
>> sensor dies or is being removed completely, how can this be detected as 
>> watermarks would be ignored for window garbage collection. Or could we 
>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
> 
> 
> Thanks,
> Stephan
> 
> 
>