Re: Hello, the performance of apply function after join

2015-12-01 Thread Fabian Hueske
Hi Phil,

an apply method after a join runs pipelined with the join, i.e., it starts
processing when the first join result is emitted and finishes after it
handled the last join result.
Unless the logic in your apply function is not terribly complex, this
should be OK. If you do not specify an apply method, a default method will
be used which returns a Tuple2(left, right).

Regarding the join hints, there is no general rule when to use joinWithHuge
/ joinWithTiny. It depends on the number of machines, machine specs, number
of records, record size, etc...
If you use joinWithHuge/Tiny, the smaller side will be broadcasted to every
node and each parallel partition will hold the full relation in memory,
i.e., if the smaller side is 10GB, you need at least 10GB for each task
manager slot. So this should only be used if the smaller side is *really*
small.

The join method does also allow to specify more fine-grained hints such as:

small.join(large, JoinHint.REPARTITION_HASH_SECOND)

which will execute the join by shuffling both inputs and building a hash
table on the partition of the second input.
If you want to optimize for performance, you should try both hints:
REPARTITION_HASH_*small* and BROADCAST_HASH_*small*.

Best, Fabian

2015-12-01 21:34 GMT+01:00 Philip Lee :

> Hello, the performance of apply function after join.
>
> Just for your information, I am running Flink job on the cluster consisted
> of 9 machine with each 48 cores. I am working on some benchmark with
> comparison of Flink, Spark-Sql, and Hive.
>
> I tried to optimize *join function with Hint* for better performance. I
> want to increase the performance as much as possible.
>
> Here are Questions===
> 1) When seeing job progress log, apply() after join function seems like it
> takes a bit long time. Do you think if I do not use apply() to format
> tuples, I would gain the better performance? Well, I could set just the
> column number instead of apply()
>
> 2) on using *join with Hint* like Huge or Tiny, is there the ideal ratio
> regarding to the size of two tables? For me, if some table is 10 times
> bigger than the other table, I use join with Hint. Otherwise, I usually use
> the general join().
>
> Best,
> Phil
>
>
>
>
>
>
>


Including option for starting job and task managers in the foreground

2015-12-01 Thread Brian Chhun
Hi All,

Is it possible to include a command line flag for starting job and task
managers in the foreground? Currently, `bin/jobmanager.sh` and
`bin/taskmanager.sh` rely on `bin/flink-daemon.sh`, which starts these
things in the background. I'd like to execute these commands inside a
docker container, but it's expected that the process is running in the
foreground. I think it might be useful to have it run in the foreground so
that it can be hooked into some process supervisors. Any suggestions are
appreciated.


Thanks,
Brian


Hello, the performance of apply function after join

2015-12-01 Thread Philip Lee
Hello, the performance of apply function after join.

Just for your information, I am running Flink job on the cluster consisted
of 9 machine with each 48 cores. I am working on some benchmark with
comparison of Flink, Spark-Sql, and Hive.

I tried to optimize *join function with Hint* for better performance. I
want to increase the performance as much as possible.

Here are Questions===
1) When seeing job progress log, apply() after join function seems like it
takes a bit long time. Do you think if I do not use apply() to format
tuples, I would gain the better performance? Well, I could set just the
column number instead of apply()

2) on using *join with Hint* like Huge or Tiny, is there the ideal ratio
regarding to the size of two tables? For me, if some table is 10 times
bigger than the other table, I use join with Hint. Otherwise, I usually use
the general join().

Best,
Phil


Question about flink message processing guarantee

2015-12-01 Thread Jerry Peng
Hello,

I have a question regarding link streaming.  I now if you enable
checkpointing you can have exactly once processing guarantee. If I do
not enable checkpointing what are the semantics of the processing? At
least once?

Best,

Jerry


Re: Data Stream union error after upgrading from 0.9 to 0.10.1

2015-12-01 Thread Welly Tambunan
Hi Aljoscha,

Is this fix has already been available on 0.10-SNAPSHOT ?


Cheers

On Tue, Dec 1, 2015 at 6:04 PM, Welly Tambunan  wrote:

> Thanks a lot Aljoscha.
>
> When it will be released ?
>
> Cheers
>
> On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> I relaxed the restrictions on union. This should make it into an upcoming
>> 0.10.2 bugfix release.
>>
>> Cheers,
>> Aljoscha
>> > On 01 Dec 2015, at 11:23, Welly Tambunan  wrote:
>> >
>> > Hi All,
>> >
>> > After upgrading our system to the latest version from 0.9 to 0.10.1 we
>> have this following error.
>> >
>> > Exception in thread "main" java.lang.UnsupportedOperationException: A
>> DataStream cannot be unioned with itself
>> >
>> > Then i find the relevant JIRA for this one.
>> > https://issues.apache.org/jira/browse/FLINK-3080
>> >
>> > Is there any plan which release this will be ?
>> >
>> >
>> > Another issue i have after upgrading is can't union with different
>> level of parallelism.
>> >
>> > I think we will need to fall back to 0.9 again for the time being.
>> >
>> > Cheers
>> >
>> > --
>> > Welly Tambunan
>> > Triplelands
>> >
>> > http://weltam.wordpress.com
>> > http://www.triplelands.com
>>
>>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>



-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Question about flink message processing guarantee

2015-12-01 Thread Márton Balassi
Dear Jerry,

If you do not enable checkpointing you get the at most once processing
guarantee (some might call that no guarantee at all). When you enable
checkpointing you can choose between exactly and at least once semantics.
The latter provides better latency.

Best,

Marton

On Tue, Dec 1, 2015 at 11:04 PM, Jerry Peng 
wrote:

> Hello,
>
> I have a question regarding link streaming.  I now if you enable
> checkpointing you can have exactly once processing guarantee. If I do
> not enable checkpointing what are the semantics of the processing? At
> least once?
>
> Best,
>
> Jerry
>


Re: Data Stream union error after upgrading from 0.9 to 0.10.1

2015-12-01 Thread Robert Metzger
No, its not yet merged into the source repo of Flink.

You can find the code here: https://github.com/apache/flink/pull/1425
You can also check out the code of the PR or download the PR contents as a
patch and apply it to the Flink source.

I think the change will be merged tomorrow and then you'll have it in
0.10-SNAPSHOT.


For the 0.10.2 release: There are no concrete plans yet, but I think it'll
happen in the next 2-3 weeks.


On Tue, Dec 1, 2015 at 11:48 PM, Welly Tambunan  wrote:

> Hi Aljoscha,
>
> Is this fix has already been available on 0.10-SNAPSHOT ?
>
>
> Cheers
>
> On Tue, Dec 1, 2015 at 6:04 PM, Welly Tambunan  wrote:
>
>> Thanks a lot Aljoscha.
>>
>> When it will be released ?
>>
>> Cheers
>>
>> On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> I relaxed the restrictions on union. This should make it into an
>>> upcoming 0.10.2 bugfix release.
>>>
>>> Cheers,
>>> Aljoscha
>>> > On 01 Dec 2015, at 11:23, Welly Tambunan  wrote:
>>> >
>>> > Hi All,
>>> >
>>> > After upgrading our system to the latest version from 0.9 to 0.10.1 we
>>> have this following error.
>>> >
>>> > Exception in thread "main" java.lang.UnsupportedOperationException: A
>>> DataStream cannot be unioned with itself
>>> >
>>> > Then i find the relevant JIRA for this one.
>>> > https://issues.apache.org/jira/browse/FLINK-3080
>>> >
>>> > Is there any plan which release this will be ?
>>> >
>>> >
>>> > Another issue i have after upgrading is can't union with different
>>> level of parallelism.
>>> >
>>> > I think we will need to fall back to 0.9 again for the time being.
>>> >
>>> > Cheers
>>> >
>>> > --
>>> > Welly Tambunan
>>> > Triplelands
>>> >
>>> > http://weltam.wordpress.com
>>> > http://www.triplelands.com
>>>
>>>
>>
>>
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


Re: Data Stream union error after upgrading from 0.9 to 0.10.1

2015-12-01 Thread Welly Tambunan
Ok Robert,

Thanks a lot.

Looking forward to it.


Cheers

On Wed, Dec 2, 2015 at 5:50 AM, Robert Metzger  wrote:

> No, its not yet merged into the source repo of Flink.
>
> You can find the code here: https://github.com/apache/flink/pull/1425
> You can also check out the code of the PR or download the PR contents as a
> patch and apply it to the Flink source.
>
> I think the change will be merged tomorrow and then you'll have it in
> 0.10-SNAPSHOT.
>
>
> For the 0.10.2 release: There are no concrete plans yet, but I think it'll
> happen in the next 2-3 weeks.
>
>
> On Tue, Dec 1, 2015 at 11:48 PM, Welly Tambunan  wrote:
>
>> Hi Aljoscha,
>>
>> Is this fix has already been available on 0.10-SNAPSHOT ?
>>
>>
>> Cheers
>>
>> On Tue, Dec 1, 2015 at 6:04 PM, Welly Tambunan  wrote:
>>
>>> Thanks a lot Aljoscha.
>>>
>>> When it will be released ?
>>>
>>> Cheers
>>>
>>> On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,
 I relaxed the restrictions on union. This should make it into an
 upcoming 0.10.2 bugfix release.

 Cheers,
 Aljoscha
 > On 01 Dec 2015, at 11:23, Welly Tambunan  wrote:
 >
 > Hi All,
 >
 > After upgrading our system to the latest version from 0.9 to 0.10.1
 we have this following error.
 >
 > Exception in thread "main" java.lang.UnsupportedOperationException: A
 DataStream cannot be unioned with itself
 >
 > Then i find the relevant JIRA for this one.
 > https://issues.apache.org/jira/browse/FLINK-3080
 >
 > Is there any plan which release this will be ?
 >
 >
 > Another issue i have after upgrading is can't union with different
 level of parallelism.
 >
 > I think we will need to fall back to 0.9 again for the time being.
 >
 > Cheers
 >
 > --
 > Welly Tambunan
 > Triplelands
 >
 > http://weltam.wordpress.com
 > http://www.triplelands.com


>>>
>>>
>>> --
>>> Welly Tambunan
>>> Triplelands
>>>
>>> http://weltam.wordpress.com
>>> http://www.triplelands.com 
>>>
>>
>>
>>
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>
>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Running WebClient from Windows

2015-12-01 Thread Welly Tambunan
Hi All,

Is there any way to run WebClient for uploading the job from windows ?

I try to run that from mingw but has these error


$ bin/start-webclient.sh
/c/flink-0.10.0/bin/config.sh: line 261: conditional binary operator
expected
/c/flink-0.10.0/bin/config.sh: line 261: syntax error near `=~'
/c/flink-0.10.0/bin/config.sh: line 261: `if [[ "$SLAVE" =~
^.*/([0-9a-zA-Z.
-]+)$ ]]; then'
/c/flink-0.10.0/bin/config.sh: line 261: conditional binary operator
expected
/c/flink-0.10.0/bin/config.sh: line 261: syntax error near `=~'
/c/flink-0.10.0/bin/config.sh: line 261: `if [[ "$SLAVE" =~
^.*/([0-9a-zA-Z.
-]+)$ ]]; then'
Starting Flink webclient

[Terminate]

Cheers

-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Continuing from the stackoverflow post

2015-12-01 Thread Nirmalya Sengupta
Hello Fabian (),

Many thanks for your encouraging words about the blogs. I want to make a
sincere attempt.

To summarise my understanding of the rule of removal of the elements from
the window (after going through your last mail), here are two corollaries:

1) If my workflow has no triggers (and hence, no evictors), my application
will run out of memory, perhaps sooner than I expect.

2) If I am using CountTriggers only (and no Evictors), then I too my
application will run out of memory, eventually.

Could you please strike them with Yes/No?

I understand why removal of elements from the window is essential. In fact
if older elements are not removed, new elements cannot come in and
therefore, the conceptual proposition of a **flowing Stream** is not
realized in the right manner.


-- Nirmalya


-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Re: Cleanup of OperatorStates?

2015-12-01 Thread Ufuk Celebi

> On 01 Dec 2015, at 18:34, Stephan Ewen  wrote:
> 
> Hi!
> 
> If you want to run with checkpoints (fault tolerance), you need to specify a 
> place to store the checkpoints to.
> 
> By default, it is the master's memory (or zookeeper in HA), so we put a limit 
> on the size of the size of the state there.

Regarding the ZooKeeper in HA part: we don’t store the actual state in 
ZooKeeper, but a pointer to the state (e.g. a pointer to the files, which in 
turn store the actual state). So you don’t have to worry about ZooKeeper being 
flooded with your large data when you run with HA.

– Ufuk

PS: Nice use case, indeed! :)

Re: Working with protobuf wrappers

2015-12-01 Thread Robert Metzger
Hi Flavio,

1. you don't have to register serializers if its working for you. I would
add a custom serializer if its not working or if the performance is poor.
2. I don't think that there is such a performance comparison. Tuples are a
little faster than POJOs, other types (serialized with Kryo's standard
serializer) are usually slower.
3. There are some plans for the table api to do various optimizations
(projection/filter push down), which also have some assumptions about the
serializers. So yes, this might change for the table api.


On Tue, Dec 1, 2015 at 11:26 AM, Flavio Pompermaier 
wrote:

> Sorry for the long question but I take advantage of this discussion to ask
> for something I've never fully understood.. Let's say I have for example a
> thrift/protobuf/avro object Person.
>
>1. Do I have really to register a custom serializer? In my code I
>create a dataset from parquet-thrift but I never had to register
>anything...Does this change something if I
>call registerTypeWithKryoSerializer?
>2. How are performance of Flink affected by using one serialization
>wrt another? For example, is there a simple snippet of a Flink program that
>show when it's better to the original Person, its POJO version or it's
>Tuple version (assuming that is a flat object)?
>3. Does this further change when I use Table APIs?
>
>
> Best,
> Flavio
>
> On Tue, Dec 1, 2015 at 10:25 AM, Robert Metzger 
> wrote:
>
>> Also, we don't add serializers automatically for DataStream programs.
>> I've opened a JIRA for this a while ago.
>>
>> On Tue, Dec 1, 2015 at 10:20 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Kryzsztof,
>>>
>>> it's true that we once added the Protobuf serializer automatically.
>>> However, due to versioning conflicts (see
>>> https://issues.apache.org/jira/browse/FLINK-1635), we removed it again.
>>> Now you have to register the ProtobufSerializer manually:
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program
>>> .
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Nov 30, 2015 at 8:48 PM, Krzysztof Zarzycki <
>>> k.zarzy...@gmail.com> wrote:
>>>
 Hi!
 I'm trying to use generated Protobuf wrappers compiled with protoc and
 pass them as objects between functions of Flink. I'm using Flink 0.10.0.
 Unfortunately, I get an exception on runtime:

 [...]
 Caused by: com.esotericsoftware.kryo.KryoException:
 java.lang.UnsupportedOperationException
 Serialization trace:
 enrichments_ (com.company$MyObject)
 at
 com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
 at
 org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162)
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313)
 ... 11 more
 Caused by: java.lang.UnsupportedOperationException
 at
 java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
 at
 com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
 at
 com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
 at
 com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
 ... 15 more


 I believed that protobuf are now serializable on default Flink
 configuration after fixing this issue in 0.9/0.8.1:
 https://issues.apache.org/jira/browse/FLINK-1392

 Maybe it really is, but Flink just requires some configuration?
 I'll be grateful for your help with this issue.
 Cheers,
 Krzysztof


>>>
>>
>


Re: Cleanup of OperatorStates?

2015-12-01 Thread Stephan Ewen
Concerning keeping all events in memory: I thought that is sort of a
requirement by your application. All events need to go to the same file
(which is determined by the time the session times out).

If you relax that requirement that you only need to store some aggregate
statistic about the session in the files in the end, than you can of course
alter the way the Session object stores information (only keep aggregate
statistics) and it will decrease the size of the data stored per session by
a lot!

In order to support the "real time" path, the session object really only
needs to store the visit ID and the current expiry timestamp.

Let me know if you want a few pointers about the code...

Greetings,
Stephan


On Tue, Dec 1, 2015 at 5:23 PM, Niels Basjes  wrote:

> Hi,
>
> The first thing I noticed is that the Session object maintains a list of
> all events in memory.
> Your events are really small yet in my scenario the predicted number of
> events per session will be above 1000 and each is expected to be in the
> 512-1024 bytes range.
> This worried me yet I decided to give your code a run.
>
> After a while running it in my IDE (not on cluster) I got this:
>
> 17:18:46,336 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 269 @ 1448986726336
> 17:18:46,587 INFO  org.apache.flink.runtime.taskmanager.Task
>   - sessionization -> Sink: Unnamed (4/4) switched to FAILED with
> exception.
> java.lang.RuntimeException: Error triggering a checkpoint as the result of
> receiving checkpoint barrier
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:577)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:570)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:201)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:127)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:173)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Size of the state is larger than the
> maximum permitted memory-backed state. Size=5246277 , maxSize=5242880 .
> Consider using a different state backend, like the File System State
> backend.
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:130)
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkpointStateSerializable(MemoryStateBackend.java:108)
> at
> com.dataartisans.streaming.sessionization.SessionizingOperator.snapshotOperatorState(SessionizingOperator.java:162)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:440)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:574)
> ... 8 more
>
>
> Niels
>
>
>
> On Tue, Dec 1, 2015 at 4:41 PM, Niels Basjes  wrote:
>
>> Thanks!
>> I'm going to study this code closely!
>>
>> Niels
>>
>> On Tue, Dec 1, 2015 at 2:50 PM, Stephan Ewen  wrote:
>>
>>> Hi Niels!
>>>
>>> I have a pretty nice example for you here:
>>> https://github.com/StephanEwen/sessionization
>>>
>>> It keeps only one state and has the structure:
>>>
>>>
>>> (source) --> (window sessions) ---> (real time sink)
>>>   |
>>>   +--> (15 minute files)
>>>
>>>
>>> The real time sink gets the event with attached visitId immediately. The
>>> session operator, as a side effect, writes out the 15 minute files with
>>> sessions that expired in that time.
>>>
>>>
>>> It is not a lot of code, the two main parts are
>>>
>>>   - the program and the program skeleton:
>>> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/EventTimeSessionization.java
>>>   - the sessionizing and file writing operator:
>>> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/SessionizingOperator.java
>>>
>>>
>>> The example runs fully on event time, where the timestamps are extracted
>>> from the records. That makes this program very robust (no issue with
>>> clocks, etc).
>>>
>>> Also, here comes the amazing part: The same program should do "replay"
>>> and real time. The only difference is what input you give it. Since time is
>>> event time, it can do both.
>>>
>>>
>>> One note:
>>>   - Event Time Watermarks are the mechanism to signal progress in event
>>> time. It is simple here, because I assume that timestamps are ascending in
>>> a Kafka partition. If that is not the case, you need to implement a more
>>> elaborate 

Re: Cleanup of OperatorStates?

2015-12-01 Thread Niels Basjes
Hi Stephan,

I created a first version of the Visit ID assignment like this:

First I group by sessionid and I create a Window per visit.
The custom Trigger for this window does a 'FIRE' after each element and
sets an EventTimer on the 'next possible moment the visit can expire'.
To avoid getting 'all events' in the visit after every 'FIRE' I'm using
CountEvictor.of(1).
When the visit expires I do a PURGE. So if there are more events afterwards
for the same sessionId I get a new visit (which is exactly what I want).

The last step I do is I want to have a 'normal' DataStream again to work
with.
I created this WindowFunction to map the Window stream back to  normal
DataStream
Essentially I do this:

DataStream visitDataStream = visitWindowedStream.apply(new
WindowToStream())

// This is an identity 'apply'
private static class WindowToStream implements WindowFunction {
@Override
public void apply(String s, GlobalWindow window, Iterable values,
Collector out) throws Exception {
for (T value: values) {
out.collect(value);
}
}
}


The problem with this is that I first create the visitIds in a Window
(great).
Because I really need to have both the Windowed events AND the near
realtime version I currently break down the Window to get the single events
and after that I have to recreate the same Window again.

I'm looking forward to the implementation direction you are referring to. I
hope you have a better way of doing this.

Niels Basjes


On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen  wrote:

> Hi Niels!
>
> Nice use case that you have!
> I think you can solve this super nicely with Flink, such that "replay" and
> "realtime" are literally the same program - they differ only in whether
>
> Event time is, like you said, the key thing for "replay". Event time
> depends on the progress in the timestamps of the data, so it can progress
> at different speeds, depending on what the rate of your stream is.
> With the appropriate data source, it will progress very fast in "replay
> mode", so that you replay in "fast forward speed", and it progresses at the
> same speed as processing time when you attach to the end of the Kafka queue.
>
> When you define the time intervals in your program to react to event time
> progress, then you will compute the right sessionization in both replay and
> real time settings.
>
> I am writing a little example code to share. The type of ID-assignment
> sessions you want to do need an undocumented API right now, so I'll prepare
> something there for you...
>
> Greetings,
> Stephan
>
>
>
> On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes  wrote:
>
>> Hi,
>>
>> The sessionid is present in the measurements. It can also be seen as a
>> form of 'browser id'.
>> Most websites use either a 'long lived random value in a cookie' or a
>> 'application session id' for this.
>>
>> So with the id of the browser in hand I have the need to group all events
>> into "periods of activity" which I call a visit.
>> Such a visit is a bounded subset of all events from a single browser.
>>
>> What I need is to add a (sort of) random visit id to the events that
>> becomes 'inactive' after more than X minutes of inactivity.
>> I then want to add this visitid to each event and
>> 1) stream them out in realtime
>> 2) Wait till the visit ends and store the complete visit on disk (I am
>> going for either AVRO or Parquet).
>>
>> I want to create diskfiles with all visits that ended in a specific time
>> period. So essentially
>> "Group by round(, 15 minutes)"
>>
>>
>> Because of the need to be able to 'repair' things I came with the
>> following question:
>> In the Flink API I see the 'process time' (i.e. the actual time of the
>> server) and the 'event time' (i.e. the time when and event was recorded).
>>
>> Now in my case all events are in Kafka (for say 2 weeks).
>> When something goes wrong I want to be able to 'reprocess' everything
>> from the start of the queue.
>> Here the matter of 'event time' becomes a big question for me; In those
>> 'replay' situations the event time will progress at a much higher speed
>> than the normal 1sec/sec.
>>
>> How does this work in Apache Flink?
>>
>>
>> Niels Basjes
>>
>>
>> On Fri, Nov 27, 2015 at 3:28 PM, Stephan Ewen  wrote:
>>
>>> Hey Niels!
>>>
>>> You may be able to implement this in windows anyways, depending on your
>>> setup. You can definitely implement state with timeout yourself (using the
>>> more low-level state interface), or you may be able to use custom windows
>>> for that (they can trigger on every element and return elements
>>> immediately, thereby giving you low latency).
>>>
>>> Can you tell me where exactly the session ID comes from? Is that
>>> something that the function with state generates itself?
>>> Depending on that answer, I can outline either the window, or the custom
>>> state way...
>>>
>>> Greetings,
>>> Stephan
>>>

Data Stream union error after upgrading from 0.9 to 0.10.1

2015-12-01 Thread Welly Tambunan
Hi All,

After upgrading our system to the latest version from 0.9 to 0.10.1 we have
this following error.

Exception in thread "main" java.lang.UnsupportedOperationException: A
DataStream cannot be unioned with itself

Then i find the relevant JIRA for this one.
https://issues.apache.org/jira/browse/FLINK-3080

Is there any plan which release this will be ?


Another issue i have after upgrading is can't union with different level of
parallelism.

I think we will need to fall back to 0.9 again for the time being.

Cheers

-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Data Stream union error after upgrading from 0.9 to 0.10.1

2015-12-01 Thread Welly Tambunan
Thanks a lot Aljoscha.

When it will be released ?

Cheers

On Tue, Dec 1, 2015 at 5:48 PM, Aljoscha Krettek 
wrote:

> Hi,
> I relaxed the restrictions on union. This should make it into an upcoming
> 0.10.2 bugfix release.
>
> Cheers,
> Aljoscha
> > On 01 Dec 2015, at 11:23, Welly Tambunan  wrote:
> >
> > Hi All,
> >
> > After upgrading our system to the latest version from 0.9 to 0.10.1 we
> have this following error.
> >
> > Exception in thread "main" java.lang.UnsupportedOperationException: A
> DataStream cannot be unioned with itself
> >
> > Then i find the relevant JIRA for this one.
> > https://issues.apache.org/jira/browse/FLINK-3080
> >
> > Is there any plan which release this will be ?
> >
> >
> > Another issue i have after upgrading is can't union with different level
> of parallelism.
> >
> > I think we will need to fall back to 0.9 again for the time being.
> >
> > Cheers
> >
> > --
> > Welly Tambunan
> > Triplelands
> >
> > http://weltam.wordpress.com
> > http://www.triplelands.com
>
>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Cleanup of OperatorStates?

2015-12-01 Thread Stephan Ewen
Hi Niels!

If you want to use the built-in windowing, you probably need two window:
  - One for ID assignment (that immediately pipes elements through)
  - One for accumulating session elements, and then piping them into files
upon session end.

You may be able to use the rolling file sink (roll by 15 minutes) to store
the files.
That is probably the simplest to implement and will serve the real time
case.


+--> (real time sink)
|
(source) --> (window session ids) --+
|
+--> (window session) --> (rolling sink)


You can put this all into one operator that accumulates the session
elements but still immediately emits the new records (the realtime path),
if you implement your own windowing/buffering in a custom function.
This is also very easy to put onto event time then, which makes it
valueable to process the history (replay). For this second case, still
prototyping some code for the event time case, give me a bit, I'll get back
at you...

Greetings,
Stephan


On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes  wrote:

> Hi Stephan,
>
> I created a first version of the Visit ID assignment like this:
>
> First I group by sessionid and I create a Window per visit.
> The custom Trigger for this window does a 'FIRE' after each element and
> sets an EventTimer on the 'next possible moment the visit can expire'.
> To avoid getting 'all events' in the visit after every 'FIRE' I'm using
> CountEvictor.of(1).
> When the visit expires I do a PURGE. So if there are more events
> afterwards for the same sessionId I get a new visit (which is exactly what
> I want).
>
> The last step I do is I want to have a 'normal' DataStream again to work
> with.
> I created this WindowFunction to map the Window stream back to  normal
> DataStream
> Essentially I do this:
>
> DataStream visitDataStream = visitWindowedStream.apply(new
> WindowToStream())
>
> // This is an identity 'apply'
> private static class WindowToStream implements WindowFunction String, GlobalWindow> {
> @Override
> public void apply(String s, GlobalWindow window, Iterable values,
> Collector out) throws Exception {
> for (T value: values) {
> out.collect(value);
> }
> }
> }
>
>
> The problem with this is that I first create the visitIds in a Window
> (great).
> Because I really need to have both the Windowed events AND the near
> realtime version I currently break down the Window to get the single events
> and after that I have to recreate the same Window again.
>
> I'm looking forward to the implementation direction you are referring to.
> I hope you have a better way of doing this.
>
> Niels Basjes
>
>
> On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen  wrote:
>
>> Hi Niels!
>>
>> Nice use case that you have!
>> I think you can solve this super nicely with Flink, such that "replay"
>> and "realtime" are literally the same program - they differ only in whether
>>
>> Event time is, like you said, the key thing for "replay". Event time
>> depends on the progress in the timestamps of the data, so it can progress
>> at different speeds, depending on what the rate of your stream is.
>> With the appropriate data source, it will progress very fast in "replay
>> mode", so that you replay in "fast forward speed", and it progresses at the
>> same speed as processing time when you attach to the end of the Kafka queue.
>>
>> When you define the time intervals in your program to react to event time
>> progress, then you will compute the right sessionization in both replay and
>> real time settings.
>>
>> I am writing a little example code to share. The type of ID-assignment
>> sessions you want to do need an undocumented API right now, so I'll prepare
>> something there for you...
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes  wrote:
>>
>>> Hi,
>>>
>>> The sessionid is present in the measurements. It can also be seen as a
>>> form of 'browser id'.
>>> Most websites use either a 'long lived random value in a cookie' or a
>>> 'application session id' for this.
>>>
>>> So with the id of the browser in hand I have the need to group all
>>> events into "periods of activity" which I call a visit.
>>> Such a visit is a bounded subset of all events from a single browser.
>>>
>>> What I need is to add a (sort of) random visit id to the events that
>>> becomes 'inactive' after more than X minutes of inactivity.
>>> I then want to add this visitid to each event and
>>> 1) stream them out in realtime
>>> 2) Wait till the visit ends and store the complete visit on disk (I am
>>> going for either AVRO or Parquet).
>>>
>>> I want to create diskfiles with all visits that ended in a specific time
>>> period. So essentially
>>> "Group by round(, 15 minutes)"
>>>
>>>
>>> Because of the need to be able to 

Re: Material on Apache flink internals

2015-12-01 Thread madhu phatak
Hi everyone,

I am fascinated with flink core engine way of streaming of operators rather
than typical map/reduce way that followed by hadoop or spark. Is any good
documentation/blog/video avalable which talks about this internal. I am ok
from a batch or streaming point of view.

It will be great if some one can share this info. Thank you for your
excellent work.

-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Re: Material on Apache flink internals

2015-12-01 Thread Fabian Hueske
Hi Madhu,

checkout the following resources:

- Apache Flink Blog: http://flink.apache.org/blog/index.html
- Data Artisans Blog: http://data-artisans.com/blog/
- Flink Forward Conference website (Talk slides & recordings):
http://flink-forward.org/?post_type=session
- Flink Meetup talk recordings:
https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA
- Slim's Flink Knowledge base:
http://sparkbigdata.com/component/tags/tag/27-flink

Best, Fabian

2015-12-01 16:23 GMT+01:00 madhu phatak :

> Hi everyone,
>
> I am fascinated with flink core engine way of streaming of operators
> rather than typical map/reduce way that followed by hadoop or spark. Is any
> good documentation/blog/video avalable which talks about this internal. I
> am ok from a batch or streaming point of view.
>
> It will be great if some one can share this info. Thank you for your
> excellent work.
>
> --
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>


Re: Cleanup of OperatorStates?

2015-12-01 Thread Niels Basjes
Thanks!
I'm going to study this code closely!

Niels

On Tue, Dec 1, 2015 at 2:50 PM, Stephan Ewen  wrote:

> Hi Niels!
>
> I have a pretty nice example for you here:
> https://github.com/StephanEwen/sessionization
>
> It keeps only one state and has the structure:
>
>
> (source) --> (window sessions) ---> (real time sink)
>   |
>   +--> (15 minute files)
>
>
> The real time sink gets the event with attached visitId immediately. The
> session operator, as a side effect, writes out the 15 minute files with
> sessions that expired in that time.
>
>
> It is not a lot of code, the two main parts are
>
>   - the program and the program skeleton:
> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/EventTimeSessionization.java
>   - the sessionizing and file writing operator:
> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/SessionizingOperator.java
>
>
> The example runs fully on event time, where the timestamps are extracted
> from the records. That makes this program very robust (no issue with
> clocks, etc).
>
> Also, here comes the amazing part: The same program should do "replay" and
> real time. The only difference is what input you give it. Since time is
> event time, it can do both.
>
>
> One note:
>   - Event Time Watermarks are the mechanism to signal progress in event
> time. It is simple here, because I assume that timestamps are ascending in
> a Kafka partition. If that is not the case, you need to implement a more
> elaborate TimestampExtractor.
>
>
> Hope you can work with this!
>
> Greetings,
> Stephan
>
>
> On Tue, Dec 1, 2015 at 1:00 PM, Stephan Ewen  wrote:
>
>> Just for clarification: The real-time results should also contain the
>> visitId, correct?
>>
>> On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen  wrote:
>>
>>> Hi Niels!
>>>
>>> If you want to use the built-in windowing, you probably need two window:
>>>   - One for ID assignment (that immediately pipes elements through)
>>>   - One for accumulating session elements, and then piping them into
>>> files upon session end.
>>>
>>> You may be able to use the rolling file sink (roll by 15 minutes) to
>>> store the files.
>>> That is probably the simplest to implement and will serve the real time
>>> case.
>>>
>>>
>>> +--> (real time sink)
>>> |
>>> (source) --> (window session ids) --+
>>> |
>>> +--> (window session) --> (rolling
>>> sink)
>>>
>>>
>>> You can put this all into one operator that accumulates the session
>>> elements but still immediately emits the new records (the realtime path),
>>> if you implement your own windowing/buffering in a custom function.
>>> This is also very easy to put onto event time then, which makes it
>>> valueable to process the history (replay). For this second case, still
>>> prototyping some code for the event time case, give me a bit, I'll get back
>>> at you...
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes  wrote:
>>>
 Hi Stephan,

 I created a first version of the Visit ID assignment like this:

 First I group by sessionid and I create a Window per visit.
 The custom Trigger for this window does a 'FIRE' after each element and
 sets an EventTimer on the 'next possible moment the visit can expire'.
 To avoid getting 'all events' in the visit after every 'FIRE' I'm using
 CountEvictor.of(1).
 When the visit expires I do a PURGE. So if there are more events
 afterwards for the same sessionId I get a new visit (which is exactly what
 I want).

 The last step I do is I want to have a 'normal' DataStream again to
 work with.
 I created this WindowFunction to map the Window stream back to  normal
 DataStream
 Essentially I do this:

 DataStream visitDataStream = visitWindowedStream.apply(new
 WindowToStream())

 // This is an identity 'apply'
 private static class WindowToStream implements WindowFunction {
 @Override
 public void apply(String s, GlobalWindow window, Iterable
 values, Collector out) throws Exception {
 for (T value: values) {
 out.collect(value);
 }
 }
 }


 The problem with this is that I first create the visitIds in a Window
 (great).
 Because I really need to have both the Windowed events AND the near
 realtime version I currently break down the Window to get the single events
 and after that I have to recreate the same Window again.

 I'm looking forward to the implementation direction you are referring
 to. I hope you have a 

NPE with Flink Streaming from Kafka

2015-12-01 Thread Vieru, Mihail
Hi,

we get the following NullPointerException after ~50 minutes when running a
streaming job with windowing and state that reads data from Kafka and
writes the result to local FS.
There are around 170 million messages to be processed, Flink 0.10.1 stops
at ~8 million.
Flink runs locally, started with the "start-cluster-streaming.sh" script.

12/01/2015 15:06:24Job execution switched to status RUNNING.
12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched to
SCHEDULED
12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched to
DEPLOYING
12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
SCHEDULED
12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
DEPLOYING
12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched to
RUNNING
12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
RUNNING
12/01/2015 15:56:08Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
CANCELED
12/01/2015 15:56:08Source: Custom Source -> Map -> Map(1/1) switched to
FAILED
java.lang.Exception
at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)


Any ideas on what could cause this behaviour?

Best,
Mihail


question about DataStream serialization

2015-12-01 Thread Radu Tudoran
Hello,

I have the following scenario


· Reading a fixed set
DataStream fixedset = env.readtextFile(...

· Reading a continuous stream of data
DataStream stream = 

I would need that for each event read from the continuous stream to make some 
operations onit and on the fixedsettoghether


I have tried something like

List<>

Final myObject.referenceStaticSet = fixedset;
stream.map(new MapFunction() {
 @Override
 public String map(String arg0) throws Exception {

   //for example:   final string2add = arg0;
//the goal of 
below function would be to add the string2add to the fixedset
   myObject.referenceStaticSet = 
myObject.referenceStaticSet.flatMap(new FlatMapFunction() {

  @Override
  public void flatMap(String arg0, 
Collector arg1)

//for example adding to the fixed set also the string2add object:   
arg1.collect(string2add);

}
...
}

However,  I get an exception (Exception in thread "main" 
org.apache.flink.api.common.InvalidProgramException: ) that object is not 
serializable (Object MyClass$3@a71081 not serializable )

Looking into this I see that the DataStream<> is not serializable

What would be the solution to this issue?

As I said I would like that for each event from the continuous stream to use 
the initial fixed set, add the event to it and apply an operation.
Stephan was mentioning at some point some possibility to create a DataSet and 
launch a batch processing while operating in stream mode- in case this is 
possible, can you give me a reference for it, because it might be the good 
solution to  use in case I could treat the fixed set as a DataSet and just 
add/remove the incoming event to it and apply an operation

Regards,



Dr. Radu Tudoran
Research Engineer
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



Re: Cleanup of OperatorStates?

2015-12-01 Thread Stephan Ewen
Just for clarification: The real-time results should also contain the
visitId, correct?

On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen  wrote:

> Hi Niels!
>
> If you want to use the built-in windowing, you probably need two window:
>   - One for ID assignment (that immediately pipes elements through)
>   - One for accumulating session elements, and then piping them into files
> upon session end.
>
> You may be able to use the rolling file sink (roll by 15 minutes) to store
> the files.
> That is probably the simplest to implement and will serve the real time
> case.
>
>
> +--> (real time sink)
> |
> (source) --> (window session ids) --+
> |
> +--> (window session) --> (rolling
> sink)
>
>
> You can put this all into one operator that accumulates the session
> elements but still immediately emits the new records (the realtime path),
> if you implement your own windowing/buffering in a custom function.
> This is also very easy to put onto event time then, which makes it
> valueable to process the history (replay). For this second case, still
> prototyping some code for the event time case, give me a bit, I'll get back
> at you...
>
> Greetings,
> Stephan
>
>
> On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes  wrote:
>
>> Hi Stephan,
>>
>> I created a first version of the Visit ID assignment like this:
>>
>> First I group by sessionid and I create a Window per visit.
>> The custom Trigger for this window does a 'FIRE' after each element and
>> sets an EventTimer on the 'next possible moment the visit can expire'.
>> To avoid getting 'all events' in the visit after every 'FIRE' I'm using
>> CountEvictor.of(1).
>> When the visit expires I do a PURGE. So if there are more events
>> afterwards for the same sessionId I get a new visit (which is exactly what
>> I want).
>>
>> The last step I do is I want to have a 'normal' DataStream again to work
>> with.
>> I created this WindowFunction to map the Window stream back to  normal
>> DataStream
>> Essentially I do this:
>>
>> DataStream visitDataStream = visitWindowedStream.apply(new
>> WindowToStream())
>>
>> // This is an identity 'apply'
>> private static class WindowToStream implements WindowFunction> String, GlobalWindow> {
>> @Override
>> public void apply(String s, GlobalWindow window, Iterable values,
>> Collector out) throws Exception {
>> for (T value: values) {
>> out.collect(value);
>> }
>> }
>> }
>>
>>
>> The problem with this is that I first create the visitIds in a Window
>> (great).
>> Because I really need to have both the Windowed events AND the near
>> realtime version I currently break down the Window to get the single events
>> and after that I have to recreate the same Window again.
>>
>> I'm looking forward to the implementation direction you are referring to.
>> I hope you have a better way of doing this.
>>
>> Niels Basjes
>>
>>
>> On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen  wrote:
>>
>>> Hi Niels!
>>>
>>> Nice use case that you have!
>>> I think you can solve this super nicely with Flink, such that "replay"
>>> and "realtime" are literally the same program - they differ only in whether
>>>
>>> Event time is, like you said, the key thing for "replay". Event time
>>> depends on the progress in the timestamps of the data, so it can progress
>>> at different speeds, depending on what the rate of your stream is.
>>> With the appropriate data source, it will progress very fast in "replay
>>> mode", so that you replay in "fast forward speed", and it progresses at the
>>> same speed as processing time when you attach to the end of the Kafka queue.
>>>
>>> When you define the time intervals in your program to react to event
>>> time progress, then you will compute the right sessionization in both
>>> replay and real time settings.
>>>
>>> I am writing a little example code to share. The type of ID-assignment
>>> sessions you want to do need an undocumented API right now, so I'll prepare
>>> something there for you...
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes  wrote:
>>>
 Hi,

 The sessionid is present in the measurements. It can also be seen as a
 form of 'browser id'.
 Most websites use either a 'long lived random value in a cookie' or a
 'application session id' for this.

 So with the id of the browser in hand I have the need to group all
 events into "periods of activity" which I call a visit.
 Such a visit is a bounded subset of all events from a single browser.

 What I need is to add a (sort of) random visit id to the events that
 becomes 'inactive' after more than X minutes of inactivity.
 I then want to add this visitid to each event and
 1) stream them out in realtime
 

Re: Cleanup of OperatorStates?

2015-12-01 Thread Stephan Ewen
Hi Niels!

I have a pretty nice example for you here:
https://github.com/StephanEwen/sessionization

It keeps only one state and has the structure:


(source) --> (window sessions) ---> (real time sink)
  |
  +--> (15 minute files)


The real time sink gets the event with attached visitId immediately. The
session operator, as a side effect, writes out the 15 minute files with
sessions that expired in that time.


It is not a lot of code, the two main parts are

  - the program and the program skeleton:
https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/EventTimeSessionization.java
  - the sessionizing and file writing operator:
https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/SessionizingOperator.java


The example runs fully on event time, where the timestamps are extracted
from the records. That makes this program very robust (no issue with
clocks, etc).

Also, here comes the amazing part: The same program should do "replay" and
real time. The only difference is what input you give it. Since time is
event time, it can do both.


One note:
  - Event Time Watermarks are the mechanism to signal progress in event
time. It is simple here, because I assume that timestamps are ascending in
a Kafka partition. If that is not the case, you need to implement a more
elaborate TimestampExtractor.


Hope you can work with this!

Greetings,
Stephan


On Tue, Dec 1, 2015 at 1:00 PM, Stephan Ewen  wrote:

> Just for clarification: The real-time results should also contain the
> visitId, correct?
>
> On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen  wrote:
>
>> Hi Niels!
>>
>> If you want to use the built-in windowing, you probably need two window:
>>   - One for ID assignment (that immediately pipes elements through)
>>   - One for accumulating session elements, and then piping them into
>> files upon session end.
>>
>> You may be able to use the rolling file sink (roll by 15 minutes) to
>> store the files.
>> That is probably the simplest to implement and will serve the real time
>> case.
>>
>>
>> +--> (real time sink)
>> |
>> (source) --> (window session ids) --+
>> |
>> +--> (window session) --> (rolling
>> sink)
>>
>>
>> You can put this all into one operator that accumulates the session
>> elements but still immediately emits the new records (the realtime path),
>> if you implement your own windowing/buffering in a custom function.
>> This is also very easy to put onto event time then, which makes it
>> valueable to process the history (replay). For this second case, still
>> prototyping some code for the event time case, give me a bit, I'll get back
>> at you...
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes  wrote:
>>
>>> Hi Stephan,
>>>
>>> I created a first version of the Visit ID assignment like this:
>>>
>>> First I group by sessionid and I create a Window per visit.
>>> The custom Trigger for this window does a 'FIRE' after each element and
>>> sets an EventTimer on the 'next possible moment the visit can expire'.
>>> To avoid getting 'all events' in the visit after every 'FIRE' I'm using
>>> CountEvictor.of(1).
>>> When the visit expires I do a PURGE. So if there are more events
>>> afterwards for the same sessionId I get a new visit (which is exactly what
>>> I want).
>>>
>>> The last step I do is I want to have a 'normal' DataStream again to work
>>> with.
>>> I created this WindowFunction to map the Window stream back to  normal
>>> DataStream
>>> Essentially I do this:
>>>
>>> DataStream visitDataStream = visitWindowedStream.apply(new
>>> WindowToStream())
>>>
>>> // This is an identity 'apply'
>>> private static class WindowToStream implements WindowFunction>> String, GlobalWindow> {
>>> @Override
>>> public void apply(String s, GlobalWindow window, Iterable values,
>>> Collector out) throws Exception {
>>> for (T value: values) {
>>> out.collect(value);
>>> }
>>> }
>>> }
>>>
>>>
>>> The problem with this is that I first create the visitIds in a Window
>>> (great).
>>> Because I really need to have both the Windowed events AND the near
>>> realtime version I currently break down the Window to get the single events
>>> and after that I have to recreate the same Window again.
>>>
>>> I'm looking forward to the implementation direction you are referring
>>> to. I hope you have a better way of doing this.
>>>
>>> Niels Basjes
>>>
>>>
>>> On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen  wrote:
>>>
 Hi Niels!

 Nice use case that you have!
 I think you can solve this super nicely with Flink, such that "replay"
 and "realtime" are 

Re: NPE with Flink Streaming from Kafka

2015-12-01 Thread Robert Metzger
Hi Mihail,

the issue is actually a bug in Kafka. We have a JIRA in Flink for this as
well: https://issues.apache.org/jira/browse/FLINK-3067

Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain a
fix.

Since the kafka connector is not contained in the flink binary, you can
just set the version in your maven pom file to 0.10-SNAPSHOT. Maven will
then download the code planned for the 0.10-SNAPSHOT release.

On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail 
wrote:

> Hi,
>
> we get the following NullPointerException after ~50 minutes when running a
> streaming job with windowing and state that reads data from Kafka and
> writes the result to local FS.
> There are around 170 million messages to be processed, Flink 0.10.1 stops
> at ~8 million.
> Flink runs locally, started with the "start-cluster-streaming.sh" script.
>
> 12/01/2015 15:06:24Job execution switched to status RUNNING.
> 12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched
> to SCHEDULED
> 12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched
> to DEPLOYING
> 12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> SCHEDULED
> 12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> DEPLOYING
> 12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched
> to RUNNING
> 12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> RUNNING
> 12/01/2015 15:56:08Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> CANCELED
> 12/01/2015 15:56:08Source: Custom Source -> Map -> Map(1/1) switched
> to FAILED
> java.lang.Exception
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
> at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
> at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>
>
> Any ideas on what could cause this behaviour?
>
> Best,
> Mihail
>


Re: NPE with Flink Streaming from Kafka

2015-12-01 Thread Robert Metzger
Hi Gyula,

no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
"release-0.10" branch to Apache's maven snapshot repository.


I don't think Mihail's code will run when he's compiling it against
1.0-SNAPSHOT.


On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra  wrote:

> Hi,
>
> I think Robert meant to write setting the connector dependency to
> 1.0-SNAPSHOT.
>
> Cheers,
> Gyula
>
> Robert Metzger  ezt írta (időpont: 2015. dec. 1., K,
> 17:10):
>
>> Hi Mihail,
>>
>> the issue is actually a bug in Kafka. We have a JIRA in Flink for this as
>> well: https://issues.apache.org/jira/browse/FLINK-3067
>>
>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain a
>> fix.
>>
>> Since the kafka connector is not contained in the flink binary, you can
>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven will
>> then download the code planned for the 0.10-SNAPSHOT release.
>>
>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail 
>> wrote:
>>
>>> Hi,
>>>
>>> we get the following NullPointerException after ~50 minutes when running
>>> a streaming job with windowing and state that reads data from Kafka and
>>> writes the result to local FS.
>>> There are around 170 million messages to be processed, Flink 0.10.1
>>> stops at ~8 million.
>>> Flink runs locally, started with the "start-cluster-streaming.sh" script.
>>>
>>> 12/01/2015 15:06:24Job execution switched to status RUNNING.
>>> 12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched
>>> to SCHEDULED
>>> 12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched
>>> to DEPLOYING
>>> 12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>> SCHEDULED
>>> 12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>> DEPLOYING
>>> 12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched
>>> to RUNNING
>>> 12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>> RUNNING
>>> 12/01/2015 15:56:08Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>> CANCELED
>>> 12/01/2015 15:56:08Source: Custom Source -> Map -> Map(1/1) switched
>>> to FAILED
>>> java.lang.Exception
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.NullPointerException
>>> at
>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>> at
>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>> at
>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>>>
>>>
>>> Any ideas on what could cause this behaviour?
>>>
>>> Best,
>>> Mihail
>>>
>>
>>


Re: Cleanup of OperatorStates?

2015-12-01 Thread Niels Basjes
Hi,

The first thing I noticed is that the Session object maintains a list of
all events in memory.
Your events are really small yet in my scenario the predicted number of
events per session will be above 1000 and each is expected to be in the
512-1024 bytes range.
This worried me yet I decided to give your code a run.

After a while running it in my IDE (not on cluster) I got this:

17:18:46,336 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 269 @ 1448986726336
17:18:46,587 INFO  org.apache.flink.runtime.taskmanager.Task
  - sessionization -> Sink: Unnamed (4/4) switched to FAILED with
exception.
java.lang.RuntimeException: Error triggering a checkpoint as the result of
receiving checkpoint barrier
at
org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:577)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:570)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:201)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:127)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:173)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Size of the state is larger than the
maximum permitted memory-backed state. Size=5246277 , maxSize=5242880 .
Consider using a different state backend, like the File System State
backend.
at
org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:130)
at
org.apache.flink.runtime.state.memory.MemoryStateBackend.checkpointStateSerializable(MemoryStateBackend.java:108)
at
com.dataartisans.streaming.sessionization.SessionizingOperator.snapshotOperatorState(SessionizingOperator.java:162)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:440)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$1.onEvent(StreamTask.java:574)
... 8 more


Niels



On Tue, Dec 1, 2015 at 4:41 PM, Niels Basjes  wrote:

> Thanks!
> I'm going to study this code closely!
>
> Niels
>
> On Tue, Dec 1, 2015 at 2:50 PM, Stephan Ewen  wrote:
>
>> Hi Niels!
>>
>> I have a pretty nice example for you here:
>> https://github.com/StephanEwen/sessionization
>>
>> It keeps only one state and has the structure:
>>
>>
>> (source) --> (window sessions) ---> (real time sink)
>>   |
>>   +--> (15 minute files)
>>
>>
>> The real time sink gets the event with attached visitId immediately. The
>> session operator, as a side effect, writes out the 15 minute files with
>> sessions that expired in that time.
>>
>>
>> It is not a lot of code, the two main parts are
>>
>>   - the program and the program skeleton:
>> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/EventTimeSessionization.java
>>   - the sessionizing and file writing operator:
>> https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/SessionizingOperator.java
>>
>>
>> The example runs fully on event time, where the timestamps are extracted
>> from the records. That makes this program very robust (no issue with
>> clocks, etc).
>>
>> Also, here comes the amazing part: The same program should do "replay"
>> and real time. The only difference is what input you give it. Since time is
>> event time, it can do both.
>>
>>
>> One note:
>>   - Event Time Watermarks are the mechanism to signal progress in event
>> time. It is simple here, because I assume that timestamps are ascending in
>> a Kafka partition. If that is not the case, you need to implement a more
>> elaborate TimestampExtractor.
>>
>>
>> Hope you can work with this!
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, Dec 1, 2015 at 1:00 PM, Stephan Ewen  wrote:
>>
>>> Just for clarification: The real-time results should also contain the
>>> visitId, correct?
>>>
>>> On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen  wrote:
>>>
 Hi Niels!

 If you want to use the built-in windowing, you probably need two window:
   - One for ID assignment (that immediately pipes elements through)
   - One for accumulating session elements, and then piping them into
 files upon session end.

 You may be able to use the rolling file sink (roll by 15 minutes) to
 store the files.
 That is probably the simplest to implement and will serve the real time
 case.


 +--> (real time sink)
 |
 (source) --> (window session 

Re: NPE with Flink Streaming from Kafka

2015-12-01 Thread Maximilian Michels
I know this has been fixed already but, out of curiosity, could you
point me to the Kafka JIRA issue for this
bug? From the Flink issue it looks like this is a Zookeeper version mismatch.

On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger  wrote:
> Hi Gyula,
>
> no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
> "release-0.10" branch to Apache's maven snapshot repository.
>
>
> I don't think Mihail's code will run when he's compiling it against
> 1.0-SNAPSHOT.
>
>
> On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra  wrote:
>>
>> Hi,
>>
>> I think Robert meant to write setting the connector dependency to
>> 1.0-SNAPSHOT.
>>
>> Cheers,
>> Gyula
>>
>> Robert Metzger  ezt írta (időpont: 2015. dec. 1., K,
>> 17:10):
>>>
>>> Hi Mihail,
>>>
>>> the issue is actually a bug in Kafka. We have a JIRA in Flink for this as
>>> well: https://issues.apache.org/jira/browse/FLINK-3067
>>>
>>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain a
>>> fix.
>>>
>>> Since the kafka connector is not contained in the flink binary, you can
>>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven will
>>> then download the code planned for the 0.10-SNAPSHOT release.
>>>
>>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail 
>>> wrote:

 Hi,

 we get the following NullPointerException after ~50 minutes when running
 a streaming job with windowing and state that reads data from Kafka and
 writes the result to local FS.
 There are around 170 million messages to be processed, Flink 0.10.1
 stops at ~8 million.
 Flink runs locally, started with the "start-cluster-streaming.sh"
 script.

 12/01/2015 15:06:24Job execution switched to status RUNNING.
 12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched
 to SCHEDULED
 12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched
 to DEPLOYING
 12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
 main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
 SCHEDULED
 12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
 main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
 DEPLOYING
 12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1) switched
 to RUNNING
 12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
 main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
 RUNNING
 12/01/2015 15:56:08Fast TumblingTimeWindows(5000) of Reduce at
 main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
 CANCELED
 12/01/2015 15:56:08Source: Custom Source -> Map -> Map(1/1) switched
 to FAILED
 java.lang.Exception
 at
 org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
 at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
 at
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
 at
 org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.NullPointerException
 at
 org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
 at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
 at
 org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
 at
 org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
 at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
 at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
 at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
 at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
 at
 org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
 at
 org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
 at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)


 Any ideas on what could cause this behaviour?

 Best,
 Mihail
>>>
>>>
>


Re: NPE with Flink Streaming from Kafka

2015-12-01 Thread Robert Metzger
I think its this one https://issues.apache.org/jira/browse/KAFKA-824

On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels  wrote:

> I know this has been fixed already but, out of curiosity, could you
> point me to the Kafka JIRA issue for this
> bug? From the Flink issue it looks like this is a Zookeeper version
> mismatch.
>
> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger 
> wrote:
> > Hi Gyula,
> >
> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
> > "release-0.10" branch to Apache's maven snapshot repository.
> >
> >
> > I don't think Mihail's code will run when he's compiling it against
> > 1.0-SNAPSHOT.
> >
> >
> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra  wrote:
> >>
> >> Hi,
> >>
> >> I think Robert meant to write setting the connector dependency to
> >> 1.0-SNAPSHOT.
> >>
> >> Cheers,
> >> Gyula
> >>
> >> Robert Metzger  ezt írta (időpont: 2015. dec. 1.,
> K,
> >> 17:10):
> >>>
> >>> Hi Mihail,
> >>>
> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for this
> as
> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
> >>>
> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain
> a
> >>> fix.
> >>>
> >>> Since the kafka connector is not contained in the flink binary, you can
> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven
> will
> >>> then download the code planned for the 0.10-SNAPSHOT release.
> >>>
> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail  >
> >>> wrote:
> 
>  Hi,
> 
>  we get the following NullPointerException after ~50 minutes when
> running
>  a streaming job with windowing and state that reads data from Kafka
> and
>  writes the result to local FS.
>  There are around 170 million messages to be processed, Flink 0.10.1
>  stops at ~8 million.
>  Flink runs locally, started with the "start-cluster-streaming.sh"
>  script.
> 
>  12/01/2015 15:06:24Job execution switched to status RUNNING.
>  12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1)
> switched
>  to SCHEDULED
>  12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1)
> switched
>  to DEPLOYING
>  12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
>  main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>  SCHEDULED
>  12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
>  main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>  DEPLOYING
>  12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1)
> switched
>  to RUNNING
>  12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
>  main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>  RUNNING
>  12/01/2015 15:56:08Fast TumblingTimeWindows(5000) of Reduce at
>  main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>  CANCELED
>  12/01/2015 15:56:08Source: Custom Source -> Map -> Map(1/1)
> switched
>  to FAILED
>  java.lang.Exception
>  at
> 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>  at
> 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>  at
> 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>  at java.lang.Thread.run(Thread.java:745)
>  Caused by: java.lang.NullPointerException
>  at
> 
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>  at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>  at
>  org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>  at
>  org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>  at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>  at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>  at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>  at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>  at
> 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>  at
> 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>  at
> 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
> 

Re: NPE with Flink Streaming from Kafka

2015-12-01 Thread Maximilian Michels
Thanks! I've linked the issue in JIRA.

On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger  wrote:
> I think its this one https://issues.apache.org/jira/browse/KAFKA-824
>
> On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels  wrote:
>>
>> I know this has been fixed already but, out of curiosity, could you
>> point me to the Kafka JIRA issue for this
>> bug? From the Flink issue it looks like this is a Zookeeper version
>> mismatch.
>>
>> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger 
>> wrote:
>> > Hi Gyula,
>> >
>> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
>> > "release-0.10" branch to Apache's maven snapshot repository.
>> >
>> >
>> > I don't think Mihail's code will run when he's compiling it against
>> > 1.0-SNAPSHOT.
>> >
>> >
>> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra  wrote:
>> >>
>> >> Hi,
>> >>
>> >> I think Robert meant to write setting the connector dependency to
>> >> 1.0-SNAPSHOT.
>> >>
>> >> Cheers,
>> >> Gyula
>> >>
>> >> Robert Metzger  ezt írta (időpont: 2015. dec. 1.,
>> >> K,
>> >> 17:10):
>> >>>
>> >>> Hi Mihail,
>> >>>
>> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for this
>> >>> as
>> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
>> >>>
>> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain
>> >>> a
>> >>> fix.
>> >>>
>> >>> Since the kafka connector is not contained in the flink binary, you
>> >>> can
>> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven
>> >>> will
>> >>> then download the code planned for the 0.10-SNAPSHOT release.
>> >>>
>> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
>> >>> 
>> >>> wrote:
>> 
>>  Hi,
>> 
>>  we get the following NullPointerException after ~50 minutes when
>>  running
>>  a streaming job with windowing and state that reads data from Kafka
>>  and
>>  writes the result to local FS.
>>  There are around 170 million messages to be processed, Flink 0.10.1
>>  stops at ~8 million.
>>  Flink runs locally, started with the "start-cluster-streaming.sh"
>>  script.
>> 
>>  12/01/2015 15:06:24Job execution switched to status RUNNING.
>>  12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1)
>>  switched
>>  to SCHEDULED
>>  12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1)
>>  switched
>>  to DEPLOYING
>>  12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
>>  main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>  SCHEDULED
>>  12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
>>  main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>  DEPLOYING
>>  12/01/2015 15:06:24Source: Custom Source -> Map -> Map(1/1)
>>  switched
>>  to RUNNING
>>  12/01/2015 15:06:24Fast TumblingTimeWindows(5000) of Reduce at
>>  main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>  RUNNING
>>  12/01/2015 15:56:08Fast TumblingTimeWindows(5000) of Reduce at
>>  main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>  CANCELED
>>  12/01/2015 15:56:08Source: Custom Source -> Map -> Map(1/1)
>>  switched
>>  to FAILED
>>  java.lang.Exception
>>  at
>> 
>>  org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>  at
>> 
>>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>  at
>> 
>>  org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>  at
>> 
>>  org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>  at
>> 
>>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>  at java.lang.Thread.run(Thread.java:745)
>>  Caused by: java.lang.NullPointerException
>>  at
>> 
>>  org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>  at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>  at
>>  org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>  at
>>  org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>  at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>  at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>  at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>  at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>  at
>> 
>>