Re: [VOTE] Release Apache Flink 1.2.1 (RC1)

2017-03-30 Thread Aljoscha Krettek
https://issues.apache.org/jira/browse/FLINK-6188 turns out to be a bit
more involved, see my comments on the PR:
https://github.com/apache/flink/pull/3616.

As I said there, maybe we should revert the commits regarding
parallelism/max-parallelism changes and release and then fix it later.

On Wed, Mar 29, 2017, at 23:08, Aljoscha Krettek wrote:
> I commented on FLINK-6214: I think it's working as intended, although we
> could fix the javadoc/doc.
> 
> On Wed, Mar 29, 2017, at 17:35, Timo Walther wrote:
> > A user reported that all tumbling and slinding window assigners contain 
> > a pretty obvious bug about offsets.
> > 
> > https://issues.apache.org/jira/browse/FLINK-6214
> > 
> > I think we should also fix this for 1.2.1. What do you think?
> > 
> > Regards,
> > Timo
> > 
> > 
> > Am 29/03/17 um 11:30 schrieb Robert Metzger:
> > > Hi Haohui,
> > > I agree that we should fix the parallelism issue. Otherwise, the 1.2.1
> > > release would introduce a new bug.
> > >
> > > On Tue, Mar 28, 2017 at 11:59 PM, Haohui Mai <ricet...@gmail.com> wrote:
> > >
> > >> -1 (non-binding)
> > >>
> > >> We recently found out that all jobs submitted via UI will have a
> > >> parallelism of 1, potentially due to FLINK-5808.
> > >>
> > >> Filed FLINK-6209 to track it.
> > >>
> > >> ~Haohui
> > >>
> > >> On Mon, Mar 27, 2017 at 2:59 AM Chesnay Schepler <ches...@apache.org>
> > >> wrote:
> > >>
> > >>> If possible I would like to include FLINK-6183 & FLINK-6184 as well.
> > >>>
> > >>> They fix 2 metric-related issues that could arise when a Task is
> > >>> cancelled very early. (like, right away)
> > >>>
> > >>> FLINK-6183 fixes a memory leak where the TaskMetricGroup was never 
> > >>> closed
> > >>> FLINK-6184 fixes a NullPointerExceptions in the buffer metrics
> > >>>
> > >>> PR here: https://github.com/apache/flink/pull/3611
> > >>>
> > >>> On 26.03.2017 12:35, Aljoscha Krettek wrote:
> > >>>> I opened a PR for FLINK-6188: https://github.com/apache/
> > >> flink/pull/3616
> > >>> <https://github.com/apache/flink/pull/3616>
> > >>>> This improves the previously very sparse test coverage for
> > >>> timestamp/watermark assigners and fixes the bug.
> > >>>>> On 25 Mar 2017, at 10:22, Ufuk Celebi <u...@apache.org> wrote:
> > >>>>>
> > >>>>> I agree with Aljoscha.
> > >>>>>
> > >>>>> -1 because of FLINK-6188
> > >>>>>
> > >>>>>
> > >>>>> On Sat, Mar 25, 2017 at 9:38 AM, Aljoscha Krettek <
> > >> aljos...@apache.org>
> > >>> wrote:
> > >>>>>> I filed this issue, which was observed by a user:
> > >>> https://issues.apache.org/jira/browse/FLINK-6188
> > >>>>>> I think that’s blocking for 1.2.1.
> > >>>>>>
> > >>>>>>> On 24 Mar 2017, at 18:57, Ufuk Celebi <u...@apache.org> wrote:
> > >>>>>>>
> > >>>>>>> RC1 doesn't contain Stefan's backport for the Asynchronous snapshots
> > >>>>>>> for heap-based keyed state that has been merged. Should we create
> > >> RC2
> > >>>>>>> with that fix since the voting period only starts on Monday? I think
> > >>>>>>> it would only mean rerunning the scripts on your side, right?
> > >>>>>>>
> > >>>>>>> – Ufuk
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Fri, Mar 24, 2017 at 3:05 PM, Robert Metzger <
> > >> rmetz...@apache.org>
> > >>> wrote:
> > >>>>>>>> Dear Flink community,
> > >>>>>>>>
> > >>>>>>>> Please vote on releasing the following candidate as Apache Flink
> > >>> version 1.2
> > >>>>>>>> .1.
> > >>>>>>>>
> > >>>>>>>> The commit to be voted on:
> > >>>>>>>> *732e55bd* (*
> > >>> http://git-wip-us.apache.org/repos/asf/flink/commit/732e55bd
> > >>>>>>>> <http://git-wip-us.apache.org/repos/asf/flink/commit/732e55bd>*)
> > >>>>>>>>
> > >>>>>>>> Branch:
> > >>>>>>>> release-1.2.1-rc1
> > >>>>>>>>
> > >>>>>>>> The release artifacts to be voted on can be found at:
> > >>>>>>>> *http://people.apache.org/~rmetzger/flink-1.2.1-rc1/
> > >>>>>>>> <http://people.apache.org/~rmetzger/flink-1.2.1-rc1/>*
> > >>>>>>>>
> > >>>>>>>> The release artifacts are signed with the key with fingerprint
> > >>> D9839159:
> > >>>>>>>> http://www.apache.org/dist/flink/KEYS
> > >>>>>>>>
> > >>>>>>>> The staging repository for this release can be found at:
> > >>>>>>>>
> > >>> https://repository.apache.org/content/repositories/orgapacheflink-1116
> > >>>>>>>> -
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> The vote ends on Wednesday, March 29, 2017, 3pm CET.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> [ ] +1 Release this package as Apache Flink 1.2.1
> > >>>>>>>> [ ] -1 Do not release this package, because ...
> > >>>
> > 


Re: [VOTE] Release Apache Flink 1.2.1 (RC1)

2017-03-29 Thread Aljoscha Krettek
I commented on FLINK-6214: I think it's working as intended, although we
could fix the javadoc/doc.

On Wed, Mar 29, 2017, at 17:35, Timo Walther wrote:
> A user reported that all tumbling and slinding window assigners contain 
> a pretty obvious bug about offsets.
> 
> https://issues.apache.org/jira/browse/FLINK-6214
> 
> I think we should also fix this for 1.2.1. What do you think?
> 
> Regards,
> Timo
> 
> 
> Am 29/03/17 um 11:30 schrieb Robert Metzger:
> > Hi Haohui,
> > I agree that we should fix the parallelism issue. Otherwise, the 1.2.1
> > release would introduce a new bug.
> >
> > On Tue, Mar 28, 2017 at 11:59 PM, Haohui Mai <ricet...@gmail.com> wrote:
> >
> >> -1 (non-binding)
> >>
> >> We recently found out that all jobs submitted via UI will have a
> >> parallelism of 1, potentially due to FLINK-5808.
> >>
> >> Filed FLINK-6209 to track it.
> >>
> >> ~Haohui
> >>
> >> On Mon, Mar 27, 2017 at 2:59 AM Chesnay Schepler <ches...@apache.org>
> >> wrote:
> >>
> >>> If possible I would like to include FLINK-6183 & FLINK-6184 as well.
> >>>
> >>> They fix 2 metric-related issues that could arise when a Task is
> >>> cancelled very early. (like, right away)
> >>>
> >>> FLINK-6183 fixes a memory leak where the TaskMetricGroup was never closed
> >>> FLINK-6184 fixes a NullPointerExceptions in the buffer metrics
> >>>
> >>> PR here: https://github.com/apache/flink/pull/3611
> >>>
> >>> On 26.03.2017 12:35, Aljoscha Krettek wrote:
> >>>> I opened a PR for FLINK-6188: https://github.com/apache/
> >> flink/pull/3616
> >>> <https://github.com/apache/flink/pull/3616>
> >>>> This improves the previously very sparse test coverage for
> >>> timestamp/watermark assigners and fixes the bug.
> >>>>> On 25 Mar 2017, at 10:22, Ufuk Celebi <u...@apache.org> wrote:
> >>>>>
> >>>>> I agree with Aljoscha.
> >>>>>
> >>>>> -1 because of FLINK-6188
> >>>>>
> >>>>>
> >>>>> On Sat, Mar 25, 2017 at 9:38 AM, Aljoscha Krettek <
> >> aljos...@apache.org>
> >>> wrote:
> >>>>>> I filed this issue, which was observed by a user:
> >>> https://issues.apache.org/jira/browse/FLINK-6188
> >>>>>> I think that’s blocking for 1.2.1.
> >>>>>>
> >>>>>>> On 24 Mar 2017, at 18:57, Ufuk Celebi <u...@apache.org> wrote:
> >>>>>>>
> >>>>>>> RC1 doesn't contain Stefan's backport for the Asynchronous snapshots
> >>>>>>> for heap-based keyed state that has been merged. Should we create
> >> RC2
> >>>>>>> with that fix since the voting period only starts on Monday? I think
> >>>>>>> it would only mean rerunning the scripts on your side, right?
> >>>>>>>
> >>>>>>> – Ufuk
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, Mar 24, 2017 at 3:05 PM, Robert Metzger <
> >> rmetz...@apache.org>
> >>> wrote:
> >>>>>>>> Dear Flink community,
> >>>>>>>>
> >>>>>>>> Please vote on releasing the following candidate as Apache Flink
> >>> version 1.2
> >>>>>>>> .1.
> >>>>>>>>
> >>>>>>>> The commit to be voted on:
> >>>>>>>> *732e55bd* (*
> >>> http://git-wip-us.apache.org/repos/asf/flink/commit/732e55bd
> >>>>>>>> <http://git-wip-us.apache.org/repos/asf/flink/commit/732e55bd>*)
> >>>>>>>>
> >>>>>>>> Branch:
> >>>>>>>> release-1.2.1-rc1
> >>>>>>>>
> >>>>>>>> The release artifacts to be voted on can be found at:
> >>>>>>>> *http://people.apache.org/~rmetzger/flink-1.2.1-rc1/
> >>>>>>>> <http://people.apache.org/~rmetzger/flink-1.2.1-rc1/>*
> >>>>>>>>
> >>>>>>>> The release artifacts are signed with the key with fingerprint
> >>> D9839159:
> >>>>>>>> http://www.apache.org/dist/flink/KEYS
> >>>>>>>>
> >>>>>>>> The staging repository for this release can be found at:
> >>>>>>>>
> >>> https://repository.apache.org/content/repositories/orgapacheflink-1116
> >>>>>>>> -
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> The vote ends on Wednesday, March 29, 2017, 3pm CET.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> [ ] +1 Release this package as Apache Flink 1.2.1
> >>>>>>>> [ ] -1 Do not release this package, because ...
> >>>
> 


Re: [VOTE] Release Apache Flink 1.2.1 (RC1)

2017-03-26 Thread Aljoscha Krettek
I opened a PR for FLINK-6188: https://github.com/apache/flink/pull/3616 
<https://github.com/apache/flink/pull/3616>

This improves the previously very sparse test coverage for timestamp/watermark 
assigners and fixes the bug.

> On 25 Mar 2017, at 10:22, Ufuk Celebi <u...@apache.org> wrote:
> 
> I agree with Aljoscha.
> 
> -1 because of FLINK-6188
> 
> 
> On Sat, Mar 25, 2017 at 9:38 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
>> I filed this issue, which was observed by a user: 
>> https://issues.apache.org/jira/browse/FLINK-6188
>> 
>> I think that’s blocking for 1.2.1.
>> 
>>> On 24 Mar 2017, at 18:57, Ufuk Celebi <u...@apache.org> wrote:
>>> 
>>> RC1 doesn't contain Stefan's backport for the Asynchronous snapshots
>>> for heap-based keyed state that has been merged. Should we create RC2
>>> with that fix since the voting period only starts on Monday? I think
>>> it would only mean rerunning the scripts on your side, right?
>>> 
>>> – Ufuk
>>> 
>>> 
>>> On Fri, Mar 24, 2017 at 3:05 PM, Robert Metzger <rmetz...@apache.org> wrote:
>>>> Dear Flink community,
>>>> 
>>>> Please vote on releasing the following candidate as Apache Flink version 
>>>> 1.2
>>>> .1.
>>>> 
>>>> The commit to be voted on:
>>>> *732e55bd* (*http://git-wip-us.apache.org/repos/asf/flink/commit/732e55bd
>>>> <http://git-wip-us.apache.org/repos/asf/flink/commit/732e55bd>*)
>>>> 
>>>> Branch:
>>>> release-1.2.1-rc1
>>>> 
>>>> The release artifacts to be voted on can be found at:
>>>> *http://people.apache.org/~rmetzger/flink-1.2.1-rc1/
>>>> <http://people.apache.org/~rmetzger/flink-1.2.1-rc1/>*
>>>> 
>>>> The release artifacts are signed with the key with fingerprint D9839159:
>>>> http://www.apache.org/dist/flink/KEYS
>>>> 
>>>> The staging repository for this release can be found at:
>>>> https://repository.apache.org/content/repositories/orgapacheflink-1116
>>>> 
>>>> -
>>>> 
>>>> 
>>>> The vote ends on Wednesday, March 29, 2017, 3pm CET.
>>>> 
>>>> 
>>>> [ ] +1 Release this package as Apache Flink 1.2.1
>>>> [ ] -1 Do not release this package, because ...
>> 



Re: [VOTE] Release Apache Flink 1.2.1 (RC1)

2017-03-25 Thread Aljoscha Krettek
I filed this issue, which was observed by a user: 
https://issues.apache.org/jira/browse/FLINK-6188

I think that’s blocking for 1.2.1.

> On 24 Mar 2017, at 18:57, Ufuk Celebi  wrote:
> 
> RC1 doesn't contain Stefan's backport for the Asynchronous snapshots
> for heap-based keyed state that has been merged. Should we create RC2
> with that fix since the voting period only starts on Monday? I think
> it would only mean rerunning the scripts on your side, right?
> 
> – Ufuk
> 
> 
> On Fri, Mar 24, 2017 at 3:05 PM, Robert Metzger  wrote:
>> Dear Flink community,
>> 
>> Please vote on releasing the following candidate as Apache Flink version 1.2
>> .1.
>> 
>> The commit to be voted on:
>> *732e55bd* (*http://git-wip-us.apache.org/repos/asf/flink/commit/732e55bd
>> *)
>> 
>> Branch:
>> release-1.2.1-rc1
>> 
>> The release artifacts to be voted on can be found at:
>> *http://people.apache.org/~rmetzger/flink-1.2.1-rc1/
>> *
>> 
>> The release artifacts are signed with the key with fingerprint D9839159:
>> http://www.apache.org/dist/flink/KEYS
>> 
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapacheflink-1116
>> 
>> -
>> 
>> 
>> The vote ends on Wednesday, March 29, 2017, 3pm CET.
>> 
>> 
>> [ ] +1 Release this package as Apache Flink 1.2.1
>> [ ] -1 Do not release this package, because ...



[jira] [Created] (FLINK-6188) Some setParallelism() method can't cope with default parallelism

2017-03-25 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-6188:
---

 Summary: Some setParallelism() method can't cope with default 
parallelism
 Key: FLINK-6188
 URL: https://issues.apache.org/jira/browse/FLINK-6188
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.2.1
Reporter: Aljoscha Krettek
 Fix For: 1.2.1


Recent changes done for FLINK-5808 move default parallelism manifestation from 
eager to lazy, that is, the parallelism of operations that don't have an 
explicit parallelism is only set when generating the JobGraph. Some 
`setParallelism()` calls, such as `SingleOutputStreamOperator.setParallelism()` 
cannot deal with the fact that the parallelism of an operation might be {{-1}} 
(which indicates that it should take the default parallelism when generating 
the JobGraph).

We should either revert the changes that fixed another user-facing bug for 
version 1.2.1 or fix the methods.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6164) Make ProcessWindowFunction a RichFunction

2017-03-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-6164:
---

 Summary: Make ProcessWindowFunction a RichFunction
 Key: FLINK-6164
 URL: https://issues.apache.org/jira/browse/FLINK-6164
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Aljoscha Krettek
 Fix For: 1.3.0


{{ProcessWindowFunction}} is an abstract class so we can make it a 
{{RichFunction}} by default and remove {{RichProcessWindowFunction}}. This is 
in line with {{ProcessFunction}} which is also a {{RichFunction}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6163) Document per-window state in ProcessWindowFunction

2017-03-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-6163:
---

 Summary: Document per-window state in ProcessWindowFunction
 Key: FLINK-6163
 URL: https://issues.apache.org/jira/browse/FLINK-6163
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Documentation
Reporter: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.3.0


The current windowing documentation mostly describes {{WindowFunction}} and 
treats {{ProcessWindowFunction}} as an afterthought. We should reverse that and 
also document the new per-key state that is only available to 
{{ProcessWindowFunction}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] Release Apache Flink 1.1.5 (RC1)

2017-03-21 Thread Aljoscha Krettek
+1

I looked at every commit since 1.1.4. The code looks good and we didn’t 
introduce any new dependencies, so ,legally speaking, we should be fine.
> On 21 Mar 2017, at 14:40, Stephan Ewen  wrote:
> 
> +1
> 
>  - Verified the LICENSE and NOTICE (no dependency updates, update on
> RabbitMQ license)
>  - Release contains no executable binaries
>  - Build and all integration tests pass forHadoop 2.6.1 , Scala 2.11
> 
> @Robert The java8 module has only tests, so it should be fine to not have
> it published.
> 
> On Tue, Mar 21, 2017 at 9:52 AM, Robert Metzger  wrote:
> 
>> +1
>> 
>> - start-local.sh + some example jobs work (logs are fine, web interface
>> works, scripts are fine)
>> - files in the staging repo look good (quickstart is correct) (except java8
>> module is missing, but see comment below)
>> 
>> 
>> Side note: The flink-java8 module is not in the staging repository. I first
>> considered cancelling the vote, but then I realized that since Flink 0.10
>> (november 2015) we never released the module:
>> http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.
>> apache.flink%22%20AND%20a%3A%22flink-java8%22
>> 
>> 
>> On Fri, Mar 17, 2017 at 6:57 PM, Tzu-Li (Gordon) Tai 
>> wrote:
>> 
>>> Dear Flink community,
>>> 
>>> Please vote on releasing the following candidate as Apache Flink version
>>> 1.1.5.
>>> 
>>> The commit to be voted on:
>>> ed18e97 (http://git-wip-us.apache.org/repos/asf/flink/commit/ed18e97)
>>> 
>>> Branch:
>>> release-1.1.5-rc1
>>> (https://git1-us-west.apache.org/repos/asf/flink/repo?p=
>>> flink.git;a=shortlog;h=refs/heads/release-1.1.5-rc1)
>>> 
>>> The release artifacts to be voted on can be found at:
>>> http://people.apache.org/~tzulitai/flink-1.1.5-rc1/
>>> 
>>> The release artifacts are signed with the key with fingerprint B065B356:
>>> http://www.apache.org/dist/flink/KEYS
>>> 
>>> The staging repository for this release can be found at:
>>> https://repository.apache.org/content/repositories/orgapacheflink-1115
>>> 
>>> -
>>> 
>>> The voting time is three days and the vote passes if a
>>> majority of at least three +1 PMC votes are cast.
>>> 
>>> The vote ends on Wednesday, March 22nd, 2017, 6PM CET.
>>> (I’ve started the 3 day count starting from next Monday,
>>> since it’s already near the end of the week).
>>> 
>>> [ ] +1 Release this package as Apache Flink 1.1.5
>>> [ ] -1 Do not release this package, because ...
>>> 
>>> -
>>> 
>>> Short note on testing this RC:
>>> 
>>> Please have a look at the new commits since 1.1.4 and try to
>>> target and test the fixes specifically, too.
>>> That will be much appreciated and helpful.
>>> 
>>> Here’s a list of new commits since 1.1.4 -
>>> 
>>> 6662cc6 [FLINK-5701] [kafka] FlinkKafkaProducer should check
>>> asyncException on checkpoints
>>> e296aca [FLINK-6006] [kafka] Always use complete restored state in
>>> FlinkKafkaConsumer
>>> a34559d [FLINK-5940] [checkpoint] Harden ZooKeeperCompletedCheckpointSt
>> ore.recover
>>> method
>>> 97616fd [FLINK-5942] [checkpoint] Harden ZooKeeperStateHandleStore to
>>> handle corrupt data
>>> e50bf65 [FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo
>>> serialization
>>> 01703e6 [FLINK-5945] [core] Close function in OuterJoinOperatorBase#
>>> executeOnCollections
>>> ba5aa10 [FLINK-5934] Set the Scheduler in the ExecutionGraph via its
>>> constructor
>>> 44f48b3 [FLINK-5771] [core] Fix multi-char delimiter detection in
>>> DelimitedInputFormat.
>>> 3cd7c8e [FLINK-5575] [docs] Add outdated release warning
>>> 3f2860f [docs] Set version to 1.1.4
>>> e1861db [FLINK-5647] Fix RocksDB Backend Cleanup
>>> 0c99e48 [FLINK-5639] [rabbitmq connector] Fix incorrect location of
>>> README.md for backported notice
>>> 743aaf6 [FLINK-5639] [rabbitmq connector] Add a note about MPL 1.1
>> license
>>> of Maven dependency
>>> 6566b63 [FLINK-2662] [optimizer] Fix translation of broadcasted unions.
>>> f6f1c24 [FLINK-5585] [jobmanager] Fix NullPointerException in
>>> JobManager.updateAccumulators
>>> 931929b [FLINK-5484] [serialization] Add test for registered Kryo types
>>> 214c188 [FLINK-5518] [hadoopCompat] Add null check to
>>> HadoopInputFormatBase.close().
>>> 4ea52d6 [FLINK-5466] [webfrontend] Rebuild CSS/JS files
>>> 12cf5dc [FLINK-5466] [webfrontend] Set environment to production in
>>> gulpfile
>>> 
>>> 
>> 



Re: [ANNOUNCE] New committer: Theodore Vasiloudis

2017-03-21 Thread Aljoscha Krettek
Welcome (to committership)! (Since you’re already with us for such a long 
time…) :-)
> On 21 Mar 2017, at 10:08, Tzu-Li (Gordon) Tai  wrote:
> 
> Welcome Theodore! Great to have you on board :-)
> 
> 
> On March 21, 2017 at 4:35:35 PM, Robert Metzger (rmetz...@apache.org) wrote:
> 
> Hi everybody, 
> 
> On behalf of the PMC I am delighted to announce Theodore Vasiloudis as a 
> new Flink committer! 
> 
> Theo has been a community member for a very long time and he is one of the 
> main drivers of the currently ongoing ML discussions in Flink. 
> 
> 
> Welcome Theo and congratulations again for becoming a Flink committer! 
> 
> 
> Regards, 
> Robert 



[jira] [Created] (FLINK-6135) Allowing adding inputs to StreamOperator

2017-03-20 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-6135:
---

 Summary: Allowing adding inputs to StreamOperator
 Key: FLINK-6135
 URL: https://issues.apache.org/jira/browse/FLINK-6135
 Project: Flink
  Issue Type: Sub-task
  Components: DataStream API
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


As mentioned in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
 we need to be able to add additional inputs to a {{StreamOperator}.

There is a (somewhat old) design document here: 
https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit#heading=h.pqg5z6g0mjm7
 and some proof-of-concept code here: 
https://github.com/aljoscha/flink/tree/operator-ng-side-input-wrapper



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6131) Add side inputs for DataStream API

2017-03-20 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-6131:
---

 Summary: Add side inputs for DataStream API
 Key: FLINK-6131
 URL: https://issues.apache.org/jira/browse/FLINK-6131
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


This is an umbrella issue for tracking the implementation of FLIP-17: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Future of Queryable State Feature

2017-03-20 Thread Aljoscha Krettek
I think adding this could be as simple as adding a 
makeWindowContentsQueryable(String) call to WindowedStream. In there, we create 
the StateDescriptor for keeping the window contents so we can call 
setQueryable() on the StateDescriptor and set the name under which it should be 
accessible. If you want you can create an issue and change this. I can help you 
if you have any questions.

For custom Triggers, I think you can also call setQueryable() on a 
StateDescriptor.

The tricky thing when it comes to windows is that state is scoped to a window, 
so the querying logic has to take that into account.

I also cc’ed Ufuk and Nico, who probably know best what’s going on with 
queryable state.
> On 16 Mar 2017, at 15:33, Joe Olson  wrote:
> 
> I have a question regarding the future direction of the queryable state 
> feature.
> 
> 
> We are current using this feature in production implemented in a RichFlatMap. 
> It is doing what we need it to do at the scale we need it done in, with the 
> resources we have assigned to it. Win.
> 
> 
> However, we'd also like to use this feature in conjunction with Flink's 
> windowing. The "Rich" execution environment is not exposed in any of the 
> windows / triggers / apply hierarchy, so we cannot expose any of the state 
> managed within the windows outside of Flink. Many of our use cases require us 
> to have access to values as they are being accumulated, as well as the 
> aggregated result.
> 
> 
> We can get by with the RichFlatMap for now. I'd like some clarification as to 
> whether or not the queryable state feature is going to be extended to the 
> windowing components for the next milestone release. This will determine our 
> Flink development milestones for the next few months. From consulting the 
> open items in JIRA, it does not look like it is on the docket.
> 
> 
> I'd be more than willing to help out implementing this feature, but I don't 
> think I have the experience to submit this change on my own.



Re: [DISCUSS] Project build time and possible restructuring

2017-03-20 Thread Aljoscha Krettek
The Beam Jenkins jobs are configured inside the Beam src repo itself. For 
example: 
https://github.com/apache/beam/blob/master/.jenkins/job_beam_PostCommit_Java_RunnableOnService_Flink.groovy

For initial setup of the seed job you need admin rights on Jenkins, as 
described here: https://cwiki.apache.org/confluence/display/INFRA/Jenkins.

The somewhat annoying thing is setting up our own “flink” build slaves and 
maintaining them. There are some general purpose build slaves but 
high-throughput projects usually have their own build slaves to ensure speedy 
processing of Jenkins jobs: 
https://cwiki.apache.org/confluence/display/INFRA/Jenkins+node+labels

> On 20 Mar 2017, at 14:40, Timo Walther <twal...@apache.org> wrote:
> 
> Another solution would be to make the Travis builds more efficient. For 
> example, we could write a script that determines the modified Maven module 
> and only run the test for this module (and maybe transitive dependencies). 
> PRs for libraries such as Gelly, Table, CEP or connectors would not trigger a 
> compilation of the entire stack anymore. Of course this would not solve all 
> problems but many of it.
> 
> What do you think about this?
> 
> 
> 
> Am 20/03/17 um 14:02 schrieb Robert Metzger:
>> Aljoscha, do you know how to configure jenkins?
>> Is Apache INFRA doing that, or are the beam people doing that themselves?
>> 
>> One downside of Jenkins is that we probably need some machines that execute
>> the tests. A Travis container has 2 CPU cores and 4 GB main memory. We
>> currently have 10 such containers available on travis concurrently. I think
>> we would need at least the same amount on Jenkins.
>> 
>> 
>> On Mon, Mar 20, 2017 at 1:48 PM, Timo Walther <twal...@apache.org> wrote:
>> 
>>> I agress with Aljoscha that we might consider moving from Jenkins to
>>> Travis. Is there any disadvantage in using Jenkins?
>>> 
>>> I think we should structure the project according to release management
>>> (e.g. more frequent releases of libraries) or other criteria (e.g. core and
>>> non-core) instead of build time. What would happen if the built of another
>>> submodule would become too long, would we split/restructure again and
>>> again? If Jenkins solves all our problems we should use it.
>>> 
>>> Regards,
>>> Timo
>>> 
>>> 
>>> 
>>> Am 20/03/17 um 12:21 schrieb Aljoscha Krettek:
>>> 
>>>> I prefer Jenkins to Travis by far. Working on Beam, where we have good
>>>> Jenkins integration, has opened my eyes to what is possible with good CI
>>>> integration.
>>>> 
>>>> For example, look at this recent Beam PR: https://github.com/apache/beam
>>>> /pull/2263 <https://github.com/apache/beam/pull/2263>. The
>>>> Jenkins-Github integration will tell you exactly which tests failed and if
>>>> you click on the links you can look at the log output/std out of the tests
>>>> in question.
>>>> 
>>>> This is the overview page of one of the Jenkins Jobs that we have in
>>>> Beam: https://builds.apache.org/job/beam_PostCommit_Java_RunnableO
>>>> nService_Flink/ <https://builds.apache.org/job
>>>> /beam_PostCommit_Java_RunnableOnService_Flink/>. This is an example of a
>>>> stable build: https://builds.apache.org/job/
>>>> beam_PostCommit_Java_RunnableOnService_Flink/lastStableBuild/ <
>>>> https://builds.apache.org/job/beam_PostCommit_Java_Runnable
>>>> OnService_Flink/lastStableBuild/>. Notice how it gives you fine grained
>>>> information about the Maven run. This is an unstable run:
>>>> https://builds.apache.org/job/beam_PostCommit_Java_RunnableO
>>>> nService_Flink/lastUnstableBuild/ <https://builds.apache.org/job
>>>> /beam_PostCommit_Java_RunnableOnService_Flink/lastUnstableBuild/>. There
>>>> you can see which tests failed and you can easily drill down.
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>> On 20 Mar 2017, at 11:46, Robert Metzger <rmetz...@apache.org> wrote:
>>>>> Thank you for looking into the build times.
>>>>> 
>>>>> I didn't know that the build time situation is so bad. Even with yarn,
>>>>> mesos, connectors and libraries removed, we are still running into the
>>>>> build timeout :(
>>>>> 
>>>>> Aljoscha told me that the Beam community is using Jenkins for running
>>>>> the tests, and they are planning to completely move away from Travis.

Re: [jira] [Updated] (FLINK-6126) Yet another conflict : guava

2017-03-20 Thread Aljoscha Krettek
So it is resolved now? Thanks for letting us know!
> On 20 Mar 2017, at 12:55, Liangfei Su  wrote:
> 
> Well...Comments added.
> 
> This looks due to incorrect local build of flink.
> 
> Documentation at
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html
> shows
> clearly that
> 
> """
> NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain
> dependencies. Maven 3.0.3 creates the libraries properly. To build unit
> tests with Java 8, use Java 8u51 or above to prevent failures in unit tests
> that use the PowerMock runner.
> 
> """
> My local flink jar was build by maven3.3.3, which the shade of guava is not
> well set. Cause the conflict of elastic search(guava 18) and the packaged
> ones(should be the ones from hadoop related)
> 
> On Mon, Mar 20, 2017 at 7:04 PM, Liangfei Su  wrote:
> 
>> Another finding that it looks most of these kinds of conflict comes from
>> the dependency to hadoop/hbase (by using mvn dependency to check).
>> 
>> It looks to me the correct way is to shade more packages from the hadoop
>> dependencies...  Thoughts here?
>> 
>> Thanks
>> Ralph
>> 
>> 
>> 
>> On Mon, Mar 20, 2017 at 6:59 PM, Liangfei Su  wrote:
>> 
>>> Hi,
>>> 
>>> I try to write some user function to write elastic search, my project
>>> comes with elastic search 2.3.5 (which is the same to flink connectors
>>> elastic search). But i'm seeing be exception that recorded in the below
>>> JIRA, by enabling the java -version:class, it show it's the guava
>>> dependency issue:
>>> Elastic search 2.3.5 transport client is using guava 18.0, and called one
>>> method starting from guava 18.0, MoreExecutors.directExecutor
>>> 
>>> While the class loading log show the active runtime MoreExecutors class
>>> loaded from flink distribution, which cause the java.lang.NoSuchMethodErro
>>> r.
>>> 
>>> Based on above finding, it looks es 2.3.5 is not able to be used with
>>> flink1.2.0 (and then the connectors-elasticsearch is broken)? Can someone
>>> help clarify?
>>> 
>>> Also, it looks some of the fink-core actually use the shade way to rename
>>> the class like from com.google.guava to 
>>> org.apache.flink.***.com.google.guava
>>> which is actually a fix of this kind of issue. Etc.
>>> https://issues.apache.org/jira/browse/FLINK-4587/https:
>>> //issues.apache.org/jira/browse/FLINK-3373.
>>> 
>>> 
>>> My flink cluster is v1.2.0, running in docker.
>>> 
>>> 
>>> Thanks,
>>> Ralph
>>> 
>>> -- Forwarded message --
>>> From: Su Ralph (JIRA) 
>>> Date: Mon, Mar 20, 2017 at 6:41 PM
>>> Subject: [jira] [Updated] (FLINK-6126) Yet another conflict : guava
>>> To: suliang...@gmail.com
>>> 
>>> 
>>> 
>>> [ https://issues.apache.org/jira/browse/FLINK-6126?page=com.at
>>> lassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
>>> 
>>> Su Ralph updated FLINK-6126:
>>> 
>>>Description:
>>> When write a user function try to write to elastic search (depend on
>>> elasticsearch 2.3.5)
>>> 
>>> Stack like:
>>> java.lang.NoSuchMethodError: com.google.common.util.concurr
>>> ent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
>>>at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.
>>> java:190)
>>>at org.elasticsearch.client.transport.TransportClient$Builder.b
>>> uild(TransportClient.java:131)
>>>at io.sherlock.capabilities.es.AbstractEsSink.open(AbstractEsSi
>>> nk.java:98)
>>> 
>>> When enable env.java.opts.taskmanager to -version:class, we can see the
>>> class load log like:
>>> [Loaded com.google.common.util.concurrent.MoreExecutors from
>>> file:/opt/flink/lib/flink-dist_2.11-1.2.0.jar]
>>> 
>>> The user code is using guva of 18.0.
>>> 
>>>  was:
>>> For some reason I need to use 
>>> org.apache.httpcomponents:httpasyncclient:4.1.2
>>> in flink.
>>> The source file is:
>>> {code}
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory
>>> 
>>> /**
>>>  * Created by renkai on 16/9/7.
>>>  */
>>> object Main {
>>>  def main(args: Array[String]): Unit = {
>>>val instance = ManagedNHttpClientConnectionFactory.INSTANCE
>>>println("instance = " + instance)
>>> 
>>>val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>val stream = env.fromCollection(1 to 100)
>>>val result = stream.map { x =>
>>>  x * 2
>>>}
>>>result.print()
>>>env.execute("xixi")
>>>  }
>>> }
>>> 
>>> {code}
>>> 
>>> and
>>> {code}
>>> name := "flink-explore"
>>> 
>>> version := "1.0"
>>> 
>>> scalaVersion := "2.11.8"
>>> 
>>> crossPaths := false
>>> 
>>> libraryDependencies ++= Seq(
>>>  "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
>>>exclude("com.google.code.findbugs", "jsr305"),
>>>  "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
>>>exclude("com.google.code.findbugs", "jsr305"),
>>>  

Re: [DISCUSS] Project build time and possible restructuring

2017-03-20 Thread Aljoscha Krettek
I prefer Jenkins to Travis by far. Working on Beam, where we have good Jenkins 
integration, has opened my eyes to what is possible with good CI integration.

For example, look at this recent Beam PR: 
https://github.com/apache/beam/pull/2263 
. The Jenkins-Github integration will 
tell you exactly which tests failed and if you click on the links you can look 
at the log output/std out of the tests in question.

This is the overview page of one of the Jenkins Jobs that we have in Beam: 
https://builds.apache.org/job/beam_PostCommit_Java_RunnableOnService_Flink/ 
. 
This is an example of a stable build: 
https://builds.apache.org/job/beam_PostCommit_Java_RunnableOnService_Flink/lastStableBuild/
 
.
 Notice how it gives you fine grained information about the Maven run. This is 
an unstable run: 
https://builds.apache.org/job/beam_PostCommit_Java_RunnableOnService_Flink/lastUnstableBuild/
 
.
 There you can see which tests failed and you can easily drill down.

Best,
Aljoscha

> On 20 Mar 2017, at 11:46, Robert Metzger  wrote:
> 
> Thank you for looking into the build times.
> 
> I didn't know that the build time situation is so bad. Even with yarn, mesos, 
> connectors and libraries removed, we are still running into the build timeout 
> :(
> 
> Aljoscha told me that the Beam community is using Jenkins for running the 
> tests, and they are planning to completely move away from Travis. I wonder 
> whether we should do the same, as having our own Jenkins servers would allow 
> us to run tests for more than 50 minutes.
> 
> I agree with Stephan that we should keep the yarn and mesos tests in the core 
> for stability / testing quality purposes.
> 
> 
> On Mon, Mar 20, 2017 at 11:27 AM, Stephan Ewen  > wrote:
> @Greg
> 
> I am personally in favor of splitting "connectors" and "contrib" out as
> well. I know that @rmetzger has some reservations about the connectors, but
> we may be able to convince him.
> 
> For the cluster tests (yarn / mesos) - in the past there were many cases
> where these tests caught cases that other tests did not, because they are
> the only tests that actually use the "flink-dist.jar" and thus discover
> many dependency and configuration issues. For that reason, my feeling would
> be that they are valuable in the core repository.
> 
> I would actually suggest to do only the library split initially, to see
> what the challenges are in setting up the multi-repo build and release
> tooling. Once we gathered experience there, we can probably easily see what
> else we can split out.
> 
> Stephan
> 
> 
> On Fri, Mar 17, 2017 at 8:37 PM, Greg Hogan  > wrote:
> 
> > I’d like to use this refactoring opportunity to unspilt the Travis tests.
> > With 51 builds queued up for the weekend (some of which may fail or have
> > been force pushed) we are at the limit of the number of contributions we
> > can process. Fixing this requires 1) splitting the project, 2)
> > investigating speedups for long-running tests, and 3) staying cognizant of
> > test performance when accepting new code.
> >
> > I’d like to add one to Stephan’s list of module group. I like that the
> > modules are generic (“libraries”) so that no one module is alone and
> > independent.
> >
> > Flink has three “libraries”: cep, ml, and gelly.
> >
> > “connectors” is a hotspot due to the long-running Kafka tests (and
> > connectors for three Kafka versions).
> >
> > Both flink-storm and flink-python have a modest number of number of tests
> > and could live with the miscellaneous modules in “contrib”.
> >
> > The YARN tests are long-running and problematic (I am unable to
> > successfully run these locally). A “cluster” module could host flink-mesos,
> > flink-yarn, and flink-yarn-tests.
> >
> > That gets us close to running all tests in a single Travis build.
> >   https://travis-ci.org/greghogan/flink/builds/212122590 
> >  <
> > https://travis-ci.org/greghogan/flink/builds/212122590 
> > >
> >
> > I also tested (https://github.com/greghogan/flink/commits/core_build 
> >  <
> > https://github.com/greghogan/flink/commits/core_build 
> > >) with a maven
> > parallelism of 2 and 4, with the latter a 6.4% drop in build time.
> >   https://travis-ci.org/greghogan/flink/builds/212137659 
> >  <
> > https://travis-ci.org/greghogan/flink/builds/212137659 
> 

[jira] [Created] (FLINK-6118) Chained operators forward watermark without checking

2017-03-19 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-6118:
---

 Summary: Chained operators forward watermark without checking
 Key: FLINK-6118
 URL: https://issues.apache.org/jira/browse/FLINK-6118
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.1.4, 1.2.0, 1.3.0
Reporter: Aljoscha Krettek


For operators that read from the network we have checks in place that verify 
that the input watermark only moves forwards. If an operator is directly 
chained to an operator then any {{Output.emitWatermark()}} of the first 
operator directly invoke {{processWatermark()}} on the chained operator, 
meaning that there are no verification steps in-between.

This only becomes visible when a non-keyed, chained operator is checking the  
current operator. Only keyed operators can have timers and for those the 
watermark always comes form the network, i.e. it behaves correctly.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Code style / checkstyle

2017-03-19 Thread Aljoscha Krettek
I played around with this over the week end and it turns out that the required 
changes in flink-streaming-java are not that big. I created a PR with a 
proposed checkstyle.xml and the required code changes: 
https://github.com/apache/flink/pull/3567 
<https://github.com/apache/flink/pull/3567>. There’s a longer description of 
the style in the PR. The commits add continuously more invasive changes so we 
can start with the more lightweight changes if we want to.

If we want to go forward with this I would also encourage other people to use 
this for different modules and see how it turns out.

Best,
Aljoscha

> On 18 Mar 2017, at 08:00, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> I added an issue for adding a custom checkstyle.xml for flink-streaming-java 
> so that we can gradually add checks: 
> https://issues.apache.org/jira/browse/FLINK-6107. I outlined the procedure in 
> the Jira. We can use this as a pilot project and see how it goes and then 
> gradually also apply to other modules.
> 
> What do you think?
> 
>> On 6 Mar 2017, at 12:42, Stephan Ewen <se...@apache.org> wrote:
>> 
>> A singular "all reformat in one instant" will cause immense damage to the
>> project, in my opinion.
>> 
>> - There are so many pull requests that we are having a hard time keeping
>> up, and merging will a lot more time intensive.
>> - I personally have many forked branches with WIP features that will
>> probably never go in if the branches become unmergeable. I expect that to
>> be true for many other committers and contributors.
>> - Some companies have Flink forks and are rebasing patches onto master
>> regularly. They will be completely screwed by a full reformat.
>> 
>> If we do something, the only thing that really is possible is:
>> 
>> (1) Define a style. Ideally not too far away from Flink's style.
>> (2) Apply it to new projects/modules
>> (3) Coordinate carefully to pull it into existing modules, module by
>> module. Leaving time to adopt pull requests bit by bit, and allowing forks
>> to go through minor merges, rather than total conflict.
>> 
>> 
>> 
>> On Wed, Mar 1, 2017 at 5:57 PM, Henry Saputra <henry.sapu...@gmail.com>
>> wrote:
>> 
>>> It is actually Databricks Scala guide to help contributions to Apache Spark
>>> so not really official Spark Scala guide.
>>> The style guide feels bit more like asking people to write Scala in Java
>>> mode so I am -1 to follow the style for Apache Flink Scala if that what you
>>> are recommending.
>>> 
>>> If the "unification" means ONE style guide for both Java and Scala I would
>>> vote -1 to it because both languages have different semantics and styles to
>>> make them readable and understandable.
>>> 
>>> We could start with improving the Scala maven style guide to follow more
>>> Scala official style guide [1] and add IntelliJ Idea or Eclipse plugin
>>> style to follow suit.
>>> 
>>> Java side has bit more strict style check but we could make it tighter but
>>> embracing more Google Java guide closely with minor exceptions
>>> 
>>> - Henry
>>> 
>>> [1] http://docs.scala-lang.org/style/
>>> 
>>> On Mon, Feb 27, 2017 at 11:54 AM, Stavros Kontopoulos <
>>> st.kontopou...@gmail.com> wrote:
>>> 
>>>> +1 to provide and enforcing a unified code style for both java and scala.
>>>> Unification should apply when it makes sense like comments though.
>>>> 
>>>> Eventually code base should be re-factored. I would vote for the one at a
>>>> time module fix apporoach.
>>>> Style guide should be part of any PR review.
>>>> 
>>>> We could also have a look at the spark style guide:
>>>> https://github.com/databricks/scala-style-guide
>>>> 
>>>> The style code and general guidelines help keep code more readable and
>>> keep
>>>> things simple
>>>> with many contributors and different styles of code writing + language
>>>> features.
>>>> 
>>>> 
>>>> On Mon, Feb 27, 2017 at 8:01 PM, Stephan Ewen <se...@apache.org> wrote:
>>>> 
>>>>> I agree, reformatting 90% of the code base is tough.
>>>>> 
>>>>> There are two main issues:
>>>>> (1) Incompatible merges. This is hard, especially for the folks that
>>>> have
>>>>> to merge the pull requests ;-)
>>>>> 
>>>>> (2) Author history: This is le

[jira] [Created] (FLINK-6116) Watermarks don't work when unioning with same DataStream

2017-03-19 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-6116:
---

 Summary: Watermarks don't work when unioning with same DataStream
 Key: FLINK-6116
 URL: https://issues.apache.org/jira/browse/FLINK-6116
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.2.0, 1.3.0
Reporter: Aljoscha Krettek


In this example job we don't get any watermarks in the {{WatermarkObserver}}:
{code}
public class WatermarkTest {
public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.setParallelism(1);

DataStreamSource input = env.addSource(new 
SourceFunction() {
@Override
public void run(SourceContext ctx) throws 
Exception {
while (true) {
ctx.collect("hello!");
Thread.sleep(800);
}
}

@Override
public void cancel() {

}
});

input.union(input)
.flatMap(new IdentityFlatMap())
.transform("WatermarkOp", 
BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());

env.execute();
}

public static class WatermarkObserver
extends AbstractStreamOperator
implements OneInputStreamOperator<String, String> {
@Override
public void processElement(StreamRecord element) throws 
Exception {
System.out.println("GOT ELEMENT: " + element);
}

@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);

System.out.println("GOT WATERMARK: " + mark);
}
}

private static class IdentityFlatMap
extends RichFlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector out) throws 
Exception {
out.collect(value);
}
}
}
{code}

When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Code style / checkstyle

2017-03-18 Thread Aljoscha Krettek
I added an issue for adding a custom checkstyle.xml for flink-streaming-java so 
that we can gradually add checks: 
https://issues.apache.org/jira/browse/FLINK-6107. I outlined the procedure in 
the Jira. We can use this as a pilot project and see how it goes and then 
gradually also apply to other modules.

What do you think?

> On 6 Mar 2017, at 12:42, Stephan Ewen <se...@apache.org> wrote:
> 
> A singular "all reformat in one instant" will cause immense damage to the
> project, in my opinion.
> 
>  - There are so many pull requests that we are having a hard time keeping
> up, and merging will a lot more time intensive.
>  - I personally have many forked branches with WIP features that will
> probably never go in if the branches become unmergeable. I expect that to
> be true for many other committers and contributors.
>  - Some companies have Flink forks and are rebasing patches onto master
> regularly. They will be completely screwed by a full reformat.
> 
> If we do something, the only thing that really is possible is:
> 
>  (1) Define a style. Ideally not too far away from Flink's style.
>  (2) Apply it to new projects/modules
>  (3) Coordinate carefully to pull it into existing modules, module by
> module. Leaving time to adopt pull requests bit by bit, and allowing forks
> to go through minor merges, rather than total conflict.
> 
> 
> 
> On Wed, Mar 1, 2017 at 5:57 PM, Henry Saputra <henry.sapu...@gmail.com>
> wrote:
> 
>> It is actually Databricks Scala guide to help contributions to Apache Spark
>> so not really official Spark Scala guide.
>> The style guide feels bit more like asking people to write Scala in Java
>> mode so I am -1 to follow the style for Apache Flink Scala if that what you
>> are recommending.
>> 
>> If the "unification" means ONE style guide for both Java and Scala I would
>> vote -1 to it because both languages have different semantics and styles to
>> make them readable and understandable.
>> 
>> We could start with improving the Scala maven style guide to follow more
>> Scala official style guide [1] and add IntelliJ Idea or Eclipse plugin
>> style to follow suit.
>> 
>> Java side has bit more strict style check but we could make it tighter but
>> embracing more Google Java guide closely with minor exceptions
>> 
>> - Henry
>> 
>> [1] http://docs.scala-lang.org/style/
>> 
>> On Mon, Feb 27, 2017 at 11:54 AM, Stavros Kontopoulos <
>> st.kontopou...@gmail.com> wrote:
>> 
>>> +1 to provide and enforcing a unified code style for both java and scala.
>>> Unification should apply when it makes sense like comments though.
>>> 
>>> Eventually code base should be re-factored. I would vote for the one at a
>>> time module fix apporoach.
>>> Style guide should be part of any PR review.
>>> 
>>> We could also have a look at the spark style guide:
>>> https://github.com/databricks/scala-style-guide
>>> 
>>> The style code and general guidelines help keep code more readable and
>> keep
>>> things simple
>>> with many contributors and different styles of code writing + language
>>> features.
>>> 
>>> 
>>> On Mon, Feb 27, 2017 at 8:01 PM, Stephan Ewen <se...@apache.org> wrote:
>>> 
>>>> I agree, reformatting 90% of the code base is tough.
>>>> 
>>>> There are two main issues:
>>>>  (1) Incompatible merges. This is hard, especially for the folks that
>>> have
>>>> to merge the pull requests ;-)
>>>> 
>>>>  (2) Author history: This is less of an issue, I think. "git log
>>>> " and "git show  -- " will still work and
>>> one
>>>> may have to go one commit back to find out why something was changed
>>>> 
>>>> 
>>>> What I could image is to do this incrementally. Define the code style
>> in
>>>> "flink-parent" but do not activate it.
>>>> Then start with some projects (new projects, plus some others):
>>>> merge/reject PRs, reformat, activate code style.
>>>> 
>>>> Piece by piece. This is realistically going to take a long time until
>> it
>>> is
>>>> pulled through all components, but that's okay, I guess.
>>>> 
>>>> Stephan
>>>> 
>>>> 
>>>> On Mon, Feb 27, 2017 at 1:53 PM, Aljoscha Krettek <aljos...@apache.org
>>> 
>>>> wrote:
>>>> 
>>>>> Just for a bit of context, this is 

[jira] [Created] (FLINK-6107) Add custom checkstyle for flink-streaming-java

2017-03-18 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-6107:
---

 Summary: Add custom checkstyle for flink-streaming-java
 Key: FLINK-6107
 URL: https://issues.apache.org/jira/browse/FLINK-6107
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


There was some consensus on the ML 
(https://lists.apache.org/thread.html/94c8c5186b315c58c3f8aaf536501b99e8b92ee97b0034dee295ff6a@%3Cdev.flink.apache.org%3E)
 that we want to have a more uniform code style. We should start 
module-by-module and by introducing increasingly stricter rules. We have to be 
aware of the PR situation and ensure that we have minimal breakage for 
contributors.

This issue aims at adding a custom checkstyle.xml for {{flink-streaming-java}} 
that is based on our current checkstyle.xml but adds these checks for Javadocs:
{code}





  
  
  
  
  
  
  
  






  
  
  



  
  

{code}

This checks:
 - Every type has a type-level Javadoc
 - Proper use of {{}} in Javadocs
 - First sentence must end with a proper punctuation mark
 - Proper use (including closing) of HTML tags



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-17 Thread Aljoscha Krettek
Yes, I agree! The implementation stuff we talked about so far is only
visible at the operator level. A user function that uses the (future)
side API would not be aware of whether buffering or blocking is used. It
would simply know that it is invoked and that side input is ready.

I'll also quickly try to elaborate on my comment about why I think
windowing/triggering in the side-input operator itself is not necessary.
I created a figure: http://imgur.com/a/aAlw7 It is enough for the
side-input operator simply to consider side input for a given window as
ready when we have seen some data for that window. The WindowOperator
that is upstream of the side input will take care of
windowing/triggering.

I'll create Jira issues for implementing the low-level requirements for
side inputs (n-ary operator, broadcast state and buffering) and update
this thread. If anyone is interested on working on one of those we might
have a chance of getting this ready for Flink 1.3. Time is a bit tight
for me because I'm going to be on vacation for 1.5 weeks starting next
week Wednesday and after that we have Flink Forward.

Best,
Aljoscha

On Thu, Mar 16, 2017, at 23:52, Gábor Hermann wrote:
> Regarding the CoFlatMap workaround,
> - For keyed streams, do you suggest that having a per-key buffer stored 
> as keyed state would have a large memory overhead? That must be true, 
> although a workaround could be partitioning the data and using a 
> non-keyed stream. Of course that seems hacky, as we have a keyed stream 
> abstraction, so I agree with you.
> - I agree that keeping a broadcast side-input in the operator state is 
> not optimal. That's a good point I have not thought about. First we have 
> a separate abstraction for broadcast state, then we can optimize e.g. 
> checkpointing it (avoiding checkpointing it at every operator).
> 
> 
> Regarding blocking/backpressuring inputs, it should not only be useful 
> for static side-input, but also for periodically updated (i.e. slowly 
> changing). E.g. when a machine learning model is updated and loaded 
> every hour, it make sense to prioritize loading the model on the side 
> input. But I see the limitations of the underlying runtime.
> 
> Exposing a buffer could be useful for now. Although, the *API* for 
> blocking could even be implemented by simply buffering. So the buffering 
> could be hidden from the user, and later maybe optimized to not only 
> buffer, but also apply backpressure. What do you think? Again, for the 
> prototype, exposing the buffer should be fine IMHO. API and 
> implementation for blocking inputs could be a separate issue, but let's 
> not forget about it.
> 
> Cheers,
> Gabor
> 
> 
> On 2017-03-15 16:14, Aljoscha Krettek wrote:
> > Hi,
> > thanks for you input! :-)
> >
> > Regarding 1)
> > I don't see the benefit of integrating windowing into the side-input
> > logic. Windowing can happen upstream and whenever that emits new data
> > then operator will notice because there is new input. Having windowing
> > inside the side-input of an operator as well would just make the
> > implementation more complex without adding benefit, IMHO.
> >
> > Regarding 2)
> > That's a very good observation! I think we are fine, though, because
> > checkpoint barriers never "overtake" elements. It's only elements that
> > can overtake checkpoint barriers. If the broadcast state on different
> > parallel instances differs in a checkpoint then it only differs because
> > some parallel instances have reflected changes in their state from
> > elements that they shouldn't have "seen" yet in the exactly-once mode.
> > If we pick the state of an arbitrary instance as the de-facto state we
> > don't break guarantees any more than turning on at-least-once mode does.
> >
> > Regarding 3)
> > We need the special buffer support for keyed operations because there we
> > need to make sure that data is restored on the correct operator that is
> > responsible for the key of the data while also allowing us to iterate
> > over all the buffered data (for when we are ready to process the data).
> > This iteration over elements is not possible when simply storing data in
> > keyed state.
> >
> > What do you think?
> >
> > On Wed, Mar 15, 2017, at 09:07, wenlong.lwl wrote:
> >> Hi, Aljoscha, I just go through your prototype. I like the design of the
> >> SideInputReader which can make it flexible to determine when we can get
> >> the
> >> side input.
> >>
> >> I agree that side inputs are API sugar on the top of the three
> >> components(n-ary
> >> inputs, broadcast state and input buffering), following is some more
> >>

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-15 Thread Aljoscha Krettek
Hi,
thanks for you input! :-)

Regarding 1)
I don't see the benefit of integrating windowing into the side-input
logic. Windowing can happen upstream and whenever that emits new data
then operator will notice because there is new input. Having windowing
inside the side-input of an operator as well would just make the
implementation more complex without adding benefit, IMHO.

Regarding 2)
That's a very good observation! I think we are fine, though, because
checkpoint barriers never "overtake" elements. It's only elements that
can overtake checkpoint barriers. If the broadcast state on different
parallel instances differs in a checkpoint then it only differs because
some parallel instances have reflected changes in their state from
elements that they shouldn't have "seen" yet in the exactly-once mode.
If we pick the state of an arbitrary instance as the de-facto state we
don't break guarantees any more than turning on at-least-once mode does.

Regarding 3)
We need the special buffer support for keyed operations because there we
need to make sure that data is restored on the correct operator that is
responsible for the key of the data while also allowing us to iterate
over all the buffered data (for when we are ready to process the data).
This iteration over elements is not possible when simply storing data in
keyed state.

What do you think?

On Wed, Mar 15, 2017, at 09:07, wenlong.lwl wrote:
> Hi, Aljoscha, I just go through your prototype. I like the design of the
> SideInputReader which can make it flexible to determine when we can get
> the
> side input.
> 
> I agree that side inputs are API sugar on the top of the three
> components(n-ary
> inputs, broadcast state and input buffering), following is some more
> thought about the three component:
> 
> 1. Take both N-ary input operator and windowing/triggers mechanism into
> consideration, I think we may need the N-ary input operator supports some
> inputs(side inputs) are windowed while the others(main input) are normal
> stream. for static/slow-evolving data, we need to use global windows and
> for windowed-base join data , we need to use time window or custom
> windows.
> The window function on the side input can be used to collect or merge the
> data to generate the value of the side input(a single value or 
> list/map).
> Once a side input reader window is triggered, the SideInputReader will
> return value available, and if a Window is triggered more than once, the
> value of side input will be updated and maybe the SideInputReader need a
> interface to notice the user that something changed. Besides, I prefer
> the
> option to make every input of N-ary input operator equal, because user
> may
> need one side input depends on another side input.
> 
> 2. Regarding broadcast state, my concern is that how can we merge the
> value
> of the state from different subtasks. If the job running in at least once
> mode, the returned value of broadcast state from different subtasks will
> be
> different. Is there already any design on broadcast state?
> 
> 3. Regarding input buffering, I think if we use window/trigger mechanism,
> state can be store in the state of window, which may be mostly like what
> we
> need to do currently in KeyedWindow and AllWindow. We may need to allow
> custom merge strategy on all window state data since in side inputs we
> may
> need to choose data according to broadcast state strategy  while in
> normal
> windows we can just redistribute the window state data.
> 
> What do you think?
> 
> Best Regards!
> 
> Wenlong
> 
> On 14 March 2017 at 01:41, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> > Ha! this is turning out to be quite the discussion. :-) Also, thanks
> > Kenn, for chiming in with the Beam perspective!
> >
> > I'll try and address some stuff.
> >
> > It seems we have some consensus on using N-ary operator to implement
> > side inputs. I see two ways forward there:
> >  - Have a "pure" N-ary operator that has zero inputs by default and all
> >  N inputs are equal: this exists side-by-side with the current one-input
> >  operator and two-input operator.
> >  - Extends the existing operators with more inputs: the main input(s)
> >  would be considered different from the N other inputs, internally. With
> >  this, we would not have to rewrite existing operators and could simply
> >  have side inputs as an add-on.
> >
> > There weren't any (many?) comments on using broadcast state for side
> > inputs. I think there is not much to agree on there because it seems
> > pretty straightforward to me that we need this.
> >
> > About buffering: I think we need this as a Flink service because it is
> > right no

Re: Question: RemoteStreamEnvironment submit in detach mode.

2017-03-15 Thread Aljoscha Krettek
I would also really like to have this feature in. I'll try and see if I
can find some time to get the PR into mergeable shape.

On Wed, Mar 15, 2017, at 09:11, Liangfei Su wrote:
> Thanks for the information, Evgeny.
> 
> It looks no fixed version for these feature which i think it quite common
> requirement. Do we have any planned release to include this fix?
> 
> Thanks
> Ralph
> 
> 
> 
> On Wed, Mar 15, 2017 at 3:49 PM, Evgeny Kincharov
>  > wrote:
> 
> > Hi, Ralph.
> >
> > You are right, there is only one way to run streaming job at the current
> > version 1.2 and it is blocking the main thread.
> > We have some open issues connected with this [1], [2], and the open PR
> > related this topic [3]. You may look for details there.
> >
> > Thanks for your interest in this topic.
> >
> > BR, Evgeny
> >
> > [1] - https://issues.apache.org/jira/browse/FLINK-2313
> > [2] - https://issues.apache.org/jira/browse/FLINK-4272
> > [3] - https://github.com/apache/flink/pull/2732
> >
> > -Original Message-
> > From: Liangfei Su [mailto:suliang...@gmail.com]
> > Sent: Wednesday, March 15, 2017 11:19 AM
> > To: dev@flink.apache.org
> > Subject: Question: RemoteStreamEnvironment submit in detach mode.
> >
> > Hi,
> >
> > I get a question when try to submit a graph to a remote stream
> > RemoteStreamEnvironment in detach mode.
> >
> > From the client API, it looks when use call execute().  User doesn't have a
> > way to control whether run in detach mode or not. Since it's
> > the RemoteStreamEnvironment create a StandaloneClusterClient and flag
> > detachedJobSubmission is never get chance to be configured?
> >
> > Is there any doc on this? I did try but didn't find in doc, or wiki yet...
> >
> > Thanks
> > Ralph
> >


Re: [DISCUSS] Release Flink 1.1.5 / Flink 1.2.1

2017-03-15 Thread Aljoscha Krettek
I did in fact just open a PR for 
> https://issues.apache.org/jira/browse/FLINK-6001
> NPE on TumblingEventTimeWindows with ContinuousEventTimeTrigger and
> allowedLateness


On Tue, Mar 14, 2017, at 18:20, Vladislav Pernin wrote:
> Hi,
> 
> I would also include the following (not yet resolved) issue in the 1.2.1
> scope :
> 
> https://issues.apache.org/jira/browse/FLINK-6001
> NPE on TumblingEventTimeWindows with ContinuousEventTimeTrigger and
> allowedLateness
> 
> 2017-03-14 17:34 GMT+01:00 Ufuk Celebi :
> 
> > Big +1 Gordon!
> >
> > I think (10) is very critical to have in 1.2.1.
> >
> > – Ufuk
> >
> >
> > On Tue, Mar 14, 2017 at 3:37 PM, Stefan Richter
> >  wrote:
> > > Hi,
> > >
> > > I would suggest to also include in 1.2.1:
> > >
> > > (9) https://issues.apache.org/jira/browse/FLINK-6044 <
> > https://issues.apache.org/jira/browse/FLINK-6044>
> > > Replaces unintentional calls to InputStream#read(…) with the intended
> > > and correct InputStream#readFully(…)
> > > Status: PR
> > >
> > > (10) https://issues.apache.org/jira/browse/FLINK-5985 <
> > https://issues.apache.org/jira/browse/FLINK-5985>
> > > Flink 1.2 was creating state handles for stateless tasks which caused
> > trouble
> > > at restore time for users that wanted to do some changes that only
> > include
> > > stateless operators to their topology.
> > > Status: PR
> > >
> > >
> > >> Am 14.03.2017 um 15:15 schrieb Till Rohrmann :
> > >>
> > >> Thanks for kicking off the discussion Tzu-Li. I'd like to add the
> > following
> > >> issues which have already been merged into the 1.2-release and
> > 1.1-release
> > >> branch:
> > >>
> > >> 1.2.1:
> > >>
> > >> (7) https://issues.apache.org/jira/browse/FLINK-5942
> > >> Hardens the checkpoint recovery in case of corrupted ZooKeeper data.
> > >> Corrupted checkpoints will now be skipped.
> > >> Status: Merged
> > >>
> > >> (8) https://issues.apache.org/jira/browse/FLINK-5940
> > >> Hardens the checkpoint recovery in case that we cannot retrieve the
> > >> completed checkpoint from the meta data state handle retrieved from
> > >> ZooKeeper. This can, for example, happen if the meta data is deleted.
> > >> Checkpoints with unretrievable state handles are skipped.
> > >> Status: Merged
> > >>
> > >> 1.1.5:
> > >>
> > >>
> > >> (7) https://issues.apache.org/jira/browse/FLINK-5942
> > >> Hardens the checkpoint recovery in case of corrupted ZooKeeper data.
> > >> Corrupted checkpoints will now be skipped.
> > >> Status: Merged
> > >>
> > >> (8) https://issues.apache.org/jira/browse/FLINK-5940
> > >> Hardens the checkpoint recovery in case that we cannot retrieve the
> > >> completed checkpoint from the meta data state handle retrieved from
> > >> ZooKeeper. This can, for example, happen if the meta data is deleted.
> > >> Checkpoints with unretrievable state handles are skipped.
> > >> Status: Merged
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Tue, Mar 14, 2017 at 12:02 PM, Tzu-Li (Gordon) Tai <
> > tzuli...@apache.org>
> > >> wrote:
> > >>
> > >>> Hi all!
> > >>>
> > >>> I would like to start a discussion for the next bugfix release for
> > 1.1.x
> > >>> and 1.2.x.
> > >>> There’s been quite a few critical fixes for bugs in both the releases
> > >>> recently, and I think they deserve a bugfix release soon.
> > >>> Most of the bugs were reported by users.
> > >>>
> > >>> I’m starting the discussion for both bugfix releases because most fixes
> > >>> span both releases (almost identical).
> > >>> Of course, the actual RC votes and RC creation process doesn’t have to
> > be
> > >>> started together.
> > >>>
> > >>> Here’s an overview of what’s been collected so far, for both bugfix
> > >>> releases -
> > >>> (it’s a list of what I’m aware of so far, and may be missing stuff;
> > please
> > >>> append and bring to attention as necessary :-) )
> > >>>
> > >>>
> > >>> For Flink 1.2.1:
> > >>>
> > >>> (1) https://issues.apache.org/jira/browse/FLINK-5701:
> > >>> Async exceptions in the FlinkKafkaProducer are not checked on
> > checkpoints.
> > >>> This compromises the producer’s at-least-once guarantee.
> > >>> Status: merged
> > >>>
> > >>> (2) https://issues.apache.org/jira/browse/FLINK-5949:
> > >>> Do not check Kerberos credentials for non-Kerberos authentications.
> > MapR
> > >>> users are affected by this, and cannot submit Flink on YARN jobs on a
> > >>> secured MapR cluster.
> > >>> Status: PR - https://github.com/apache/flink/pull/3528, one +1 already
> > >>>
> > >>> (3) https://issues.apache.org/jira/browse/FLINK-6006:
> > >>> Kafka Consumer can lose state if queried partition list is incomplete
> > on
> > >>> restore.
> > >>> Status: PR - https://github.com/apache/flink/pull/3505, one +1 already
> > >>>
> > >>> (4) https://issues.apache.org/jira/browse/FLINK-6025:
> > >>> KryoSerializer may use the wrong classloader when Kryo’s
> > JavaSerializer is
> > >>> used.
> > >>> Status: merged
> > >>>
> > >>> (5) 

Re: [DISCUSS] Release Flink 1.1.5 / Flink 1.2.1

2017-03-14 Thread Aljoscha Krettek
Thanks for kicking off the discussion!

I have an open PR for
 - https://issues.apache.org/jira/browse/FLINK-5808: Missing
 verification for setParallelism and setMaxParallelism
 - https://issues.apache.org/jira/browse/FLINK-5713: Protect against NPE
 in WindowOperator window cleanup

On Tue, Mar 14, 2017, at 15:15, Till Rohrmann wrote:
> Thanks for kicking off the discussion Tzu-Li. I'd like to add the
> following
> issues which have already been merged into the 1.2-release and
> 1.1-release
> branch:
> 
> 1.2.1:
> 
> (7) https://issues.apache.org/jira/browse/FLINK-5942
> Hardens the checkpoint recovery in case of corrupted ZooKeeper data.
> Corrupted checkpoints will now be skipped.
> Status: Merged
> 
> (8) https://issues.apache.org/jira/browse/FLINK-5940
> Hardens the checkpoint recovery in case that we cannot retrieve the
> completed checkpoint from the meta data state handle retrieved from
> ZooKeeper. This can, for example, happen if the meta data is deleted.
> Checkpoints with unretrievable state handles are skipped.
> Status: Merged
> 
> 1.1.5:
> 
> 
> (7) https://issues.apache.org/jira/browse/FLINK-5942
> Hardens the checkpoint recovery in case of corrupted ZooKeeper data.
> Corrupted checkpoints will now be skipped.
> Status: Merged
> 
> (8) https://issues.apache.org/jira/browse/FLINK-5940
> Hardens the checkpoint recovery in case that we cannot retrieve the
> completed checkpoint from the meta data state handle retrieved from
> ZooKeeper. This can, for example, happen if the meta data is deleted.
> Checkpoints with unretrievable state handles are skipped.
> Status: Merged
> 
> Cheers,
> Till
> 
> On Tue, Mar 14, 2017 at 12:02 PM, Tzu-Li (Gordon) Tai
> 
> wrote:
> 
> > Hi all!
> >
> > I would like to start a discussion for the next bugfix release for 1.1.x
> > and 1.2.x.
> > There’s been quite a few critical fixes for bugs in both the releases
> > recently, and I think they deserve a bugfix release soon.
> > Most of the bugs were reported by users.
> >
> > I’m starting the discussion for both bugfix releases because most fixes
> > span both releases (almost identical).
> > Of course, the actual RC votes and RC creation process doesn’t have to be
> > started together.
> >
> > Here’s an overview of what’s been collected so far, for both bugfix
> > releases -
> > (it’s a list of what I’m aware of so far, and may be missing stuff; please
> > append and bring to attention as necessary :-) )
> >
> >
> > For Flink 1.2.1:
> >
> > (1) https://issues.apache.org/jira/browse/FLINK-5701:
> > Async exceptions in the FlinkKafkaProducer are not checked on checkpoints.
> > This compromises the producer’s at-least-once guarantee.
> > Status: merged
> >
> > (2) https://issues.apache.org/jira/browse/FLINK-5949:
> > Do not check Kerberos credentials for non-Kerberos authentications. MapR
> > users are affected by this, and cannot submit Flink on YARN jobs on a
> > secured MapR cluster.
> > Status: PR - https://github.com/apache/flink/pull/3528, one +1 already
> >
> > (3) https://issues.apache.org/jira/browse/FLINK-6006:
> > Kafka Consumer can lose state if queried partition list is incomplete on
> > restore.
> > Status: PR - https://github.com/apache/flink/pull/3505, one +1 already
> >
> > (4) https://issues.apache.org/jira/browse/FLINK-6025:
> > KryoSerializer may use the wrong classloader when Kryo’s JavaSerializer is
> > used.
> > Status: merged
> >
> > (5) https://issues.apache.org/jira/browse/FLINK-5771:
> > Fix multi-char delimiters in Batch InputFormats.
> > Status: merged
> >
> > (6) https://issues.apache.org/jira/browse/FLINK-5934:
> > Set the Scheduler in the ExecutionGraph via its constructor. This fixes a
> > bug that causes HA recovery to fail.
> > Status: merged
> >
> >
> >
> > For Flink 1.1.5:
> >
> > (1) https://issues.apache.org/jira/browse/FLINK-5701:
> > Async exceptions in the FlinkKafkaProducer are not checked on checkpoints.
> > This compromises the producer’s at-least-once guarantee.
> > Status: This is already merged for 1.2.1. I would personally like to
> > backport the fix for this to 1.1.5 also.
> >
> > (2) https://issues.apache.org/jira/browse/FLINK-6006:
> > Kafka Consumer can lose state if queried partition list is incomplete on
> > restore.
> > Status: PR - https://github.com/apache/flink/pull/3507, one +1 already
> >
> > (3) https://issues.apache.org/jira/browse/FLINK-6025:
> > KryoSerializer may use the wrong classloader when Kryo’s JavaSerializer is
> > used.
> > Status: merged
> >
> > (4) https://issues.apache.org/jira/browse/FLINK-5771:
> > Fix multi-char delimiters in Batch InputFormats.
> > Status: merged
> >
> > (5) https://issues.apache.org/jira/browse/FLINK-5934:
> > Set the Scheduler in the ExecutionGraph via its constructor. This fixes a
> > bug that causes HA recovery to fail.
> > Status: merged
> >
> > (6) https://issues.apache.org/jira/browse/FLINK-5048:
> > Kafka Consumer (0.9/0.10) threading model leads problematic cancellation
> > behavior.
> > 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-13 Thread Aljoscha Krettek
 identity element. It is nice for this identity element (known a priori)
> to
> be "always available" on the side input, for every window, if it is
> expected to be something that is continually updated. But if the
> configuration is such that it is a one-time triggering of bounded data,
> that behavior is not right. Related, after some amount of time we
> conclude
> that no input will ever be received for a window, and the side input
> becomes ready.
> 
> Map Side Inputs with triggers: When new data arrives for a key in Beam,
> there's no way to know which value should "win", so you basically just
> can't use map side inputs with triggers.
> 
> These are just some quick thoughts at a very high level.
> 
> Kenn
> 
> On Thu, Mar 9, 2017 at 7:59 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> 
> > Hi Jamie,
> > actually the approach where the .withSideInput() comes before the user
> > function is only required for implementation proposal #1, which I like
> > the least. For the other two it can be after the user function, which is
> > also what I prefer.
> >
> > Regarding semantics: yes, we simply wait for anything to be available.
> > For GlobalWindows, i.e. side inputs on a normal function where we simply
> > don't have windows, this means that we wait for anything. For the
> > windowed case, which I'm proposing as a second step we will wait for
> > side input in a window to be available that matches the main-input
> > window. For the keyed case we wait for something on the same key to be
> > available, for the broadcast case we wait for anything.
> >
> > Best,
> > Aljoscha
> >
> > On Thu, Mar 9, 2017, at 16:55, Jamie Grier wrote:
> > > Hi, I think the proposal looks good.  The only thing I wasn't clear on
> > > was
> > > which API is actually being proposed.  The one where .withSideInput()
> > > comes
> > > before the user function or after.  I would definitely prefer it come
> > > after
> > > since that's the normal pattern in the Flink API.  I understood that
> > > makes
> > > the implementation different (maybe harder) but I think it helps keep the
> > > API uniform which is really good.
> > >
> > > Overall I think the API looks good and yes there are some tricky
> > > semantics
> > > here but in general if, when processing keyed main streams, we always
> > > wait
> > > until there is a side-input available for that key we're off to a great
> > > start and I think that was what you're suggesting in the design doc.
> > >
> > > -Jamie
> > >
> > >
> > > On Thu, Mar 9, 2017 at 7:27 AM, Aljoscha Krettek <aljos...@apache.org>
> > > wrote:
> > >
> > > > Hi,
> > > > these are all valuable suggestions and I think that we should implement
> > > > them when the time is right. However, I would like to first get a
> > > > minimal viable version of this feature into Flink and then expand on
> > it.
> > > > I think the last few tries of tackling this problem fizzled out because
> > > > we got to deep into discussing special semantics and features. I think
> > > > the most important thing to agree on right now is the basic API and the
> > > > implementation plan. What do you think about that?
> > > >
> > > > Regarding your suggestions, I have in fact a branch [1] from May 2016
> > > > where I implemented a prototype implementation. This has an n-ary
> > > > operator and inputs can be either bounded or unbounded and the
> > > > implementation actually waits for all bounded inputs to finish before
> > > > starting to process the unbounded inputs.
> > > >
> > > > In general, I think blocking on an input is only possible while you're
> > > > waiting for a bounded input to finish. If all inputs are unbounded you
> > > > cannot block because you might run into deadlocks (in the processing
> > > > graph, due to back pressure) and also because blocking will also block
> > > > elements that might have a lower timestamp and might fall into a
> > > > different window which is already ready for processing.
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > [1]
> > > > https://github.com/aljoscha/flink/commits/operator-ng-side-
> > input-wrapper
> > > >
> > > > On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote:
> > > > > Hi Aljoscha, thank you fo

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-09 Thread Aljoscha Krettek
Hi Jamie,
actually the approach where the .withSideInput() comes before the user
function is only required for implementation proposal #1, which I like
the least. For the other two it can be after the user function, which is
also what I prefer.

Regarding semantics: yes, we simply wait for anything to be available.
For GlobalWindows, i.e. side inputs on a normal function where we simply
don't have windows, this means that we wait for anything. For the
windowed case, which I'm proposing as a second step we will wait for
side input in a window to be available that matches the main-input
window. For the keyed case we wait for something on the same key to be
available, for the broadcast case we wait for anything.

Best,
Aljoscha

On Thu, Mar 9, 2017, at 16:55, Jamie Grier wrote:
> Hi, I think the proposal looks good.  The only thing I wasn't clear on
> was
> which API is actually being proposed.  The one where .withSideInput()
> comes
> before the user function or after.  I would definitely prefer it come
> after
> since that's the normal pattern in the Flink API.  I understood that
> makes
> the implementation different (maybe harder) but I think it helps keep the
> API uniform which is really good.
> 
> Overall I think the API looks good and yes there are some tricky
> semantics
> here but in general if, when processing keyed main streams, we always
> wait
> until there is a side-input available for that key we're off to a great
> start and I think that was what you're suggesting in the design doc.
> 
> -Jamie
> 
> 
> On Thu, Mar 9, 2017 at 7:27 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> 
> > Hi,
> > these are all valuable suggestions and I think that we should implement
> > them when the time is right. However, I would like to first get a
> > minimal viable version of this feature into Flink and then expand on it.
> > I think the last few tries of tackling this problem fizzled out because
> > we got to deep into discussing special semantics and features. I think
> > the most important thing to agree on right now is the basic API and the
> > implementation plan. What do you think about that?
> >
> > Regarding your suggestions, I have in fact a branch [1] from May 2016
> > where I implemented a prototype implementation. This has an n-ary
> > operator and inputs can be either bounded or unbounded and the
> > implementation actually waits for all bounded inputs to finish before
> > starting to process the unbounded inputs.
> >
> > In general, I think blocking on an input is only possible while you're
> > waiting for a bounded input to finish. If all inputs are unbounded you
> > cannot block because you might run into deadlocks (in the processing
> > graph, due to back pressure) and also because blocking will also block
> > elements that might have a lower timestamp and might fall into a
> > different window which is already ready for processing.
> >
> > Best,
> > Aljoscha
> >
> > [1]
> > https://github.com/aljoscha/flink/commits/operator-ng-side-input-wrapper
> >
> > On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote:
> > > Hi Aljoscha, thank you for the proposal, it is great to hear about the
> > > progress in side input.
> > >
> > > Following is my point of view:
> > > 1. I think there may be an option to block the processing of the main
> > > input
> > > instead of buffer the data in state because in production, the through
> > > put
> > > of the main input is usually much larger, and buffering the data before
> > > the
> > > side input may slow down the preparing of side input since the i-o and
> > > computing resources are always limited.
> > > 2. another issue may need to be disscussed: how can we do checkpointing
> > > with side input, because static side input may finish soon once started
> > > which will stop the checkpointing.
> > > 3. I agree with Gyula that user should be able to determines when a side
> > > input is ready? Maybe we can do it one step further: whether users can
> > > determine a operator with multiple inputs to process which input each
> > > time
> > > or not?  It would be more flexible.
> > >
> > >
> > > Best Regards!
> > > Wenlong
> > >
> > > On 7 March 2017 at 18:39, Ventura Del Monte <venturadelmo...@gmail.com>
> > > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > Thank you for the proposal and for bringing up again this discussion.
> > > >
> > > > Regarding the implementation aspect,I would say the first 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-09 Thread Aljoscha Krettek
Hi,
these are all valuable suggestions and I think that we should implement
them when the time is right. However, I would like to first get a
minimal viable version of this feature into Flink and then expand on it.
I think the last few tries of tackling this problem fizzled out because
we got to deep into discussing special semantics and features. I think
the most important thing to agree on right now is the basic API and the
implementation plan. What do you think about that?

Regarding your suggestions, I have in fact a branch [1] from May 2016
where I implemented a prototype implementation. This has an n-ary
operator and inputs can be either bounded or unbounded and the
implementation actually waits for all bounded inputs to finish before
starting to process the unbounded inputs.

In general, I think blocking on an input is only possible while you're
waiting for a bounded input to finish. If all inputs are unbounded you
cannot block because you might run into deadlocks (in the processing
graph, due to back pressure) and also because blocking will also block
elements that might have a lower timestamp and might fall into a
different window which is already ready for processing.

Best,
Aljoscha

[1]
https://github.com/aljoscha/flink/commits/operator-ng-side-input-wrapper

On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote:
> Hi Aljoscha, thank you for the proposal, it is great to hear about the
> progress in side input.
> 
> Following is my point of view:
> 1. I think there may be an option to block the processing of the main
> input
> instead of buffer the data in state because in production, the through
> put
> of the main input is usually much larger, and buffering the data before
> the
> side input may slow down the preparing of side input since the i-o and
> computing resources are always limited.
> 2. another issue may need to be disscussed: how can we do checkpointing
> with side input, because static side input may finish soon once started
> which will stop the checkpointing.
> 3. I agree with Gyula that user should be able to determines when a side
> input is ready? Maybe we can do it one step further: whether users can
> determine a operator with multiple inputs to process which input each
> time
> or not?  It would be more flexible.
> 
> 
> Best Regards!
> Wenlong
> 
> On 7 March 2017 at 18:39, Ventura Del Monte <venturadelmo...@gmail.com>
> wrote:
> 
> > Hi Aljoscha,
> >
> > Thank you for the proposal and for bringing up again this discussion.
> >
> > Regarding the implementation aspect,I would say the first way could
> > be easier/faster to implement but it could add some overhead when
> > dealing with multiple side inputs through the current 2-streams union
> > transform. I tried the second option myself as it has less overhead
> > but then the outcome was something close to a N-ary operator consuming
> > first each side input while buffering the main one.
> > Therefore, I would choose the third option as it is more generic
> > and might help also in other scenarios, although its implementation
> > requires more effort.
> > I also agree with Gyula, I think the user should be allowed to define the
> > condition that determines when a side input is ready, e.g., load the side
> > input first, incrementally update the side input.
> >
> > Best,
> > Ventura
> >
> >
> >
> >
> >
> >
> > This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
> > confidential and/or privileged information. If you are not the addressee or
> > authorized to receive this for the addressee, you must not use, copy,
> > disclose or take any action based on this message or any information
> > herein. If you have received this message in error, please advise the
> > sender immediately by reply e-mail and delete this message. Thank you for
> > your cooperation.
> >
> > On Mon, Mar 6, 2017 at 3:50 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
> >
> > > Hi Aljoscha,
> > >
> > > Thank you for the nice proposal!
> > >
> > > I think it would make sense to allow user's to affect the readiness of
> > the
> > > side input. I think making it ready when the first element arrives is
> > only
> > > slightly better then making it always ready from usability perspective.
> > For
> > > instance if I am joining against a static data set I want to wait for the
> > > whole set before making it ready. This could be exposed as a user defined
> > > condition that could also recognize bounded inputs maybe.
> > >
> > > Maybe we could also add an aggregating (merging) side input type, that
> > > could wor

[jira] [Created] (FLINK-5972) Don't allow shrinking merging windows

2017-03-06 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5972:
---

 Summary: Don't allow shrinking merging windows
 Key: FLINK-5972
 URL: https://issues.apache.org/jira/browse/FLINK-5972
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.2.0, 1.1.0, 1.3.0
Reporter: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.3.0


A misbehaving {{MergingWindowAssigner}} can cause a merge that results in a 
window that is smaller than the span of all the merged windows. This, in itself 
is not problematic. It becomes problematic when the end timestamp of a window 
that was not late before merging is now earlier than the watermark (the 
timestamp is smaller than the watermark).

There are two choices:
 - immediately process the window
 - drop the window

processing the window will lead to late data downstream.

The current behaviour is to silently drop the window but that logic has a bug: 
we only remove the dropped window from the {{MergingWindowSet}} but we don't 
properly clean up state and timers that the window still (possibly) has. We 
should fix this bug in the process of resolving this issue.

We should either just fix the bug and still silently drop windows or add a 
check and throw an exception when the end timestamp falls below the watermark.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[DISCUSS] FLIP-17 Side Inputs

2017-03-06 Thread Aljoscha Krettek
Hi Folks,

I would like to finally agree on a plan for implementing side inputs in
Flink. There has already been an attempt to come to consensus [1], which
resulted in two design documents. I tried to consolidate those two and
also added a section about implementation plans. This is the resulting
FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API


In terms of semantics I tried to go with the minimal viable solution.
The part that needs discussing is how we want to implement this. I
outlined three possible implementation plans in the FLIP but what it
boils down to is that we need to introduce some way of getting several
inputs into an operator/task.


Please have a look at the doc and let us know what you think.



Best,

Aljoscha



[1] 
https://lists.apache.org/thread.html/797df0ba066151b77c7951fd7d603a8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.org%3E


[jira] [Created] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-03-06 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5969:
---

 Summary: Add savepoint backwards compatibility tests from 1.2 to 
1.3
 Key: FLINK-5969
 URL: https://issues.apache.org/jira/browse/FLINK-5969
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.3.0


We currently only have tests that test migration from 1.1 to 1.3, because we 
added these tests when releasing Flink 1.2.

We have to copy/migrate those tests:
 - {{StatefulUDFSavepointMigrationITCase}}
 - {{*MigrationTest}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5968) Document WindowedStream.aggregate()

2017-03-06 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5968:
---

 Summary: Document WindowedStream.aggregate()
 Key: FLINK-5968
 URL: https://issues.apache.org/jira/browse/FLINK-5968
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Documentation
Reporter: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.3.0






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5967) Document AggregatingState

2017-03-06 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5967:
---

 Summary: Document AggregatingState
 Key: FLINK-5967
 URL: https://issues.apache.org/jira/browse/FLINK-5967
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Documentation
Reporter: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.3.0






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Side Outputs and Split/Select

2017-03-02 Thread Aljoscha Krettek
Ok, so it seems we have to go with the OutputTag variant for windows as
well, for now.

For Flink 2.0 we can change that. Would everyone be OK with that?

On Thu, Mar 2, 2017 at 12:05 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Flink enforces binary compatibility for all classes tagged with the @Public
> annotation.
> Binary compatibility allows users to execute a job against a newer Flink
> version without recompiling their job jar.
> Your change alters the return type of some methods (apply()). I think
> there's no way to do that in a binary compatible way.
>
> The only thing we could do is keep the return type as is, but return a
> WindowedOperation instance.
> Users could then manually cast the returned object to access the late
> stream.
>
> Downgrading to "source compatibility" only should fix the issue, but then
> users have to recompile their Flink jobs when upgrading the Flink version.
>
> On Tue, Feb 28, 2017 at 9:37 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hi Chen and Aljoscha,
> >
> > thanks for the great proposal and work.
> >
> > I prefer the WindowedOperator.getLateStream() variant without explicit
> > tags.
> > I think it is fine to start adding side output to ProcessFunction (keyed
> > and non-keyed) and window operators and see how it is picked up by users.
> >
> > Best, Fabian
> >
> >
> > 2017-02-28 15:42 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> >
> > > Quick update: I created a branch where I make the result type of
> > > WindowedStream operations more specific:
> > > https://github.com/aljoscha/flink/blob/windowed-stream-
> > > result-specific/flink-streaming-java/src/main/java/
> > > org/apache/flink/streaming/api/datastream/WindowedStream.java
> > >
> > > We would need this for the "lateStream()" API without the explicit
> > > OutputTag.
> > >
> > > It seems the backwards compatibility checker doesn't like this and
> > > complains about breaking binary backwards compatibility. +Robert
> Metzger
> > > <rmetz...@apache.org> Do you have an idea what we could do there?
> > >
> > > On Tue, 28 Feb 2017 at 12:39 Ufuk Celebi <u...@apache.org> wrote:
> > >
> > > > On Tue, Feb 28, 2017 at 11:38 AM, Aljoscha Krettek <
> > aljos...@apache.org>
> > > > wrote:
> > > > > I see the ProcessFunction as a bit of the generalised future of
> > > FlatMap,
> > > > so
> > > > > to me it makes sense to only allow side outputs on the
> > ProcessFunction
> > > > but
> > > > > I'm open for anything. If we decide for this I'm happy with an
> > > additional
> > > > > method on Collector.
> > > >
> > > > I think it's best to restrict this to ProcessFunction after all
> (given
> > > > that we allow it for non-keyed streams, etc.). ;-)
> > > >
> > >
> >
>


Re: [DISCUSS] Side Outputs and Split/Select

2017-02-28 Thread Aljoscha Krettek
Quick update: I created a branch where I make the result type of
WindowedStream operations more specific:
https://github.com/aljoscha/flink/blob/windowed-stream-result-specific/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java

We would need this for the "lateStream()" API without the explicit
OutputTag.

It seems the backwards compatibility checker doesn't like this and
complains about breaking binary backwards compatibility. +Robert Metzger
<rmetz...@apache.org> Do you have an idea what we could do there?

On Tue, 28 Feb 2017 at 12:39 Ufuk Celebi <u...@apache.org> wrote:

> On Tue, Feb 28, 2017 at 11:38 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > I see the ProcessFunction as a bit of the generalised future of FlatMap,
> so
> > to me it makes sense to only allow side outputs on the ProcessFunction
> but
> > I'm open for anything. If we decide for this I'm happy with an additional
> > method on Collector.
>
> I think it's best to restrict this to ProcessFunction after all (given
> that we allow it for non-keyed streams, etc.). ;-)
>


Re: [DISCUSS] Per-key event time

2017-02-28 Thread Aljoscha Krettek
@Tzu-Li Yes, the deluxe stream would not allow another keyBy(). Or we could
allow it but then we would exit the world of the deluxe stream and per-key
watermarks and go back to the realm of normal streams and keyed streams.

On Tue, 28 Feb 2017 at 10:08 Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Throwing in some thoughts:
>
> When a source determines that no more data will come for a key (which
> in itself is a bit of a tricky problem) then it should signal to
> downstream
> operations to take the key out of watermark calculations, that is that we
> can release some space.
> I don’t think this is possible without exposing API for the UDF to signal
> there will be no more data for a specific key. We could detect idleness of
> a key at the source operator, but without any help from user logic,
> essentially it can only be seen as "temporarily idle", which is not helpful
> in reducing the state as the watermark state for that key still needs to be
> kept downstream.
>
> So to achieve this, I think the only option would be to expose new APIs
> here too.
>
> It’s like how we recently exposed a new `markAsTemporarilyIdle` method in
> the SourceFunction.SourceContext interface, but instead a
> `markKeyTerminated` that must be called by the source UDF to be able to
> save state space and have no feasible fallback detection strategy.
>
> DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
> input
> .map()
> .window(...) // notice that we don't need keyBy because it is implicit
> .reduce(...)
> .map(...)
> .window(...)
> ...
>
> Would this mean that another `keyBy` isn’t allowed downstream? Or still
> allowed, but we’re using the keys in `DeluxeKeyedStream` as the “meta key”
> to track key lineage?
>
> On February 27, 2017 at 9:37:27 PM, Aljoscha Krettek (aljos...@apache.org)
> wrote:
>
> This is indeed an interesting topic, thanks for starting the discussion,
> Jamie!
>
> I now thought about this for a while, since more and more people seem to be
> asking about it lately. First, I thought that per-key watermark handling
> would not be necessary because it can be done locally (as Paris suggested),
> then I realised that that's not actually the case and thought that this
> wouldn't be possible. In the end, I came to realise that it is indeed
> possible (with some caveats), although with a huge overhead in the amount
> of state that we have to keep and with changes to our API. I'll try and
> walk you through my thought process.
>
> Let's first look at local watermark tracking, that is, tracking the
> watermark locally at the operator that needs it, for example a
> WindowOperator. I initially thought that this would be sufficient. Assume
> we have a pipeline like this:
>
> Source -> KeyBy -> WindowOperator -> ...
>
> If we have parallelism=1, then all elements for a given key k will be read
> by the same source operator instance and they will arrive (in-order) at the
> WindowOperator. It doesn't matter whether we track the per-key watermarks
> at the Source or at the WindowOperator because we see the same elements in
> the same order at each operator, per key.
>
> Now, think about this pipeline:
>
> Source1 --+
> |-> Union -> KeyBy -> WindowOperator -> ...
> Source2 --+
>
> (you can either think about two sources or once source that has several
> parallel instances, i.e. parallelism > 1)
>
> Here, both Source1 and Source2 can emit elements with our key k. If Source1
> is faster than Source2 and the watermarking logic at the WindowOperator
> determines the watermark based on the incoming element timestamps (for
> example, using the BoundedLatenessTimestampExtractor) then the elements
> coming from Source2 will be considered late at the WindowOperator.
>
> From this we know that our WindowOperator needs to calculate the watermark
> similarly to how watermark calculation currently happens in Flink: the
> watermark is the minimum of the watermark of all upstream operations. In
> this case it would be: the minimum upstream watermarks of operations that
> emit elements with key k. For per-partition watermarks this works because
> the number of upstream operations is know and we simply keep an array that
> has the current upstream watermark for each input operation. For per-key
> watermarks this would mean that we have to keep k*u upstream watermarks
> where u is the number of upstream operations. This can be quite large.
> Another problem is that the observed keys change, i.e. the key space is
> evolving and we need to retire keys from our calculations lest we run out
> of space.
>
> We could find a solution based on a feature we recently introduced in
> Flink: https://github.com/a

Re: [DISCUSS] Code style / checkstyle

2017-02-28 Thread Aljoscha Krettek
By the way, I also don't see the benefit of doing the transition piece by
piece.

On Mon, 27 Feb 2017 at 22:21 Dawid Wysakowicz <wysakowicz.da...@gmail.com>
wrote:

> I agree with adopting a custom codestyle/checkstyle for flink, but as I
> understood correctly most people agree there is no point of providing an
> unenforced code style.
>
> 2017-02-27 22:04 GMT+01:00 Greg Hogan <c...@greghogan.com>:
>
> > There was also …
> >
> > - create a flink style (for example, leaving indentation as +1 tab rather
> > than +2 spaces as in google's style)
> >
> > - create a flink style but optional and unenforced (but recommended for
> > new contributions)
> >
> > Flink currently has a reasonably consistent code style. I expect that
> > adopting a radically different code style module-by-module will also
> result
> > in contributions with a mix of old an new styles. If we’re not willing to
> > re-style flink-core today, under what circumstances will this change?
> With
> > a punctuated refactoring there would be a singular event for developers
> to
> > remember (as with the initial commits).
> >
> > Greg
> >
> >
> > > On Feb 27, 2017, at 3:40 PM, Dawid Wysakowicz <
> > wysakowicz.da...@gmail.com> wrote:
> > >
> > > So to sum up all the comments so far we have two alternatives.
> > > We either:
> > > 1) introduce unified checkstyle (with enforcing) and corresponding code
> > > style, both based on some established ones like google code style for
> > java
> > > [1] <https://github.com/google/google-java-format> and scalastyle for
> > scala
> > > [2] <http://www.scalastyle.org/> . We would introduce it module by
> > module
> > > for a longer period of time
> > > or
> > > 2) leave it as it is, and end this discussion for a longer (possibly
> > > infinite :) ) period of time
> > >
> > > Not sure how we should proceed with the decision on it. Is it possible
> to
> > > do some voting or so?
> > >
> > > 2017-02-27 20:54 GMT+01:00 Stavros Kontopoulos <
> st.kontopou...@gmail.com
> > >:
> > >
> > >> +1 to provide and enforcing a unified code style for both java and
> > scala.
> > >> Unification should apply when it makes sense like comments though.
> > >>
> > >> Eventually code base should be re-factored. I would vote for the one
> at
> > a
> > >> time module fix apporoach.
> > >> Style guide should be part of any PR review.
> > >>
> > >> We could also have a look at the spark style guide:
> > >> https://github.com/databricks/scala-style-guide
> > >>
> > >> The style code and general guidelines help keep code more readable and
> > keep
> > >> things simple
> > >> with many contributors and different styles of code writing + language
> > >> features.
> > >>
> > >>
> > >> On Mon, Feb 27, 2017 at 8:01 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > >>
> > >>> I agree, reformatting 90% of the code base is tough.
> > >>>
> > >>> There are two main issues:
> > >>>  (1) Incompatible merges. This is hard, especially for the folks that
> > >> have
> > >>> to merge the pull requests ;-)
> > >>>
> > >>>  (2) Author history: This is less of an issue, I think. "git log
> > >>> " and "git show  -- " will still work
> and
> > >> one
> > >>> may have to go one commit back to find out why something was changed
> > >>>
> > >>>
> > >>> What I could image is to do this incrementally. Define the code style
> > in
> > >>> "flink-parent" but do not activate it.
> > >>> Then start with some projects (new projects, plus some others):
> > >>> merge/reject PRs, reformat, activate code style.
> > >>>
> > >>> Piece by piece. This is realistically going to take a long time until
> > it
> > >> is
> > >>> pulled through all components, but that's okay, I guess.
> > >>>
> > >>> Stephan
> > >>>
> > >>>
> > >>> On Mon, Feb 27, 2017 at 1:53 PM, Aljoscha Krettek <
> aljos...@apache.org
> > >
> > >>> wrote:
> > >>>
> > >>>> Just for a bit of context, this is the output of runnin

Re: [DISCUSS] Side Outputs and Split/Select

2017-02-28 Thread Aljoscha Krettek
uot; users to the ProcessFunction in
> order to use side outputs. If on the other hand we think that the main
> use case will be the late data stream from windows then it's probably
> fine. I think we have two options for RichFunctions, either the
> runtime context or the collector, both of which are shared with the
> DataSet API. I would be OK with throwing an
> UnsupportedOperationException if a batch API user tries to access it.
>
>
>
> On Mon, Feb 27, 2017 at 8:56 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
> > Aljoscha,
> >
> > Ahh, that is much better.  As long as it's explicitly referring to late
> > data I think it's fine.  I also like the second variant where a user
> > doesn't have to explicitly create the OutputTag.
> >
> >
> >
> > On Mon, Feb 27, 2017 at 8:45 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> >> @Jamie I must have mistyped my last API proposal. This piece of code:
> >> WindowedOperator windowedResult = input
> >>   .keyBy(...)
> >>   .window(...)
> >>   .apply(...)
> >>
> >> DataStream lateData = windowedResult.getSideOutput();
> >>
> >> should actually have been:
> >>
> >> WindowedOperator windowedResult = input
> >>   .keyBy(...)
> >>   .window(...)
> >>   .apply(...)
> >>
> >> DataStream lateData = windowedResult.getLateDataSideOutput();
> >>
> >> So apart from the naming it's pretty much the same as your suggestion,
> >> right? The reason why I preferred the explicit OutputTag is that we
> >> otherwise have to create another layer of OutputTags that are internal
> to
> >> the system so that users cannot accidentally also send data to the same
> >> side output. It just means writing more code for use and introducing the
> >> more concrete return type for the WindowedStream operations. But that's
> >> fine if y'all prefer that variant. :-)
> >>
> >> On Sat, 25 Feb 2017 at 04:19 Chen Qin <qinnc...@gmail.com> wrote:
> >>
> >> > Hi Jamie,
> >> >
> >> > I think it does make consuming late arriving events more explicit! At
> >> cost
> >> > of
> >> > fix a predefined OutputTag which user have no control nor
> definition
> >> > an extra UDF which essentially filter out all mainOutputs and only let
> >> > sideOutput pass (like filterFunction)
> >> >
> >> > Thanks,
> >> > Chen
> >> >
> >> > > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
> >> > wrote:
> >> > >
> >> > > I prefer the ProcessFunction and side outputs solution over split()
> and
> >> > > select() which I've never liked primarily due to the lack of type
> >> safety
> >> > > and it also doesn't really seem to fit with the rest of Flink's API.
> >> > >
> >> > > On the late data question I strongly prefer the late data concept
> being
> >> > > explicit in the API.  Could we not also do something like:
> >> > >
> >> > > WindowedStream<> windowedStream = input
> >> > >  .keyBy(...)
> >> > >  .window(...);
> >> > >
> >> > > DataStream<> mainOutput = windowedStream
> >> > >  .apply(...);
> >> > >
> >> > > DataStream<> lateOutput = windowStream
> >> > >  .lateStream();
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gyf...@apache.org>
> wrote:
> >> > >
> >> > >> Hi,
> >> > >>
> >> > >> Thanks for the nice proposal, I like the idea of side outputs, and
> it
> >> > would
> >> > >> make a lot of topologies much simpler.
> >> > >>
> >> > >> Regarding the API I think we should come up with a way of making
> side
> >> > >> otuputs accessible from all sort of operators in a similar way. For
> >> > >> instance through the RichFunction interface with a special
> collector
> >> > that
> >> > >> we invalidate when the user should not be collecting to it. (just a
> >> > quick
> >> > >> idea)
> >> > >>
> >> > >> I personally wouldn't 

[jira] [Created] (FLINK-5933) Allow Evictor for merging windows

2017-02-28 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5933:
---

 Summary: Allow Evictor for merging windows
 Key: FLINK-5933
 URL: https://issues.apache.org/jira/browse/FLINK-5933
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.2.0, 1.3.0
Reporter: Aljoscha Krettek


Due to some leftover checks {{WindowedStream}} does not allow the combination 
although {{EvictingWindowOperator}} has the code to support it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Side Outputs and Split/Select

2017-02-27 Thread Aljoscha Krettek
I'm curious to know what people think about the OutputTag API for the
general side-output implementation?

One thing that might easily go overlooked is that I changed ProcessFunction
from an interface to an abstract class. So that I could provide a default
onTime() method. This also would require allowing ProcessFunction on a
non-keyed stream, as I mentioned in my first mail (I hope).

On Mon, 27 Feb 2017 at 17:45 Aljoscha Krettek <aljos...@apache.org> wrote:

> @Jamie I must have mistyped my last API proposal. This piece of code:
> WindowedOperator windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream lateData = windowedResult.getSideOutput();
>
> should actually have been:
>
> WindowedOperator windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream lateData = windowedResult.getLateDataSideOutput();
>
> So apart from the naming it's pretty much the same as your suggestion,
> right? The reason why I preferred the explicit OutputTag is that we
> otherwise have to create another layer of OutputTags that are internal to
> the system so that users cannot accidentally also send data to the same
> side output. It just means writing more code for use and introducing the
> more concrete return type for the WindowedStream operations. But that's
> fine if y'all prefer that variant. :-)
>
> On Sat, 25 Feb 2017 at 04:19 Chen Qin <qinnc...@gmail.com> wrote:
>
> Hi Jamie,
>
> I think it does make consuming late arriving events more explicit! At cost
> of
> fix a predefined OutputTag which user have no control nor definition
> an extra UDF which essentially filter out all mainOutputs and only let
> sideOutput pass (like filterFunction)
>
> Thanks,
> Chen
>
> > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
> >
> > I prefer the ProcessFunction and side outputs solution over split() and
> > select() which I've never liked primarily due to the lack of type safety
> > and it also doesn't really seem to fit with the rest of Flink's API.
> >
> > On the late data question I strongly prefer the late data concept being
> > explicit in the API.  Could we not also do something like:
> >
> > WindowedStream<> windowedStream = input
> >  .keyBy(...)
> >  .window(...);
> >
> > DataStream<> mainOutput = windowedStream
> >  .apply(...);
> >
> > DataStream<> lateOutput = windowStream
> >  .lateStream();
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gyf...@apache.org> wrote:
> >
> >> Hi,
> >>
> >> Thanks for the nice proposal, I like the idea of side outputs, and it
> would
> >> make a lot of topologies much simpler.
> >>
> >> Regarding the API I think we should come up with a way of making side
> >> otuputs accessible from all sort of operators in a similar way. For
> >> instance through the RichFunction interface with a special collector
> that
> >> we invalidate when the user should not be collecting to it. (just a
> quick
> >> idea)
> >>
> >> I personally wouldn't deprecate the "universal" Split/Select API that
> can
> >> be used on any  DataStream in favor of functionality that is only
> >> accessible trhough the process function/ or a few select operators. I
> think
> >> the Split/Select pattern is also very nice and I use it in many
> different
> >> contexts to get efficient multiway filtering (after map/co operators for
> >> examples).
> >>
> >> Regards,
> >> Gyula
> >>
> >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. febr.
> 23.,
> >> Cs, 15:42):
> >>
> >>> Hi Folks,
> >>> Chen and I have been working for a while now on making FLIP-13 (side
> >>> outputs) [1] a reality. We think we have a pretty good internal
> >>> implementation and also a proposal for an API but now we need to
> discuss
> >>> how we want to go forward with this, especially how we should deal with
> >>> split/select which does some of the same things side outputs can do.
> I'll
> >>> first quickly describe what the split/select API looks like, so that
> >> we're
> >>> all on the same page. Then I'll present the new proposed side output
> API
> >>> and then I'll present new API for getting dropped late data from a
> >> windowed
> >>> operation, which was t

Re: [DISCUSS] Side Outputs and Split/Select

2017-02-27 Thread Aljoscha Krettek
@Jamie I must have mistyped my last API proposal. This piece of code:
WindowedOperator windowedResult = input
  .keyBy(...)
  .window(...)
  .apply(...)

DataStream lateData = windowedResult.getSideOutput();

should actually have been:

WindowedOperator windowedResult = input
  .keyBy(...)
  .window(...)
  .apply(...)

DataStream lateData = windowedResult.getLateDataSideOutput();

So apart from the naming it's pretty much the same as your suggestion,
right? The reason why I preferred the explicit OutputTag is that we
otherwise have to create another layer of OutputTags that are internal to
the system so that users cannot accidentally also send data to the same
side output. It just means writing more code for use and introducing the
more concrete return type for the WindowedStream operations. But that's
fine if y'all prefer that variant. :-)

On Sat, 25 Feb 2017 at 04:19 Chen Qin <qinnc...@gmail.com> wrote:

> Hi Jamie,
>
> I think it does make consuming late arriving events more explicit! At cost
> of
> fix a predefined OutputTag which user have no control nor definition
> an extra UDF which essentially filter out all mainOutputs and only let
> sideOutput pass (like filterFunction)
>
> Thanks,
> Chen
>
> > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
> >
> > I prefer the ProcessFunction and side outputs solution over split() and
> > select() which I've never liked primarily due to the lack of type safety
> > and it also doesn't really seem to fit with the rest of Flink's API.
> >
> > On the late data question I strongly prefer the late data concept being
> > explicit in the API.  Could we not also do something like:
> >
> > WindowedStream<> windowedStream = input
> >  .keyBy(...)
> >  .window(...);
> >
> > DataStream<> mainOutput = windowedStream
> >  .apply(...);
> >
> > DataStream<> lateOutput = windowStream
> >  .lateStream();
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gyf...@apache.org> wrote:
> >
> >> Hi,
> >>
> >> Thanks for the nice proposal, I like the idea of side outputs, and it
> would
> >> make a lot of topologies much simpler.
> >>
> >> Regarding the API I think we should come up with a way of making side
> >> otuputs accessible from all sort of operators in a similar way. For
> >> instance through the RichFunction interface with a special collector
> that
> >> we invalidate when the user should not be collecting to it. (just a
> quick
> >> idea)
> >>
> >> I personally wouldn't deprecate the "universal" Split/Select API that
> can
> >> be used on any  DataStream in favor of functionality that is only
> >> accessible trhough the process function/ or a few select operators. I
> think
> >> the Split/Select pattern is also very nice and I use it in many
> different
> >> contexts to get efficient multiway filtering (after map/co operators for
> >> examples).
> >>
> >> Regards,
> >> Gyula
> >>
> >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. febr.
> 23.,
> >> Cs, 15:42):
> >>
> >>> Hi Folks,
> >>> Chen and I have been working for a while now on making FLIP-13 (side
> >>> outputs) [1] a reality. We think we have a pretty good internal
> >>> implementation and also a proposal for an API but now we need to
> discuss
> >>> how we want to go forward with this, especially how we should deal with
> >>> split/select which does some of the same things side outputs can do.
> I'll
> >>> first quickly describe what the split/select API looks like, so that
> >> we're
> >>> all on the same page. Then I'll present the new proposed side output
> API
> >>> and then I'll present new API for getting dropped late data from a
> >> windowed
> >>> operation, which was the original motivation for adding side outputs.
> >>>
> >>> Split/select consists of two API calls:
> DataStream.split(OutputSelector)
> >>> and SplitStream.select(). You can use it like this:
> >>>
> >>> DataStreamSource input = env.fromElements(1, 2, 3);
> >>>
> >>> final String EVEN_SELECTOR = "even";
> >>> final String ODD_SELECTOR = "odd";
> >>>
> >>> SplitStream split = input.split(
> >>>new OutputSelector() {
> >>>@Override
> >>> 

[jira] [Created] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5929:
---

 Summary: Allow Access to Per-Window State in ProcessWindowFunction
 Key: FLINK-5929
 URL: https://issues.apache.org/jira/browse/FLINK-5929
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Aljoscha Krettek


Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} can 
access is scoped to the key of the window but not the window itself. That is, 
state is global across all windows for a given key.

For some use cases it is beneficial to keep state scoped to a window. For 
example, if you expect to have several {{Trigger}} firings (due to early and 
late firings) a user can keep state per window to keep some information between 
those firings.

The per-window state has to be cleaned up in some way. For this I see two 
options:
 - Keep track of all state that a user uses and clean up when we reach the 
window GC horizon.
 - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called when 
we reach the window GC horizon that users can/should use to clean up their 
state.

On the API side, we can add a method {{windowState()}} on 
{{ProcessWindowFunction.Context}} that retrieves the per-window state and 
{{globalState()}} that would allow access to the (already available) global 
state. The {{Context}} would then look like this:
{code}
/**
 * The context holding window metadata
 */
public abstract class Context {
/**
 * @return The window that is being evaluated.
 */
public abstract W window();

/**
 * State accessor for per-key and per-window state.
 */
KeyedStateStore windowState();

/**
 * State accessor for per-key global state.
 */
KeyedStateStore globalState();
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: ElasticsearchSink Exception

2017-02-27 Thread Aljoscha Krettek
+Tzu-Li (Gordon) Tai  Do you have any idea what could
be causing this? I'm asking because you recently worked on the
Elasticsearch connectors, right?

On Sun, 26 Feb 2017 at 04:26 Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Thanks Flavio. I tried with multiple versions but still the same exception
> and I was able to locate the class file inside my jar. Am I missing
> something? Thanks for all the help.
>
> On Sat, Feb 25, 2017 at 3:09 PM, Flavio Pompermaier 
> wrote:
>
> > The exception you have (NoClassDefFoundError:
> > org/elasticsearch/index/mapper/MapperParsingException) is usually caused
> > by
> > elasticsearch version conflict or a bad shading when creating the uber
> jar.
> > Can you check if one of the 2 is causing the problem?
> >
> > On 25 Feb 2017 23:13, "Govindarajan Srinivasaraghavan" <
> > govindragh...@gmail.com> wrote:
> >
> > > Hi Flavio,
> > >
> > > I tried with both http port 9200 and tcp port 9300 and I see incoming
> > > connections in the elasticserach node. Also I see the below errors in
> > > taskmanager out logs. Below are the dependencies I have on my gradle
> > > project. Am I missing something?
> > >
> > > Exception in thread "elasticsearch[Madame Menace][generic][T#2]"
> > > java.lang.NoClassDefFoundError:
> > > org/elasticsearch/index/mapper/MapperParsingException
> > > at
> > > org.elasticsearch.ElasticsearchException.(
> > > ElasticsearchException.java:597)
> > > at
> > > org.elasticsearch.transport.TransportService$Adapter$3.
> > > run(TransportService.java:622)
> > > at
> > > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > ThreadPoolExecutor.java:1142)
> > > at
> > > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > ThreadPoolExecutor.java:617)
> > > at java.lang.Thread.run(Thread.java:745)
> > > Caused by: java.lang.ClassNotFoundException:
> > > org.elasticsearch.index.mapper.MapperParsingException
> > > at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > > ... 5 more
> > >
> > >
> > > Exception in thread "elasticsearch[Saint Elmo][generic][T#2]"
> > > java.lang.NoClassDefFoundError: Could not initialize class
> > > org.elasticsearch.transport.NodeDisconnectedException
> > > at
> > > org.elasticsearch.transport.TransportService$Adapter$3.
> > > run(TransportService.java:622)
> > > at
> > > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > ThreadPoolExecutor.java:1142)
> > > at
> > > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > ThreadPoolExecutor.java:617)
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > compile group: 'org.apache.flink', name: 'flink-core', version: '1.2.0'
> > > compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10',
> > > version: '1.2.0'
> > > compile group: 'org.apache.flink', name: 'flink-java', version: '1.2.0'
> > > compile group: 'org.apache.flink', name:
> > > 'flink-connector-kafka-0.10_2.10', version: '1.2.0'
> > > compile group: 'org.apache.flink', name: 'flink-clients_2.10', version:
> > > '1.2.0'
> > >
> > > compile group: 'org.apache.flink', name:
> > > 'flink-connector-elasticsearch2_2.10', version: '1.2.0'
> > >
> > >
> > > On Sat, Feb 25, 2017 at 1:26 AM, Flavio Pompermaier <
> > pomperma...@okkam.it>
> > > wrote:
> > >
> > > > Are you sure that in elasticsearch.yml you've enabled ES to listen to
> > the
> > > > http port 9300?
> > > >
> > > > On 25 Feb 2017 08:58, "Govindarajan Srinivasaraghavan" <
> > > > govindragh...@gmail.com> wrote:
> > > >
> > > > Hi All,
> > > >
> > > > I'm getting the below exception when I start my flink job. I have
> > > verified
> > > > the elastic search host and it seems to be working well. I have also
> > > tried
> > > > including the below dependecies to my project but nothing works. Need
> > > some
> > > > help. Thanks.
> > > >
> > > > compile group: 'org.apache.lucene', name: 'lucene-core', version:
> > '5.5.0'
> > > > compile group: 'org.elasticsearch', name: 'elasticsearch', version:
> > > '2.3.5'
> > > >
> > > >
> > > > *Sink Code:*
> > > >
> > > > List transportAddresses = new ArrayList<>();
> > > > transportAddresses.add(new
> > > > InetSocketAddress(InetAddress.getByName(*hostName*), 9300));
> > > >
> > > > output.addSink(new ElasticsearchSink<>(config, transportAddresses,
> new
> > > > ElasticsearchSinkFunction() {
> > > >
> > > > }
> > > >
> > > >
> > > > *Exception:*
> > > >
> > > > java.lang.RuntimeException: Client is not connected to any
> > Elasticsearch
> > > > nodes!
> > > > at org.apache.flink.streaming.connectors.elasticsearch2.
> > > > ElasticsearchSink.open(ElasticsearchSink.java:172)
> > > > at org.apache.flink.api.common.functions.util.FunctionUtils.
> > > > 

Re: [DISCUSS] Per-key event time

2017-02-27 Thread Aljoscha Krettek
This is indeed an interesting topic, thanks for starting the discussion,
Jamie!

I now thought about this for a while, since more and more people seem to be
asking about it lately. First, I thought that per-key watermark handling
would not be necessary because it can be done locally (as Paris suggested),
then I realised that that's not actually the case and thought that this
wouldn't be possible. In the end, I came to realise that it is indeed
possible (with some caveats), although with a huge overhead in the amount
of state that we have to keep and with changes to our API. I'll try and
walk you through my thought process.

Let's first look at local watermark tracking, that is, tracking the
watermark locally at the operator that needs it, for example a
WindowOperator. I initially thought that this would be sufficient. Assume
we have a pipeline like this:

Source -> KeyBy -> WindowOperator -> ...

If we have parallelism=1, then all elements for a given key k will be read
by the same source operator instance and they will arrive (in-order) at the
WindowOperator. It doesn't matter whether we track the per-key watermarks
at the Source or at the WindowOperator because we see the same elements in
the same order at each operator, per key.

Now, think about this pipeline:

Source1 --+
  |-> Union -> KeyBy -> WindowOperator -> ...
Source2 --+

(you can either think about two sources or once source that has several
parallel instances, i.e. parallelism > 1)

Here, both Source1 and Source2 can emit elements with our key k. If Source1
is faster than Source2 and the watermarking logic at the WindowOperator
determines the watermark based on the incoming element timestamps (for
example, using the BoundedLatenessTimestampExtractor) then the elements
coming from Source2 will be considered late at the WindowOperator.

>From this we know that our WindowOperator needs to calculate the watermark
similarly to how watermark calculation currently happens in Flink: the
watermark is the minimum of the watermark of all upstream operations. In
this case it would be: the minimum upstream watermarks of operations that
emit elements with key k. For per-partition watermarks this works because
the number of upstream operations is know and we simply keep an array that
has the current upstream watermark for each input operation. For per-key
watermarks this would mean that we have to keep k*u upstream watermarks
where u is the number of upstream operations. This can be quite large.
Another problem is that the observed keys change, i.e. the key space is
evolving and we need to retire keys from our calculations lest we run out
of space.

We could find a solution based on a feature we recently introduced in
Flink: https://github.com/apache/flink/pull/2801. The sources keep track of
whether they have input and signal to downstream operations whether they
should be included in the watermark calculation logic. A similar thing
could be done per-key, where each source signals to downstream operations
that there is a new key and that we should start calculating watermarks for
this. When a source determines that no more data will come for a key (which
in itself is a bit of a tricky problem) then it should signal to downstream
operations to take the key out of watermark calculations, that is that we
can release some space.

The above is analysing, on a purely technical level, the feasibility of
such a feature. I think it is feasible but can be very expensive in terms
of state size requirements. Gabor also pointed this out above and gave a
few suggestions on reducing that size.

We would also need to change our API to allow tracking the lineage of keys
or to enforce that a key stays the same throughout a pipeline. Consider
this pipeline:

Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator

where KeyBy1 and KeyBy2 extract a different key, respectively. How would
watermarks be tracked across this change of keys? Would we know which of
the prior keys and up being keys according to KeyBy2, i.e. do we have some
kind of key lineage information?

One approach for solving this would be to introduce a new API that allows
extracting a key at the source and will keep this key on the elements until
the sink. For example:

DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
input
  .map()
  .window(...) // notice that we don't need keyBy because it is implicit
  .reduce(...)
  .map(...)
  .window(...)
  ...

The DeluxeKeyedStream (name preliminary ;-) would allow the operations that
we today have on KeyedStream and on DataStream and it would always maintain
the key that was assigned at the sources. The result of each operation
would again be a DeluxeKeyedStream. This way, we could track watermarks per
key.

I know it's a bit of a (very) lengthy mail, but what do you think?


On Thu, 23 Feb 2017 at 11:14 Gábor Hermann  wrote:

> Hey all,
>
> Let me share some ideas about this.
>
> @Paris: The local-only 

Re: [DISCUSS] Code style / checkstyle

2017-02-27 Thread Aljoscha Krettek
Just for a bit of context, this is the output of running cloc on the Flink
codebase:
---
Language files  blankcomment
code
---
Java  4609 126825 185428
  519096

=> 704,524 lines of code + comments/javadoc

When I apply the google style to the Flink code base using
https://github.com/google/google-java-format I get these commit statistics:

4577 files changed, 647645 insertions(+), 622663 deletions(-)

That is, a change to the Google Code Style would touch roughly over 90% of
all code/comment lines.

I would like to have a well defined code style, such as the Google Code
style, that has nice tooling and support but I don't think we will ever
convince enough people to do this kind of massive change. Even I think it's
a bit crazy to change 90% of the code base in one commit.

On Mon, 27 Feb 2017 at 11:10 Till Rohrmann  wrote:

> No, I think that's exactly what people mean when saying "losing the commit
> history". With the reformatting you would have to go manually through all
> past commits until you find the commit which changed a given line before
> the reformatting.
>
> Cheers,
> Till
>
> On Sun, Feb 26, 2017 at 6:32 PM, Alexander Alexandrov <
> alexander.s.alexand...@gmail.com> wrote:
>
> > Just to clarify - by "losing the commit history" you actually mean
> "losing
> > the ability to annotate each line in a file with its last commit", right?
> >
> > Or is there some other sense in which something is lost after applying
> bulk
> > re-format?
> >
> > Cheers,
> > A.
> >
> > On Sat, Feb 25, 2017 at 7:10 AM Henry Saputra 
> > wrote:
> >
> > > Just want to clarify what unify code style here.
> > >
> > > Is the intention to have IDE and Maven plugins to have the same check
> > style
> > > rules?
> > >
> > > Or are we talking about having ONE code style for both Java and Scala?
> > >
> > > - Henry
> > >
> > > On Fri, Feb 24, 2017 at 8:08 AM, Greg Hogan 
> wrote:
> > >
> > > > I agree wholeheartedly with Ufuk. We cannot reformat the codebase,
> > cannot
> > > > pause while flushing the PR queue, and won't find a consensus code
> > style.
> > > >
> > > > I think we can create a baseline code style for new and existing
> > > > contributors for which reformatting on changed files will be
> acceptable
> > > for
> > > > PR reviews.
> > > >
> > > > On Fri, Feb 24, 2017 at 5:01 AM, Dawid Wysakowicz <
> > > > wysakowicz.da...@gmail.com> wrote:
> > > >
> > > > > The problem with code style when it is not enforced is that it will
> > be
> > > a
> > > > > matter of luck to what parts of files / new files will it be
> applied.
> > > > When
> > > > > the code style is not applied to whole file, it is pretty much
> > useless
> > > > > anyway. You would need to manually select just the fragments one is
> > > > > changing. The benefits of having code style and enforcing it I see
> > are:
> > > > >  - being able to apply autoformatter, which speeds up writing code
> > > > >  - it would make reviewing PRs easier as e.g. there would be line
> > > length
> > > > > limit applied etc. which will make line breaking more reader
> > friendly.
> > > > >
> > > > > Though I think if a consensus is not reachable it would be good to
> > once
> > > > and
> > > > > forever decide that we don't want a code style and checkstyle.
> > > > >
> > > > > 2017-02-24 10:51 GMT+01:00 Ufuk Celebi :
> > > > >
> > > > > > On Fri, Feb 24, 2017 at 10:46 AM, Fabian Hueske <
> fhue...@gmail.com
> > >
> > > > > wrote:
> > > > > > > I agree with Till that encouraging a code style without
> enforcing
> > > it
> > > > > does
> > > > > > > not make a lot of sense.
> > > > > > > If we enforce it, we need to touch all files and PRs.
> > > > > >
> > > > > > I think it makes sense for new contributors to have a starting
> > point
> > > > > > without enforcing anything (I do agree that we are past the point
> > to
> > > > > > reach consensus on a style and enforcement ;-)).
> > > > > >
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-5917) Remove MapState.size()

2017-02-24 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5917:
---

 Summary: Remove MapState.size()
 Key: FLINK-5917
 URL: https://issues.apache.org/jira/browse/FLINK-5917
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.3.0
Reporter: Aljoscha Krettek


I'm proposing to remove {{size()}} because it is a prohibitively expensive 
operation and users might not be aware of it. Instead of {{size()}} users can 
use an iterator over all mappings to determine the size, when doing this they 
will be aware of the fact that it is a costly operation.

Right now, {{size()}} is only costly on the RocksDB state backend but I think 
with future developments on the in-memory state backend it might also become an 
expensive operation there.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Frontend classpath issue

2017-02-24 Thread Aljoscha Krettek
Did any user have problems with the Flink 1.1 behaviour? If not, we could
disable it again, by default, and add a flag for adding the user jar to all
the classpaths.

On Fri, 24 Feb 2017 at 14:50 Robert Metzger  wrote:

> I agree with you Gyula, this change is dangerous. I have seen another case
> from a user with Hadoop dependencies that crashed in Flink 1.2.0 that
> didn't in 1.1.x
>
> I wonder if we should introduce a config flag for Flink 1.2.1 to disable
> the behavior if needed.
>
> On Fri, Feb 24, 2017 at 2:27 PM, Ufuk Celebi  wrote:
>
> > On Fri, Feb 24, 2017 at 11:05 AM, Gyula Fóra 
> wrote:
> > > I was not aware of this big change (I know it's my fault) but I am not
> > sure
> > > if I agree with the rationale.
> >
> > No comment on the actual issue from my side, but I strongly disagree
> > that this is your fault. We should have covered this better in the
> > release announcement in my opinion. Of course, this doesn't help now.
> > ;-)
> >
> > – Ufuk
> >
>


Re: [DISCUSS] Code style / checkstyle

2017-02-23 Thread Aljoscha Krettek
If we go for a codestyle/checkstyle I would suggest to use the Google
style. This already has checkstyle, IntelliJ style, Eclipse style and a
code formatting tool and is well established. However, some people will not
like this style. In general, I think we will never manage to find a style
that all people will like.

On Wed, 22 Feb 2017 at 18:36 Dawid Wysakowicz 
wrote:

> So how about preparing a code style and corresponding checkstyle and
> enabling it gradually module by module whenever shepherd/commiter with
> expertise in a module will have time to introduce/check such a change?
> Maybe it will make the "snowball effect" happen?
> I agree there is no point in preparing code style/checkstyle until it will
> be introduced somewhere. I will be willing to work on the checkstyle if
> some volunteering modules appear.
>
> 2017-02-22 17:09 GMT+01:00 Chesnay Schepler :
>
> > For file where we don't enforce checkstyle things should work they way
> > they do right now.
> >
> > Turn off auto-formatting, and only format code that you touched and
> that's
> > it. For these
> > modification we will have to check them manually in the PRs as we do now.
> >
> >
> > On 22.02.2017 16:22, Greg Hogan wrote:
> >
> >> Will not the code style be applied on save to any user-modified file? So
> >> this will clutter PRs and overwrite history.
> >>
> >> On Wed, Feb 22, 2017 at 6:19 AM, Dawid Wysakowicz <
> >> wysakowicz.da...@gmail.com> wrote:
> >>
> >> I also agree with Till and Chesnayl. Anyway as to "capture the current
> >>> style" I have some doubts if this is possible, as it changes file to
> >>> file.
> >>>
> >>> Chesnay's suggestion as to were enforce the checkstyle seems reasonable
> >>> to
> >>> me, but I am quite new to the community :).
> >>> Enabling checkstyle for particular packages is possible.
> >>>
> >>> 2017-02-22 12:07 GMT+01:00 Chesnay Schepler :
> >>>
> >>> I agree with Till.
> 
>  I would propose enforcing checkstyle on a subset of the modules,
> 
> >>> basically
> >>>
>  those that are not
>  flink-runtime, flink-java, flink-streaming-java. These are the ones
> imo
>  where messing with the history
>  can be detrimental; for the others it isn't really important imo.
>  (Note that i excluded scala code since i don't know the state of
>  checkstyle compliance there)
> 
>  For flink-runtime we could maybe (don't know if it is supported)
> enforce
>  checkstyle for all classes
>  under o.a.f.migration, so that at least the flip-6 code is covered.
> 
>  Similarly, enforcing checkstyle for all tests should be fine as well.
> 
>  Regards,
>  Chesnay
> 
> 
>  On 22.02.2017 11:48, Till Rohrmann wrote:
> 
>  I think that not enforcing a code style is as good as not having any
> >
>  code
> >>>
>  style to be honest. Having an IntelliJ or Eclipse profile is nice and
> >
>  some
> >>>
>  people will probably use it, but my gut feeling is that the majority
> >
>  won't
> >>>
>  notice it.
> >
> > Cheers,
> > Till
> >
> > On Wed, Feb 22, 2017 at 11:15 AM, Ufuk Celebi 
> wrote:
> >
> > Kurt's proposal sounds reasonable.
> >
> >> What about the following:
> >> - We try to capture the current style in an code style configuration
> >> (for IntelliJ and maybe Eclipse)
> >> - We provide that on the website for contributors to download
> >> - We don't enforce it, but new contributions and changes are free to
> >> format with this style as changes happen
> >>
> >> Practically speaking, this should not change much except maybe the
> >> import order or whitespace after certain keywords, etc.
> >>
> >>
> >> On Wed, Feb 22, 2017 at 4:48 AM, Kurt Young 
> wrote:
> >>
> >> +1 to provide a unified code style for both java and scala.
> >>>
> >>> -1 to adjust all the existing code to the new style in one step.
> The
> >>>
> >>> code's
> >>
> >> history contains very helpful information which can help
> >>> develops to understand why these codes are added, which jira is
> >>>
> >> related.
> >>>
>  This information is too valuable to lose. I think we can
> >>> do the reformat thing step by step, each time when the codes being
> >>>
> >>> changed,
> >>
> >> we can adopt them to the new style.
> >>> IMHO this is also the reason why the unified code style is
> important.
> >>>
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>> On Wed, Feb 22, 2017 at 5:50 AM, Dawid Wysakowicz <
> >>> wysakowicz.da...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
>  I would like to resurrect the discussing ([1]
>    nabble.com/Code-style-guideline-for-Scala-td7526.html>
> 

[DISCUSS] Side Outputs and Split/Select

2017-02-23 Thread Aljoscha Krettek
Hi Folks,
Chen and I have been working for a while now on making FLIP-13 (side
outputs) [1] a reality. We think we have a pretty good internal
implementation and also a proposal for an API but now we need to discuss
how we want to go forward with this, especially how we should deal with
split/select which does some of the same things side outputs can do. I'll
first quickly describe what the split/select API looks like, so that we're
all on the same page. Then I'll present the new proposed side output API
and then I'll present new API for getting dropped late data from a windowed
operation, which was the original motivation for adding side outputs.

Split/select consists of two API calls: DataStream.split(OutputSelector)
and SplitStream.select(). You can use it like this:

DataStreamSource input = env.fromElements(1, 2, 3);

final String EVEN_SELECTOR = "even";
final String ODD_SELECTOR = "odd";

SplitStream split = input.split(
new OutputSelector() {
@Override
public Iterable select(Integer value) {
if (value % 2 == 0) {
return Collections.singleton(EVEN_SELECTOR);
} else {
return Collections.singleton(ODD_SELECTOR);
}
}
});

DataStream evenStream = split.select(EVEN_SELECTOR);
DataStream oddStream = split.select(ODD_SELECTOR);

The stream is split according to an OutputSelector that returns an Iterable
of Strings. Then you can use select() to get a new stream that only
contains elements with the given selector. Notice how the element type for
all the split streams is the same.

The new side output API proposal adds a new type OutputTag and relies on
extending ProcessFunction to allow emitting data to outputs besides the
main output. I think it's best explained with an example as well:

DataStreamSource input = env.fromElements(1, 2, 3);

final OutputTag sideOutput1 = new OutputTag<>("side-output-1"){};
final OutputTag sideOutput2 = new OutputTag<>("side-output-2"){};

SingleOutputStreamOperator mainOutputStream = input
.process(new ProcessFunction() {

@Override
public void processElement(
Integer value,
Context ctx,
Collector out) throws Exception {

ctx.output(sideOutput1, "WE GOT: " + value);
ctx.output(sideOutput2, value);
out.collect("MAIN OUTPUT: " + value);
}

});

DataStream sideOutputStream1 =
mainOutputStream.getSideOutput(sideOutput1);
DataStream sideOutputStream2 =
mainOutputStream.getSideOutput(sideOutput2);

Notice how the OutputTags are anonymous inner classes, similar to TypeHint.
We need this to be able to analyse the type of the side-output streams.
Also notice, how the types of the side-output streams can be independent of
the main-output stream, also notice how everything is correctly type
checked by the Java Compiler.

This change requires making ProcessFunction an abstract base class so that
not every user has to implement the onTimer() method. We would also need to
allow ProcessFunction on a non-keyed stream.

Chen also implemented an API based on FlatMapFunction that looks like the
one proposed in the FLIP. This relies on CollectorWrapper, which can be
used to "pimp" a Collector to also allow emitting to side outputs.

For WindowedStream we have two proposals: make OutputTag visible on the
WindowedStream API or make the result type of WindowedStream operations
more specific to allow a getDroppedDataSideOutput() method. For the first
proposal it would look like this:

final OutputTag lateDataTag = new OutputTag<>("side-output-1"){};

DataStream windowedResult = input
  .keyBy(...)
  .window(...)
  .sideOutputLateData(lateDataTag)
  .apply(...)

DataStream lateData = windowedResult.getSideOutput(lateDataTag);

For the second proposal it would look like this:

WindowedOperator windowedResult = input
  .keyBy(...)
  .window(...)
  .apply(...)

DataStream lateData = windowedResult.getSideOutput();

Right now, the result of window operations is a
SingleOutputStreamOperator, same as it is for all DataStream operations.
Making the result type more specific, i.e. a WindowedOperator, would allow
us to add extra methods there. This would require wrapping a
SingleOutputStreamOperator and forwarding all the method calls to the
wrapped operator which can be a bit of a hassle for future changes. The
first proposal requires additional boilerplate code.

Sorry for the long mail but I think it's necessary to get everyone on the
same page. The question is now: how should we proceed with the proposed API
and the old split/select API? I propose to deprecate split/select and only
have side outputs, going forward. Of course, I'm a bit biased on this. ;-)
If we decide to do this, we also need to decide on what the side output API
should look like.

Happy discussing! Feedback very welcome. :-)


Re: [DISCUSS] Project build time and possible restructuring

2017-02-22 Thread Aljoscha Krettek
I'm not against splitting but I wan't to highlight that there are other
options:
 - We could split the tests run on travis logically. For example, run unit
tests and integration tests separately. This would have the benefit that
you would see early on if the (fast) unit tests fail. We could also split
by components. Maybe have: runtime tests, API tests and library tests.
 - We could also look into using Jenkins properly, have separate pre-commit
hooks that run on PRs that test the code in a fine-grained way. Units
tests/IT tests, separate Jenkins profiles for runtime, API and so on.

On Wed, 22 Feb 2017 at 11:06 Gábor Hermann  wrote:

> Hi all,
>
> I'm also in favor of splitting, but only in terms of committers. I agree
> with Theodore, that async releases would cause confusion. With time
> based releases [1] it should be easy to sync release.
>
> Even if it's possible to add committers to different components, should
> we do a more fine-grained split? E.g. should we split the committers of
> Gelly and ML? If not, committers should be trusted not to fiddle with
> something that's not their territory. That might not be a problem, as it
> seems to be the case right now.
>
> What should we do if ASF does not allow splitting? Should we add more
> committers and trust them not to touch code that's not their
> responsibility? That's the same as no split in terms of committers
> (build time can be lowered of course).
>
> What about ensuring code quality? In my opinion the primary reason of
> being a committer is to ensure code quality. Committers are trusted to
> adhere to a certain code quality, partly determined by developer
> guidelines, and make others adhere too.
>
> By adding more committers with less consideration, we are risking the
> quality of different components. That might not be a problem, because
> that's the price of a more dynamic development in libraries etc., but we
> should ensure that *eventually* the code quality converges to what's
> expected by Flink. So a new committer would learn some of the
> responsibilities as a committer, not as a contributor. But what if the
> new committer fails to adhere? Is there a way to revoke committer status?
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/Time-based+releases
>
> Cheers,
> Gabor
>
>


Re: [DISCUSS] Should we supply a new Iterator instance for Functions with Iterable input(s) like CoGroupFunction ?

2017-02-22 Thread Aljoscha Krettek
I think this was mostly an oversight on my part that was possible because
we didn't have good test-coverage that was enforcing correctness. Please go
ahead and open an issue for re-adding the throw.

On Wed, 22 Feb 2017 at 13:28 Lin Li  wrote:

> Thank you for the answer!
>
> The discussion on FLINK-1023 is very clear to me. I agree with that throws
> a TraversableOnceException when the iterator is requested the second time.
>
> @Aljoscha git history shows you removed the exception-thrown code from
> FLINK-1110, would you mind me create an issue and add it back?
>
> BTW, I had submitted a pr for FLINK-5498 (
> https://github.com/apache/flink/pull/3379), support left/right outer joins
> with non-equi-join conditions via coGroup operator with a generated
> OuterJoinCoGroupFunction.
> But current implementation is not memory safe when do a many-to-one/many
> outer join which will copy the opposite side input into an List buffer(This
> is not pretty though it follows the guideline, just remember the input data
> within a function call). It's a work-around for now, in the long run, I
> think we should extend the runtime join operators to support such
> non-equi-join conditions.  Implementation in TableAPI layer could not
>  always ensures the efficiency.
> Welcome any suggestions on current solution.
>
> Best, Lincoln
>


[jira] [Created] (FLINK-5880) Add documentation for object reuse for DataStream API

2017-02-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5880:
---

 Summary: Add documentation for object reuse for DataStream API
 Key: FLINK-5880
 URL: https://issues.apache.org/jira/browse/FLINK-5880
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aljoscha Krettek


The batch documentation has this section: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#operating-on-data-objects-in-functions



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Aljoscha Krettek
I'm afraid that won't work because we also internally use murmur hash on
the result of hashCode().

@Ovidiu I still want to understand why you want to use keyBy() for that
case. It sounds like you want to use it because you would like to do
something else but that is not possible with the Flink APIs. The fact that
key groups exist is more of an implementation detail and exposing that to
users does not seem like to right way to go.

On Tue, 21 Feb 2017 at 16:10 Greg Hogan <c...@greghogan.com> wrote:

> Integer's hashCode is the identity function. Store your slot index in an
> Integer or IntValue and key off that field.
>
> On Tue, Feb 21, 2017 at 6:04 AM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr> wrote:
>
> > Hi,
> >
> > As in my example, each key is a window so I want to evenly distributed
> > processing to all slots.
> > If I have 100 keys and 100 slots, for each key I have the same rate of
> > events, I don’t want skewed distribution.
> >
> > Best,
> > Ovidiu
> >
> > > On 21 Feb 2017, at 11:38, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > >
> > > Hi Ovidiu,
> > > what's the reason for wanting to make the parallelism equal to the
> number
> > > of keys? I think in general it's very hard to ensure that hashes even
> go
> > to
> > > different key groups. It can always happen that all your keys (if you
> > have
> > > so few of them) are assigned to the same parallel operator instance.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Tue, 21 Feb 2017 at 10:53 Till Rohrmann <trohrm...@apache.org>
> wrote:
> > >
> > >> Hi Ovidiu,
> > >>
> > >> at the moment it is not possible to plugin a user defined hash
> > function/key
> > >> group assignment function. If you like, then you can file a JIRA issue
> > to
> > >> add this functionality.
> > >>
> > >> The key group assignment in your example looks quite skewed. One
> > question
> > >> concerning how you calculated it: Shouldn't the number of element in
> > each
> > >> group sum up to 1024? this only works for the first case. What do the
> > >> numbers mean then?
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
> > >> ovidiu-cristian.ma...@inria.fr> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> Thank you for clarifications (I am working with KeyedStream so a
> custom
> > >>> partitioner does not help).
> > >>>
> > >>> So I should set maxParallelism>=parallelism and change my keys (from
> > >>> input.keyBy(0)) such that key group assignment works as expected),
> > >>> but I can’t modify these keys in order to make it work.
> > >>>
> > >>> The other option is to change Flink’s internals in order to evenly
> > >>> distribute keys (changing computeKeyGroupForKeyHash: is this
> enough?).
> > >>> What I was looking for was an api to change the way key group
> > assignment
> > >>> is done, but without changing Flink’s runtime.
> > >>>
> > >>> I think that the maxParallelism setting is not enough (it introduces
> > this
> > >>> inefficient way of distributing data for processing when using
> > >> KeyedStream).
> > >>> Is it possible to expose somehow the key group assignment?
> > >>>
> > >>> This is how keys are distributed (1024 keys, key=1..1024; and groups
> > from
> > >>> 2 to 16 - equiv. parallelism that is number of slots):
> > >>>
> > >>> {0=517, 1=507} 2
> > >>> {0=881, 1=809, 2=358} 3
> > >>> {0=1139, 1=1048, 2=617, 3=268} 4
> > >>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
> > >>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
> > >>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
> > >>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
> > >>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
> > >>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233,
> > 9=99}
> > >>> 10
> > >>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359,
> > >> 9=174,
> > >>> 10=101} 11
> > >>> {0=2192, 1=2091, 2=

Re: KeyGroupRangeAssignment ?

2017-02-21 Thread Aljoscha Krettek
Hi Ovidiu,
what's the reason for wanting to make the parallelism equal to the number
of keys? I think in general it's very hard to ensure that hashes even go to
different key groups. It can always happen that all your keys (if you have
so few of them) are assigned to the same parallel operator instance.

Cheers,
Aljoscha

On Tue, 21 Feb 2017 at 10:53 Till Rohrmann  wrote:

> Hi Ovidiu,
>
> at the moment it is not possible to plugin a user defined hash function/key
> group assignment function. If you like, then you can file a JIRA issue to
> add this functionality.
>
> The key group assignment in your example looks quite skewed. One question
> concerning how you calculated it: Shouldn't the number of element in each
> group sum up to 1024? this only works for the first case. What do the
> numbers mean then?
>
> Cheers,
> Till
>
> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr> wrote:
>
> > Hi,
> >
> > Thank you for clarifications (I am working with KeyedStream so a custom
> > partitioner does not help).
> >
> > So I should set maxParallelism>=parallelism and change my keys (from
> > input.keyBy(0)) such that key group assignment works as expected),
> > but I can’t modify these keys in order to make it work.
> >
> > The other option is to change Flink’s internals in order to evenly
> > distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
> > What I was looking for was an api to change the way key group assignment
> > is done, but without changing Flink’s runtime.
> >
> > I think that the maxParallelism setting is not enough (it introduces this
> > inefficient way of distributing data for processing when using
> KeyedStream).
> > Is it possible to expose somehow the key group assignment?
> >
> > This is how keys are distributed (1024 keys, key=1..1024; and groups from
> > 2 to 16 - equiv. parallelism that is number of slots):
> >
> > {0=517, 1=507} 2
> > {0=881, 1=809, 2=358} 3
> > {0=1139, 1=1048, 2=617, 3=268} 4
> > {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
> > {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
> > {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
> > {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
> > {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
> > {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99}
> > 10
> > {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359,
> 9=174,
> > 10=101} 11
> > {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
> > 9=255, 10=173, 11=95} 12
> > {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
> > 9=340, 10=254, 11=186, 12=73} 13
> > {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
> > 9=417, 10=329, 11=265, 12=135, 13=66} 14
> > {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
> > 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
> > {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
> > 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
> >
> > Best,
> > Ovidiu
> >
> > > On 20 Feb 2017, at 12:04, Till Rohrmann  wrote:
> > >
> > > Hi Ovidiu,
> > >
> > > the way Flink works is to assign key group ranges to operators. For
> each
> > element you calculate a hash value and based on that you assign it to a
> key
> > group. Thus, in your example, you have either a key group with more than
> 1
> > key or multiple key groups with 1 or more keys assigned to an operator.
> > >
> > > So what you could try to do is to reduce the number of key groups to
> > your parallelism via env.setMaxParallelism() and then try to figure a key
> > out whose hashes are uniformly distributed over the key groups. The key
> > group assignment is calculated via murmurHash(key.hashCode()) %
> > maxParallelism.
> > >
> > > Alternatively if you don’t need a keyed stream, you could try to use a
> > custom partitioner via DataStream.partitionCustom.
> > >
> > > Cheers,
> > > Till
> > >
> > >
> > > On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
> > ovidiu-cristian.ma...@inria.fr >
> > wrote:
> > > Hi,
> > >
> > > Can you please comment on how can I ensure stream input records are
> > distributed evenly onto task slots?
> > > See attached screen Records received issue.
> > >
> > > I have a simple application which is applying some window function over
> > a stream partitioned as follows:
> > > (parallelism is equal to the number of keys; records with the same key
> > are streamed evenly)
> > >
> > > // get the execution environment
> > > final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> > getExecutionEnvironment();
> > > // get input data by connecting to the socket
> > > DataStream text = env.socketTextStream("localhost", port,
> "\n");
> > > DataStream Long,
> > Long>> input = 

Re: Reliable Distributed FS support (HCFS)

2017-02-17 Thread Aljoscha Krettek
Hi,
I think atomic rename is not part of the requirements.

I'll add +Stephan who recently wrote this document in case he has any
additional input.

Cheers,
Aljoscha

On Thu, 16 Feb 2017 at 23:28 Vijay Srinivasaraghavan 
wrote:

> Following up on my question regarding backed Filesystem (HCFS)
> requirements. Appreciate any inputs.
>
> ---
> Regarding the Filesystem abstraction support, we are planning to use a
> distributed file system which complies with Hadoop Compatible File System
> (HCFS) standard in place of standard HDFS.
>
> According to the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html),
> persistence gurantees is listed as one of the main requirement and to be
> precises it qualifies both visibility and durability gurantees.
>
> My question is,
>
> 1) Are we expecting the file system to support "Atomic Rename"
> characteristics? I believe checkpoint mechanism involves in renaming the
> files and will that have an impact if "atomic rename" is not guranteed by
> the underlying file system?
>
> 2) How does one certify Flink with HCFS (in place of standard HDFS) in
> terms of the scenarios/usecase that needs to be tested? Is there any
> general guidance on this?
> ---
>
> Regards
> Vijay
>
>
> On Wednesday, February 15, 2017 11:28 AM, Vijay Srinivasaraghavan <
> vijikar...@yahoo.com> wrote:
>
>
> Hello,
>
> Regarding the Filesystem abstraction support, we are planning to use a
> distributed file system which complies with Hadoop Compatible File System
> (HCFS) standard in place of standard HDFS.
>
> According to the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html),
> persistence gurantees is listed as one of the main requirement and to be
> precises it qualifies both visibility and durability gurantees.
>
> My question is,
>
> 1) Are we expecting the file system to support "Atomic Rename"
> characteristics? I believe checkpoint mechanism involves in renaming the
> files and will that have an impact if "atomic rename" is not guranteed by
> the underlying file system?
>
> 2) How does one certify Flink with HCFS (in place of standard HDFS) in
> terms of the scenarios/usecase that needs to be tested? Is there any
> general guidance on this?
>
> Thanks
> Vijay
>
>
>


Re: Wish to Contribute - Andrea Spina

2017-02-17 Thread Aljoscha Krettek
Welcome to the community, Andrea! :-)

On Fri, 17 Feb 2017 at 10:22 Fabian Hueske  wrote:

> Hi Andrea,
>
> welcome to the community!
> I gave you Contributor permissions. You can now assign issues to yourself.
>
> Best, Fabian
>
> 2017-02-17 9:14 GMT+01:00 Andrea Spina :
>
> > Dear Gordon,
> >
> > Thank you so much. My JIRA's id is spi-x-i. Can't wait to helping.
> >
> > Cheers,
> > Andrea
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-mailing-
> > list-archive.1008284.n3.nabble.com/Wish-to-Contribute-
> > Andrea-Spina-tp15991p15996.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at
> > Nabble.com.
> >
>


Re: Contribute Flink

2017-02-17 Thread Aljoscha Krettek
Hi,
welcome to the community. :-)

I'm sure Ufuk, who opened the issue, will get back to you once he notices
your message.

Cheers,
Aljoscha

On Fri, 17 Feb 2017 at 11:18 Riccardo Diomedi <
riccardo.diom...@radicalbit.io> wrote:

> Hi Fabian,
>
> I’d like to contribute mainly on Flink streaming since I’m working on it.
>
> I have asked to contribute with this sub-task:
> https://issues.apache.org/jira/browse/FLINK-5611 <
> https://issues.apache.org/jira/browse/FLINK-5611>
>
> since I thought it was suitable for my level.
>
> Best,
> Riccardo
> > Il giorno 17 feb 2017, alle ore 10:16, Fabian Hueske 
> ha scritto:
> >
> > Hi Riccardo,
> >
> > welcome to the community!
> > Great to hear that you want to contribute :-)
> >
> > Do you have a certain area of interest that you'd like to work on?
> >
> > Best,
> > Fabian
> >
> > 2017-02-17 9:44 GMT+01:00 Riccardo Diomedi <
> riccardo.diom...@radicalbit.io>:
> >
> >> Hi Flink Community
> >>
> >> My name is Riccardo and I’m a Software Engineer.
> >>
> >> I’m working with Flink from a year. I have used it for my Master thesis
> >> and now for my work.
> >>
> >> My wish is to contribute it with some tasks/issues and try to be
> helpful.
> >>
> >> My JIRA id is: riccardo_91
> >>
> >> Cheers,
> >>
> >> Riccardo
> >>
> >>
> >>
>
>


Re: Flink Job Exception

2017-02-16 Thread Aljoscha Krettek
Hi Govindarajan,
the Jira issue that you linked to and which Till is currently fixing will
only fix the obvious type mismatch in the Akka messages. There is also an
underlying problem that causes this message to be sent in the first place.
In the case of the user who originally created the Jira issue the reason
was that the Max-Parallelism was set to a value smaller than the
parallelism. Can you try looking in the JobManager/TaskManager logs and see
if you find the original cause there?

Cheers,
Aljoscha

On Thu, 16 Feb 2017 at 09:36 Till Rohrmann  wrote:

> Hi Govindarajan,
>
> there is a pending PR for this issue. I think I can merge it today.
>
> Cheers,
> Till
>
> On Thu, Feb 16, 2017 at 12:50 AM, Govindarajan Srinivasaraghavan <
> govindragh...@gmail.com> wrote:
>
> Hi All,
>
> I'm trying to run a streaming job with flink 1.2 version and there are 3
> task managers with 12 task slots. Irrespective of the parallelism that I
> give it always fails with the below error and I found a JIRA link
> corresponding to this issue. Can I know by when this will be resolved since
> I'm not able to run any job in my current environment. Thanks.
>
> https://issues.apache.org/jira/browse/FLINK-5773
>
> java.lang.ClassCastException: Cannot cast scala.util.Failure to 
> org.apache.flink.runtime.messages.Acknowledge
>   at java.lang.Class.cast(Class.java:3369)
>   at scala.concurrent.Future$$anonfun$mapTo$1.apply(Future.scala:405)
>   at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
>   at scala.util.Try$.apply(Try.scala:192)
>   at scala.util.Success.map(Try.scala:237)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>   at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at 
> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>   at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>   at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
>   at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
>   at 
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:967)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>


Re: [DISCUSS] Planning Release 1.3

2017-02-13 Thread Aljoscha Krettek
+1, we can do it this time! :-)

On Mon, 6 Feb 2017 at 20:19 Robert Metzger  wrote:

> Hi,
> according to our recent time-based releases discussion, I came up with the
> following deadlines for the upcoming 1.3 release:
>
> *Feature freeze (branch forking)*:  1 May 2017
> *Code freeze (first voting RC)*:  15 May 2017
> *Release date*: 26 May 2017
>
> I will try to post into this thread monthly to remind everybody on the time
> left.
>
> There are currently 21 JIRAs assigned to the 1.3.0 release:
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.3.0%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC
>


Re: [ANNOUNCE] Welcome Stefan Richter as a new committer

2017-02-10 Thread Aljoscha Krettek
Welcome! :-)

On Fri, 10 Feb 2017 at 16:10 Till Rohrmann  wrote:

> Great to have you on board as a committer Stefan :-)
>
> On Fri, Feb 10, 2017 at 3:32 PM, Greg Hogan  wrote:
>
> > Welcome, Stefan, and thank you for your contributions!
> >
> > On Fri, Feb 10, 2017 at 5:00 AM, Ufuk Celebi  wrote:
> >
> > > Hey everyone,
> > >
> > > I'm very happy to announce that the Flink PMC has accepted Stefan
> > > Richter to become a committer of the Apache Flink project.
> > >
> > > Stefan is part of the community for almost a year now and worked on
> > > major features of the latest 1.2 release, most notably rescaling and
> > > backwards compatibility of program state.
> > >
> > > Please join me in welcoming Stefan. :-)
> > >
> > > – Ufuk
> > >
> >
>


Re: [ANNOUNCE] Welcome Jark Wu and Kostas Kloudas as committers

2017-02-10 Thread Aljoscha Krettek
Congrats! :-)

On Wed, 8 Feb 2017 at 11:29 Robert Metzger  wrote:

> Welcome on board guys!
>
> If you want to try our your new privileges, you can add yourself here:
> http://flink.apache.org/community.html#people (through the
> apache/flink-web
> repo)
>
> On Wed, Feb 8, 2017 at 10:52 AM, Till Rohrmann 
> wrote:
>
> > Congratulations Jark and Kostas :-)
> >
> > On Wed, Feb 8, 2017 at 10:49 AM, Paris Carbone  wrote:
> >
> > > welcome aboard Kostas and Jark :)
> > >
> > > Paris
> > >
> > > > On 7 Feb 2017, at 21:16, Fabian Hueske  wrote:
> > > >
> > > > Hi everybody,
> > > >
> > > > I'm very happy to announce that Jark Wu and Kostas Kloudas accepted
> the
> > > > invitation of the Flink PMC to become committers of the Apache Flink
> > > > project.
> > > >
> > > > Jark and Kostas are longtime members of the Flink community.
> > > > Both are actively driving Flink's development and contributing to its
> > > > community in many ways.
> > > >
> > > > Please join me in welcoming Kostas and Jark as committers.
> > > >
> > > > Fabian
> > >
> > >
> >
>


[jira] [Created] (FLINK-5774) ContinuousFileProcessingTest has test instability

2017-02-10 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5774:
---

 Summary: ContinuousFileProcessingTest has test instability
 Key: FLINK-5774
 URL: https://issues.apache.org/jira/browse/FLINK-5774
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


Inserting a {{Thread.sleep(200)}} in 
{{ContinuousFileMonitoringFunction.monitorDirAndForwardSplits()}} will make the 
tests fail reliably. Normally, it occurs now and then due to "natural" slow 
downs on Travis:
log: https://api.travis-ci.org/jobs/199977242/log.txt?deansi=true

The condition that wait's for the file monitoring function to reach a certain 
state needs to be tougher.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5743) Mark WindowedStream.aggregate* methods as PublicEvolving

2017-02-08 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5743:
---

 Summary: Mark WindowedStream.aggregate* methods as PublicEvolving
 Key: FLINK-5743
 URL: https://issues.apache.org/jira/browse/FLINK-5743
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Aljoscha Krettek
Priority: Blocker


IMHO, they are to new for knowing whether they will persist in their current 
form.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5741) Add tests for window function wrappers with RichFunctions

2017-02-08 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5741:
---

 Summary: Add tests for window function wrappers with RichFunctions
 Key: FLINK-5741
 URL: https://issues.apache.org/jira/browse/FLINK-5741
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


There are not tests that {{RichFunctions}} work in all cases and at least 
{{ScalaProcessWindowFunctionWrapper}} is know to not work correctly before 
FLINK-5740 is implemented.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5740) Make WrappingFunction an interface and move to flink-core

2017-02-08 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5740:
---

 Summary: Make WrappingFunction an interface and move to flink-core
 Key: FLINK-5740
 URL: https://issues.apache.org/jira/browse/FLINK-5740
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


Making it an interface and having an {{AbstractWrappingFunction}} will allow 
implementations to have different classes as base classes. Also, we should 
change {{FunctionUtils}} to work like {{StreamingFunctionUtils}} so that 
wrapping functions don't have to implement the methods of {{RichFunction}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5721) Add FoldingState to State Documentation

2017-02-06 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5721:
---

 Summary: Add FoldingState to State Documentation
 Key: FLINK-5721
 URL: https://issues.apache.org/jira/browse/FLINK-5721
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5720) Deprecate "Folding" in all of DataStream API

2017-02-06 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5720:
---

 Summary: Deprecate "Folding" in all of DataStream API
 Key: FLINK-5720
 URL: https://issues.apache.org/jira/browse/FLINK-5720
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Aljoscha Krettek


Folding is an operation that cannot be done incrementally in a distributed way 
and that also cannot be done on merging windows. Now that we have 
{{AggregatingState}} and aggregate operations we should deprecate folding in 
the APIs and deprecate {{FoldingState}}.

I suggest to remove folding completely in Flink 2.0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5713) Protect against NPE in WindowOperator window cleanup

2017-02-05 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5713:
---

 Summary: Protect against NPE in WindowOperator window cleanup
 Key: FLINK-5713
 URL: https://issues.apache.org/jira/browse/FLINK-5713
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.2.0
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 1.2.1


Some (misbehaved) WindowAssigners can cause windows to be dropped from the 
merging window set while a cleanup timer is still active. This will trigger a 
NullPointerException when that timer fires.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5709) Add Max Parallelism to Parallel Execution Doc

2017-02-03 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5709:
---

 Summary: Add Max Parallelism to Parallel Execution Doc
 Key: FLINK-5709
 URL: https://issues.apache.org/jira/browse/FLINK-5709
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: clean up jira

2017-01-30 Thread Aljoscha Krettek
Already on it. I closed some and commented on others to ask people whether
we can close. :-)

On Sun, 29 Jan 2017 at 22:31 Robert Metzger <rmetz...@apache.org> wrote:

> Thank you for fixing the link Chesnay.
>
> Over the weekend, I've assigned all JIRAs without a component to one.
> I think some of the assignments were not 100% accurate. But the available
> components in the overview list is somewhat limited:
> https://cwiki.apache.org/confluence/display/FLINK/Components+and+Shepherds
>
> Also, there were some components that were accidentally created (probably
> by using the autosuggest functionality of jira and pressing ENTER to
> early). I've removed them.
> It would be nice if the committers could check old issues they've reported,
> they are assigned to and they are shepherding to clean up our JIRA a bit.
>
>
> On Tue, Jan 24, 2017 at 4:46 PM, Chesnay Schepler <ches...@apache.org>
> wrote:
>
> > fixed link: https://issues.apache.org/jira/issues/?jql=fixVersion%20%3D%
> > 201.2.0%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%
> > 3D%20Unresolved%20ORDER%20BY%20priority%20DESC
> >
> > Previous link lead to FLINK-5048.
> >
> >
> > On 24.01.2017 16:33, Robert Metzger wrote:
> >
> >> Hey friends of a clean JIRA,
> >>
> >> I've unmarked as many JIRAs as possible from "Fix Version = 1.2.0" as
> >> possible.
> >> The list of unresolved 1.2.0 issues is now 15 items short:
> >> https://issues.apache.org/jira/browse/FLINK-5048?jql=fixVers
> >> ion%20%3D%201.2.0%20AND%20project%20%3D%20FLINK%20AND%
> >> 20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC
> >> Most of then are documentation fixes, some of them are not resolved yet,
> >> because multiple fix versions are set.
> >>
> >>
> >> I will try to send a monthly release status email (how many weeks until
> >> next feature freeze / number of open JIRAs for that version / ... ) to
> >> encourage people to clean up our JIRA.
> >>
> >>
> >>
> >> On Thu, Dec 22, 2016 at 9:45 AM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> >>
> >> Hi Anton,
> >>>
> >>> Thanks a lot for digging through JIRA!
> >>> I'll do a pass over the issues and close or comment on those issue
> that I
> >>> know about.
> >>>
> >>> I'll update the FLINK-5384 with the status of the issues.
> >>>
> >>>   Best, Fabian
> >>>
> >>> 2016-12-21 19:01 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> >>>
> >>> Thanks a lot for going through the issues and preparing this list!
> >>>>
> >>>>  From a first glance some can definitely be closed. I didn't yet find
> >>>> the
> >>>> time to look through all of them but we should definitely work and
> >>>>
> >>> cleaning
> >>>
> >>>> up our Jira.
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>
> >>>> On Wed, 21 Dec 2016 at 18:19 Anton Solovev <anton_solo...@epam.com>
> >>>>
> >>> wrote:
> >>>
> >>>> Hi folks
> >>>>>
> >>>>> I found a lot of non-relevant issues:
> >>>>>
> >>>>> https://issues.apache.org/jira/browse/FLINK-37 -> from stratosphere;
> >>>>> https://issues.apache.org/jira/browse/FLINK-87 -> from stratosphere;
> >>>>> https://issues.apache.org/jira/browse/FLINK-481 -> from
> stratosphere;
> >>>>> https://issues.apache.org/jira/browse/FLINK-605 -> from
> stratosphere;
> >>>>> https://issues.apache.org/jira/browse/FLINK-639 -> from
> stratosphere;
> >>>>> https://issues.apache.org/jira/browse/FLINK-650 -> from
> stratosphere;
> >>>>> https://issues.apache.org/jira/browse/FLINK-735 -> from
> stratosphere;
> >>>>> https://issues.apache.org/jira/browse/FLINK-456 -> from
> stratosphere;
> >>>>> https://issues.apache.org/jira/browse/FLINK-788 -> from
> stratosphere;
> >>>>> https://issues.apache.org/jira/browse/FLINK-796 -> from
> stratosphere;
> >>>>> https://issues.apache.org/jira/browse/FLINK-805 -> from
> stratosphere ;
> >>>>> https://issues.apache.org/jira/browse/FLINK-867 -> from
> stratosphere;
> >>>>> https://issues.apache.org/jira/browse/FLINK-879 -> from
> s

Re: [VOTE] Release Apache Flink 1.2.0 (RC2)

2017-01-27 Thread Aljoscha Krettek
I think this issue that Ufuk opened is also a blocker:
https://issues.apache.org/jira/browse/FLINK-5670

As I comment in the Issue, at least one bigger user of Flink has run into
this problem on their cluster.

On Fri, 27 Jan 2017 at 10:50 Ufuk Celebi  wrote:

> Thanks Gyula!
>
> The current state of things is:
> - Stefan is working on a fix for
> https://issues.apache.org/jira/browse/FLINK-5663.
> - Till is working on https://issues.apache.org/jira/browse/FLINK-5667.
>
> As far as I can tell, these will be fixed today and we are ready to go for
> RC3.
>
> I resolved the other issues I created.
>
> – Ufuk
>
> On 26 January 2017 at 22:16:26, Gyula Fóra (gyf...@apache.org) wrote:
> > Hi,
> >
> > Aside from the issues mentioned above I have some good news as well.
> >
> > I have finished porting and started testing one of our major production
> > jobs (RBea) on 1.2 and everything seems to run well so far, with
> > savepoints, rescaling, externalized checkpoints, metrics etc. on YARN.
> >
> > In this job I use, windowing, RocksDB state, iterations, timers,
> broadcast
> > states, repartitionable operator states etc. and everything seems to be
> > working extremely well under normal circumstances.
> >
> > So far I mostly ran sunny day tests but I will continue testing with
> larger
> > load and some failure scenarios. I will keep you posted.
> >
> > Great job!
> > Gyula
> >
> >
> >
> > Robert Metzger ezt írta (időpont: 2017. jan. 26., Cs,
> > 21:28):
> >
> > Damn. I really hoped that this RC goes through.
> >
> > I propose to keep the RC2 open until we've fixed all issues mentioned
> here
> > and to get some more testing feedback.
> >
> >
> >
> > On Thu, Jan 26, 2017 at 8:06 PM, Stephan Ewen wrote:
> >
> > > @Till - I think that FLINK-5667 is a blocker
> > >
> > > Good catch finding it!
> > >
> > > On Thu, Jan 26, 2017 at 7:51 PM, Till Rohrmann
> > > wrote:
> > >
> > > > I have found another problem: Under certain circumstances Flink can
> lose
> > > > state data by completing an invalid checkpoint.
> > > > https://issues.apache.org/jira/browse/FLINK-5667.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Thu, Jan 26, 2017 at 6:27 PM, Till Rohrmann
> > > > wrote:
> > > >
> > > > > Robert also found an issue that pending checkpoint files are not
> > > properly
> > > > > cleaned up: https://issues.apache.org/jira/browse/FLINK-5660. To
> my
> > > > > surprise, the issue was already fixed in 1.1.4 so I guess I've
> > > forgotten
> > > > to
> > > > > forward port the fix. There is a pending PR to fix it. The fix
> could
> > > also
> > > > > be part of a 1.2.1 release.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Thu, Jan 26, 2017 at 6:04 PM, Ufuk Celebi wrote:
> > > > >
> > > > >> I ran some tests and found the following issues:
> > > > >>
> > > > >> https://issues.apache.org/jira/browse/FLINK-5663: Checkpoint
> fails
> > > > >> because of closed registry
> > > > >> => This happened a couple of times for the first checkpoints after
> > > > >> submitting a job. If it happened on every submission I would
> > > > >> definitely make this a blocker, but I happen to run into it in
> like 3
> > > > >> out of 10 job submission. What do we make of this?
> > > > >>
> > > > >> https://issues.apache.org/jira/browse/FLINK-5665: When the
> failures
> > > > >> happened, I also had some lingering 0-byte files.
> > > > >>
> > > > >> https://issues.apache.org/jira/browse/FLINK-5664: I also found
> the
> > > > >> logging of the RocksDB backend a little noisy (for my local setup
> at
> > > > >> least with many tasks per TM and low checkpointing interval.)
> > > > >>
> > > > >> All in all, I'm not sure if we want to make these a blocker or
> not.
> > > > >> I'm fine both ways with a follow up 1.2.1 release.
> > > > >>
> > > > >> ===
> > > > >>
> > > > >> - Verified signatures and checksums
> > > > >> - Checked out the Java quickstarts and ran the jobs
> > > > >> - All poms point to 1.2.0
> > > > >> - Migrated multiple jobs via savepoint from 1.1.4 to 1.2.0 with
> Kryo
> > > > >> types, session windows (w/o lateness), operator and keyed state
> for
> > > > >> all three backends
> > > > >> - Rescaled the same jobs from 1.2.0 savepoints with all three
> > backends
> > > > >> - Verified the "migration namespace serializer" fix
> > > > >> - Ran streaming state machine with Kafka source, RocksDB backend
> and
> > > > >> master and worker failures (standalone cluster)
> > > > >>
> > > > >> On Wed, Jan 25, 2017 at 9:14 PM, Robert Metzger
> > > > >> wrote:
> > > > >> > Dear Flink community,
> > > > >> >
> > > > >> > Please vote on releasing the following candidate as Apache Flink
> > > > version
> > > > >> > 1.2.0.
> > > > >> >
> > > > >> > The commit to be voted on:
> > > > >> > 8b5b6a8b (http://git-wip-us.apache.org/repos/asf/flink/commit/
> > > > 8b5b6a8b)
> > > > >> >
> > > > >> > Branch:
> > > > >> > release-1.2.0-rc2
> > > > >> > (https://git1-us-west.apache.org/repos/asf/flink/repo?p=flin
> > > 

[jira] [Created] (FLINK-5647) Fix RocksDB Backend Cleanup

2017-01-25 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5647:
---

 Summary: Fix RocksDB Backend Cleanup
 Key: FLINK-5647
 URL: https://issues.apache.org/jira/browse/FLINK-5647
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.1.4
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


The RocksDB backend on Flink 1.1.x does not properly clean up the directories 
that it uses. This can lead to overflowing disks when a lot of failure/recovery 
cycles happen.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Unregistering Managed State in Operator Backend

2017-01-24 Thread Aljoscha Krettek
Just a bit of clarification, the OperatorState stuff is independent of
keyed state backends, i.e. even if you use RocksDB the operator state will
not be stored in RocksDB, only keyed state is stored there.

Right now, when an operator state (ListState) is empty we will still write
some meta data about that state. I think it should be easy to
change DefaultOperatorStateBackend to not write anything in case of an
empty state. What do you think, Stefan?

On Tue, 24 Jan 2017 at 12:12 Paris Carbone  wrote:

> Sure Till,
>
> I would love to also make the patch but need to prioritize some other
> things these days.
> At least I will dig and see how complex this is regarding the different
> backends.
>
> I also have some follow-up questions, in case anybody has thought about
> these things already (or is simply interested):
>
> - Do you think it would make sense to automatically garbage collect empty
> states in general?
> - Shouldn't this happen already during snapshot compaction (in rocksdb)
> and would that violate any user assumptions in your view?
>
>
> > On 24 Jan 2017, at 11:44, Till Rohrmann  wrote:
> >
> > Hi Paris,
> >
> > if there is no such issue open, then please open one so that we can track
> > the issue. If you have time to work on that even better :-)
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 24, 2017 at 10:25 AM, Paris Carbone  wrote:
> >
> >> Any thoughts/plans?
> >> So should I open a Jira and add this?
> >>
> >> Paris
> >>
> >> On Jan 21, 2017, at 5:17 PM, Paris Carbone  >> kth.se>> wrote:
> >>
> >> Thank you for the answer Ufuk!
> >>
> >> To elaborate a bit more, I am not using keyed state, it would be indeed
> >> tricky in that case to discard everything.
> >>
> >> I need that for operator state, in my loop fault tolerance PR [1].  The
> >> idea is to tag a ListState (upstream log) per snapshot id.
> >> When a concurent snapshot is commited I want to simply remove everything
> >> related to that ListState (not just clear it). This would also
> eliminate a
> >> memory leak in case many empty logs accumulate in time (and thus state
> >> entries).
> >> Hope that makes it a bit more clear. Thanks again :)
> >>
> >> Paris
> >>
> >> [1] https://github.com/apache/flink/pull/1668
> >>
> >>
> >> On 21 Jan 2017, at 17:10, Ufuk Celebi  >> apache.org>> wrote:
> >>
> >> Hey Paris!
> >>
> >> As far as I know it's not possible at the moment and not planned. Does
> >> not sound to hard to add though. @Stefan: correct?
> >>
> >> You can currently only clear the state via #clear in the scope of the
> >> key for keyed state or the whole operator when used with operator
> >> state. In case of keyed state it's indeed hard to clear all state for
> >> operator state it's slightly better. I'm curious what your use case
> >> is?
> >>
> >> - Ufuk
> >>
> >>
> >> On Fri, Jan 20, 2017 at 5:59 PM, Paris Carbone > par...@kth.se>> wrote:
> >> Hi folks,
> >>
> >> I have a little question regarding the managed store operator backend,
> in
> >> case someone can help.
> >>
> >> Is there some convenient way (planned or under development) to
> completely
> >> unregister a state entry (e.g. a ListState) with a given id from the
> >> backend?
> >> It is fairly easy to register new states dynamically (i.e. with
> >> getOperatorState(...)), why not being able to discard it as well?
> >>
> >> I would find this feature extremely convenient to a fault tolerance
> >> related PR I am working on but I can think of many use cases that might
> >> need it.
> >>
> >>
> >> Paris
> >>
> >>
> >>
>
>


Re: Need help on understanding flink runtime and window function

2017-01-24 Thread Aljoscha Krettek
Hi,depending on which version of Flink you're using the answer changes. If
you use Flink 1.1 AggregatingProcessingTimeWindowOperator should be
responsible for executing that. In Flink 1.2 it should be WindowOperator.

For a quick overview of how scheduling works in Flink you could look at
this:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/job_scheduling.html.
I'm not aware of documentation on the shipping of Jar files but it roughly
works like this: the job is submitted with the jar file. The JobManager
puts the Jar file into the Blob Manager (which runs on the JobManager).
When parts of the job get scheduled on TaskManagers they retrieve the
required Jar File from the Blob Manager on the JobManager. Internally,
there is a custom ClassLoader that loads code from the user submitted jar
that was retrieved.

Does does help somewhat?

What's the reason for using ContinuousProcessingTimeTrigger? In general I
think almost always it is not right for a use case.

Cheers,
Aljoscha

On Fri, 20 Jan 2017 at 20:10 Fritz Budiyanto  wrote:

> Hi Flink Dev,
>
> I’m new to Flink and have a few questions below:
>
> 1. I’m trying to understand Flink runtime on the server side, and couldn’t
> figure out where the code which execute the window function sum below. I
> wanted to put a break point but got lost in the code base. Could someone
> shed a light ?
> val counts = text.flatMap { _.toLowerCase.split("\\W+") filter {
> _.nonEmpty } }
>   .map { (_, 1) }
>   .keyBy(0)
>   .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>   .sum(1)
> 2. How is the Scala jar file get executed on the server side ? Is there
> internal documentation explaining the process ?
>
> 3. I’m planning to use ContinuousProcessingTimeTrigger on a session
> window. Is there possibility in the window function to figure out if the
> window is about to be retired ? For instance, for the recurring trigger I’m
> planning to do some processing. When the window is about to be retired, I’d
> like to do a different processing (ie. Computing final value and flush).
> Any suggestion ?
>
> —
> Fritz


Re: [DISCUSS] (Not) tagging reviewers

2017-01-24 Thread Aljoscha Krettek
It seems I'm in a bit of a minority here but I like the @R tags. There are
simply to many pull request for someone to keep track of all of them and if
someone things that a certain person would be good for reviewing a change
then tagging them helps them notice the PR.

I think the tag should not mean that only that person can/should review the
PR, it should serve as a proposal.

I'm happy to not use it anymore if everyone else doesn't like them.

On Sat, 21 Jan 2017 at 00:53 Fabian Hueske  wrote:

> Hi Haohui,
>
> reviewing pull requests is a great way of contributing to the community!
>
> I am not aware of specific instructions for the review process. The are
> some dos and don'ts on our "contribute code" page [1] that should be
> considered. Apart from that, I think the best way to start is to become
> familiar with a certain part of the code base (reading code, contributing)
> and then to look out for pull requests that address the part you are
> familiar with.
>
> The review does not have to cover all aspects of a PR (a committer will
> have a look as well), but from my personal experience the effort to review
> a PR is often much lower if some other person has had a look at it already
> and gave feedback.
> I think this can help a lot to reduce the review "load" on the committers.
> Maybe you find some contributors who are interested in the same components
> as you and you can start reviewing each others code.
>
> Thanks,
> Fabian
>
> [1] http://flink.apache.org/contribute-code.html#coding-guidelines
>
>
> 2017-01-20 23:02 GMT+01:00 jincheng sun :
>
> > I totally agree with all of your ideas.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > Best wishes,
> >
> >
> >
> > SunJincheng.
> >
> > Stephan Ewen 于2017年1月16日 周一19:42写道:
> >
> > > Hi!
> > >
> > >
> > >
> > > I have seen that recently many pull requests designate reviews by
> writing
> > >
> > > "@personA review please" or so.
> > >
> > >
> > >
> > > I am personally quite strongly against that, I think it hurts the
> > community
> > >
> > > work:
> > >
> > >
> > >
> > >   - The same few people get usually "designated" and will typically get
> > >
> > > overloaded and often not do the review.
> > >
> > >
> > >
> > >   - At the same time, this discourages other community members from
> > looking
> > >
> > > at the pull request, which is totally undesirable.
> > >
> > >
> > >
> > >   - In general, review participation should be "pull based" (person
> > decides
> > >
> > > what they want to work on) not "push based" (random person pushes work
> to
> > >
> > > another person). Push-based just creates the wrong feeling in a
> community
> > >
> > > of volunteers.
> > >
> > >
> > >
> > >   - In many cases the designated reviews are not the ones most
> > >
> > > knowledgeable in the code, which is understandable, because how should
> > >
> > > contributors know whom to tag?
> > >
> > >
> > >
> > >
> > >
> > > Long story short, why don't we just drop that habit?
> > >
> > >
> > >
> > >
> > >
> > > Greetings,
> > >
> > > Stephan
> > >
> > >
> >
>


[jira] [Created] (FLINK-5616) YarnPreConfiguredMasterHaServicesTest fails sometimes

2017-01-23 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5616:
---

 Summary: YarnPreConfiguredMasterHaServicesTest fails sometimes
 Key: FLINK-5616
 URL: https://issues.apache.org/jira/browse/FLINK-5616
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.2.0, 1.3.0
Reporter: Aljoscha Krettek


This is the relevant part from the log:
{code}
---
 T E S T S
---
Running 
org.apache.flink.yarn.highavailability.YarnPreConfiguredMasterHaServicesTest
Formatting using clusterid: testClusterID
Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 11.407 sec - in 
org.apache.flink.yarn.highavailability.YarnPreConfiguredMasterHaServicesTest
Running org.apache.flink.yarn.highavailability.YarnIntraNonHaMasterServicesTest
Formatting using clusterid: testClusterID
Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 3.479 sec <<< 
FAILURE! - in 
org.apache.flink.yarn.highavailability.YarnIntraNonHaMasterServicesTest
testClosingReportsToLeader(org.apache.flink.yarn.highavailability.YarnIntraNonHaMasterServicesTest)
  Time elapsed: 0.836 sec  <<< FAILURE!
org.mockito.exceptions.verification.WantedButNotInvoked: 
Wanted but not invoked:
leaderContender.handleError();
-> at 
org.apache.flink.yarn.highavailability.YarnIntraNonHaMasterServicesTest.testClosingReportsToLeader(YarnIntraNonHaMasterServicesTest.java:120)
Actually, there were zero interactions with this mock.

at 
org.apache.flink.yarn.highavailability.YarnIntraNonHaMasterServicesTest.testClosingReportsToLeader(YarnIntraNonHaMasterServicesTest.java:120)

Running org.apache.flink.yarn.YarnFlinkResourceManagerTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.82 sec - in 
org.apache.flink.yarn.YarnFlinkResourceManagerTest
Running org.apache.flink.yarn.YarnClusterDescriptorTest
java.lang.RuntimeException: Couldn't deploy Yarn cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:425)
at 
org.apache.flink.yarn.YarnClusterDescriptorTest.testConfigOverwrite(YarnClusterDescriptorTest.java:90)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: The 
number of virtual cores per node were configured with 2147483647 but Yarn only 
has 8 virtual cores available. Please note that the number of virtual cores is 
set to the number of task slots by default unless configured in the Flink 
config with 'y

Re: Slow duplicated tests

2017-01-10 Thread Aljoscha Krettek
Thanks for looking into this! I think we can put in the fix and remove one
of the tests, yes.

@Robert What do you think? I think you initially added this test a lng
while back.

On Mon, 9 Jan 2017 at 20:11 Alexey Demin <diomi...@gmail.com> wrote:

> Hi,
>
> I am trying make small review for slow test and I found small issue:
>
> NonReusingReOpenableHashTableITCase
>
> testSpillingHashJoinWithMassiveCollisions
> testSpillingHashJoinWithTwoRecursions
>
>
> for testSpillingHashJoinWithTwoRecursions exist description
>
> /*
>  * This test is basically identical to the
> "testSpillingHashJoinWithMassiveCollisions" test, only that the number
>  * of repeated values (causing bucket collisions) are large enough to make
> sure that their target partition no longer
>  * fits into memory by itself and needs to be repartitioned in the
> recursion again.
>  */
>
> but he incorrect, because code of both test fully equal,
> one difference line very similar on bug after refactoring with inserting
> recordReuse
>
> testSpillingHashJoinWithMassiveCollisions
>  353   while ((record = buildSide.next(record)) != null) {
>
> (f51f1b4 19.03.14, 1:17 Aljoscha Krettek* Change MutableObjectIterator to
> allow immutable objects)
>
>
> Aljoscha, can we remove one test and fix buildSide.next(record) to
> buildSide.next(recordReuse) ?
>
> P.S. I started review because we have a lot of failing test due to cpu
> time limit
>
> Thanks,
> Alexey
>


Re: [ANNOUNCE] Flink 1.1.4 Released

2016-12-25 Thread Aljoscha Krettek
Very nice. Good work, team! 

On Sat, Dec 24, 2016, 00:07 Fabian Hueske  wrote:

> Thank you Ufuk for your work as release manager and everybody who
> contributed!
>
> Cheers, Fabian
>
> 2016-12-23 16:40 GMT+01:00 Ufuk Celebi :
>
> > The Flink PMC is pleased to announce the availability of Flink 1.1.4.
> >
> > The official release announcement:
> > https://flink.apache.org/news/2016/12/21/release-1.1.4.html
> >
> > Release binaries:
> > http://apache.lauf-forum.at/flink/flink-1.1.4
> >
> > Please update your Maven dependencies to the new 1.1.4 version and
> > update your binaries.
> >
> > On behalf of the Flink PMC, I would like to thank everybody who
> > contributed to the release.
> >
>


[jira] [Created] (FLINK-5374) Extend Unit Tests for RegisteredBackendStateMetaInfo

2016-12-20 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5374:
---

 Summary: Extend Unit Tests for RegisteredBackendStateMetaInfo
 Key: FLINK-5374
 URL: https://issues.apache.org/jira/browse/FLINK-5374
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Aljoscha Krettek
Assignee: Stefan Richter


The legacy savepoint restore end-to-end test uncovered a slight problem with 
the compatibility check of the meta info: 
https://github.com/apache/flink/commit/d1eaa1ee41728e6d788f1e914cb0568a874a6f32

We should extend unit tests to catch this case in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5373) Extend Unit Tests for StateAssignmentOperation

2016-12-20 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5373:
---

 Summary: Extend Unit Tests for StateAssignmentOperation
 Key: FLINK-5373
 URL: https://issues.apache.org/jira/browse/FLINK-5373
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Aljoscha Krettek
Assignee: Stefan Richter


The legacy savepoint restore end-to-end test uncovered a slight problem with 
null pointers that is fixed by this commit: 
https://github.com/apache/flink/commit/74df7631316e78af39a5416e12c1adc8a46d87fe

We should extend unit tests to catch this case in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Flink gives incorrect result when event time windowing used

2016-12-20 Thread Aljoscha Krettek
I'm afraid the doc is wrong here. The JavaDoc on Watermark says this about
watermarks:

"A Watermark tells operators that receive it that no elements with a
timestamp older or equal to the watermark timestamp should arrive at the
operator."

The system also relies on this fact, as visible in how timers are read from
the watermark timers queue and in AscendingTimestampExtractor, which has
this code:

public final Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ?
Long.MIN_VALUE : currentTimestamp - 1);
}

Notice, how the watermark is "currentTimestamp - 1" where current timestamp
is the highest seen timestamp so far and where we assume monotonically
ascending timestamps.

Cheers,
Aljoscha

On Tue, 20 Dec 2016 at 15:28 Fabian Hueske  wrote:

> Hi Jaromir,
>
> thank you very much for reporting this issue.
> The behavior you are describing is not in line with the documentation of
> watermarks [1] which clearly states that a watermark of time t tells the
> system that no more events with a timestamp < t will occur (otherwise they
> would be considered as late events). Hence, events with a timestamp = t as
> in your case should be OK and not be considered late.
>
> I think this is not intended and probably a bug.
>
> I'll loop in some contributors who are more familiar with watermarks and
> event-time (cc Aljoscha, Kostas K, Stephan).
>
> Best, Fabian
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks
>
> 2016-12-20 14:56 GMT+01:00 Jaromir Vanek :
>
> > Hi,
> >
> > I am using Flink 1.1.3 and following example doesn't work for me as
> > expected.
> >
> > I've got three input elements with similar timestamp (equaling to window
> > maxTimestamp). I'm using /event time/ notion of time with
> > /TumblingEventTimeWindows/.
> >
> > I would expect all three elements to be processed in the same window,
> > because they all have the identical event time timestamp. But the result
> > I'm
> > getting is just the first element that triggers the window. The rest of
> > elements are considered as late-comers and discarded.
> >
> > From my point of view this is definitely not correct and should be fixed.
> > Could you clarify if this is correct behavior or bug?
> >
> > I think the problem is in /WindowOperator#processWatermark/. Timer should
> > be
> > fired if and only if the current watermark is strictly larger than
> > registered timer.
> >
> > /
> > Timer timer = watermarkTimersQueue.peek();
> > if (timer != null && timer.timestamp <= mark.getTimestamp()) {
> > /
> >
> > Thanks
> > Jaromir Vanek
> >
> > /
> > public class WindowingTest {
> >
> >   public static void main(String[] args) throws Exception {
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.createLocalEnvironment();
> >
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> > List> elements = Arrays.asList(
> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100),
> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200),
> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300)
> > );
> >
> > DataStreamSource> input =
> > env.fromCollection(elements);
> >
> > SingleOutputStreamOperator> timestamped =
> > input.assignTimestampsAndWatermarks(new
> PunctuatedAssigner());
> >
> > timestamped.timeWindowAll(Time.minutes(1))
> >  .sum(1)
> >  .print();
> >
> > // printed result
> > // (2016-12-19T10:59:59.999Z,100)
> >
> > env.execute();
> >   }
> >
> >   private static class PunctuatedAssigner
> >   implements AssignerWithPunctuatedWatermarks > Integer>> {
> >
> > @Override
> > public long extractTimestamp(Tuple2 element, long
> > previousElementTimestamp) {
> >   return element.f0.toEpochMilli();
> > }
> >
> > @Override
> > public Watermark checkAndGetNextWatermark(Tuple2
> > lastElement, long extractedTimestamp) {
> >   return new Watermark(extractedTimestamp);
> > }
> >   }
> > }
> > /
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-mailing-
> > list-archive.1008284.n3.nabble.com/Flink-gives-
> > incorrect-result-when-event-time-windowing-used-tp15058.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at
> > Nabble.com.
> >
>


[jira] [Created] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2016-12-20 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5372:
---

 Summary: Fix 
RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
 Key: FLINK-5372
 URL: https://issues.apache.org/jira/browse/FLINK-5372
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


The test is currently {{@Ignored}}. We have to change 
{{AsyncCheckpointOperator}} to make sure that we can run fully asynchronously. 
Then, the test will still fail because the canceling behaviour was changed in 
the meantime.

{code}
public static class AsyncCheckpointOperator
extends AbstractStreamOperator
implements OneInputStreamOperator<String, String> {

@Override
public void open() throws Exception {
super.open();

// also get the state in open, this way we are sure that it was created 
before
// we trigger the test checkpoint
ValueState state = getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("count",
StringSerializer.INSTANCE, "hello"));

}

@Override
public void processElement(StreamRecord element) throws Exception {
// we also don't care

ValueState state = getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("count",
StringSerializer.INSTANCE, "hello"));

state.update(element.getValue());
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
// do nothing so that we don't block
}
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Schedule and Scope for Flink 1.2

2016-12-20 Thread Aljoscha Krettek
I just merged the most important backwards compatibility changes, with
tests.

I think this one is still a blocker:
https://issues.apache.org/jira/browse/FLINK-5320 (WindowedStream.fold()
cannot be used). And this one is a potential blocker for some users:
https://issues.apache.org/jira/browse/FLINK-5363.

IMHO, we can cut the branch today and I'll get them in on master and both
the 1.2 branch. What do you think?

On Tue, 20 Dec 2016 at 15:24 Robert Metzger <rmetz...@apache.org> wrote:

> Quick update here: I talked to Aljoscha offline, and the backwards
> compatibility is still being tested (there were some bugs identified while
> writing the tests).
>
> Also, Stephan made some fixes to the build infrastructure (
> https://github.com/apache/flink/pull/3029) that would be good to be
> included into the release branch.
> To finally get the FLIP-6 branch merged to master, I'm considering
> branching off the 1.2 release later today. It will be a little bit more
> overhead for Stephan and Aljoscha, but it will unblock all features waiting
> for a Flink 1.3 master.
>
>
>
>
> On Fri, Dec 16, 2016 at 6:05 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
> > Thank you Aljoscha and Fabian for the updates.
> > I propose *Monday evening (6pm Berlin, 9am US west coast) for feature
> > freezing Flink 1.2 *then. This means that I'll create a release-1.2 fork
> > and create a 1.2 RC0 (non-voting) release candidate for testing.
> >
> > I don't think that I'll create the first (voting) RC until January
> because
> > of christmas and new years activities. Most of the committers I know are
> > out of office during these 1,5 weeks.
> >
> >
> >
> > On Fri, Dec 16, 2016 at 5:55 PM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> I merged the Table API refactoring changes:
> >>
> >> - RESOLVED Clean up the packages of the Table API (FLINK-4704)
> >> - RESOLVED Move Row to flink-core (FLINK-5186)
> >>
> >> No blockers left from my side.
> >>
> >> Cheers, Fabian
> >>
> >> 2016-12-16 17:47 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> >>
> >> > Yes, I'm confident that we can finish the tests until then and merge
> the
> >> > code.
> >> >
> >> > On Fri, Dec 16, 2016, 17:41 Robert Metzger <rmetz...@apache.org>
> wrote:
> >> >
> >> > > Thank you for the update. Do you think you get it done until Monday
> >> > > evening?
> >> > >
> >> > > On Fri, Dec 16, 2016 at 5:23 PM, Aljoscha Krettek <
> >> aljos...@apache.org>
> >> > > wrote:
> >> > >
> >> > > > Hi,
> >> > > > we're still working on making the backwards compatibility from 1.1
> >> > > > savepoints a reality. We have most of the code and some tests now
> >> but
> >> > it
> >> > > > still needs some work. This is the issue that tracks the progress
> on
> >> > the
> >> > > > operators that we would like to make backwards compatible:
> >> > > > https://issues.apache.org/jira/browse/FLINK-5292
> >> > > >
> >> > > > Cheers,
> >> > > > Aljoscha
> >> > > >
> >> > > > On Tue, 13 Dec 2016 at 11:22 Feng Wang <feng.w...@outlook.com>
> >> wrote:
> >> > > >
> >> > > > > It will be pretty good if 1.2 branch could be forked off within
> >> this
> >> > > > week,
> >> > > > > and our guys working on FLIP-6  hope FLIP-6 branch could be
> merged
> >> > into
> >> > > > > master as soon as possible.
> >> > > > >
> >> > > > > Best Regards,
> >> > > > >
> >> > > > > Feng Wang
> >> > > > >
> >> > > > > Alibaba
> >> > > > >
> >> > > > > 
> >> > > > > From: Robert Metzger <rmetz...@apache.org>
> >> > > > > Sent: Tuesday, December 13, 2016 4:58 AM
> >> > > > > To: dev@flink.apache.org
> >> > > > > Subject: Re: [DISCUSS] Schedule and Scope for Flink 1.2
> >> > > > >
> >> > > > > Thank you all for figuring out a solution for the security pull
> >> > > request.
> >> > > > >
> >> > > > >
>

[jira] [Created] (FLINK-5366) Add end-to-end tests for Savepoint Backwards Compatibility

2016-12-19 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5366:
---

 Summary: Add end-to-end tests for Savepoint Backwards Compatibility
 Key: FLINK-5366
 URL: https://issues.apache.org/jira/browse/FLINK-5366
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.2.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5363) Fire timers when window state is currently empty

2016-12-18 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5363:
---

 Summary: Fire timers when window state is currently empty
 Key: FLINK-5363
 URL: https://issues.apache.org/jira/browse/FLINK-5363
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 1.2.0


Currently, when a {{Trigger}} sets a timer and that timer fires in the future 
at a point when there is currently no data in the window state, then that timer 
is being ignored.

This is a problem for some users because they manually set cleanup timers and 
they need to be called because the trigger needs to cleanup some state. (For 
normal time windows this is not a problem, but for special cases built on top 
of {{GlobalWindows}} the current behaviour leads to problems.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Schedule and Scope for Flink 1.2

2016-12-16 Thread Aljoscha Krettek
Yes, I'm confident that we can finish the tests until then and merge the
code.

On Fri, Dec 16, 2016, 17:41 Robert Metzger <rmetz...@apache.org> wrote:

> Thank you for the update. Do you think you get it done until Monday
> evening?
>
> On Fri, Dec 16, 2016 at 5:23 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Hi,
> > we're still working on making the backwards compatibility from 1.1
> > savepoints a reality. We have most of the code and some tests now but it
> > still needs some work. This is the issue that tracks the progress on the
> > operators that we would like to make backwards compatible:
> > https://issues.apache.org/jira/browse/FLINK-5292
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 13 Dec 2016 at 11:22 Feng Wang <feng.w...@outlook.com> wrote:
> >
> > > It will be pretty good if 1.2 branch could be forked off within this
> > week,
> > > and our guys working on FLIP-6  hope FLIP-6 branch could be merged into
> > > master as soon as possible.
> > >
> > > Best Regards,
> > >
> > > Feng Wang
> > >
> > > Alibaba
> > >
> > > 
> > > From: Robert Metzger <rmetz...@apache.org>
> > > Sent: Tuesday, December 13, 2016 4:58 AM
> > > To: dev@flink.apache.org
> > > Subject: Re: [DISCUSS] Schedule and Scope for Flink 1.2
> > >
> > > Thank you all for figuring out a solution for the security pull
> request.
> > >
> > >
> > > Lets try to get 1.2 feature freezed as fast as possible so that we can
> > > "unblock" waiting features like FLIP-6 and the remaining security
> > changes.
> > >
> > > *What do you think about Friday evening (6pm Berlin, 9am US west coast)
> > for
> > > feature freezing Flink 1.2?* (only bugfixes are allowed in afterwards)
> > > I'll then fork-off a "release-1.2" branch and update the version in
> > > "master" to 1.3-SNAPSHOT.
> > > Please object if you have a bigger change or any other reservations
> > > regarding the feature freeze date!
> > >
> > > This is my current view of things on the release:
> > >
> > > - RESOLVED dynamic Scaling / Key Groups (FLINK-3755)
> > > - RESOLVED Add Rescalable Non-Partitioned State (FLINK-4379)
> > > - UNRESOLVED Add Flink 1.1 savepoint backwards compatability
> (FLINK-4797)
> > > - RESOLVED [Split for 1.3] Integrate Flink with Apache Mesos
> (FLINK-1984)
> > > - UNDER DISCUSSION Secure Data Access (FLINK-3930)
> > > - RESOLVED Queryable State (FLINK-3779)
> > > - RESOLVED Metrics in Webinterface (FLINK-4389)
> > > - RESOLVED Kafka 0.10 support (FLINK-4035)
> > > - RESOLVED Table API: Group Window Aggregates (FLINK-4691, FLIP-11)
> > > - RESOLVED Table API: Scalar Functions (FLINK-3097)
> > > Added by Stephan:
> > > - NON-BLOCKING [Pending PR] Provide support for asynchronous operations
> > > over streams (FLINK-4391)
> > > - NON-BLOCKING [beginning of next week] Unify Savepoints and
> Checkpoints
> > > (FLINK-4484)
> > > Added by Fabian:
> > > - ONGOING [Pending PR] Clean up the packages of the Table API
> > (FLINK-4704)
> > >  Move Row to flink-core (
> > > Added by Max:
> > > - ONGOING [Pending PR] Change Akka configuration to allow accessing
> > actors
> > > from different URLs (FLINK-2821)
> > >
> > >
> > > On Mon, Dec 12, 2016 at 5:43 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > > > Hi Vijay!
> > > >
> > > > The workaround you suggest may be doable, but I am wondering how much
> > > that
> > > > helps, because the authorization feature would be incomplete like
> that
> > > and
> > > > thus of limited use.
> > > >
> > > > I would also assume that merging it properly and in full use after
> the
> > > 1.2
> > > > release would be a bit better - in general, we have often avoided
> last
> > > > minute additions of sensitive and complex features.
> > > >
> > > > Do you think it is more urgent to have this in Flink?
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > >
> > > > On Mon, Dec 12, 2016 at 2:49 PM, Vijay <vijikar...@yahoo.com.invalid
> >
> > > > wrote:
> > > >
> > > > > Max and Ufuk, I respect your concerns and fully u

Re: [DISCUSS] Schedule and Scope for Flink 1.2

2016-12-16 Thread Aljoscha Krettek
Hi,
we're still working on making the backwards compatibility from 1.1
savepoints a reality. We have most of the code and some tests now but it
still needs some work. This is the issue that tracks the progress on the
operators that we would like to make backwards compatible:
https://issues.apache.org/jira/browse/FLINK-5292

Cheers,
Aljoscha

On Tue, 13 Dec 2016 at 11:22 Feng Wang  wrote:

> It will be pretty good if 1.2 branch could be forked off within this week,
> and our guys working on FLIP-6  hope FLIP-6 branch could be merged into
> master as soon as possible.
>
> Best Regards,
>
> Feng Wang
>
> Alibaba
>
> 
> From: Robert Metzger 
> Sent: Tuesday, December 13, 2016 4:58 AM
> To: dev@flink.apache.org
> Subject: Re: [DISCUSS] Schedule and Scope for Flink 1.2
>
> Thank you all for figuring out a solution for the security pull request.
>
>
> Lets try to get 1.2 feature freezed as fast as possible so that we can
> "unblock" waiting features like FLIP-6 and the remaining security changes.
>
> *What do you think about Friday evening (6pm Berlin, 9am US west coast) for
> feature freezing Flink 1.2?* (only bugfixes are allowed in afterwards)
> I'll then fork-off a "release-1.2" branch and update the version in
> "master" to 1.3-SNAPSHOT.
> Please object if you have a bigger change or any other reservations
> regarding the feature freeze date!
>
> This is my current view of things on the release:
>
> - RESOLVED dynamic Scaling / Key Groups (FLINK-3755)
> - RESOLVED Add Rescalable Non-Partitioned State (FLINK-4379)
> - UNRESOLVED Add Flink 1.1 savepoint backwards compatability (FLINK-4797)
> - RESOLVED [Split for 1.3] Integrate Flink with Apache Mesos (FLINK-1984)
> - UNDER DISCUSSION Secure Data Access (FLINK-3930)
> - RESOLVED Queryable State (FLINK-3779)
> - RESOLVED Metrics in Webinterface (FLINK-4389)
> - RESOLVED Kafka 0.10 support (FLINK-4035)
> - RESOLVED Table API: Group Window Aggregates (FLINK-4691, FLIP-11)
> - RESOLVED Table API: Scalar Functions (FLINK-3097)
> Added by Stephan:
> - NON-BLOCKING [Pending PR] Provide support for asynchronous operations
> over streams (FLINK-4391)
> - NON-BLOCKING [beginning of next week] Unify Savepoints and Checkpoints
> (FLINK-4484)
> Added by Fabian:
> - ONGOING [Pending PR] Clean up the packages of the Table API (FLINK-4704)
>  Move Row to flink-core (
> Added by Max:
> - ONGOING [Pending PR] Change Akka configuration to allow accessing actors
> from different URLs (FLINK-2821)
>
>
> On Mon, Dec 12, 2016 at 5:43 PM, Stephan Ewen  wrote:
>
> > Hi Vijay!
> >
> > The workaround you suggest may be doable, but I am wondering how much
> that
> > helps, because the authorization feature would be incomplete like that
> and
> > thus of limited use.
> >
> > I would also assume that merging it properly and in full use after the
> 1.2
> > release would be a bit better - in general, we have often avoided last
> > minute additions of sensitive and complex features.
> >
> > Do you think it is more urgent to have this in Flink?
> >
> > Best,
> > Stephan
> >
> >
> > On Mon, Dec 12, 2016 at 2:49 PM, Vijay 
> > wrote:
> >
> > > Max and Ufuk, I respect your concerns and fully understand the
> importance
> > > of the network layer stack in Flink code base. Will you be comfortable
> to
> > > merge the code if I remove the Netty layer changes and leave the rest
> of
> > > the code. We can address the Netty code changes post 1.2 release?
> > >
> > > Regards,
> > > Vijay
> > >
> > > Sent from my iPhone
> > >
> > > > On Dec 12, 2016, at 3:38 AM, Ufuk Celebi  wrote:
> > > >
> > > > On 12 December 2016 at 12:30:31, Maximilian Michels (m...@apache.org)
> > > wrote:
> > > >>> It seems like we lack the resources for now to properly to take
> > > >> care
> > > >> of your pull request before the release. Unless someone from
> > > >> the
> > > >> community is really eager to help out here, I would be in favor
> > > >> of
> > > >> merging the pull request to the master after the release branch
> > > >> has
> > > >> been forked off. We should make sure it gets the attention it
> deserves
> > > >> then.
> > > >
> > > > Thanks Max! I fully agree with your reasoning. +1 to not include this
> > in
> > > 1.2 now, but look at it afterwards. I hope that OK with you Vijay.
> > > >
> > > > - Ufuk
> > > >
> > > >
> > >
> > >
> >
>


[jira] [Created] (FLINK-5250) Make AbstractUdfStreamOperator aware of WrappingFunction

2016-12-04 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5250:
---

 Summary: Make AbstractUdfStreamOperator aware of WrappingFunction
 Key: FLINK-5250
 URL: https://issues.apache.org/jira/browse/FLINK-5250
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Aljoscha Krettek


Right now, when using a {{WrappingFunction}}, as happens for {{WindowFunction}} 
and also for some functions in the Scala API then using custom interfaces is 
not possible. These custom interfaces are, for example, the checkpointing 
functions such as {{ListCheckpointed}} and {{CheckpointedFunction}}.

We should teach {{AbstractUdfStreamOperator}} about {{WrapingFunction}} so that 
it can correctly handle the case where wrapped user functions implement on of 
these interfaces.

Also, in the Scala API we have some custom functions that mimic 
{{WrappingFunction}} behaviour. These should be moved to use 
{{WrappingFunction}}, if possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5240) Properly Close StateBackend in StreamTask when closing/canceling

2016-12-02 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5240:
---

 Summary: Properly Close StateBackend in StreamTask when 
closing/canceling
 Key: FLINK-5240
 URL: https://issues.apache.org/jira/browse/FLINK-5240
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.2.0


Right now, the {{StreamTask}} never calls {{close()}} on the state backend.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5237) Consolidate and harmonize Window Translation Tests

2016-12-02 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5237:
---

 Summary: Consolidate and harmonize Window Translation Tests
 Key: FLINK-5237
 URL: https://issues.apache.org/jira/browse/FLINK-5237
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


The tests that check whether API calls on {{WindowedStream}} (both Java and 
Scala) result in the correct runtime operation are scattered across 
{{TimeWindowTranslationTest}} and {{WindowTranslationTest}} and the test 
coverage is not the same for Scala and Java.

We should ensure that we test all API calls and that we also test the Scala API 
with the same level of detail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5155) Deprecate ValueStateDescriptor constructors with default value

2016-11-24 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5155:
---

 Summary: Deprecate ValueStateDescriptor constructors with default 
value
 Key: FLINK-5155
 URL: https://issues.apache.org/jira/browse/FLINK-5155
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Aljoscha Krettek


Having the default value in the descriptor is problematic with some serialisers 
and we don't lose a feature because users can always check for the null value 
and initialise with their own default value if necessary. Right now, we're 
always forcing people to specify a default value even though they don't need 
one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5154) Duplicate TypeSerializer when writing RocksDB Snapshot

2016-11-24 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5154:
---

 Summary: Duplicate TypeSerializer when writing RocksDB Snapshot
 Key: FLINK-5154
 URL: https://issues.apache.org/jira/browse/FLINK-5154
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Aljoscha Krettek
Assignee: Stefan Richter
Priority: Blocker
 Fix For: 1.2.0, 1.1.4


Some {{TypeSerializers}} are not thread safe (for example {{KryoSerializer}}) 
we have to {{duplicate()}} them when using concurrently, as happens when 
performing a RocksDB snapshot.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] @Public libraries

2016-11-23 Thread Aljoscha Krettek
I would be for also annotating library methods/classes. Maybe Robert has a
stronger opinion on this because he introduced these annotations.

On Tue, 22 Nov 2016 at 18:56 Greg Hogan  wrote:

> Hi all,
>
> Should stable APIs in Flink's CEP, ML, and Gelly libraries be annotated
> @Public or restricted to use of @PublicEvolving?
>
> We would ensure that library APIs do not add restrictions to the core APIs.
> Libraries could use @PublicEvolving or @Internal core APIs within @Public
> or @PublicEvolving components as long as the functionality could be adapted
> or rewritten as necessary. An example: CombineHint is @Internal. Gelly
> could use CombineHint in a @Public method but could not accept CombineHint
> as a parameter to a @Public method.
>
> Greg
>


Re: [DISCUSS] Hold copies in HeapStateBackend

2016-11-23 Thread Aljoscha Krettek
You can go ahead and do the change. I just think that this is quite
fragile. For example, this depends on the reduce function returning the
right object for reuse. If we hand in the copied object as the first input
and the ReduceFunction reuses the second input then we again have a
reference to the same (input) object in the state. Several components have
to work together to make this intricate dance work and if someone changes
the order in the reduce function for some reason this might break.

Big +1 on documenting the shortcomings. :-)

On Wed, 23 Nov 2016 at 12:34 Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Aljoscha,
>
> sure, there many issues with holding the state as objects on the heap.
> However, I think we don't have to solve all problems related to that in
> order to add a small fix that solves one specific issue.
> I would not explicitly expose the fix to users but it would be nice if we
> could implement more efficient code for internal functions.
>
> Moreover, I think we should extend the documentation and clearly point out
> the limitations regarding modifying state objects.
>
> Best, Fabian
>
>
>
> 2016-11-23 12:07 GMT+01:00 sjk <shijinkui...@163.com>:
>
> > hi,Fabian Hueske, Sorry for mistake for the whole PR #2792
> >
> > > On Nov 23, 2016, at 17:10, Fabian Hueske <fhue...@gmail.com> wrote:
> > >
> > > Hi,
> > >
> > > Why do you think that this means "much code changes"?
> > > I think it would actually be a pretty lightweight change in
> > > HeapReducingState.
> > >
> > > The proposal is to copy the *first* value that goes into a
> ReducingState.
> > > The copy would be done by a TypeSerializer and hence be a deep copy.
> > > This will allow to reuse the copy in each invocation of the
> > ReduceFunction
> > > instead of creating a new result object of the same type that was
> > initially
> > > copied.
> > >
> > > I think the savings of reusing the object in each invocation of the
> > > ReduceFunction and not creating a new object should amortize the
> one-time
> > > object copy.
> > >
> > > Fabian
> > >
> > > 2016-11-23 3:04 GMT+01:00 sjk <shijinkui...@163.com>:
> > >
> > >> Hi, Fabian
> > >>
> > >> So much code changes. Can you show us the key changes code for the
> > object
> > >> copy?
> > >> Object reference maybe hold more deep reference, it can be a bomb.
> > >> Can we renew a object with its data or direct use kryo for object
> > >> serialization?
> > >> I’m not prefer object copy.
> > >>
> > >>
> > >>> On Nov 22, 2016, at 20:33, Fabian Hueske <fhue...@gmail.com> wrote:
> > >>>
> > >>> Does anybody have objections against copying the first record that
> goes
> > >>> into the ReduceState?
> > >>>
> > >>> 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> > >>>
> > >>>> That's right, yes.
> > >>>>
> > >>>> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fhue...@gmail.com>
> wrote:
> > >>>>
> > >>>>> Right, but that would be a much bigger change than "just" copying
> the
> > >>>>> *first* record that goes into the ReduceState, or am I missing
> > >> something?
> > >>>>>
> > >>>>>
> > >>>>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> > >>>>>
> > >>>>>> To bring over my comment from the Github PR that started this
> > >>>> discussion:
> > >>>>>>
> > >>>>>> @wuchong <https://github.com/wuchong>, yes this is a problem with
> > the
> > >>>>>> HeapStateBackend. The RocksDB backend does not suffer from this
> > >>>> problem.
> > >>>>> I
> > >>>>>> think in the long run we should migrate the HeapStateBackend to
> > always
> > >>>>> keep
> > >>>>>> data in serialised form, then we also won't have this problem
> > anymore.
> > >>>>>>
> > >>>>>> So I'm very much in favour of keeping data serialised. Copying
> data
> > >>>> would
> > >>>>>> only ever be a stopgap solution.
> > >>>>>>
> 

Re: [DISCUSS] deprecated function need more detail

2016-11-23 Thread Aljoscha Krettek
+1 That sounds excellent.

On Wed, 23 Nov 2016 at 11:04 Till Rohrmann  wrote:

> +1 for your proposal.
>
> Cheers,
> Till
>
> On Wed, Nov 23, 2016 at 9:33 AM, Fabian Hueske  wrote:
>
> > I agree on this one.
> > Whenever we deprecate a method or a feature we should add a comment that
> > explains the new API or why the feature was removed without replacement.
> >
> > Enforcing this information through checkstyle makes sense as well, IMO.
> >
> > Cheers, Fabian
> >
> > 2016-11-23 4:42 GMT+01:00 sjk :
> >
> > > Hi, all
> > >
> > > Let’s have look at Checkpointed interface below. It declared deprecated
> > > but have no detail for why, when and how replace this function. It’s a
> > big
> > > trouble for the users.
> > >
> > > @Deprecated
> > > @PublicEvolving
> > > public interface Checkpointed extends
> > > CheckpointedRestoring {
> > >
> > >
> > > I think we should have more detail: when give up, who replace it, why
> > > deprecated.
> > >
> > > For Java code, add detail  deprecated reason in code annotations.
> > > For Scala code, replace Java annotation  @Deprecated(,,) with Scala
> > > annotation @deprecated, such as
> > > @deprecated(message = "the reason", since = "when fully give up”)
> > >
> > > Add this rule to customized checkstyle plugin of maven and SBT.
> > >
> > > Best regard
> > > -Jinkui Shi
> >
>


[jira] [Created] (FLINK-5130) Remove Deprecated Methods from WindowedStream

2016-11-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5130:
---

 Summary: Remove Deprecated Methods from WindowedStream
 Key: FLINK-5130
 URL: https://issues.apache.org/jira/browse/FLINK-5130
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Reporter: Aljoscha Krettek
 Fix For: 2.0.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5126) Remove Checked Exceptions from State Interfaces

2016-11-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5126:
---

 Summary: Remove Checked Exceptions from State Interfaces
 Key: FLINK-5126
 URL: https://issues.apache.org/jira/browse/FLINK-5126
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Reporter: Aljoscha Krettek


Most of the methods can throw {{Exception}} or {{IOException}} but the user 
cannot really do anything to handle them. The exceptions just pollute any code 
a user implements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5125) ContinuousFileProcessingCheckpointITCase is Flaky

2016-11-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5125:
---

 Summary: ContinuousFileProcessingCheckpointITCase is Flaky
 Key: FLINK-5125
 URL: https://issues.apache.org/jira/browse/FLINK-5125
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Aljoscha Krettek
Assignee: Kostas Kloudas


This is the travis log: 
https://api.travis-ci.org/jobs/177402367/log.txt?deansi=true

The relevant sections is:
```
Running org.apache.flink.test.checkpointing.CoStreamCheckpointingITCase
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.571 sec - in 
org.apache.flink.test.exampleJavaPrograms.EnumTriangleBasicITCase
Running org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 17.704 sec - in 
org.apache.flink.test.checkpointing.CoStreamCheckpointingITCase
Running 
org.apache.flink.test.checkpointing.EventTimeAllWindowCheckpointingITCase
Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 11.805 sec - in 
org.apache.flink.test.checkpointing.EventTimeAllWindowCheckpointingITCase
Running 
org.apache.flink.test.checkpointing.ContinuousFileProcessingCheckpointITCase
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
at 
org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase.runCheckpointedProgram(StreamFaultToleranceTestBase.java:106)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:905)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply

Re: [DISCUSS] Hold copies in HeapStateBackend

2016-11-22 Thread Aljoscha Krettek
That's right, yes.

On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fhue...@gmail.com> wrote:

> Right, but that would be a much bigger change than "just" copying the
> *first* record that goes into the ReduceState, or am I missing something?
>
>
> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
> > To bring over my comment from the Github PR that started this discussion:
> >
> > @wuchong <https://github.com/wuchong>, yes this is a problem with the
> > HeapStateBackend. The RocksDB backend does not suffer from this problem.
> I
> > think in the long run we should migrate the HeapStateBackend to always
> keep
> > data in serialised form, then we also won't have this problem anymore.
> >
> > So I'm very much in favour of keeping data serialised. Copying data would
> > only ever be a stopgap solution.
> >
> > On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fhue...@gmail.com> wrote:
> >
> > > Another approach that would solve the problem for our use case (object
> > > re-usage for incremental window ReduceFunctions) would be to copy the
> > first
> > > object that is put into the state.
> > > This would be a change on the ReduceState, not on the overall state
> > > backend, which should be feasible, no?
> > >
> > >
> > >
> > > 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> > >
> > > > -1 for copying objects.
> > > >
> > > > Storing a serialized data where possible is good, but copying all
> > objects
> > > > by default is not a good idea, in my opinion.
> > > > A lot of scenarios use data types that are hellishly expensive to
> copy.
> > > > Even the current copy on chain handover is a problem.
> > > >
> > > > Let's not introduce even more copies.
> > > >
> > > > On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <m...@touk.pl>
> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > it will come with performance overhead when updating the state,
> but I
> > > > > think it'll be possible to perform asynchronous snapshots using
> > > > > HeapStateBackend (probably some changes to underlying data
> structures
> > > > would
> > > > > be needed) - which would bring more predictable performance.
> > > > >
> > > > > thanks,
> > > > > maciek
> > > > >
> > > > >
> > > > > On 21/11/2016 13:48, Aljoscha Krettek wrote:
> > > > >
> > > > >> Hi,
> > > > >> I would be in favour of this since it brings things in line with
> the
> > > > >> RocksDB backend. This will, however, come with quite the
> performance
> > > > >> overhead, depending on how fast the TypeSerializer can copy.
> > > > >>
> > > > >> Cheers,
> > > > >> Aljoscha
> > > > >>
> > > > >> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > > > >>
> > > > >> Hi everybody,
> > > > >>>
> > > > >>> when implementing a ReduceFunction for incremental aggregation of
> > > SQL /
> > > > >>> Table API window aggregates we noticed that the HeapStateBackend
> > does
> > > > not
> > > > >>> store copies but holds references to the original objects. In
> case
> > > of a
> > > > >>> SlidingWindow, the same object is referenced from different
> window
> > > > panes.
> > > > >>> Therefore, it is not possible to modify these objects (in order
> to
> > > > avoid
> > > > >>> object instantiations, see discussion [1]).
> > > > >>>
> > > > >>> Other state backends serialize their data such that the behavior
> is
> > > not
> > > > >>> consistent across backends.
> > > > >>> If we want to have light-weight tests, we have to create new
> > objects
> > > in
> > > > >>> the
> > > > >>> ReduceFunction causing unnecessary overhead.
> > > > >>>
> > > > >>> I would propose to copy objects when storing them in a
> > > > HeapStateBackend.
> > > > >>> This would ensure that objects returned from state to the user
> > behave
> > > > >>> identical for different state backends.
> > > > >>>
> > > > >>> We created a related JIRA [2] that asks to copy records that go
> > into
> > > an
> > > > >>> incremental ReduceFunction. The scope is more narrow and would
> > solve
> > > > our
> > > > >>> problem, but would leave the inconsistent behavior of state
> > backends
> > > in
> > > > >>> place.
> > > > >>>
> > > > >>> What do others think?
> > > > >>>
> > > > >>> Cheers, Fabian
> > > > >>>
> > > > >>> [1]
> https://github.com/apache/flink/pull/2792#discussion_r88653721
> > > > >>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> > > > >>>
> > > > >>>
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] Hold copies in HeapStateBackend

2016-11-21 Thread Aljoscha Krettek
To bring over my comment from the Github PR that started this discussion:

@wuchong <https://github.com/wuchong>, yes this is a problem with the
HeapStateBackend. The RocksDB backend does not suffer from this problem. I
think in the long run we should migrate the HeapStateBackend to always keep
data in serialised form, then we also won't have this problem anymore.

So I'm very much in favour of keeping data serialised. Copying data would
only ever be a stopgap solution.

On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fhue...@gmail.com> wrote:

> Another approach that would solve the problem for our use case (object
> re-usage for incremental window ReduceFunctions) would be to copy the first
> object that is put into the state.
> This would be a change on the ReduceState, not on the overall state
> backend, which should be feasible, no?
>
>
>
> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
>
> > -1 for copying objects.
> >
> > Storing a serialized data where possible is good, but copying all objects
> > by default is not a good idea, in my opinion.
> > A lot of scenarios use data types that are hellishly expensive to copy.
> > Even the current copy on chain handover is a problem.
> >
> > Let's not introduce even more copies.
> >
> > On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <m...@touk.pl> wrote:
> >
> > > Hi,
> > >
> > > it will come with performance overhead when updating the state, but I
> > > think it'll be possible to perform asynchronous snapshots using
> > > HeapStateBackend (probably some changes to underlying data structures
> > would
> > > be needed) - which would bring more predictable performance.
> > >
> > > thanks,
> > > maciek
> > >
> > >
> > > On 21/11/2016 13:48, Aljoscha Krettek wrote:
> > >
> > >> Hi,
> > >> I would be in favour of this since it brings things in line with the
> > >> RocksDB backend. This will, however, come with quite the performance
> > >> overhead, depending on how fast the TypeSerializer can copy.
> > >>
> > >> Cheers,
> > >> Aljoscha
> > >>
> > >> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fhue...@gmail.com> wrote:
> > >>
> > >> Hi everybody,
> > >>>
> > >>> when implementing a ReduceFunction for incremental aggregation of
> SQL /
> > >>> Table API window aggregates we noticed that the HeapStateBackend does
> > not
> > >>> store copies but holds references to the original objects. In case
> of a
> > >>> SlidingWindow, the same object is referenced from different window
> > panes.
> > >>> Therefore, it is not possible to modify these objects (in order to
> > avoid
> > >>> object instantiations, see discussion [1]).
> > >>>
> > >>> Other state backends serialize their data such that the behavior is
> not
> > >>> consistent across backends.
> > >>> If we want to have light-weight tests, we have to create new objects
> in
> > >>> the
> > >>> ReduceFunction causing unnecessary overhead.
> > >>>
> > >>> I would propose to copy objects when storing them in a
> > HeapStateBackend.
> > >>> This would ensure that objects returned from state to the user behave
> > >>> identical for different state backends.
> > >>>
> > >>> We created a related JIRA [2] that asks to copy records that go into
> an
> > >>> incremental ReduceFunction. The scope is more narrow and would solve
> > our
> > >>> problem, but would leave the inconsistent behavior of state backends
> in
> > >>> place.
> > >>>
> > >>> What do others think?
> > >>>
> > >>> Cheers, Fabian
> > >>>
> > >>> [1] https://github.com/apache/flink/pull/2792#discussion_r88653721
> > >>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> > >>>
> > >>>
> > >
> >
>


<    4   5   6   7   8   9   10   11   12   13   >