Re: Send events to parallel operator instances

2015-06-04 Thread Gyula Fóra
I am simply thinking about the best way to send data to different subtasks
of the same operator.

Can we go back to the original question? :D

Stephan Ewen  ezt írta (időpont: 2015. jún. 3., Sze,
23:45):

> I think that it may be a bit pre-mature to invest heavily into the parallel
> delta-policy windows just yet.
> We have not even answered all questions on the key-local delta windows yet:
>
>  - How does it behave with non-monotonous changes? What does the delta
> refer to, the max interval in the window, the interval to the earliest
> element. The max difference between two consecutive elements?
>
>  - What about the order of records? Are deltas even interesting when
> records come in arbitrary order? What about the predictability of recovery
> runs?
>
>
> I would assume that a consistent version of the key-local delta windows
> will get us a long way, use-case wise.
>
> Let's learn more about how users use these policies in the "simple" case.
> Because that will impact the protocol for global coordination (for examplea
> concerning order and relative to what element are the deltas computed, the
> first or the min). Otherwise we invest a lot of effort into something where
> we have not yet a clear understanding about how we actually want it to
> behave, exactly.
>
> What do you think?
>
>
>
>
> On Wed, Jun 3, 2015 at 2:14 PM, Gyula Fóra  wrote:
>
> > I am talking of course about global delta windows. On the full stream not
> > on a partition. Delta windows per partition happens as you said currently
> > as well.
> >
> > On Wednesday, June 3, 2015, Aljoscha Krettek 
> wrote:
> >
> > > Yes, this is obvious, but if we simply partition the data on the
> > > attribute that we use for the delta policy this can be done purely on
> > > one machine. No need for complex communication/synchronization.
> > >
> > > On Wed, Jun 3, 2015 at 1:32 PM, Gyula Fóra  > > > wrote:
> > > > Yes, we define a delta function from the first element to the last
> > > element
> > > > in a window. Now let's discretize the stream using this semantics in
> > > > parallel.
> > > >
> > > > Aljoscha Krettek > ezt írta
> > > (időpont: 2015. jún. 3.,
> > > > Sze, 12:20):
> > > >
> > > >> Ah ok. And by distributed you mean that the element that starts the
> > > >> window can be processed on a different machine than the element that
> > > >> finishes the window?
> > > >>
> > > >> On Wed, Jun 3, 2015 at 12:11 PM, Gyula Fóra  > > > wrote:
> > > >> > This is not connected to the current implementation. So lets not
> > talk
> > > >> about
> > > >> > that.
> > > >> >
> > > >> > This is about theoretical limits to support distributed delta
> > policies
> > > >> > which has far reaching implications for the windowing policies one
> > can
> > > >> > implement in a prallel way.
> > > >> >
> > > >> > But you are welcome to throw in any constructive ideas :)
> > > >> > On Wed, Jun 3, 2015 at 11:49 AM Aljoscha Krettek <
> > aljos...@apache.org
> > > >
> > > >> > wrote:
> > > >> >
> > > >> >> Part of the reason for my question is this:
> > > >> >> https://issues.apache.org/jira/browse/FLINK-1967. Especially my
> > > latest
> > > >> >> comment there. If we want this, I think we have to overhaul the
> > > >> >> windowing system anyways and then it doesn't make sense to
> explore
> > > >> >> complicated workarounds for the current system.
> > > >> >>
> > > >> >> On Wed, Jun 3, 2015 at 11:07 AM, Gyula Fóra <
> gyula.f...@gmail.com
> > > >
> > > >> wrote:
> > > >> >> > There are simple ways of implementing it in a non-distributed
> or
> > > >> >> > inconsistent fashion.
> > > >> >> > On Wed, Jun 3, 2015 at 8:55 AM Aljoscha Krettek <
> > > aljos...@apache.org >
> > > >> >> wrote:
> > > >> >> >
> > > >> >> >> This already sounds awfully complicated. Is there no other way
> > to
> > > >> >> >> implement the delta windows?
> > > >> >> >>
> > > >> >&g

Re: Send events to parallel operator instances

2015-06-04 Thread Gyula Fóra
Thank you!
I was aware of the iterations as a possibility, but I was wondering if we
might have "lateral" communications.

Ufuk Celebi  ezt írta (időpont: 2015. jún. 4., Cs, 13:29):

>
> On 04 Jun 2015, at 12:46, Stephan Ewen  wrote:
>
> > There is no "lateral communication" right now. Typical pattern is to
> break
> > it up in two operators that communicate in an all-to-all fashion.
>
> You can look at the iteration tasks: the iteration sync task is
> communicating with the iteration heads like this.


Re: Send events to parallel operator instances

2015-06-04 Thread Gyula Fóra
Thanks Stephan for clarifying :)

@kostas: i am just playing around with some ideas. Only in my head so far,
so lets not worry about these things
On Thu, Jun 4, 2015 at 6:33 PM Kostas Tzoumas  wrote:

> Wouldn't this kind of cross-task communication break the whole dataflow
> abstraction? How can recovery be implemented if we allowed something like
> this?
>
> On Thu, Jun 4, 2015 at 5:14 PM, Stephan Ewen  wrote:
>
> > That is not what Ufuk said. You can use a singleton auxiliary task that
> > communicates in both directions with the vertices and acts as a
> coordinator
> > between vertices on the same level.
> >
> > On Thu, Jun 4, 2015 at 2:55 PM, Gyula Fóra  wrote:
> >
> > > Thank you!
> > > I was aware of the iterations as a possibility, but I was wondering if
> we
> > > might have "lateral" communications.
> > >
> > > Ufuk Celebi  ezt írta (időpont: 2015. jún. 4., Cs,
> > 13:29):
> > >
> > > >
> > > > On 04 Jun 2015, at 12:46, Stephan Ewen  wrote:
> > > >
> > > > > There is no "lateral communication" right now. Typical pattern is
> to
> > > > break
> > > > > it up in two operators that communicate in an all-to-all fashion.
> > > >
> > > > You can look at the iteration tasks: the iteration sync task is
> > > > communicating with the iteration heads like this.
> > >
> >
>


Re: Planning the 0.9 Release

2015-06-08 Thread Gyula Fóra
I agree with Marton. I thought Aljoscha was working on that.

On Monday, June 8, 2015, Márton Balassi  wrote:

> FLINK-2054 is definitely a problem if it persists. Sorry for missing it,
> solving it asap.
>
> On Mon, Jun 8, 2015 at 7:18 AM, Ufuk Celebi >
> wrote:
>
> >
> > On 08 Jun 2015, at 00:22, Robert Metzger  > wrote:
> >
> > > What about https://issues.apache.org/jira/browse/FLINK-2177 and
> > > https://issues.apache.org/jira/browse/FLINK-2054 ? They are both
> marked
> > as
> > > blockers.
> >
> > FLINK-2177 is resolved. Is 2054 ("StreamOperator rework removed copy
> calls
> > when passing output to a chained operator") still a release blocker?
> >
> > > On Fri, Jun 5, 2015 at 4:22 PM, Maximilian Michels  >
> > wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> I'm excited about the upcoming release. I think a few issues still
> need
> > to
> > >> be addressed. At least for me, those were
> > >>
> > >> - fixing errors messages on builds with the JDK8
> > >> - removing Apache thrift dependencies as of
> > >> https://issues.apache.org/jira/browse/FLINK-1635
> > >> - Possibly fix an issue in the delta iterations:
> > >> https://issues.apache.org/jira/browse/FLINK-1916
> > >>
> > >> @Ufuk: If you're too busy, I could also act as a release manager.
> >
> > Yes, please do so, Max! :) Thanks!
>


Force enabling checkpoints for iterative streaming jobs

2015-06-09 Thread Gyula Fóra
Hey all,

It is currently impossible to enable state checkpointing for iterative
jobs, because en exception is thrown when creating the jobgraph. This
behaviour is motivated by the lack of precise guarantees that we can give
with the current fault-tolerance implementations for cyclic graphs.

This PR  adds an optional flag to
force checkpoints even in case of iterations. The algorithm will take
checkpoints periodically as before, but records in transit inside the loop
will be lost.

However even this guarantee is enough for most applications (Machine
Learning for instance) and certainly much better than not having anything
at all.


I suggest we add this to the 0.9 release as currently many applications
suffer from this limitation (SAMOA, ML pipelines, graph streaming etc.)


Cheers,

Gyula


Re: Force enabling checkpoints for iterative streaming jobs

2015-06-09 Thread Gyula Fóra
As for people currently suffering from it:

An application King is developing requires iterations, and they need
checkpoints. Practically all SAMOA programs would need this.

It is very likely that the state interfaces will be changed after the
release, so this is not something that we can just add later. I don't see a
reason why we should not add it, as it is clearly documented. In this
actual case not having guarantees at all means people will never use it in
any production system. Having limited guarantees means that it will depend
on the application.

On Wed, Jun 10, 2015 at 12:53 AM, Ufuk Celebi  wrote:

> Hey Gyula,
>
> I understand your reasoning, but I don't think its worth to rush this into
> the release.
>
> As you've said, we cannot give precise guarantees. But this is arguably
> one of the key requirements for any fault tolerance mechanism. Therefore I
> disagree that this is better than not having anything at all. I think it
> will already go a long way to have the non-iterative case working reliably.
>
> And as far as I know there are no users really suffering from this at the
> moment (in the sense that someone has complained on the mailing list).
>
> Hence, I vote to postpone this.
>
> – Ufuk
>
> On 10 Jun 2015, at 00:19, Gyula Fóra  wrote:
>
> > Hey all,
> >
> > It is currently impossible to enable state checkpointing for iterative
> > jobs, because en exception is thrown when creating the jobgraph. This
> > behaviour is motivated by the lack of precise guarantees that we can give
> > with the current fault-tolerance implementations for cyclic graphs.
> >
> > This PR <https://github.com/apache/flink/pull/812> adds an optional
> flag to
> > force checkpoints even in case of iterations. The algorithm will take
> > checkpoints periodically as before, but records in transit inside the
> loop
> > will be lost.
> >
> > However even this guarantee is enough for most applications (Machine
> > Learning for instance) and certainly much better than not having anything
> > at all.
> >
> >
> > I suggest we add this to the 0.9 release as currently many applications
> > suffer from this limitation (SAMOA, ML pipelines, graph streaming etc.)
> >
> >
> > Cheers,
> >
> > Gyula
>
>


Re: Force enabling checkpoints for iterative streaming jobs

2015-06-10 Thread Gyula Fóra
I disagree. Not having checkpointed operators inside the iteration still
breaks the guarantees.

It is not about the states it is about the loop itself.
On Wed, Jun 10, 2015 at 10:12 AM Aljoscha Krettek 
wrote:

> This is the answer I gave on the PR (we should have one place for
> discussing this, though):
>
> I would be against merging this in the current form. What I propose is
> to analyse the topology to verify that there are no checkpointed
> operators inside iterations. Operators before and after iterations can
> be checkpointed and we can safely allow the user to enable
> checkpointing.
>
> If we have the code to analyse which operators are inside iterations
> we could also disallow windows inside iterations. I think windows
> inside iterations don't make sense since elements in different
> "iterations" would end up in the same window. Maybe I'm wrong here
> though, then please correct me.
>
> On Wed, Jun 10, 2015 at 10:08 AM, Márton Balassi
>  wrote:
> > I agree that for the sake of the above mentioned use cases it is
> reasonable
> > to add this to the release with the right documentation, for machine
> > learning potentially loosing one round of feedback data should not
> matter.
> >
> > Let us not block prominent users until the next release on this.
> >
> > On Wed, Jun 10, 2015 at 8:09 AM, Gyula Fóra 
> wrote:
> >
> >> As for people currently suffering from it:
> >>
> >> An application King is developing requires iterations, and they need
> >> checkpoints. Practically all SAMOA programs would need this.
> >>
> >> It is very likely that the state interfaces will be changed after the
> >> release, so this is not something that we can just add later. I don't
> see a
> >> reason why we should not add it, as it is clearly documented. In this
> >> actual case not having guarantees at all means people will never use it
> in
> >> any production system. Having limited guarantees means that it will
> depend
> >> on the application.
> >>
> >> On Wed, Jun 10, 2015 at 12:53 AM, Ufuk Celebi  wrote:
> >>
> >> > Hey Gyula,
> >> >
> >> > I understand your reasoning, but I don't think its worth to rush this
> >> into
> >> > the release.
> >> >
> >> > As you've said, we cannot give precise guarantees. But this is
> arguably
> >> > one of the key requirements for any fault tolerance mechanism.
> Therefore
> >> I
> >> > disagree that this is better than not having anything at all. I think
> it
> >> > will already go a long way to have the non-iterative case working
> >> reliably.
> >> >
> >> > And as far as I know there are no users really suffering from this at
> the
> >> > moment (in the sense that someone has complained on the mailing list).
> >> >
> >> > Hence, I vote to postpone this.
> >> >
> >> > – Ufuk
> >> >
> >> > On 10 Jun 2015, at 00:19, Gyula Fóra  wrote:
> >> >
> >> > > Hey all,
> >> > >
> >> > > It is currently impossible to enable state checkpointing for
> iterative
> >> > > jobs, because en exception is thrown when creating the jobgraph.
> This
> >> > > behaviour is motivated by the lack of precise guarantees that we can
> >> give
> >> > > with the current fault-tolerance implementations for cyclic graphs.
> >> > >
> >> > > This PR <https://github.com/apache/flink/pull/812> adds an optional
> >> > flag to
> >> > > force checkpoints even in case of iterations. The algorithm will
> take
> >> > > checkpoints periodically as before, but records in transit inside
> the
> >> > loop
> >> > > will be lost.
> >> > >
> >> > > However even this guarantee is enough for most applications (Machine
> >> > > Learning for instance) and certainly much better than not having
> >> anything
> >> > > at all.
> >> > >
> >> > >
> >> > > I suggest we add this to the 0.9 release as currently many
> applications
> >> > > suffer from this limitation (SAMOA, ML pipelines, graph streaming
> etc.)
> >> > >
> >> > >
> >> > > Cheers,
> >> > >
> >> > > Gyula
> >> >
> >> >
> >>
>


Re: Testing Apache Flink 0.9.0-rc1

2015-06-10 Thread Gyula Fóra
This feature needs to be included in the release, it has been tested and
used extensively. And many applciations depend on it.

Maximilian Michels  ezt írta (időpont: 2015. jún. 10., Sze,
10:47):

> With all the issues discovered, it looks like we'll have another release
> candidate. Right now, we have discovered the following problems:
>
> 1 YARN ITCase fails [fixed via 2eb5cfe]
> 2 No Jar for SessionWindowing example [fixed in #809]
> 3 Wrong description of the input format for the graph examples (eg.
> ConnectedComponents) [fixed in #809]
> 4 TaskManagerFailsWithSlotSharingITCase fails
> 5 ComplexIntegrationTest.complexIntegrationTest1() (FLINK-2192) fails
> 6 Submitting KMeans example to Web Submission Client does not work on
> Firefox.
> 7 Zooming is buggy in Web Submission Client (Firefox)
>
> Do we have someone familiar with the web interface who could take a look at
> the Firefox issues?
>
> One more important thing: The release-0.9 branch should only be used for
> bug fixes or prior discussed feature changes. Adding new features defies
> the purpose of carefully testing in advance and can have unforeseeable
> consequences. In particular, I'm referring to #810 pull request:
> https://github.com/apache/flink/pull/810
>
> IMHO, this one shouldn't have been cherry-picked onto the release-0.9
> branch. I would like to remove it from there if no objections are raised.
>
>
> https://github.com/apache/flink/commit/e0e6f59f309170e5217bdfbf5d30db87c947f8ce
>
> On Wed, Jun 10, 2015 at 8:52 AM, Aljoscha Krettek 
> wrote:
>
> > This doesn't look good, yes.
> >
> > On Wed, Jun 10, 2015 at 1:32 AM, Ufuk Celebi  wrote:
> >
> > > While looking into FLINK-2188 (HBase input) I've discovered that Hadoop
> > > input formats implementing Configurable (like
> mapreduce.TableInputFormat)
> > > don't have the Hadoop configuration set via setConf(Configuration).
> > >
> > > I have a small fix for this, which I have to clean up. First, I wanted
> to
> > > check what you think about this issue wrt the release. Personally, I
> > think
> > > this is a release blocker, because it essentially means that no Hadoop
> > > input format, which relies on the Configuration instance to be set this
> > way
> > > will work (this is to some extent a bug of the respective input
> formats)
> > –
> > > most notably the HBase TableInputFormat.
> > >
> > > – Ufuk
> > >
> > > On 09 Jun 2015, at 18:07, Chiwan Park  wrote:
> > >
> > > > I attached jps and jstack log about hanging
> > > TaskManagerFailsWithSlotSharingITCase to JIRA FLINK-2183.
> > > >
> > > > Regards,
> > > > Chiwan Park
> > > >
> > > >> On Jun 10, 2015, at 12:28 AM, Aljoscha Krettek  >
> > > wrote:
> > > >>
> > > >> I discovered something that might be a feature, rather than a bug.
> > When
> > > you
> > > >> submit an example using the web client without giving parameters the
> > > >> program fails with this:
> > > >>
> > > >> org.apache.flink.client.program.ProgramInvocationException: The main
> > > method
> > > >> caused an error.
> > > >>
> > > >> at
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
> > > >>
> > > >> at
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> > > >>
> > > >> at org.apache.flink.client.program.Client.run(Client.java:315)
> > > >>
> > > >> at
> > > >>
> > >
> >
> org.apache.flink.client.web.JobSubmissionServlet.doGet(JobSubmissionServlet.java:302)
> > > >>
> > > >> at javax.servlet.http.HttpServlet.service(HttpServlet.java:668)
> > > >>
> > > >> at javax.servlet.http.HttpServlet.service(HttpServlet.java:770)
> > > >>
> > > >> at
> > > org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:532)
> > > >>
> > > >> at
> > > >>
> > >
> >
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:453)
> > > >>
> > > >> at
> > > >>
> > >
> >
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:227)
> > > >>
> > > >> at
> > > >>
> > >
> >
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:965)
> > > >>
> > > >> at
> > >
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:388)
> > > >>
> > > >> at
> > > >>
> > >
> >
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:187)
> > > >>
> > > >> at
> > > >>
> > >
> >
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:901)
> > > >>
> > > >> at
> > > >>
> > >
> >
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:117)
> > > >>
> > > >> at
> > >
> org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:47)
> > > >>
> > > >> at
> > > >>
> > >
> >
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:113)
> > > >>
> > > >> at org.eclipse.jetty.server.Server.handle(Server.java:352)
> > > >>
> > > >> at
> > > >>
> > >
> >
> org.eclipse.jetty.server.HttpConnection.handleRequest(Ht

Re: Force enabling checkpoints for iterative streaming jobs

2015-06-10 Thread Gyula Fóra
And also I would like to remind everyone that any fault tolerance we
provide is only as good as the fault tolerance of the master node. Which is
non existent at the moment.

So I don't see a reason why a user should not be able to choose whether he
wants state checkpoints for iterations as well.

In any case this will be used by King for instance, so making it part of
the release would save a lot of work for everyone.

Paris Carbone  ezt írta (időpont: 2015. jún. 10., Sze,
10:29):

>
> To continue Gyula's point, for consistent snapshots we need to persist the
> records in transit within the loop  and also slightly change the current
> protocol since it works only for DAGs. Before going into that direction
> though I would propose we first see whether there is a nice way to make
> iterations more structured.
>
> Paris
> ____
> From: Gyula Fóra 
> Sent: Wednesday, June 10, 2015 10:19 AM
> To: dev@flink.apache.org
> Subject: Re: Force enabling checkpoints for iterative streaming jobs
>
> I disagree. Not having checkpointed operators inside the iteration still
> breaks the guarantees.
>
> It is not about the states it is about the loop itself.
> On Wed, Jun 10, 2015 at 10:12 AM Aljoscha Krettek 
> wrote:
>
> > This is the answer I gave on the PR (we should have one place for
> > discussing this, though):
> >
> > I would be against merging this in the current form. What I propose is
> > to analyse the topology to verify that there are no checkpointed
> > operators inside iterations. Operators before and after iterations can
> > be checkpointed and we can safely allow the user to enable
> > checkpointing.
> >
> > If we have the code to analyse which operators are inside iterations
> > we could also disallow windows inside iterations. I think windows
> > inside iterations don't make sense since elements in different
> > "iterations" would end up in the same window. Maybe I'm wrong here
> > though, then please correct me.
> >
> > On Wed, Jun 10, 2015 at 10:08 AM, Márton Balassi
> >  wrote:
> > > I agree that for the sake of the above mentioned use cases it is
> > reasonable
> > > to add this to the release with the right documentation, for machine
> > > learning potentially loosing one round of feedback data should not
> > matter.
> > >
> > > Let us not block prominent users until the next release on this.
> > >
> > > On Wed, Jun 10, 2015 at 8:09 AM, Gyula Fóra 
> > wrote:
> > >
> > >> As for people currently suffering from it:
> > >>
> > >> An application King is developing requires iterations, and they need
> > >> checkpoints. Practically all SAMOA programs would need this.
> > >>
> > >> It is very likely that the state interfaces will be changed after the
> > >> release, so this is not something that we can just add later. I don't
> > see a
> > >> reason why we should not add it, as it is clearly documented. In this
> > >> actual case not having guarantees at all means people will never use
> it
> > in
> > >> any production system. Having limited guarantees means that it will
> > depend
> > >> on the application.
> > >>
> > >> On Wed, Jun 10, 2015 at 12:53 AM, Ufuk Celebi  wrote:
> > >>
> > >> > Hey Gyula,
> > >> >
> > >> > I understand your reasoning, but I don't think its worth to rush
> this
> > >> into
> > >> > the release.
> > >> >
> > >> > As you've said, we cannot give precise guarantees. But this is
> > arguably
> > >> > one of the key requirements for any fault tolerance mechanism.
> > Therefore
> > >> I
> > >> > disagree that this is better than not having anything at all. I
> think
> > it
> > >> > will already go a long way to have the non-iterative case working
> > >> reliably.
> > >> >
> > >> > And as far as I know there are no users really suffering from this
> at
> > the
> > >> > moment (in the sense that someone has complained on the mailing
> list).
> > >> >
> > >> > Hence, I vote to postpone this.
> > >> >
> > >> > – Ufuk
> > >> >
> > >> > On 10 Jun 2015, at 00:19, Gyula Fóra  wrote:
> > >> >
> > >> > > Hey all,
> > >> > >
> > >> > > It is currently impossible to ena

Re: Force enabling checkpoints for iterative streaming jobs

2015-06-10 Thread Gyula Fóra
The other tests verify that the checkpointing algorithm runs properly. That
also ensures that it runs for iterations because a loop is just an extra
source and sink in the jobgraph (so it is the same for the algorithm).

Fabian Hueske  ezt írta (időpont: 2015. jún. 10., Sze,
11:19):

> Without going into the details, how well tested is this feature? The PR
> only extends one test by a few lines.
>
> Is that really enough to ensure that
> 1) the change does not cause trouble
> 2) is working as expected
>
> If this feature should go into the release, it must be thoroughly checked
> and we must take the time for that.
> Including code and hoping for the best because time is scarce is not an
> option IMO.
>
> Fabian
>
>
> 2015-06-10 11:05 GMT+02:00 Gyula Fóra :
>
> > And also I would like to remind everyone that any fault tolerance we
> > provide is only as good as the fault tolerance of the master node. Which
> is
> > non existent at the moment.
> >
> > So I don't see a reason why a user should not be able to choose whether
> he
> > wants state checkpoints for iterations as well.
> >
> > In any case this will be used by King for instance, so making it part of
> > the release would save a lot of work for everyone.
> >
> > Paris Carbone  ezt írta (időpont: 2015. jún. 10., Sze,
> > 10:29):
> >
> > >
> > > To continue Gyula's point, for consistent snapshots we need to persist
> > the
> > > records in transit within the loop  and also slightly change the
> current
> > > protocol since it works only for DAGs. Before going into that direction
> > > though I would propose we first see whether there is a nice way to make
> > > iterations more structured.
> > >
> > > Paris
> > > 
> > > From: Gyula Fóra 
> > > Sent: Wednesday, June 10, 2015 10:19 AM
> > > To: dev@flink.apache.org
> > > Subject: Re: Force enabling checkpoints for iterative streaming jobs
> > >
> > > I disagree. Not having checkpointed operators inside the iteration
> still
> > > breaks the guarantees.
> > >
> > > It is not about the states it is about the loop itself.
> > > On Wed, Jun 10, 2015 at 10:12 AM Aljoscha Krettek  >
> > > wrote:
> > >
> > > > This is the answer I gave on the PR (we should have one place for
> > > > discussing this, though):
> > > >
> > > > I would be against merging this in the current form. What I propose
> is
> > > > to analyse the topology to verify that there are no checkpointed
> > > > operators inside iterations. Operators before and after iterations
> can
> > > > be checkpointed and we can safely allow the user to enable
> > > > checkpointing.
> > > >
> > > > If we have the code to analyse which operators are inside iterations
> > > > we could also disallow windows inside iterations. I think windows
> > > > inside iterations don't make sense since elements in different
> > > > "iterations" would end up in the same window. Maybe I'm wrong here
> > > > though, then please correct me.
> > > >
> > > > On Wed, Jun 10, 2015 at 10:08 AM, Márton Balassi
> > > >  wrote:
> > > > > I agree that for the sake of the above mentioned use cases it is
> > > > reasonable
> > > > > to add this to the release with the right documentation, for
> machine
> > > > > learning potentially loosing one round of feedback data should not
> > > > matter.
> > > > >
> > > > > Let us not block prominent users until the next release on this.
> > > > >
> > > > > On Wed, Jun 10, 2015 at 8:09 AM, Gyula Fóra 
> > > > wrote:
> > > > >
> > > > >> As for people currently suffering from it:
> > > > >>
> > > > >> An application King is developing requires iterations, and they
> need
> > > > >> checkpoints. Practically all SAMOA programs would need this.
> > > > >>
> > > > >> It is very likely that the state interfaces will be changed after
> > the
> > > > >> release, so this is not something that we can just add later. I
> > don't
> > > > see a
> > > > >> reason why we should not add it, as it is clearly documented. In
> > this
> > > > >> actual case not having guarantees at all means people wil

Re: Force enabling checkpoints for iterative streaming jobs

2015-06-10 Thread Gyula Fóra
I don't understand the question, I vote for checkpointing all state in the
job, even inside iterations (its more of a loop).

Aljoscha Krettek  ezt írta (időpont: 2015. jún. 10.,
Sze, 12:34):

> I don't understand why having the state inside an iteration but not
> the elements that correspond to this state or created this state is
> desirable. Maybe an example could help understand this better?
>
> On Wed, Jun 10, 2015 at 11:27 AM, Gyula Fóra  wrote:
> > The other tests verify that the checkpointing algorithm runs properly.
> That
> > also ensures that it runs for iterations because a loop is just an extra
> > source and sink in the jobgraph (so it is the same for the algorithm).
> >
> > Fabian Hueske  ezt írta (időpont: 2015. jún. 10.,
> Sze,
> > 11:19):
> >
> >> Without going into the details, how well tested is this feature? The PR
> >> only extends one test by a few lines.
> >>
> >> Is that really enough to ensure that
> >> 1) the change does not cause trouble
> >> 2) is working as expected
> >>
> >> If this feature should go into the release, it must be thoroughly
> checked
> >> and we must take the time for that.
> >> Including code and hoping for the best because time is scarce is not an
> >> option IMO.
> >>
> >> Fabian
> >>
> >>
> >> 2015-06-10 11:05 GMT+02:00 Gyula Fóra :
> >>
> >> > And also I would like to remind everyone that any fault tolerance we
> >> > provide is only as good as the fault tolerance of the master node.
> Which
> >> is
> >> > non existent at the moment.
> >> >
> >> > So I don't see a reason why a user should not be able to choose
> whether
> >> he
> >> > wants state checkpoints for iterations as well.
> >> >
> >> > In any case this will be used by King for instance, so making it part
> of
> >> > the release would save a lot of work for everyone.
> >> >
> >> > Paris Carbone  ezt írta (időpont: 2015. jún. 10., Sze,
> >> > 10:29):
> >> >
> >> > >
> >> > > To continue Gyula's point, for consistent snapshots we need to
> persist
> >> > the
> >> > > records in transit within the loop  and also slightly change the
> >> current
> >> > > protocol since it works only for DAGs. Before going into that
> direction
> >> > > though I would propose we first see whether there is a nice way to
> make
> >> > > iterations more structured.
> >> > >
> >> > > Paris
> >> > > 
> >> > > From: Gyula Fóra 
> >> > > Sent: Wednesday, June 10, 2015 10:19 AM
> >> > > To: dev@flink.apache.org
> >> > > Subject: Re: Force enabling checkpoints for iterative streaming jobs
> >> > >
> >> > > I disagree. Not having checkpointed operators inside the iteration
> >> still
> >> > > breaks the guarantees.
> >> > >
> >> > > It is not about the states it is about the loop itself.
> >> > > On Wed, Jun 10, 2015 at 10:12 AM Aljoscha Krettek <
> aljos...@apache.org
> >> >
> >> > > wrote:
> >> > >
> >> > > > This is the answer I gave on the PR (we should have one place for
> >> > > > discussing this, though):
> >> > > >
> >> > > > I would be against merging this in the current form. What I
> propose
> >> is
> >> > > > to analyse the topology to verify that there are no checkpointed
> >> > > > operators inside iterations. Operators before and after iterations
> >> can
> >> > > > be checkpointed and we can safely allow the user to enable
> >> > > > checkpointing.
> >> > > >
> >> > > > If we have the code to analyse which operators are inside
> iterations
> >> > > > we could also disallow windows inside iterations. I think windows
> >> > > > inside iterations don't make sense since elements in different
> >> > > > "iterations" would end up in the same window. Maybe I'm wrong here
> >> > > > though, then please correct me.
> >> > > >
> >> > > > On Wed, Jun 10, 2015 at 10:08 AM, Márton Balassi
> >> > > >  wrote:
> >> > > > > I agree that for the 

Re: Force enabling checkpoints for iterative streaming jobs

2015-06-10 Thread Gyula Fóra
You are right, to have consistent results we would need to persist the
records.

But since we cannot do that right now, we can still checkpoint all operator
states and understand that inflight records in the loop are lost on failure.

This is acceptable for most the use-cases that we have developed so far for
iterations (machine learning, graph updates, etc.) What is not acceptable
is to not have checkpointing at all.

Aljoscha Krettek  ezt írta (időpont: 2015. jún. 10.,
Sze, 12:43):

> The elements that are in-flight in an iteration are also state of the
> job. I'm wondering whether the state inside iterations still makes
> sense without these in-flight elements. But I also don't know the King
> use-case, that's why I though an example could be helpful.
>
> On Wed, Jun 10, 2015 at 12:37 PM, Gyula Fóra  wrote:
> > I don't understand the question, I vote for checkpointing all state in
> the
> > job, even inside iterations (its more of a loop).
> >
> > Aljoscha Krettek  ezt írta (időpont: 2015. jún.
> 10.,
> > Sze, 12:34):
> >
> >> I don't understand why having the state inside an iteration but not
> >> the elements that correspond to this state or created this state is
> >> desirable. Maybe an example could help understand this better?
> >>
> >> On Wed, Jun 10, 2015 at 11:27 AM, Gyula Fóra 
> wrote:
> >> > The other tests verify that the checkpointing algorithm runs properly.
> >> That
> >> > also ensures that it runs for iterations because a loop is just an
> extra
> >> > source and sink in the jobgraph (so it is the same for the algorithm).
> >> >
> >> > Fabian Hueske  ezt írta (időpont: 2015. jún. 10.,
> >> Sze,
> >> > 11:19):
> >> >
> >> >> Without going into the details, how well tested is this feature? The
> PR
> >> >> only extends one test by a few lines.
> >> >>
> >> >> Is that really enough to ensure that
> >> >> 1) the change does not cause trouble
> >> >> 2) is working as expected
> >> >>
> >> >> If this feature should go into the release, it must be thoroughly
> >> checked
> >> >> and we must take the time for that.
> >> >> Including code and hoping for the best because time is scarce is not
> an
> >> >> option IMO.
> >> >>
> >> >> Fabian
> >> >>
> >> >>
> >> >> 2015-06-10 11:05 GMT+02:00 Gyula Fóra :
> >> >>
> >> >> > And also I would like to remind everyone that any fault tolerance
> we
> >> >> > provide is only as good as the fault tolerance of the master node.
> >> Which
> >> >> is
> >> >> > non existent at the moment.
> >> >> >
> >> >> > So I don't see a reason why a user should not be able to choose
> >> whether
> >> >> he
> >> >> > wants state checkpoints for iterations as well.
> >> >> >
> >> >> > In any case this will be used by King for instance, so making it
> part
> >> of
> >> >> > the release would save a lot of work for everyone.
> >> >> >
> >> >> > Paris Carbone  ezt írta (időpont: 2015. jún. 10.,
> Sze,
> >> >> > 10:29):
> >> >> >
> >> >> > >
> >> >> > > To continue Gyula's point, for consistent snapshots we need to
> >> persist
> >> >> > the
> >> >> > > records in transit within the loop  and also slightly change the
> >> >> current
> >> >> > > protocol since it works only for DAGs. Before going into that
> >> direction
> >> >> > > though I would propose we first see whether there is a nice way
> to
> >> make
> >> >> > > iterations more structured.
> >> >> > >
> >> >> > > Paris
> >> >> > > 
> >> >> > > From: Gyula Fóra 
> >> >> > > Sent: Wednesday, June 10, 2015 10:19 AM
> >> >> > > To: dev@flink.apache.org
> >> >> > > Subject: Re: Force enabling checkpoints for iterative streaming
> jobs
> >> >> > >
> >> >> > > I disagree. Not having checkpointed operators inside the
> iteration
> >> >> still
> >> >> > > breaks the guarante

Re: Force enabling checkpoints for iterative streaming jobs

2015-06-10 Thread Gyula Fóra
Here is an example for you:

Parallel streaming kmeans, the state we keep is the current cluster
centers, and we use iterations to sync the centers across parallel
instances.
We can afford lost model updated in the loop but we need the checkpoint the
models.

https://github.com/gyfora/stream-clustering/blob/master/src/main/scala/stream/clustering/StreamClustering.scala

(checkpointing is not turned on but you will get the point)



Gyula Fóra  ezt írta (időpont: 2015. jún. 10., Sze,
12:47):

> You are right, to have consistent results we would need to persist the
> records.
>
> But since we cannot do that right now, we can still checkpoint all
> operator states and understand that inflight records in the loop are lost
> on failure.
>
> This is acceptable for most the use-cases that we have developed so far
> for iterations (machine learning, graph updates, etc.) What is not
> acceptable is to not have checkpointing at all.
>
> Aljoscha Krettek  ezt írta (időpont: 2015. jún. 10.,
> Sze, 12:43):
>
>> The elements that are in-flight in an iteration are also state of the
>> job. I'm wondering whether the state inside iterations still makes
>> sense without these in-flight elements. But I also don't know the King
>> use-case, that's why I though an example could be helpful.
>>
>> On Wed, Jun 10, 2015 at 12:37 PM, Gyula Fóra 
>> wrote:
>> > I don't understand the question, I vote for checkpointing all state in
>> the
>> > job, even inside iterations (its more of a loop).
>> >
>> > Aljoscha Krettek  ezt írta (időpont: 2015. jún.
>> 10.,
>> > Sze, 12:34):
>> >
>> >> I don't understand why having the state inside an iteration but not
>> >> the elements that correspond to this state or created this state is
>> >> desirable. Maybe an example could help understand this better?
>> >>
>> >> On Wed, Jun 10, 2015 at 11:27 AM, Gyula Fóra 
>> wrote:
>> >> > The other tests verify that the checkpointing algorithm runs
>> properly.
>> >> That
>> >> > also ensures that it runs for iterations because a loop is just an
>> extra
>> >> > source and sink in the jobgraph (so it is the same for the
>> algorithm).
>> >> >
>> >> > Fabian Hueske  ezt írta (időpont: 2015. jún. 10.,
>> >> Sze,
>> >> > 11:19):
>> >> >
>> >> >> Without going into the details, how well tested is this feature?
>> The PR
>> >> >> only extends one test by a few lines.
>> >> >>
>> >> >> Is that really enough to ensure that
>> >> >> 1) the change does not cause trouble
>> >> >> 2) is working as expected
>> >> >>
>> >> >> If this feature should go into the release, it must be thoroughly
>> >> checked
>> >> >> and we must take the time for that.
>> >> >> Including code and hoping for the best because time is scarce is
>> not an
>> >> >> option IMO.
>> >> >>
>> >> >> Fabian
>> >> >>
>> >> >>
>> >> >> 2015-06-10 11:05 GMT+02:00 Gyula Fóra :
>> >> >>
>> >> >> > And also I would like to remind everyone that any fault tolerance
>> we
>> >> >> > provide is only as good as the fault tolerance of the master node.
>> >> Which
>> >> >> is
>> >> >> > non existent at the moment.
>> >> >> >
>> >> >> > So I don't see a reason why a user should not be able to choose
>> >> whether
>> >> >> he
>> >> >> > wants state checkpoints for iterations as well.
>> >> >> >
>> >> >> > In any case this will be used by King for instance, so making it
>> part
>> >> of
>> >> >> > the release would save a lot of work for everyone.
>> >> >> >
>> >> >> > Paris Carbone  ezt írta (időpont: 2015. jún. 10.,
>> Sze,
>> >> >> > 10:29):
>> >> >> >
>> >> >> > >
>> >> >> > > To continue Gyula's point, for consistent snapshots we need to
>> >> persist
>> >> >> > the
>> >> >> > > records in transit within the loop  and also slightly change the
>> >> >> current
>> >> >> > > protocol since it wo

Re: Force enabling checkpoints for iterative streaming jobs

2015-06-10 Thread Gyula Fóra
Max suggested that I add this feature slightly hidden to the execution
config instance.

The problem then is that I either make a public field in the config or once
again add a method.

Any ideas?

Aljoscha Krettek  ezt írta (időpont: 2015. jún. 10.,
Sze, 14:07):

> Thanks :D, now I see. It makes sense because we don't have another way
> of keeping the cluster state synced/distributed across parallel
> instances of the operators.
>
> On Wed, Jun 10, 2015 at 12:52 PM, Gyula Fóra  wrote:
> > Here is an example for you:
> >
> > Parallel streaming kmeans, the state we keep is the current cluster
> > centers, and we use iterations to sync the centers across parallel
> > instances.
> > We can afford lost model updated in the loop but we need the checkpoint
> the
> > models.
> >
> >
> https://github.com/gyfora/stream-clustering/blob/master/src/main/scala/stream/clustering/StreamClustering.scala
> >
> > (checkpointing is not turned on but you will get the point)
> >
> >
> >
> > Gyula Fóra  ezt írta (időpont: 2015. jún. 10.,
> Sze,
> > 12:47):
> >
> >> You are right, to have consistent results we would need to persist the
> >> records.
> >>
> >> But since we cannot do that right now, we can still checkpoint all
> >> operator states and understand that inflight records in the loop are
> lost
> >> on failure.
> >>
> >> This is acceptable for most the use-cases that we have developed so far
> >> for iterations (machine learning, graph updates, etc.) What is not
> >> acceptable is to not have checkpointing at all.
> >>
> >> Aljoscha Krettek  ezt írta (időpont: 2015. jún.
> 10.,
> >> Sze, 12:43):
> >>
> >>> The elements that are in-flight in an iteration are also state of the
> >>> job. I'm wondering whether the state inside iterations still makes
> >>> sense without these in-flight elements. But I also don't know the King
> >>> use-case, that's why I though an example could be helpful.
> >>>
> >>> On Wed, Jun 10, 2015 at 12:37 PM, Gyula Fóra 
> >>> wrote:
> >>> > I don't understand the question, I vote for checkpointing all state
> in
> >>> the
> >>> > job, even inside iterations (its more of a loop).
> >>> >
> >>> > Aljoscha Krettek  ezt írta (időpont: 2015. jún.
> >>> 10.,
> >>> > Sze, 12:34):
> >>> >
> >>> >> I don't understand why having the state inside an iteration but not
> >>> >> the elements that correspond to this state or created this state is
> >>> >> desirable. Maybe an example could help understand this better?
> >>> >>
> >>> >> On Wed, Jun 10, 2015 at 11:27 AM, Gyula Fóra 
> >>> wrote:
> >>> >> > The other tests verify that the checkpointing algorithm runs
> >>> properly.
> >>> >> That
> >>> >> > also ensures that it runs for iterations because a loop is just an
> >>> extra
> >>> >> > source and sink in the jobgraph (so it is the same for the
> >>> algorithm).
> >>> >> >
> >>> >> > Fabian Hueske  ezt írta (időpont: 2015. jún.
> 10.,
> >>> >> Sze,
> >>> >> > 11:19):
> >>> >> >
> >>> >> >> Without going into the details, how well tested is this feature?
> >>> The PR
> >>> >> >> only extends one test by a few lines.
> >>> >> >>
> >>> >> >> Is that really enough to ensure that
> >>> >> >> 1) the change does not cause trouble
> >>> >> >> 2) is working as expected
> >>> >> >>
> >>> >> >> If this feature should go into the release, it must be thoroughly
> >>> >> checked
> >>> >> >> and we must take the time for that.
> >>> >> >> Including code and hoping for the best because time is scarce is
> >>> not an
> >>> >> >> option IMO.
> >>> >> >>
> >>> >> >> Fabian
> >>> >> >>
> >>> >> >>
> >>> >> >> 2015-06-10 11:05 GMT+02:00 Gyula Fóra :
> >>> >> >>
> >>> >> >> > And also I would like to remind everyone that any fault
> tolerance
> >>&

Re: Force enabling checkpoints for iterative streaming jobs

2015-06-10 Thread Gyula Fóra
Then I suggest we leave it in the environment along with the other
checkpointing methods.

I updated my PR so it includes hints how to force enable checkpoints (and
the reduced guarantees) when an error is thrown for iterative jobs.

On Wed, Jun 10, 2015 at 2:46 PM, Aljoscha Krettek 
wrote:

> We could add a method on the ExecutionConfig but mark it as deprecated
> and explain that it will go away once the interplay of iterations,
> state and so on is properly figured out.
>
> On Wed, Jun 10, 2015 at 2:36 PM, Ufuk Celebi  wrote:
> > On 10 Jun 2015, at 14:29, Gyula Fóra  wrote:
> >
> >> Max suggested that I add this feature slightly hidden to the execution
> >> config instance.
> >>
> >> The problem then is that I either make a public field in the config or
> once
> >> again add a method.
> >>
> >> Any ideas?
> >
> > I thought about this as well. The only way to add this in a "hidden" way
> would be have access to the Configuration instance – which the user does
> not. So I think there is no way w/o adding public access.
>


Re: Force enabling checkpoints for iterative streaming jobs

2015-06-10 Thread Gyula Fóra
Done, I will merge it after travis passes.

Maximilian Michels  ezt írta (időpont: 2015. jún. 10., Sze,
15:25):

> Let's mark the method of the environment as deprecated like Aljoscha
> suggested. Then I think we could merge it.
>
> On Wed, Jun 10, 2015 at 2:50 PM, Gyula Fóra  wrote:
>
> > Then I suggest we leave it in the environment along with the other
> > checkpointing methods.
> >
> > I updated my PR so it includes hints how to force enable checkpoints (and
> > the reduced guarantees) when an error is thrown for iterative jobs.
> >
> > On Wed, Jun 10, 2015 at 2:46 PM, Aljoscha Krettek 
> > wrote:
> >
> > > We could add a method on the ExecutionConfig but mark it as deprecated
> > > and explain that it will go away once the interplay of iterations,
> > > state and so on is properly figured out.
> > >
> > > On Wed, Jun 10, 2015 at 2:36 PM, Ufuk Celebi  wrote:
> > > > On 10 Jun 2015, at 14:29, Gyula Fóra  wrote:
> > > >
> > > >> Max suggested that I add this feature slightly hidden to the
> execution
> > > >> config instance.
> > > >>
> > > >> The problem then is that I either make a public field in the config
> or
> > > once
> > > >> again add a method.
> > > >>
> > > >> Any ideas?
> > > >
> > > > I thought about this as well. The only way to add this in a "hidden"
> > way
> > > would be have access to the Configuration instance – which the user
> does
> > > not. So I think there is no way w/o adding public access.
> > >
> >
>


Re: Testing Apache Flink 0.9.0-rc2

2015-06-15 Thread Gyula Fóra
The checkpoint cleanup works for HDFS right? I assume the job manager
should see that as well.

This is not a trivial problem in general, so the assumptions we were making
now that the JM can actually execute the cleanup logic.

Aljoscha Krettek  ezt írta (időpont: 2015. jún. 15.,
H, 15:40):

> @Ufuk The cleanup bug for file:// checkpoints is not easy to fix IMHO.
>
> On Mon, 15 Jun 2015 at 15:39 Aljoscha Krettek  wrote:
>
> > Oh yes, on that I agree. I'm just saying that the checkpoint setting
> > should maybe be a central setting.
> >
> > On Mon, 15 Jun 2015 at 15:38 Matthias J. Sax <
> > mj...@informatik.hu-berlin.de> wrote:
> >
> >> Hi,
> >>
> >> IMHO, it is very common that Workers do have their own config files (eg,
> >> Storm works the same way). And I think it make a lot of senses. You
> >> might run Flink in an heterogeneous cluster and you want to assign
> >> different memory and slots for different hardware. This would not be
> >> possible using a single config file (specified at the master and
> >> distribute it).
> >>
> >>
> >> -Matthias
> >>
> >> On 06/15/2015 03:30 PM, Aljoscha Krettek wrote:
> >> > Regarding 1), thats why I said "bugs and features". :D But I think of
> >> it as
> >> > a bug, since people will normally set in in the flink-conf.yaml on the
> >> > master and assume that it works. That's what I assumed and it took me
> a
> >> > while to figure out that the task managers don't respect this setting.
> >> >
> >> > Regarding 3), if you think about it, this could never work. The state
> >> > handle cleanup logic happens purely on the JobManager. So what happens
> >> is
> >> > that the TaskManagers create state in some directory, let's say
> >> > /tmp/checkpoints, on the TaskManager. For cleanup, the JobManager gets
> >> the
> >> > state handle and calls discard (on the JobManager), this tries to
> >> cleanup
> >> > the state in /tmp/checkpoints, but of course, there is nothing there
> >> since
> >> > we are still on the JobManager.
> >> >
> >> > On Mon, 15 Jun 2015 at 15:23 Márton Balassi  >
> >> > wrote:
> >> >
> >> >> @Aljoscha:
> >> >> 1) I think this just means that you can set the state backend on a
> >> >> taskmanager basis.
> >> >> 3) This is a serious issue then. Is it work when you set it in the
> >> >> flink-conf.yaml?
> >> >>
> >> >> On Mon, Jun 15, 2015 at 3:17 PM, Aljoscha Krettek <
> aljos...@apache.org
> >> >
> >> >> wrote:
> >> >>
> >> >>> So, during my testing of the state checkpointing on a cluster I
> >> >> discovered
> >> >>> several things (bugs and features):
> >> >>>
> >> >>>  - If you have a setup where the configuration is not synced to the
> >> >> workers
> >> >>> they do not pick up the state back-end configuration. The workers do
> >> not
> >> >>> respect the setting in the flink-cont.yaml on the master
> >> >>> - HDFS checkpointing works fine if you manually set it as the
> per-job
> >> >>> state-backend using setStateHandleProvider()
> >> >>> - If you manually set the stateHandleProvider to a "file://"
> backend,
> >> old
> >> >>> checkpoints will not be cleaned up, they will also not be cleaned up
> >> >> when a
> >> >>> job is finished.
> >> >>>
> >> >>> On Sun, 14 Jun 2015 at 23:22 Maximilian Michels 
> >> wrote:
> >> >>>
> >>  Hi Henry,
> >> 
> >>  This is just a dry run. The goal is to get everything in shape for
> a
> >> >>> proper
> >>  vote.
> >> 
> >>  Kind regards,
> >>  Max
> >> 
> >> 
> >>  On Sun, Jun 14, 2015 at 7:58 PM, Henry Saputra <
> >> >> henry.sapu...@gmail.com>
> >>  wrote:
> >> 
> >> > Hi Max,
> >> >
> >> > Are you doing official VOTE on the RC on 0.9 release or this is
> just
> >> >> a
> >>  dry
> >> > run?
> >> >
> >> >
> >> > - Henry
> >> >
> >> > On Sun, Jun 14, 2015 at 9:11 AM, Maximilian Michels <
> m...@apache.org
> >> >
> >> > wrote:
> >> >> Dear Flink community,
> >> >>
> >> >> Here's the second release candidate for the 0.9.0 release. We
> >> >> haven't
> >> > had a
> >> >> formal vote on the previous release candidate but it received an
> >>  implicit
> >> >> -1 because of a couple of issues.
> >> >>
> >> >> Thanks to the hard-working Flink devs these issues should be
> solved
> >>  now.
> >> >> The following commits have been added to the second release
> >> >>> candidate:
> >> >>
> >> >> f5f0709 [FLINK-2194] [type extractor] Excludes Writable type from
> >> >> WritableTypeInformation to be treated as an interface
> >> >> 40e2df5 [FLINK-2072] [ml] Adds quickstart guide
> >> >> af0fee5 [FLINK-2207] Fix TableAPI conversion documenation and
> >> >> further
> >> >> renamings for consistency.
> >> >> e513be7 [FLINK-2206] Fix incorrect counts of finished, canceled,
> >> >> and
> >> > failed
> >> >> jobs in webinterface
> >> >> ecfde6d [docs][release] update stable version to 0.9.0
> >> >> 4d8ae1c [docs] remove obsolete YARN link and cleanup downloa

Removing reduce/aggregations from non-grouped data streams

2015-06-22 Thread Gyula Fóra
Hey all,
Currently we have reduce and aggregation methods for non-grouped
DataStreams as well, which will produce local aggregates depending on the
parallelism of the operator.

This behaviour is neither intuitive nor useful as it only produces sensible
results if the user specifically sets the parallelism to 1 which should not
be encouraged.

I would like to remove these methods from the DataStream api and only keep
it for GroupedDataStreams and WindowedDataStream where the aggregation is
either executed per-key or per-window.

Cheers,
Gyula


Re: Thoughts About Streaming

2015-06-22 Thread Gyula Fóra
Hi Aljoscha,

Thanks for the nice summary, this is a very good initiative.

I added some comments to the respective sections (where I didnt fully agree
:).).
At some point I think it would be good to have a public hangout session on
this, which could make a more dynamic discussion.

Cheers,
Gyula

Aljoscha Krettek  ezt írta (időpont: 2015. jún. 22.,
H, 21:34):

> Hi,
> with people proposing changes to the streaming part I also wanted to throw
> my hat into the ring. :D
>
> During the last few months, while I was getting acquainted with the
> streaming system, I wrote down some thoughts I had about how things could
> be improved. Hopefully, they are in somewhat coherent shape now, so please
> have a look if you are interested in this:
>
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>
> This mostly covers:
>  - Timestamps assigned at sources
>  - Out-of-order processing of elements in window operators
>  - API design
>
> Please let me know what you think. Comment in the document or here in the
> mailing list.
>
> I have a PR in the makings that would introduce source timestamps and
> watermarks for keeping track of them. I also hacked a proof-of-concept of a
> windowing system that is able to process out-of-order elements using a
> FlatMap operator. (It uses panes to perform efficient pre-aggregations.)
>
> Cheers,
> Aljoscha
>


Re: Removing reduce/aggregations from non-grouped data streams

2015-06-22 Thread Gyula Fóra
I opened a PR <https://github.com/apache/flink/pull/860> for this.

Stephan Ewen  ezt írta (időpont: 2015. jún. 22., H,
19:25):

> +1 totally agreed
>
> On Mon, Jun 22, 2015 at 5:32 PM, Gyula Fóra  wrote:
>
> > Hey all,
> > Currently we have reduce and aggregation methods for non-grouped
> > DataStreams as well, which will produce local aggregates depending on the
> > parallelism of the operator.
> >
> > This behaviour is neither intuitive nor useful as it only produces
> sensible
> > results if the user specifically sets the parallelism to 1 which should
> not
> > be encouraged.
> >
> > I would like to remove these methods from the DataStream api and only
> keep
> > it for GroupedDataStreams and WindowedDataStream where the aggregation is
> > either executed per-key or per-window.
> >
> > Cheers,
> > Gyula
> >
>


Re: Thoughts About Streaming

2015-06-23 Thread Gyula Fóra
Hey

I think we should not block PRs unnecessarily if your suggested changes
might touch them at some point.

Also I still think we should not put everything in the Datastream because
it will be a huge mess.

Also we need to agree on the out of order processing, whether we want it
the way you proposed it(which is quite costly). Another alternative
approach there which fits in the current windowing is to filter out if
order events and apply a special handling operator on them. This would be
fairly lightweight.

My point is that we need to consider some alternative solutions. And we
should not block contributions along the way.

Cheers
Gyula

On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek 
wrote:

> The reason I posted this now is that we need to think about the API and
> windowing before proceeding with the PRs of Gabor (inverse reduce) and
> Gyula (removal of "aggregate" functions on DataStream).
>
> For the windowing, I think that the current model does not work for
> out-of-order processing. Therefore, the whole windowing infrastructure will
> basically have to be redone. Meaning also that any work on the
> pre-aggregators or optimizations that we do now becomes useless.
>
> For the API, I proposed to restructure the interactions between all the
> different *DataStream classes and grouping/windowing. (See API section of
> the doc I posted.)
>
> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra  wrote:
>
> > Hi Aljoscha,
> >
> > Thanks for the nice summary, this is a very good initiative.
> >
> > I added some comments to the respective sections (where I didnt fully
> agree
> > :).).
> > At some point I think it would be good to have a public hangout session
> on
> > this, which could make a more dynamic discussion.
> >
> > Cheers,
> > Gyula
> >
> > Aljoscha Krettek  ezt írta (időpont: 2015. jún.
> 22.,
> > H, 21:34):
> >
> > > Hi,
> > > with people proposing changes to the streaming part I also wanted to
> > throw
> > > my hat into the ring. :D
> > >
> > > During the last few months, while I was getting acquainted with the
> > > streaming system, I wrote down some thoughts I had about how things
> could
> > > be improved. Hopefully, they are in somewhat coherent shape now, so
> > please
> > > have a look if you are interested in this:
> > >
> > >
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> > >
> > > This mostly covers:
> > >  - Timestamps assigned at sources
> > >  - Out-of-order processing of elements in window operators
> > >  - API design
> > >
> > > Please let me know what you think. Comment in the document or here in
> the
> > > mailing list.
> > >
> > > I have a PR in the makings that would introduce source timestamps and
> > > watermarks for keeping track of them. I also hacked a proof-of-concept
> > of a
> > > windowing system that is able to process out-of-order elements using a
> > > FlatMap operator. (It uses panes to perform efficient
> pre-aggregations.)
> > >
> > > Cheers,
> > > Aljoscha
> > >
> >
>


Re: Thoughts About Streaming

2015-06-24 Thread Gyula Fóra
I agree lets separate these topics from each other so we can get faster
resolution.

There is already a state discussion in the thread we started with Paris.

On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas  wrote:

> I agree with supporting out-of-order out of the box :-), even if this means
> a major refactoring. This is the right time to refactor the streaming API
> before we pull it out of beta. I think that this is more important than new
> features in the streaming API, which can be prioritized once the API is out
> of beta (meaning, that IMO this is the right time to stall PRs until we
> agree on the design).
>
> There are three sections in the document: windowing, state, and API. How
> convoluted are those with each other? Can we separate the discussion or do
> we need to discuss those all together? I think part of the difficulty is
> that we are discussing three design choices at once.
>
> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning 
> wrote:
>
> > Out of order is ubiquitous in the real-world.  Typically, what happens is
> > that businesses will declare a maximum allowable delay for delayed
> > transactions and will commit to results when that delay is reached.
> > Transactions that arrive later than this cutoff are collected specially
> as
> > corrections which are reported/used when possible.
> >
> > Clearly, ordering can also be violated during processing, but if the data
> > is originally out of order the situation can't be repaired by any
> protocol
> > fixes that prevent transactions from becoming disordered but has to
> handled
> > at the data level.
> >
> >
> >
> >
> > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek 
> > wrote:
> >
> > > I also don't like big changes but sometimes they are necessary. The
> > reason
> > > why I'm so adamant about out-of-order processing is that out-of-order
> > > elements are not some exception that occurs once in a while; they occur
> > > constantly in a distributed system. For example, in this:
> > > https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting
> > > windows
> > > are completely bogus because the current windowing system assumes
> > elements
> > > to globally arrive in order, which is simply not true. (The example
> has a
> > > source that generates increasing integers. Then these pass through a
> map
> > > and are unioned with the original DataStream before a window operator.)
> > > This simulates elements arriving from different operators at a
> windowing
> > > operator. The example is also DOP=1, I imagine this to get worse with
> > > higher DOP.
> > >
> > > What do you mean by costly? As I said, I have a proof-of-concept
> > windowing
> > > operator that can handle out-or-order elements. This is an example
> using
> > > the current Flink API:
> > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> > > (It is an infinite source of tuples and a 5 second window operator that
> > > counts the tuples.) The first problem is that this code deadlocks
> because
> > > of the thread that emits fake elements. If I disable the fake element
> > code
> > > it works, but the throughput using my mockup is 4 times higher . The
> gap
> > > widens dramatically if the window size increases.
> > >
> > > So, it actually increases performance (unless I'm making a mistake in
> my
> > > explorations) and can handle elements that arrive out-of-order (which
> > > happens basically always in a real-world windowing use-cases).
> > >
> > >
> > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen  wrote:
> > >
> > > > What I like a lot about Aljoscha's proposed design is that we need no
> > > > different code for "system time" vs. "event time". It only differs in
> > > where
> > > > the timestamps are assigned.
> > > >
> > > > The OOP approach also gives you the semantics of total ordering
> without
> > > > imposing merges on the streams.
> > > >
> > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> > > > mj...@informatik.hu-berlin.de> wrote:
> > > >
> > > > > I agree that there should be multiple alternatives the user(!) can
> > > > > choose from. Partial out-of-order processing works for many/most
> > > > > aggregates. However, if you consider Event-Pattern-Matching, global
> > > > > ordering in necessary (even if the performance penalty mi

Stream iteration head as ConnectedDataStream

2015-06-26 Thread Gyula Fóra
Hey!

Now that we are implementing more and more applications for streaming that
use iterations we realized a huge shortcoming of the current iteration api.
Currently it only allows to feedback data of the same type to the iteration
head.

This makes sense because the operators are typed but makes it awkward if we
indeed want to use a different feedback (such as a model syncing for
 machine learning applications). To do this developers need to use wrapper
types and flags to distinguish the inputs.

I propose to add the possibility to tread the original input of the
iteration head operator and the feedback stream as a ConnectedDataStream so
we can apply operators such as CoMap, CoFlatMap etc. This helps
distinguishing the inputs and also allows different feedback types to be
used. I believe this change is inevitable if we want to write elegant
applications without unnecessary wrapper types.

I made a PR  already that
introduces this functionality (it is a very small change in fact).

Cheers,
Gyula


Re: Off-by-one issues in the windowing code

2015-06-29 Thread Gyula Fóra
The second issue is related to parallel time based aggregations. I think we
should fix this for 0.9.1.

Also since the fix as you said is rather straight-forward there is no harm
doing it. As I understand if we keep the functionality of having time based
global windows, the implementations for merging the partial aggregates will
be conceptually similar after Aljoscha's update as well.

Márton Balassi  ezt írta (időpont: 2015. jún.
29., H, 11:02):

> I have found two off-by-one issues in the windowing code.
>
> The first may result in duplicate data in the last window and is easy to
> fix. [1]
>
> The second may result data being swallowed in the last window, and is also
> not difficult to fix. [2]
>
> I've talked to Aljoscha about fixing the second one, and he suggested not
> to fix it right away as that part should be rewritten soon anyways, maybe
> we remove that functionality as a whole. As this is also in the 0.9.0
> release I would still opt for having it, at least for the sake of 0.9.1.
>
> What do you think?
>
> [1] https://issues.apache.org/jira/browse/FLINK-2285
> [2] https://issues.apache.org/jira/browse/FLINK-2286
>
> Best,
>
> Marton
>


Re: Is there Any api that let DataStream join DataSet ?

2015-06-29 Thread Gyula Fóra
You are right, one cannot use the current window-join implementation to
this.

A workaround is to implement your custom binary stream operator that will
wait until it receives the whole file, then starts joining.
For instance a filestream.connect(streamToJoinWith).flatMap(
CustomCoFlatMap that does the join )

Matthias J. Sax  ezt írta (időpont: 2015.
jún. 29., H, 11:40):

> I am wondering what the semantics of a DataStream created from a file
> is. It should be a regular (but finite) stream. From my understanding, a
> Window-Join is defined with some ts-constraint. So the static file part
> will also have this restriction in the join, right? However, a
> file-stream-join should join *all* data from the file with each element
> in the stream... It seems to me, that a file-DataStream would not yield
> this result. Am I wrong?
>
>
> On 06/29/2015 11:00 AM, Stephan Ewen wrote:
> > If you only want to "join" a finite data set (like a file) to a stream,
> you
> > can do that. you can create a DataStream from a (distributed) file.
> >
> > If you want specific batch-api operations, this is still on the roadmap,
> > not in yet, as Marton said.
> >
> > On Sun, Jun 28, 2015 at 10:45 AM, Márton Balassi <
> balassi.mar...@gmail.com>
> > wrote:
> >
> >> Hi,
> >>
> >> Flink currently does not have explicit Api support for that, but is
> >> definitely possible to do. In fact Gyula (cc-d) mocked up a prototype
> for a
> >> similar problem some time ago.
> >>
> >> The idea needs some refinement to properly support all the viable use
> cases
> >> though and the streaming Api currently has some more pressing challenges
> >> than this integration. :)
> >>
> >> It's on our roadmap, but is not an immediate task. Could you tell us
> more
> >> about your use case?
> >>
> >> Best,
> >> Marton
> >> On Jun 28, 2015 8:29 AM, "马国维"  wrote:
> >>
> >>> Hi,everyone:
> >>> Is there Any api that let the DataStream join a DataSet ? I have read
> all
> >>> the document But I can't find .
> >>> If Flink now does not have the api, will Flink support it in the
> future ?
> >>>  thanks a lot!
> >>>
> >>
> >
>
>


Replacing Checkpointed interface with field annotations

2015-06-29 Thread Gyula Fóra
Hey all!

Just to add something new to the end of the discussion list. After some
discussion with Seif, and Paris, I have added a commit that replaces the
use of the Checkpointed interface with field annotations.

This is probably the most lightweight state declaration so far and it will
probably work very well to replace the Checkpointed interface:

public class StatefulMapper implements MapFunction {

@State
int counter;
@State
Serializable state;

Object notState

public Integer map(Integer input)[
counter++;
//update other state
/...
}
}

What do you think?
You can check it out here
.

Cheers,
Gyula


Re: Storm Compatibility Improvement

2015-06-29 Thread Gyula Fóra
Hey,
I didn't look through the whole code so I probably don't get something but
why don't you just do what storm does? Keep a map from the field names to
indexes somewhere (make this accessible from the tuple) and then you can
just use a simple Flink tuple.

I think this is what's happening in storm, they get the index from the
context, which knows the declared output fields.

Gyula

Matthias J. Sax  ezt írta (időpont: 2015.
jún. 29., H, 18:08):

> Hi,
>
> I started to work on a missing feature for the Storm compatibility
> layer: named attribute access
>
> In Storm, each attribute of an input tuple can be accessed via index or
> by name. Currently, only index access is supported. In order to support
> this feature in Flink (embedded Bolt in Flink program), I see two
> (independent and complementary) ways to support this feature:
>
>  1) the input type is a POJO
>  2) Flink's Tuple type is extended to support named attributes
>
> Right now I started a prototype for POJOs. I would like to extend Tuple
> type with named attributes. However, I am not sure how the community
> likes this idea.
>
> I would like to get some feedback for the POJO prototype, too. I use
> reflections and I am not sure if my code is elegant enough. You can find
> it here: https://github.com/mjsax/flink/tree/flink-storm-compatibility
>
>
> -Matthias
>
>
>
>
>


Re: Storm Compatibility Improvement

2015-06-29 Thread Gyula Fóra
Ah ok, now I get what I didn't get before :)

So you want to take some input stream , and execute a bolt implementation
on it. And the question is what input type to assume when the user wants to
use field name based access.

Can't we force the user to declare the names of the inputs/outputs even in
this case? Otherwise there is not much we can do. Maybe stick to either
public fields or getter setters.


Matthias J. Sax  ezt írta (időpont: 2015.
jún. 29., H, 23:51):

> Well. If a whole Storm topology is executed, this is of course the way
> to got. However, I want to have named-attribute access in the case of an
> embedded bolt (as a single operator) in a Flink program. And is this
> case, fields are not declared and do not have a name (eg, if the bolt's
> consumers emits a stream of type Tuple3)
>
> -Matthias
>
>
> On 06/29/2015 11:42 PM, Gyula Fóra wrote:
> > Hey,
> > I didn't look through the whole code so I probably don't get something
> but
> > why don't you just do what storm does? Keep a map from the field names to
> > indexes somewhere (make this accessible from the tuple) and then you can
> > just use a simple Flink tuple.
> >
> > I think this is what's happening in storm, they get the index from the
> > context, which knows the declared output fields.
> >
> > Gyula
> >
> > Matthias J. Sax  ezt írta (időpont: 2015.
> > jún. 29., H, 18:08):
> >
> >> Hi,
> >>
> >> I started to work on a missing feature for the Storm compatibility
> >> layer: named attribute access
> >>
> >> In Storm, each attribute of an input tuple can be accessed via index or
> >> by name. Currently, only index access is supported. In order to support
> >> this feature in Flink (embedded Bolt in Flink program), I see two
> >> (independent and complementary) ways to support this feature:
> >>
> >>  1) the input type is a POJO
> >>  2) Flink's Tuple type is extended to support named attributes
> >>
> >> Right now I started a prototype for POJOs. I would like to extend Tuple
> >> type with named attributes. However, I am not sure how the community
> >> likes this idea.
> >>
> >> I would like to get some feedback for the POJO prototype, too. I use
> >> reflections and I am not sure if my code is elegant enough. You can find
> >> it here: https://github.com/mjsax/flink/tree/flink-storm-compatibility
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >>
> >>
> >
>
>


Re: Storm Compatibility Improvement

2015-06-29 Thread Gyula Fóra
By declare I mean we assume a Flink Tuple datatype and the user declares
the name mapping (sorry its getting late).

Gyula Fóra  ezt írta (időpont: 2015. jún. 29., H,
23:57):

> Ah ok, now I get what I didn't get before :)
>
> So you want to take some input stream , and execute a bolt implementation
> on it. And the question is what input type to assume when the user wants to
> use field name based access.
>
> Can't we force the user to declare the names of the inputs/outputs even in
> this case? Otherwise there is not much we can do. Maybe stick to either
> public fields or getter setters.
>
>
> Matthias J. Sax  ezt írta (időpont: 2015.
> jún. 29., H, 23:51):
>
>> Well. If a whole Storm topology is executed, this is of course the way
>> to got. However, I want to have named-attribute access in the case of an
>> embedded bolt (as a single operator) in a Flink program. And is this
>> case, fields are not declared and do not have a name (eg, if the bolt's
>> consumers emits a stream of type Tuple3)
>>
>> -Matthias
>>
>>
>> On 06/29/2015 11:42 PM, Gyula Fóra wrote:
>> > Hey,
>> > I didn't look through the whole code so I probably don't get something
>> but
>> > why don't you just do what storm does? Keep a map from the field names
>> to
>> > indexes somewhere (make this accessible from the tuple) and then you can
>> > just use a simple Flink tuple.
>> >
>> > I think this is what's happening in storm, they get the index from the
>> > context, which knows the declared output fields.
>> >
>> > Gyula
>> >
>> > Matthias J. Sax  ezt írta (időpont:
>> 2015.
>> > jún. 29., H, 18:08):
>> >
>> >> Hi,
>> >>
>> >> I started to work on a missing feature for the Storm compatibility
>> >> layer: named attribute access
>> >>
>> >> In Storm, each attribute of an input tuple can be accessed via index or
>> >> by name. Currently, only index access is supported. In order to support
>> >> this feature in Flink (embedded Bolt in Flink program), I see two
>> >> (independent and complementary) ways to support this feature:
>> >>
>> >>  1) the input type is a POJO
>> >>  2) Flink's Tuple type is extended to support named attributes
>> >>
>> >> Right now I started a prototype for POJOs. I would like to extend Tuple
>> >> type with named attributes. However, I am not sure how the community
>> >> likes this idea.
>> >>
>> >> I would like to get some feedback for the POJO prototype, too. I use
>> >> reflections and I am not sure if my code is elegant enough. You can
>> find
>> >> it here: https://github.com/mjsax/flink/tree/flink-storm-compatibility
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >>
>> >>
>> >>
>> >
>>
>>


Re: Replacing Checkpointed interface with field annotations

2015-07-01 Thread Gyula Fóra
Hey,

Thanks for the feedback guys:

@Max: You are right, this is not top priority to changes, I was just
mocking up some alternatives to try to make the state usage even simpler so
that the user can keep his current implementations and just add 1-2
annotations.

@Stephan, Robert: You are right that the checkpointed interface has some
advantages from that point of view. Maybe a way to go would be to separate
this signaling functionality (when the checkpoint is taken and maybe also
the commits) from the snapshotting itself. One advantage I see there is
that we would not need to have 3 different interfaces doing pretty much the
same thing (OperatorState - needed for partitioned state and different
backends/out-of-core, Checkpointed - needed for special actions after
checkpoints, Annotations - checkpointing simple fields natively).

What we could do is to modify the checkpointed interface, so it cannot
actually take a snapshot, but perform actions after the snapshot is taken,
and also on restore (and maybe include the commit functonality as well).

Checkpointed{
void onSnapshot(id, ts)
void onRestore(id, ts)
// void commit(id, ts)
}

Since all these special actions are best effort anyways I think this could
work. And now it is clear that OperatorStates and annotated fields would be
checkpointed, but we can still have actions on the checkpoints.

Would something like this make sense? I think it would be very good to
clarify and make the minimal necessary interfaces that don't necessarily
replicate functionality.

Cheers,
Gyula

Stephan Ewen  ezt írta (időpont: 2015. júl. 1., Sze,
10:22):

> +1 for adding the annotation, but not removing the interface
>
> Robert is right, the nice thing about the current interface is that you can
> use it to commit the state yourself to a database and simply return a key
> to where the state is stored. That is quite nice.
>
> On Wed, Jul 1, 2015 at 10:14 AM, Robert Metzger 
> wrote:
>
> > I would certainly not replace the current Checkpointed interface by this
> > implementation.
> > The interface allows you to perform custom actions when creating a
> snapshot
> > or restoring state.
> >
> > We could add the annotation variant for simple cases like in your
> example.
> >
> >
> > On Wed, Jul 1, 2015 at 10:10 AM, Maximilian Michels 
> > wrote:
> >
> > > Hi Gyula,
> > >
> > > Looks like a neat feature you thought of; I like it. One problem I see
> is
> > > that with all the big changes and discussions in streaming, I wonder if
> > we
> > > should get other things right first. Another problem could be the lack
> of
> > > clarity of this implementation. State annotations can now be very
> easily
> > > placed anywhere in a class with the @Annotation. The Checkpointed
> > interface
> > > we have now is more explicit. From an end user perspective it might
> make
> > > sense to use annotations, from a developer perspective I'm not sure.
> > >
> > > Cheers,
> > > Max
> > >
> > >
> > >
> > > On Tue, Jun 30, 2015 at 1:44 PM, Hermann Gábor 
> > > wrote:
> > >
> > > > Wow, this looks pretty concise. I really like it!
> > > >
> > > > On Mon, Jun 29, 2015 at 3:27 PM Gyula Fóra 
> wrote:
> > > >
> > > > > Hey all!
> > > > >
> > > > > Just to add something new to the end of the discussion list. After
> > some
> > > > > discussion with Seif, and Paris, I have added a commit that
> replaces
> > > the
> > > > > use of the Checkpointed interface with field annotations.
> > > > >
> > > > > This is probably the most lightweight state declaration so far and
> it
> > > > will
> > > > > probably work very well to replace the Checkpointed interface:
> > > > >
> > > > > public class StatefulMapper implements
> MapFunction {
> > > > >
> > > > > @State
> > > > > int counter;
> > > > > @State
> > > > > Serializable state;
> > > > >
> > > > > Object notState
> > > > >
> > > > > public Integer map(Integer input)[
> > > > > counter++;
> > > > > //update other state
> > > > > /...
> > > > > }
> > > > > }
> > > > >
> > > > > What do you think?
> > > > > You can check it out here
> > > > > <https://github.com/gyfora/flink/commits/annotated_state>.
> > > > >
> > > > > Cheers,
> > > > > Gyula
> > > > >
> > > >
> > >
> >
>


Re: Replacing Checkpointed interface with field annotations

2015-07-01 Thread Gyula Fóra
I understand your concerns Robert but I don't fully agree.

The Checkpointed interface works indeed but there are so many use cases
that it is not suitable for in the long run, and also the whole interface
is slightly awkward in my opinion when returning simple fields which are
already serializable.

This motivated the introduction of the OperatorStateInterface which you can
call the first rework of the checkpointed interface, but I see that as the
first version which is actually capable of handling many issues that were
obvious with the Checkpointed interfaces.

This is actually not only a rework of the interface but the rework of the
state concept and runtime handling. This needs to be clean if we are moving
streaming out of beta, and should provide the needed funcionality. I think
we can afford to experiment around a little bit with these interfaces and
see the implications for the applications that we can develop with them as
we think of statefulness as a major advantage of Flink streaming.

So actually I think this is the only time when we can afford rework these
interfaces without big costs to make it work for the future.



Robert Metzger  ezt írta (időpont: 2015. júl. 1., Sze,
11:25):

> Whats causing me the biggest headache here is that I don't see an end on
> all these "state interface" reworks.
> I think this is now the third big change to the interface.
> It is a horrible user experience to rework your old code with each new
> Flink release.
>
> I understand that there are always ways to improve interfaces, and I'm sure
> Flink has many that we can improve.
> But there are (in my opinion) more important things than reworking the
> interfaces every second week ... for example that the functionality they
> are providing is actually working and well tested.
>
>
>
> On Wed, Jul 1, 2015 at 11:15 AM, Ufuk Celebi  wrote:
>
> >
> > On 01 Jul 2015, at 10:57, Gyula Fóra  wrote:
> >
> > > Hey,
> > >
> > > Thanks for the feedback guys:
> > >
> > > @Max: You are right, this is not top priority to changes, I was just
> > > mocking up some alternatives to try to make the state usage even
> simpler
> > so
> > > that the user can keep his current implementations and just add 1-2
> > > annotations.
> >
> > I agree. It's good to cover the "basic" case with a simple solution. :-)
> >
> > > @Stephan, Robert: You are right that the checkpointed interface has
> some
> > > advantages from that point of view. Maybe a way to go would be to
> > separate
> > > this signaling functionality (when the checkpoint is taken and maybe
> also
> > > the commits) from the snapshotting itself. One advantage I see there is
> > > that we would not need to have 3 different interfaces doing pretty much
> > the
> > > same thing (OperatorState - needed for partitioned state and different
> > > backends/out-of-core, Checkpointed - needed for special actions after
> > > checkpoints, Annotations - checkpointing simple fields natively).
> >
> > I also agree with Stephan and Robert that there are other use cases,
> which
> > require the interfaces. I cannot judge your proposal at this point
> though.
> > I'm eager to hear what the others say who worked on this.
> >
> > – Ufuk
>


Re: Redesigned "Features" page

2015-07-07 Thread Gyula Fóra
I think the content is pretty good, much better than before. But the page
structure could be better (and this is very important in my opinion).
Now it just looks like a long list of features without any ways to navigate
between them. We should probably have something at the top that summarizes
the page.

And maybe at the specific points we should add links to other pages (for
instance internals pages).

Stephan Ewen  ezt írta (időpont: 2015. júl. 7., K, 13:22):

> I actually put quite some thought into the structure of the points. They
> reflect pretty much what I observed (meetups and talks) where people get
> excited and what they are missing.
>
> The structure follows the line of through of "stream processor that also
> does batch very well". And then separate the features as they contribute to
> that.
>
> Intermixing features across their relation to streaming and batch has not
> worked out for my presentations at all. It came across that Flink is a
> system with a multitude of aspects, but no one in the audience could really
> say what Flink is good at in the end, where it stands out.
>
> The proposed structure makes this very clear:
>
>   - Flink is a kick-ass stream processor (Strongest differentiation
> from all other technology)
>   - Flink can also do batch very well  (It is broadly applicable
> and strong)
>   - It surfaces nice APIs and rich libraries  (Low entry hurdle)
>   - It plays well with existing technology
>
>
> Stephan
>
>
> On Tue, Jul 7, 2015 at 12:22 PM, Fabian Hueske  wrote:
>
> > +1 for the clear and brief feature descriptions!
> >
> > I am not so sure about the structure of the points, especially separating
> > "Streaming" and "Batch and Streaming in One System" does not support the
> > message of a unified system, IMO.
> >
> > How about categorizing the points into three sections (Internals, API,
> > Integration) and structuring the points for example as follows:
> >
> > -- High Performance, Robust Execution, and Fault-tolerance
> > High performance
> > Continuous Streaming Model with Flow Control
> > Fault-tolerance via Lightweight Distributed Snapshots
> > Memory Management
> > Iterations and Delta Iterations
> >
> > -- Ease of use, APIs, and Clear Semantics
> > One runtime for Streaming and Batch Processing
> > Expressive APIs (Batch + Streaming)
> > Exactly-once Semantics for Stateful Computations
> > Program Optimizer
> > Library Ecosystem
> >
> > -- Integration
> > Broad Integration
> >
> > Cheers, Fabian
> >
> > 2015-07-07 10:39 GMT+02:00 Till Rohrmann :
> >
> > > I also like the new feature page. I better conveys the strong points of
> > > Flink, since it's more to the point.
> > >
> > > On Mon, Jul 6, 2015 at 6:09 PM, Stephan Ewen  wrote:
> > >
> > > > Thanks Max!
> > > >
> > > > Did not even know we had a github mirror of the flink-web repo...
> > > >
> > > > On Mon, Jul 6, 2015 at 6:05 PM, Maximilian Michels 
> > > wrote:
> > > >
> > > > > Hi Stephan,
> > > > >
> > > > > Thanks for the feature page update. I think it is much more
> > informative
> > > > and
> > > > > better structured now.
> > > > >
> > > > > By the way, you could also open a pull request for your changes on
> > > > > https://github.com/apache/flink-web/pulls
> > > > >
> > > > > Cheers,
> > > > > Max
> > > > >
> > > > >
> > > > > On Mon, Jul 6, 2015 at 3:28 PM, Fabian Hueske 
> > > wrote:
> > > > >
> > > > > > I'll be happy to help, eh draw ;-)
> > > > > >
> > > > > > 2015-07-06 15:22 GMT+02:00 Stephan Ewen :
> > > > > >
> > > > > > > Hi all!
> > > > > > >
> > > > > > > I think that the "Features" page of the website is a bit out of
> > > date.
> > > > > > >
> > > > > > > I made an effort to stub a new one. It is committed under "
> > > > > > features_new.md
> > > > > > > "
> > > > > > > and not yet built as an HTML page.
> > > > > > >
> > > > > > > If you want to take a look and help building this, pull the
> > > flink-web
> > > > > git
> > > > > > > repository and build the website locally. You can then look at
> it
> > > > > under "
> > > > > > > http://localhost:4000/index_new.html";
> > > > > > >
> > > > > > >
> > > > > > > So far, I am happy with the new contents. Three things remain
> to
> > be
> > > > > done:
> > > > > > >
> > > > > > > 1) The page misses a figure on Streaming throughput and
> latency.
> > > > > > Currently,
> > > > > > > I am using a figure from some preliminary measurements made by
> > > > Marton.
> > > > > > > IIRC, Robert is running some performance tests right now and we
> > can
> > > > use
> > > > > > his
> > > > > > > results to create a new figure.
> > > > > > >
> > > > > > > 2) The figures could use some love. The concept is good, but
> the
> > > > style
> > > > > > is a
> > > > > > > bit sterile. I was hoping that Fabian would help out with his
> > > amazing
> > > > > > style
> > > > > > > of drawing figures ;-)
> > > > > > >
> > > > > > > 3) More people should have a look and say if we should replace
> > the
> > > > > > current
> > > > > > > "Features" 

Re: Design documents for consolidated DataStream API

2015-07-07 Thread Gyula Fóra
You are right thats an important issue.

And I think we should also do some renaming with the "iterations" because
they are not really iterations like in the batch case and it might confuse
some users.
Maybe we can call them loops or cycles and rename the api calls to make it
more intuitive what happens. It is really just a cyclic dataflow.

Aljoscha Krettek  ezt írta (időpont: 2015. júl. 7., K,
15:35):

> Hi,
> I just noticed that we don't have anything about how iterations and
> timestamps/watermarks should interact.
>
> Cheers,
> Aljoscha
>
> On Mon, 6 Jul 2015 at 23:56 Stephan Ewen  wrote:
>
> > Hi all!
> >
> > As many of you know, there are a ongoing efforts to consolidate the
> > streaming API for the next release, and then graduate it (from beta
> > status).
> >
> > In the process of this consolidation, we want to achieve the following
> > goals.
> >
> >  - Make the code more robust and simplify it in parts
> >
> >  - Clearly define the semantics of the constructs.
> >
> >  - Prepare it for support of more advanced concepts, like partitionable
> > state, and event time.
> >
> >  - Cut support for certain corner cases that were prototyped, but turned
> > out to be not efficiently doable
> >
> >
> > Based on prior discussions on the mailing list, Aljoscha and me drafted
> the
> > design documents below, which outline how the consolidated API would
> like.
> > We focused in constructs, time, and window semantics.
> >
> >
> > Design document on how to restructure the Streaming API:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
> >
> > Design document on definitions of time, order, and the resulting
> semantics:
> >
> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
> >
> >
> >
> > Note: The design of the interfaces and concepts for advanced state in
> > functions is not in here. That is part of a separate design discussion
> and
> > orthogonal to the designs drafted here.
> >
> >
> > Please have a look and voice questions and concerns. Since we should not
> > break the streaming API more than once, we should make sure this
> > consolidation brings it into the shape we want it to be in.
> >
> >
> > Greetings,
> > Stephan
> >
>


Rework of streaming iteration API

2015-07-07 Thread Gyula Fóra
Hey,

Along with the suggested changes to the streaming API structure I think we
should also rework the "iteration" api. Currently the iteration api tries
to mimic the syntax of the batch API while the runtime behaviour is quite
different.

What we create instead of iterations is really just cyclic streams (loops
in the streaming job), so the API should somehow be intuitive about this
behaviour.

I suggest to remove the explicit iterate call and instead add a method to
the StreamOperators that allows to connect feedback inputs (create loops).
It would look like this:

A mapper that does nothing but iterates over some filtered input:

*Current API :*
DataStream source = ..
IterativeDataStream it = source.iterate()
DataStream mapper = it.map(noOpMapper)
DataStream feedback = mapper.filter(...)
it.closeWith(feedback)

*Suggested API :*
DataStream source = ..
DataStream mapper = source.map(noOpMapper)
DataStream feedback = mapper.filter(...)
mapper.addInput(feedback)

The suggested approach would let us define inputs to operators after they
are created and implicitly union them with the normal input. This is I
think a much clearer approach than what we have now.

What do you think?

Gyula


Re: Rework of streaming iteration API

2015-07-07 Thread Gyula Fóra
Sorry Stephan I meant it slightly differently, I see your point:

DataStream source = ...
SingleInputOperator mapper = source.map(...)
mapper.addInput()

So the add input would be a method of the operator not the stream.

Aljoscha Krettek  ezt írta (időpont: 2015. júl. 7., K,
16:12):

> I think this would be good yes. I was just about to open an Issue for
> changing the Streaming Iteration API. :D
>
> Then we should also make the implementation very straightforward and
> simple, right now, the implementation of the iterations is all over the
> place.
>
> On Tue, 7 Jul 2015 at 15:57 Gyula Fóra  wrote:
>
> > Hey,
> >
> > Along with the suggested changes to the streaming API structure I think
> we
> > should also rework the "iteration" api. Currently the iteration api tries
> > to mimic the syntax of the batch API while the runtime behaviour is quite
> > different.
> >
> > What we create instead of iterations is really just cyclic streams (loops
> > in the streaming job), so the API should somehow be intuitive about this
> > behaviour.
> >
> > I suggest to remove the explicit iterate call and instead add a method to
> > the StreamOperators that allows to connect feedback inputs (create
> loops).
> > It would look like this:
> >
> > A mapper that does nothing but iterates over some filtered input:
> >
> > *Current API :*
> > DataStream source = ..
> > IterativeDataStream it = source.iterate()
> > DataStream mapper = it.map(noOpMapper)
> > DataStream feedback = mapper.filter(...)
> > it.closeWith(feedback)
> >
> > *Suggested API :*
> > DataStream source = ..
> > DataStream mapper = source.map(noOpMapper)
> > DataStream feedback = mapper.filter(...)
> > mapper.addInput(feedback)
> >
> > The suggested approach would let us define inputs to operators after they
> > are created and implicitly union them with the normal input. This is I
> > think a much clearer approach than what we have now.
> >
> > What do you think?
> >
> > Gyula
> >
>


Re: Rework of streaming iteration API

2015-07-07 Thread Gyula Fóra
@Kostas:
This new API is I believe equivalent in expressivity with the current one.
We can define nested loops now as well.
And I also don't see nested loops much worse generally than simple loops.

Gyula Fóra  ezt írta (időpont: 2015. júl. 7., K,
16:14):

> Sorry Stephan I meant it slightly differently, I see your point:
>
> DataStream source = ...
> SingleInputOperator mapper = source.map(...)
> mapper.addInput()
>
> So the add input would be a method of the operator not the stream.
>
> Aljoscha Krettek  ezt írta (időpont: 2015. júl. 7.,
> K, 16:12):
>
>> I think this would be good yes. I was just about to open an Issue for
>> changing the Streaming Iteration API. :D
>>
>> Then we should also make the implementation very straightforward and
>> simple, right now, the implementation of the iterations is all over the
>> place.
>>
>> On Tue, 7 Jul 2015 at 15:57 Gyula Fóra  wrote:
>>
>> > Hey,
>> >
>> > Along with the suggested changes to the streaming API structure I think
>> we
>> > should also rework the "iteration" api. Currently the iteration api
>> tries
>> > to mimic the syntax of the batch API while the runtime behaviour is
>> quite
>> > different.
>> >
>> > What we create instead of iterations is really just cyclic streams
>> (loops
>> > in the streaming job), so the API should somehow be intuitive about this
>> > behaviour.
>> >
>> > I suggest to remove the explicit iterate call and instead add a method
>> to
>> > the StreamOperators that allows to connect feedback inputs (create
>> loops).
>> > It would look like this:
>> >
>> > A mapper that does nothing but iterates over some filtered input:
>> >
>> > *Current API :*
>> > DataStream source = ..
>> > IterativeDataStream it = source.iterate()
>> > DataStream mapper = it.map(noOpMapper)
>> > DataStream feedback = mapper.filter(...)
>> > it.closeWith(feedback)
>> >
>> > *Suggested API :*
>> > DataStream source = ..
>> > DataStream mapper = source.map(noOpMapper)
>> > DataStream feedback = mapper.filter(...)
>> > mapper.addInput(feedback)
>> >
>> > The suggested approach would let us define inputs to operators after
>> they
>> > are created and implicitly union them with the normal input. This is I
>> > think a much clearer approach than what we have now.
>> >
>> > What do you think?
>> >
>> > Gyula
>> >
>>
>


Re: Rework of streaming iteration API

2015-07-07 Thread Gyula Fóra
@Aljoscha:
Yes, thats basically my point as well. This is what happens now too but we
give this mutable datastream a special name : IterativeDataStream

This can be handled in very different ways through the api, the goal would
be to make something easy to use. I am fine with what we have now because I
know how it works but it might confuse people to call it iterate.

Aljoscha Krettek  ezt írta (időpont: 2015. júl. 7., K,
16:18):

> I think it could work if we allowed a DataStream to be unioned after
> creation. For example:
>
> DataStream source = ..
> DataStream mapper = source.map(noOpMapper)
> DataStream feedback = mapper.filter(...)
> source.union(feedback)
>
> This would basically mean that a DataStream is mutable and can be extended
> after creation with more streams.
>
> On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek  wrote:
>
> > I think this would be good yes. I was just about to open an Issue for
> > changing the Streaming Iteration API. :D
> >
> > Then we should also make the implementation very straightforward and
> > simple, right now, the implementation of the iterations is all over the
> > place.
> >
> > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra  wrote:
> >
> >> Hey,
> >>
> >> Along with the suggested changes to the streaming API structure I think
> we
> >> should also rework the "iteration" api. Currently the iteration api
> tries
> >> to mimic the syntax of the batch API while the runtime behaviour is
> quite
> >> different.
> >>
> >> What we create instead of iterations is really just cyclic streams
> (loops
> >> in the streaming job), so the API should somehow be intuitive about this
> >> behaviour.
> >>
> >> I suggest to remove the explicit iterate call and instead add a method
> to
> >> the StreamOperators that allows to connect feedback inputs (create
> loops).
> >> It would look like this:
> >>
> >> A mapper that does nothing but iterates over some filtered input:
> >>
> >> *Current API :*
> >> DataStream source = ..
> >> IterativeDataStream it = source.iterate()
> >> DataStream mapper = it.map(noOpMapper)
> >> DataStream feedback = mapper.filter(...)
> >> it.closeWith(feedback)
> >>
> >> *Suggested API :*
> >> DataStream source = ..
> >> DataStream mapper = source.map(noOpMapper)
> >> DataStream feedback = mapper.filter(...)
> >> mapper.addInput(feedback)
> >>
> >> The suggested approach would let us define inputs to operators after
> they
> >> are created and implicitly union them with the normal input. This is I
> >> think a much clearer approach than what we have now.
> >>
> >> What do you think?
> >>
> >> Gyula
> >>
> >
>


Re: Rework of streaming iteration API

2015-07-07 Thread Gyula Fóra
Okay, I am fine with this approach as well I see the advantages. Then we
just need to find a suitable name for marking a "FeedbackPoint" :)

Stephan Ewen  ezt írta (időpont: 2015. júl. 7., K, 16:28):

> In Aljoscha's approach, we would need a special mutable stream. We could do
> it like this:
>
> DataStream source = ...
>
> FeedbackPoint pt = source.createFeedbackPoint();
>
> DataStream mapper = pt .map(noOpMapper)
> DataStream feedback = mapper.filter(...)
> pt .addFeedbacl(feedback)
>
>
> It is basically like the current approach, with different names.
>
> I actually like the current approach, because it is explicit where streams
> could be altered in hind-sight (after their definition).
>
>
> On Tue, Jul 7, 2015 at 4:20 PM, Gyula Fóra  wrote:
>
> > @Aljoscha:
> > Yes, thats basically my point as well. This is what happens now too but
> we
> > give this mutable datastream a special name : IterativeDataStream
> >
> > This can be handled in very different ways through the api, the goal
> would
> > be to make something easy to use. I am fine with what we have now
> because I
> > know how it works but it might confuse people to call it iterate.
> >
> > Aljoscha Krettek  ezt írta (időpont: 2015. júl. 7.,
> > K,
> > 16:18):
> >
> > > I think it could work if we allowed a DataStream to be unioned after
> > > creation. For example:
> > >
> > > DataStream source = ..
> > > DataStream mapper = source.map(noOpMapper)
> > > DataStream feedback = mapper.filter(...)
> > > source.union(feedback)
> > >
> > > This would basically mean that a DataStream is mutable and can be
> > extended
> > > after creation with more streams.
> > >
> > > On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek 
> > wrote:
> > >
> > > > I think this would be good yes. I was just about to open an Issue for
> > > > changing the Streaming Iteration API. :D
> > > >
> > > > Then we should also make the implementation very straightforward and
> > > > simple, right now, the implementation of the iterations is all over
> the
> > > > place.
> > > >
> > > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra  wrote:
> > > >
> > > >> Hey,
> > > >>
> > > >> Along with the suggested changes to the streaming API structure I
> > think
> > > we
> > > >> should also rework the "iteration" api. Currently the iteration api
> > > tries
> > > >> to mimic the syntax of the batch API while the runtime behaviour is
> > > quite
> > > >> different.
> > > >>
> > > >> What we create instead of iterations is really just cyclic streams
> > > (loops
> > > >> in the streaming job), so the API should somehow be intuitive about
> > this
> > > >> behaviour.
> > > >>
> > > >> I suggest to remove the explicit iterate call and instead add a
> method
> > > to
> > > >> the StreamOperators that allows to connect feedback inputs (create
> > > loops).
> > > >> It would look like this:
> > > >>
> > > >> A mapper that does nothing but iterates over some filtered input:
> > > >>
> > > >> *Current API :*
> > > >> DataStream source = ..
> > > >> IterativeDataStream it = source.iterate()
> > > >> DataStream mapper = it.map(noOpMapper)
> > > >> DataStream feedback = mapper.filter(...)
> > > >> it.closeWith(feedback)
> > > >>
> > > >> *Suggested API :*
> > > >> DataStream source = ..
> > > >> DataStream mapper = source.map(noOpMapper)
> > > >> DataStream feedback = mapper.filter(...)
> > > >> mapper.addInput(feedback)
> > > >>
> > > >> The suggested approach would let us define inputs to operators after
> > > they
> > > >> are created and implicitly union them with the normal input. This
> is I
> > > >> think a much clearer approach than what we have now.
> > > >>
> > > >> What do you think?
> > > >>
> > > >> Gyula
> > > >>
> > > >
> > >
> >
>


Re: Github behind aft-git

2015-07-12 Thread Gyula Fóra
This happens quite often :/

Matthias J. Sax  ezt írta (időpont: 2015.
júl. 12., V, 14:28):

> Hi,
>
> Github master is behind asf-git by two days. The last 6 commits are not
> available at Github. Please compare:
>
>  - https://github.com/apache/flink/commits/master
>  -
> https://git-wip-us.apache.org/repos/asf/flink/repo?p=flink.git;a=summary
>
> Is there any problem?
>
>
> -Matthias
>
>
>


Re: Design documents for consolidated DataStream API

2015-07-13 Thread Gyula Fóra
In general I like it, although the main difference between the current and
the new one is the windowing and that is still not very clear.

Where do we have the full stream time windows for instance?(which is
parallel but not keyed)
On Mon, Jul 13, 2015 at 4:28 PM Aljoscha Krettek 
wrote:

> +1 I like it as well.
>
> On Mon, 13 Jul 2015 at 16:17 Kostas Tzoumas  wrote:
>
> > +1 from my side
> >
> > On Mon, Jul 13, 2015 at 4:15 PM, Stephan Ewen  wrote:
> >
> > > Do we have consensus on these designs?
> > >
> > > If we have, we should get to implementing this soon, because basically
> > all
> > > streaming patches will have to be revisited in light of this...
> > >
> > > On Tue, Jul 7, 2015 at 3:41 PM, Gyula Fóra 
> wrote:
> > >
> > > > You are right thats an important issue.
> > > >
> > > > And I think we should also do some renaming with the "iterations"
> > because
> > > > they are not really iterations like in the batch case and it might
> > > confuse
> > > > some users.
> > > > Maybe we can call them loops or cycles and rename the api calls to
> make
> > > it
> > > > more intuitive what happens. It is really just a cyclic dataflow.
> > > >
> > > > Aljoscha Krettek  ezt írta (időpont: 2015. júl.
> > 7.,
> > > > K,
> > > > 15:35):
> > > >
> > > > > Hi,
> > > > > I just noticed that we don't have anything about how iterations and
> > > > > timestamps/watermarks should interact.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Mon, 6 Jul 2015 at 23:56 Stephan Ewen  wrote:
> > > > >
> > > > > > Hi all!
> > > > > >
> > > > > > As many of you know, there are a ongoing efforts to consolidate
> the
> > > > > > streaming API for the next release, and then graduate it (from
> beta
> > > > > > status).
> > > > > >
> > > > > > In the process of this consolidation, we want to achieve the
> > > following
> > > > > > goals.
> > > > > >
> > > > > >  - Make the code more robust and simplify it in parts
> > > > > >
> > > > > >  - Clearly define the semantics of the constructs.
> > > > > >
> > > > > >  - Prepare it for support of more advanced concepts, like
> > > partitionable
> > > > > > state, and event time.
> > > > > >
> > > > > >  - Cut support for certain corner cases that were prototyped, but
> > > > turned
> > > > > > out to be not efficiently doable
> > > > > >
> > > > > >
> > > > > > Based on prior discussions on the mailing list, Aljoscha and me
> > > drafted
> > > > > the
> > > > > > design documents below, which outline how the consolidated API
> > would
> > > > > like.
> > > > > > We focused in constructs, time, and window semantics.
> > > > > >
> > > > > >
> > > > > > Design document on how to restructure the Streaming API:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
> > > > > >
> > > > > > Design document on definitions of time, order, and the resulting
> > > > > semantics:
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
> > > > > >
> > > > > >
> > > > > >
> > > > > > Note: The design of the interfaces and concepts for advanced
> state
> > in
> > > > > > functions is not in here. That is part of a separate design
> > > discussion
> > > > > and
> > > > > > orthogonal to the designs drafted here.
> > > > > >
> > > > > >
> > > > > > Please have a look and voice questions and concerns. Since we
> > should
> > > > not
> > > > > > break the streaming API more than once, we should make sure this
> > > > > > consolidation brings it into the shape we want it to be in.
> > > > > >
> > > > > >
> > > > > > Greetings,
> > > > > > Stephan
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Design documents for consolidated DataStream API

2015-07-13 Thread Gyula Fóra
I think we agree on everything its more of a naming issue :)

I thought it might be misleading that global time windows are
"non-parallel" windows. We dont want to give a bad impression. (Also we
dont want them to think that every global window is parallel but thats not
a problem here)

Gyula
On Mon, Jul 13, 2015 at 5:22 PM Stephan Ewen  wrote:

> Okay, what is missing about the windowing in your opinion?
>
> The core points of the document are:
>
>   - The parallel windows are per group only.
>
>   - The implementation of the parallel windows holds window data in the
> group buffers.
>
>   - The global windows are non-parallel. May have parallel pre-aggregation,
> if they are time windows.
>
>   - Time may be operator time (timer thread), or watermark time. Watermark
> time can refer to ingress or event time.
>
>   - Windows that do not pre-aggregate may require elements in order. Not
> part of the first prototype.
>
> Do we agree on those points?
>
>
> On Mon, Jul 13, 2015 at 4:50 PM, Gyula Fóra  wrote:
>
> > In general I like it, although the main difference between the current
> and
> > the new one is the windowing and that is still not very clear.
> >
> > Where do we have the full stream time windows for instance?(which is
> > parallel but not keyed)
> > On Mon, Jul 13, 2015 at 4:28 PM Aljoscha Krettek 
> > wrote:
> >
> > > +1 I like it as well.
> > >
> > > On Mon, 13 Jul 2015 at 16:17 Kostas Tzoumas 
> wrote:
> > >
> > > > +1 from my side
> > > >
> > > > On Mon, Jul 13, 2015 at 4:15 PM, Stephan Ewen 
> > wrote:
> > > >
> > > > > Do we have consensus on these designs?
> > > > >
> > > > > If we have, we should get to implementing this soon, because
> > basically
> > > > all
> > > > > streaming patches will have to be revisited in light of this...
> > > > >
> > > > > On Tue, Jul 7, 2015 at 3:41 PM, Gyula Fóra 
> > > wrote:
> > > > >
> > > > > > You are right thats an important issue.
> > > > > >
> > > > > > And I think we should also do some renaming with the "iterations"
> > > > because
> > > > > > they are not really iterations like in the batch case and it
> might
> > > > > confuse
> > > > > > some users.
> > > > > > Maybe we can call them loops or cycles and rename the api calls
> to
> > > make
> > > > > it
> > > > > > more intuitive what happens. It is really just a cyclic dataflow.
> > > > > >
> > > > > > Aljoscha Krettek  ezt írta (időpont: 2015.
> > júl.
> > > > 7.,
> > > > > > K,
> > > > > > 15:35):
> > > > > >
> > > > > > > Hi,
> > > > > > > I just noticed that we don't have anything about how iterations
> > and
> > > > > > > timestamps/watermarks should interact.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > > On Mon, 6 Jul 2015 at 23:56 Stephan Ewen 
> > wrote:
> > > > > > >
> > > > > > > > Hi all!
> > > > > > > >
> > > > > > > > As many of you know, there are a ongoing efforts to
> consolidate
> > > the
> > > > > > > > streaming API for the next release, and then graduate it
> (from
> > > beta
> > > > > > > > status).
> > > > > > > >
> > > > > > > > In the process of this consolidation, we want to achieve the
> > > > > following
> > > > > > > > goals.
> > > > > > > >
> > > > > > > >  - Make the code more robust and simplify it in parts
> > > > > > > >
> > > > > > > >  - Clearly define the semantics of the constructs.
> > > > > > > >
> > > > > > > >  - Prepare it for support of more advanced concepts, like
> > > > > partitionable
> > > > > > > > state, and event time.
> > > > > > > >
> > > > > > > >  - Cut support for certain corner cases that were prototyped,
> > but
> > > > > > turned
> > > > > > 

Re: Design documents for consolidated DataStream API

2015-07-13 Thread Gyula Fóra
+1
On Mon, Jul 13, 2015 at 6:23 PM Stephan Ewen  wrote:

> If naming is the only concern, then we should go ahead, because we can
> change names easily (before the release).
>
> In fact, I don't think it leaves a bad impression. Global windows are
> non-parallel windows. There are also parallel windows. Pick what you need
> and what works.
>
>
> On Mon, Jul 13, 2015 at 6:13 PM, Gyula Fóra  wrote:
>
> > I think we agree on everything its more of a naming issue :)
> >
> > I thought it might be misleading that global time windows are
> > "non-parallel" windows. We dont want to give a bad impression. (Also we
> > dont want them to think that every global window is parallel but thats
> not
> > a problem here)
> >
> > Gyula
> > On Mon, Jul 13, 2015 at 5:22 PM Stephan Ewen  wrote:
> >
> > > Okay, what is missing about the windowing in your opinion?
> > >
> > > The core points of the document are:
> > >
> > >   - The parallel windows are per group only.
> > >
> > >   - The implementation of the parallel windows holds window data in the
> > > group buffers.
> > >
> > >   - The global windows are non-parallel. May have parallel
> > pre-aggregation,
> > > if they are time windows.
> > >
> > >   - Time may be operator time (timer thread), or watermark time.
> > Watermark
> > > time can refer to ingress or event time.
> > >
> > >   - Windows that do not pre-aggregate may require elements in order.
> Not
> > > part of the first prototype.
> > >
> > > Do we agree on those points?
> > >
> > >
> > > On Mon, Jul 13, 2015 at 4:50 PM, Gyula Fóra 
> > wrote:
> > >
> > > > In general I like it, although the main difference between the
> current
> > > and
> > > > the new one is the windowing and that is still not very clear.
> > > >
> > > > Where do we have the full stream time windows for instance?(which is
> > > > parallel but not keyed)
> > > > On Mon, Jul 13, 2015 at 4:28 PM Aljoscha Krettek <
> aljos...@apache.org>
> > > > wrote:
> > > >
> > > > > +1 I like it as well.
> > > > >
> > > > > On Mon, 13 Jul 2015 at 16:17 Kostas Tzoumas 
> > > wrote:
> > > > >
> > > > > > +1 from my side
> > > > > >
> > > > > > On Mon, Jul 13, 2015 at 4:15 PM, Stephan Ewen 
> > > > wrote:
> > > > > >
> > > > > > > Do we have consensus on these designs?
> > > > > > >
> > > > > > > If we have, we should get to implementing this soon, because
> > > > basically
> > > > > > all
> > > > > > > streaming patches will have to be revisited in light of this...
> > > > > > >
> > > > > > > On Tue, Jul 7, 2015 at 3:41 PM, Gyula Fóra <
> gyula.f...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > You are right thats an important issue.
> > > > > > > >
> > > > > > > > And I think we should also do some renaming with the
> > "iterations"
> > > > > > because
> > > > > > > > they are not really iterations like in the batch case and it
> > > might
> > > > > > > confuse
> > > > > > > > some users.
> > > > > > > > Maybe we can call them loops or cycles and rename the api
> calls
> > > to
> > > > > make
> > > > > > > it
> > > > > > > > more intuitive what happens. It is really just a cyclic
> > dataflow.
> > > > > > > >
> > > > > > > > Aljoscha Krettek  ezt írta (időpont:
> > 2015.
> > > > júl.
> > > > > > 7.,
> > > > > > > > K,
> > > > > > > > 15:35):
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > > I just noticed that we don't have anything about how
> > iterations
> > > > and
> > > > > > > > > timestamps/watermarks should interact.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Aljoscha
> > > > > >

Re: Design documents for consolidated DataStream API

2015-07-14 Thread Gyula Fóra
I think Marton has some good points here.

1) Is KeyedDataStream a better name if this is only a renaming?

2) the discretize semantics is unclear indeed. Are we operating on a single
or sequence of datasets? If the latter why not call it something else
(dstream). How are joins and other binary operators defined for different
discretizations etc.
On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi  wrote:

> Generally I agree with the new design. Two concerns:
>
> 1) Does KeyedDataStream replace GroupedDataStream or is it the latter a
> special case of the former?
>
> The KeyedDataStream as described in the design document is a bit unclear
> for me. It lists the following usages:
>   a) It is the first step in building a window stream, on top of which the
> grouped/windowed aggregation and reduce-style function can be applied
>   b) It allows to use the "by-key" state of functions. Here, every record
> has access to a state that is scoped by its key. Key-scoped state can be
> automatically redistributed and repartitioned.
>
> The code snippet describes a use case where the computation and the access
> of the state is used the way currently the GroupedDataStream should work. I
> suppose this is the example for case b). Would case a) also window elements
> by key? If yes, then this is practically a renaming and enhancement of the
> GroupedDataStream functionality with keyed state. Then the
> StreamExecutionEnvironment.createKeyedStream(Partitioner,
> KeySelector)construction does not make much sense as the user only operates
> within the scope of the keyselector and not the partitioner anyway.
>
> I personally think KeyedDataStream as a name does not necessarily suggest
> that the records are grouped by key, it only suggests partitioning by key -
> at least for me. :)
>
> 2) The API for discretization is not convenient IMHO
>
> The discretization part declares that the output of DataStream.discretize()
> is a sequence of DataSets. I love this approach, but then in the code
> snippet the return value of this function is simply a DataSet and uses it
> as such. The take home message of that code is the following: this is
> actually the way you would like to program on these sequence of DataSets,
> most probably you would like to do the same with each of them. If that is
> the case we should provide a nice utility for that. I think Spark
> Streaming's DStream.foreachRDD() is fairly useful for this purpose.
>
> On Mon, Jul 13, 2015 at 6:30 PM, Gyula Fóra  wrote:
>
> > +1
> > On Mon, Jul 13, 2015 at 6:23 PM Stephan Ewen  wrote:
> >
> > > If naming is the only concern, then we should go ahead, because we can
> > > change names easily (before the release).
> > >
> > > In fact, I don't think it leaves a bad impression. Global windows are
> > > non-parallel windows. There are also parallel windows. Pick what you
> need
> > > and what works.
> > >
> > >
> > > On Mon, Jul 13, 2015 at 6:13 PM, Gyula Fóra 
> > wrote:
> > >
> > > > I think we agree on everything its more of a naming issue :)
> > > >
> > > > I thought it might be misleading that global time windows are
> > > > "non-parallel" windows. We dont want to give a bad impression. (Also
> we
> > > > dont want them to think that every global window is parallel but
> thats
> > > not
> > > > a problem here)
> > > >
> > > > Gyula
> > > > On Mon, Jul 13, 2015 at 5:22 PM Stephan Ewen 
> wrote:
> > > >
> > > > > Okay, what is missing about the windowing in your opinion?
> > > > >
> > > > > The core points of the document are:
> > > > >
> > > > >   - The parallel windows are per group only.
> > > > >
> > > > >   - The implementation of the parallel windows holds window data in
> > the
> > > > > group buffers.
> > > > >
> > > > >   - The global windows are non-parallel. May have parallel
> > > > pre-aggregation,
> > > > > if they are time windows.
> > > > >
> > > > >   - Time may be operator time (timer thread), or watermark time.
> > > > Watermark
> > > > > time can refer to ingress or event time.
> > > > >
> > > > >   - Windows that do not pre-aggregate may require elements in
> order.
> > > Not
> > > > > part of the first prototype.
> > > > >
> > > > > Do we agree on those points?
> > > > >
> > > > >
> > > > > On Mon

Re: Design documents for consolidated DataStream API

2015-07-14 Thread Gyula Fóra
If we only want to have either keyBy or groupBy, why not keep groupBy? That
would be more consistent with the batch api.
On Tue, Jul 14, 2015 at 10:35 AM Stephan Ewen  wrote:

> Concerning your comments:
>
> 1) In the new design, there is no grouping without windowing. The
> KeyedDataStream subsumes the grouping and key-ing for partitioned state.
>
> The keyBy() + window() makes a parallel grouped window
> keyBy() alone allows access to partitioned state.
>
> My thought was that this is simpler, because it needs not groupBy() and
> keyBy(), but one construct to handle both cases.
>
> 2) The discretization is a rough thought and is nothing for the short term.
> It totally needs more thoughts. I put it there to have it as a sketch for
> how to evolve this.
>
> The idea is of course to not have a single data set, but a series of
> data set. In each discrete time slice, the data set can be treated like a
> regular data set.
>
> Let's kick off a separate design for the discretization. Joins are good
> to talk about (data sets can be joined with data set), and I am sure there
> are more questions coming up.
>
>
> Does that make sense?
>
>
>
>
>
> On Tue, Jul 14, 2015 at 10:05 AM, Gyula Fóra  wrote:
>
> > I think Marton has some good points here.
> >
> > 1) Is KeyedDataStream a better name if this is only a renaming?
> >
> > 2) the discretize semantics is unclear indeed. Are we operating on a
> single
> > or sequence of datasets? If the latter why not call it something else
> > (dstream). How are joins and other binary operators defined for different
> > discretizations etc.
> > On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi 
> > wrote:
> >
> > > Generally I agree with the new design. Two concerns:
> > >
> > > 1) Does KeyedDataStream replace GroupedDataStream or is it the latter a
> > > special case of the former?
> > >
> > > The KeyedDataStream as described in the design document is a bit
> unclear
> > > for me. It lists the following usages:
> > >   a) It is the first step in building a window stream, on top of which
> > the
> > > grouped/windowed aggregation and reduce-style function can be applied
> > >   b) It allows to use the "by-key" state of functions. Here, every
> record
> > > has access to a state that is scoped by its key. Key-scoped state can
> be
> > > automatically redistributed and repartitioned.
> > >
> > > The code snippet describes a use case where the computation and the
> > access
> > > of the state is used the way currently the GroupedDataStream should
> > work. I
> > > suppose this is the example for case b). Would case a) also window
> > elements
> > > by key? If yes, then this is practically a renaming and enhancement of
> > the
> > > GroupedDataStream functionality with keyed state. Then the
> > > StreamExecutionEnvironment.createKeyedStream(Partitioner,
> > > KeySelector)construction does not make much sense as the user only
> > operates
> > > within the scope of the keyselector and not the partitioner anyway.
> > >
> > > I personally think KeyedDataStream as a name does not necessarily
> suggest
> > > that the records are grouped by key, it only suggests partitioning by
> > key -
> > > at least for me. :)
> > >
> > > 2) The API for discretization is not convenient IMHO
> > >
> > > The discretization part declares that the output of
> > DataStream.discretize()
> > > is a sequence of DataSets. I love this approach, but then in the code
> > > snippet the return value of this function is simply a DataSet and uses
> it
> > > as such. The take home message of that code is the following: this is
> > > actually the way you would like to program on these sequence of
> DataSets,
> > > most probably you would like to do the same with each of them. If that
> is
> > > the case we should provide a nice utility for that. I think Spark
> > > Streaming's DStream.foreachRDD() is fairly useful for this purpose.
> > >
> > > On Mon, Jul 13, 2015 at 6:30 PM, Gyula Fóra 
> > wrote:
> > >
> > > > +1
> > > > On Mon, Jul 13, 2015 at 6:23 PM Stephan Ewen 
> wrote:
> > > >
> > > > > If naming is the only concern, then we should go ahead, because we
> > can
> > > > > change names easily (before the release).
> > > > >
> > > > > In fact, I don't think it leaves a bad impression. G

Re: Design documents for consolidated DataStream API

2015-07-14 Thread Gyula Fóra
I see your point, reduceByKey is much clearer.

The question is whether we want to introduce this inconsistency across the
two api-s or stick with what we have.
On Tue, Jul 14, 2015 at 10:57 AM Aljoscha Krettek 
wrote:

> I agree, the groupBy, in the batch API is misleading, since a
> ds.groupBy().reduce() does not really build any groups, it is really a
> ds.keyBy().reduceByKey(). In the streaming API we can still fix this, IMHO.
>
> On Tue, 14 Jul 2015 at 10:56 Stephan Ewen  wrote:
>
> > It is not a bit different than the batch API, because streaming semantics
> > are a bit different ;-)
> >
> > One good thing is that we can make things better that were sub-optimal in
> > the Batch API.
> >
> > On Tue, Jul 14, 2015 at 10:55 AM, Stephan Ewen  wrote:
> >
> > > keyBy() does not do any grouping. Grouping in streams in not defined
> > > without windows.
> > >
> > > On Tue, Jul 14, 2015 at 10:48 AM, Gyula Fóra 
> > wrote:
> > >
> > >> If we only want to have either keyBy or groupBy, why not keep groupBy?
> > >> That
> > >> would be more consistent with the batch api.
> > >> On Tue, Jul 14, 2015 at 10:35 AM Stephan Ewen 
> wrote:
> > >>
> > >> > Concerning your comments:
> > >> >
> > >> > 1) In the new design, there is no grouping without windowing. The
> > >> > KeyedDataStream subsumes the grouping and key-ing for partitioned
> > state.
> > >> >
> > >> > The keyBy() + window() makes a parallel grouped window
> > >> > keyBy() alone allows access to partitioned state.
> > >> >
> > >> > My thought was that this is simpler, because it needs not
> > groupBy()
> > >> and
> > >> > keyBy(), but one construct to handle both cases.
> > >> >
> > >> > 2) The discretization is a rough thought and is nothing for the
> short
> > >> term.
> > >> > It totally needs more thoughts. I put it there to have it as a
> sketch
> > >> for
> > >> > how to evolve this.
> > >> >
> > >> > The idea is of course to not have a single data set, but a
> series
> > of
> > >> > data set. In each discrete time slice, the data set can be treated
> > like
> > >> a
> > >> > regular data set.
> > >> >
> > >> > Let's kick off a separate design for the discretization. Joins
> are
> > >> good
> > >> > to talk about (data sets can be joined with data set), and I am sure
> > >> there
> > >> > are more questions coming up.
> > >> >
> > >> >
> > >> > Does that make sense?
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On Tue, Jul 14, 2015 at 10:05 AM, Gyula Fóra 
> > >> wrote:
> > >> >
> > >> > > I think Marton has some good points here.
> > >> > >
> > >> > > 1) Is KeyedDataStream a better name if this is only a renaming?
> > >> > >
> > >> > > 2) the discretize semantics is unclear indeed. Are we operating
> on a
> > >> > single
> > >> > > or sequence of datasets? If the latter why not call it something
> > else
> > >> > > (dstream). How are joins and other binary operators defined for
> > >> different
> > >> > > discretizations etc.
> > >> > > On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi <
> mbala...@apache.org
> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Generally I agree with the new design. Two concerns:
> > >> > > >
> > >> > > > 1) Does KeyedDataStream replace GroupedDataStream or is it the
> > >> latter a
> > >> > > > special case of the former?
> > >> > > >
> > >> > > > The KeyedDataStream as described in the design document is a bit
> > >> > unclear
> > >> > > > for me. It lists the following usages:
> > >> > > >   a) It is the first step in building a window stream, on top of
> > >> which
> > >> > > the
> > >> > > > grouped/windowed aggregation and reduce-style function can be
> > >> applied
> > >> > > >   b) It allows to use th

Re: Thoughts About Streaming

2015-07-23 Thread Gyula Fóra
nverse reduce
> > > >>>>>>>> optimisation:
> > > >>>>>>>>>>>>>> (Tuple 0,38)
> > > >>>>>>>>>>>>>> (Tuple 0,829)
> > > >>>>>>>>>>>>>> (Tuple 0,1625)
> > > >>>>>>>>>>>>>> (Tuple 0,2424)
> > > >>>>>>>>>>>>>> (Tuple 0,3190)
> > > >>>>>>>>>>>>>> (Tuple 0,3198)
> > > >>>>>>>>>>>>>> (Tuple 0,-339368)
> > > >>>>>>>>>>>>>> (Tuple 0,-1315725)
> > > >>>>>>>>>>>>>> (Tuple 0,-2932932)
> > > >>>>>>>>>>>>>> (Tuple 0,-5082735)
> > > >>>>>>>>>>>>>> (Tuple 0,-7743256)
> > > >>>>>>>>>>>>>> (Tuple 0,75701046)
> > > >>>>>>>>>>>>>> (Tuple 0,642829470)
> > > >>>>>>>>>>>>>> (Tuple 0,2242018381)
> > > >>>>>>>>>>>>>> (Tuple 0,5190708618)
> > > >>>>>>>>>>>>>> (Tuple 0,10060360311)
> > > >>>>>>>>>>>>>> (Tuple 0,-94254951)
> > > >>>>>>>>>>>>>> (Tuple 0,-219806321293)
> > > >>>>>>>>>>>>>> (Tuple 0,-1258895232699)
> > > >>>>>>>>>>>>>> (Tuple 0,-4074432596329)
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> One line is one emitted window count. This is what
> happens
> > > >> when
> > > >>>> I
> > > >>>>>>>>>>> remove
> > > >>>>>>>>>>>>>> the Thread.sleep(1):
> > > >>>>>>>>>>>>>> (Tuple 0,660676)
> > > >>>>>>>>>>>>>> (Tuple 0,2553733)
> > > >>>>>>>>>>>>>> (Tuple 0,3542696)
> > > >>>>>>>>>>>>>> (Tuple 0,1)
> > > >>>>>>>>>>>>>> (Tuple 0,1107035)
> > > >>>>>>>>>>>>>> (Tuple 0,2549491)
> > > >>>>>>>>>>>>>> (Tuple 0,4100387)
> > > >>>>>>>>>>>>>> (Tuple 0,-8406583360092)
> > > >>>>>>>>>>>>>> (Tuple 0,-8406582150743)
> > > >>>>>>>>>>>>>> (Tuple 0,-8406580427190)
> > > >>>>>>>>>>>>>> (Tuple 0,-8406580427190)
> > > >>>>>>>>>>>>>> (Tuple 0,-8406580427190)
> > > >>>>>>>>>>>>>> (Tuple 0,6847279255682044995)
> > > >>>>>>>>>>>>>> (Tuple 0,6847279255682044995)
> > > >>>>>>>>>>>>>> (Tuple 0,-5390528042713628318)
> > > >>>>>>>>>>>>>> (Tuple 0,-5390528042711551780)
> > > >>>>>>>>>>>>>> (Tuple 0,-5390528042711551780)
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> So at some point the pre-reducer seems to go haywire and
> > > does
> > > >>>> not
> > > >>>>>>>>>>> recover
> > > >>>>>>>>>>>>>> from it. The good thing is that it does produce results
> > now,
> > > >>>> where
> > > >>>>>>>>> the
> > > >>>>>>>>>>>>>> previous Current/Reduce would simply hang and not
> produce
> > > any
> > > >>>>>>>> output.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <
> > gga...@gmail.com>
> > > >

Guide/design doc for streaming operator states

2015-07-23 Thread Gyula Fóra
Hey!
I started putting together a guide/design document for the streaming
operator state interfaces and implementations. The idea would be to create
a doc that contains all the details about the implementations so anyone can
use it as a reference later.

https://cwiki.apache.org/confluence/display/FLINK/Stateful+Stream+Processing

It will probably take me a couple of days to finish it, but in any case
feel free to comment.

Cheers,
Gyula


Extending the streaming scala api with stateful functions

2015-07-24 Thread Gyula Fóra
Hey,

I would like to propose a way to extend the standard Streaming Scala API
methods (map, flatmap, filter etc) with versions that take stateful
functions as lambdas. I think this would eliminate the awkwardness of
implementing RichFunctions in Scala and make statefulness more explicit:

*For example:*
def map( statefulMap: (I, Option[S]) => (O, Option[S]) )
def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O],
Option[S]))

This would be translated into RichMap and RichFlatMapFunctions that store
Option[S] as OperatorState for fault tolerance.

*Example rolling sum by key:*
val input: DataStream[Long] = ...
val sumByKey: DataStream[Long] =
input.keyBy(...).map( (next: Long, sum: Option[Long]) =>
 sum match {
   case Some(s) => (next + s, Some(next + s))
   case None => (next, Some(next))
  })

What do you think?

Gyula


Re: Extending the streaming scala api with stateful functions

2015-07-25 Thread Gyula Fóra
I opened a PR: https://github.com/apache/flink/pull/936
Feel free to comment :)

Gyula

Till Rohrmann  ezt írta (időpont: 2015. júl. 24., P,
18:15):

> We have something similar for broadcast variables in FlinkML. It allows you
> to write ds.mapWithBcVariable(bcDS){ (dsElement, bcVar) => ... }.
>
> I like the idea to make the life of a Scala programmer a little bit less
> javaesque :-)
> ​
>
> On Fri, Jul 24, 2015 at 5:45 PM, Stephan Ewen  wrote:
>
> > This is really syntactic sugar in the Scala API, rather then a system
> > feature.
> >
> > Which is good, it needs no extra runtime constructs...
> >
> > On Fri, Jul 24, 2015 at 5:43 PM, Aljoscha Krettek 
> > wrote:
> >
> > > Yes, this might be nice. Till and I had similar ideas about using the
> > > pattern to make broadcast variables more useable in Scala, in fact. :D
> > >
> > > On Fri, 24 Jul 2015 at 17:39 Gyula Fóra  wrote:
> > >
> > > > Hey,
> > > >
> > > > I would like to propose a way to extend the standard Streaming Scala
> > API
> > > > methods (map, flatmap, filter etc) with versions that take stateful
> > > > functions as lambdas. I think this would eliminate the awkwardness of
> > > > implementing RichFunctions in Scala and make statefulness more
> > explicit:
> > > >
> > > > *For example:*
> > > > def map( statefulMap: (I, Option[S]) => (O, Option[S]) )
> > > > def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O],
> > > > Option[S]))
> > > >
> > > > This would be translated into RichMap and RichFlatMapFunctions that
> > store
> > > > Option[S] as OperatorState for fault tolerance.
> > > >
> > > > *Example rolling sum by key:*
> > > > val input: DataStream[Long] = ...
> > > > val sumByKey: DataStream[Long] =
> > > > input.keyBy(...).map( (next: Long, sum: Option[Long]) =>
> > > >  sum match {
> > > >case Some(s) => (next + s, Some(next + s))
> > > >case None => (next, Some(next))
> > > >   })
> > > >
> > > > What do you think?
> > > >
> > > > Gyula
> > > >
> > >
> >
>


Re: streaming iteration

2015-07-27 Thread Gyula Fóra
Hey,
The JobGraph that is executed cannot have cycles (due to scheduling
reasons), that's why there is no edge between the head and tail operators.

What we do instead is we add an extra source to the head and a sink to the
tail (with the same parallelism), and the feedback data is passed outside
of the graph using blocking queues.

Cheers,
Gyula

MaGuoWei  ezt írta (időpont: 2015. júl. 28., K, 3:39):

> Hi,guysI am reading the code about streaming iteration.I find there is no
> StreamEdge the StreamIterationHead and StreamIterationTail.Why? Because the
> graph can't have any ring?Or any other concerns?
> thanks


Re: Revert 78fd2146dd until we have consensus for FLINK-2419?

2015-07-28 Thread Gyula Fóra
Hey,

I am sorry that you feel bad about this, I only did not add a test
case for FLINK-2419
because I am adding a test in my upcoming PR which verified the behaviour.

As for FLINK-2423, it is actually very bad that issue is still there. You
introduced this in your PR https://github.com/apache/flink/pull/895 which I
commented but no one fixed it before merging. As developing a test takes
quite much time here as it is tricky, I wanted to push the fix, which was
in fact trivial.

Regards,
Gyula

Robert Metzger  ezt írta (időpont: 2015. júl. 28., K,
20:01):

> Hi,
>
> I'm a bit unhappy how we were handling
> https://issues.apache.org/jira/browse/FLINK-2419 today.
>
> I raised a concern in the JIRA because the commit for the fix didn't
> contain any tests. Our coding guidelines [1] imply that every feature
> should have tests. Apparently there were not enough tests for the two bugs
> fixed with commit 78fd2146dd.
>
> Also, Gyula's answer sounds like he is not willing to add tests right now.
>
> I can not remember if we ever reverted a commit in the Flink community, but
> in my understanding, this is how ASF projects are doing lazy consensus for
> commits-without-PR.
> So if there is a disagreement in the associated JIRA, we revert the fix
> until there is an agreement.
>
> In this case, I did not immediately revert the commit, because I would like
> to see whether others in the community agree with me.
>
>
> What do you think how we should handle cases like this one in the future?
>
> I think its very important for committers and PMC members to be a good
> example when it comes to following our own rules. Otherwise, how can we ask
> our contributors to adhere to these rules?
>
>
> My suggestion to resolve this situation is the following:
> - Revert commit 78fd2146dd
> - open pull requests for FLINK-2419 and FLINK-2423 (with tests of course),
> review and merge them.
>
>
>
> Best,
> Robert
>
> [1] http://flink.apache.org/coding-guidelines.html
>


Re: Revert 78fd2146dd until we have consensus for FLINK-2419?

2015-07-28 Thread Gyula Fóra
What concerns me here is that for FLINK-2419 I clearly indicated that there
is a test in my other PR, and since the fix was actually trivial, which
didn't break the current functionality according my test, I wanted to push
it in before my PR because that is pending on something else. I could have
added a test here that is true.

With FLINK-2423 I was fixing some else's mistake who disregarded my message
when merging a PR. We could now revert that PR that introduced that bug,
but instead we are reverting my fix for that mistake.




Gyula Fóra  ezt írta (időpont: 2015. júl. 28., K,
20:19):

> Hey,
>
> I am sorry that you feel bad about this, I only did not add a test case
> for FLINK-2419 because I am adding a test in my upcoming PR which
> verified the behaviour.
>
> As for FLINK-2423, it is actually very bad that issue is still there. You
> introduced this in your PR https://github.com/apache/flink/pull/895 which
> I commented but no one fixed it before merging. As developing a test takes
> quite much time here as it is tricky, I wanted to push the fix, which was
> in fact trivial.
>
> Regards,
> Gyula
>
> Robert Metzger  ezt írta (időpont: 2015. júl. 28.,
> K, 20:01):
>
>> Hi,
>>
>> I'm a bit unhappy how we were handling
>> https://issues.apache.org/jira/browse/FLINK-2419 today.
>>
>> I raised a concern in the JIRA because the commit for the fix didn't
>> contain any tests. Our coding guidelines [1] imply that every feature
>> should have tests. Apparently there were not enough tests for the two bugs
>> fixed with commit 78fd2146dd.
>>
>> Also, Gyula's answer sounds like he is not willing to add tests right now.
>>
>> I can not remember if we ever reverted a commit in the Flink community,
>> but
>> in my understanding, this is how ASF projects are doing lazy consensus for
>> commits-without-PR.
>> So if there is a disagreement in the associated JIRA, we revert the fix
>> until there is an agreement.
>>
>> In this case, I did not immediately revert the commit, because I would
>> like
>> to see whether others in the community agree with me.
>>
>>
>> What do you think how we should handle cases like this one in the future?
>>
>> I think its very important for committers and PMC members to be a good
>> example when it comes to following our own rules. Otherwise, how can we
>> ask
>> our contributors to adhere to these rules?
>>
>>
>> My suggestion to resolve this situation is the following:
>> - Revert commit 78fd2146dd
>> - open pull requests for FLINK-2419 and FLINK-2423 (with tests of course),
>> review and merge them.
>>
>>
>>
>> Best,
>> Robert
>>
>> [1] http://flink.apache.org/coding-guidelines.html
>>
>


Re: Revert 78fd2146dd until we have consensus for FLINK-2419?

2015-07-28 Thread Gyula Fóra
I agree that consensus should be reached in all changes to the system.

What is not clear to me is what is the subject of consensus in this case.

As for FLINK-2423, this is clearly an issue, and the only question here is
whether my solution solves it or not. I think it is fair to say that this
is a trivial fix for this issue, but in any case it should be tested. I had
two options: fix it without a test, fix it and add a test. Unfortunately I
did not have time to add a test because I was busy with other things but I
did not want to leave that bug in. We can revert the commit back which will
still not give me time to write the test so the bug will potentially remain
there for long (as Robert indicated he doesnt have time for it either).

As for FLINK-2419, I agree that I could have added a simple test which
would not have taken me long. The only reason I did this because I am
testing this functionality in another PR. I understand if you want to
revert this I can open a PR with the simple test added.

Would it have been better if I did not address the first issue if I dont
have a time to write a proper test? Then that issue would have been
lingering there in a core functionality for who knows how long.

I would like to clearly understand what is expected in this situation.

Gyula


Kostas Tzoumas  ezt írta (időpont: 2015. júl. 28., K,
20:48):

> I am not familiar with this part of the code, but this is perhaps a good
> thing, as this is a matter of policy, not who introduced which bug (I
> suspect that the policy issue was Robert's motivation for starting a thread
> at the dev list)
>
> So, I think we have two issues:
>
> (1) Pull request https://github.com/apache/flink/pull/895 was merged
> without addressing Gyula's comment.
>
> (2) Commit
>
> https://github.com/apache/flink/commit/78fd2146dd00da1130910d9f23f09e2504854ef7
> was
> merged but consensus was not reached.
>
> Let's keep the two issues separate, as tracing back whose bug a PR is
> fixing (recursively :-) will not lead anywhere.
>
> Now, back to the original question: I think that commits should be subject
> to consensus in a similar way as PRs. The right to commit does not mean
> that consensus should not be reached, and this is a clear case of not
> having consensus.
>
> Kostas
>
>
> On Tue, Jul 28, 2015 at 8:30 PM, Gyula Fóra  wrote:
>
> > What concerns me here is that for FLINK-2419 I clearly indicated that
> there
> > is a test in my other PR, and since the fix was actually trivial, which
> > didn't break the current functionality according my test, I wanted to
> push
> > it in before my PR because that is pending on something else. I could
> have
> > added a test here that is true.
> >
> > With FLINK-2423 I was fixing some else's mistake who disregarded my
> message
> > when merging a PR. We could now revert that PR that introduced that bug,
> > but instead we are reverting my fix for that mistake.
> >
> >
> >
> >
> > Gyula Fóra  ezt írta (időpont: 2015. júl. 28., K,
> > 20:19):
> >
> > > Hey,
> > >
> > > I am sorry that you feel bad about this, I only did not add a test case
> > > for FLINK-2419 because I am adding a test in my upcoming PR which
> > > verified the behaviour.
> > >
> > > As for FLINK-2423, it is actually very bad that issue is still there.
> You
> > > introduced this in your PR https://github.com/apache/flink/pull/895
> > which
> > > I commented but no one fixed it before merging. As developing a test
> > takes
> > > quite much time here as it is tricky, I wanted to push the fix, which
> was
> > > in fact trivial.
> > >
> > > Regards,
> > > Gyula
> > >
> > > Robert Metzger  ezt írta (időpont: 2015. júl.
> 28.,
> > > K, 20:01):
> > >
> > >> Hi,
> > >>
> > >> I'm a bit unhappy how we were handling
> > >> https://issues.apache.org/jira/browse/FLINK-2419 today.
> > >>
> > >> I raised a concern in the JIRA because the commit for the fix didn't
> > >> contain any tests. Our coding guidelines [1] imply that every feature
> > >> should have tests. Apparently there were not enough tests for the two
> > bugs
> > >> fixed with commit 78fd2146dd.
> > >>
> > >> Also, Gyula's answer sounds like he is not willing to add tests right
> > now.
> > >>
> > >> I can not remember if we ever reverted a commit in the Flink
> community,
> > >> but
> > >> in my understanding, this is how ASF projects are doing lazy consensus
> > for
> &g

Re: Revert 78fd2146dd until we have consensus for FLINK-2419?

2015-07-28 Thread Gyula Fóra
Hey,

I think there is no reason for making a more serious issue out of this than
it already is :)

I have opened a pull request that adds the missing test for FLINK-2419:
https://github.com/apache/flink/pull/947
There everyone can verify that my commit has actually fixed the problem.
This should have been included in the commit itself.

If the community decides so, I am comfortable with reverting the commit, in
which case I will add the solution to FLINK-2419 to the PR mentioned above
and can be merged as one commit. Unfortunately I do not have time to
develop a thorough test for FLINK-2423 so in case we revert the commit I
have to exclude the bugfix for that issue, and I encourage anyone to pick
up that test case (https://issues.apache.org/jira/browse/FLINK-2423).

Regards,
Gyula

Kostas Tzoumas  ezt írta (időpont: 2015. júl. 28., K,
21:36):

> On Tue, Jul 28, 2015 at 9:09 PM, Gyula Fóra  wrote:
>
> > I agree that consensus should be reached in all changes to the system.
> >
> >
> Then Robert and you should reach consensus on FLINK-2419.
>
>
> > What is not clear to me is what is the subject of consensus in this case.
> >
> > As for FLINK-2423, this is clearly an issue, and the only question here
> is
> > whether my solution solves it or not. I think it is fair to say that this
> > is a trivial fix for this issue, but in any case it should be tested. I
> had
> > two options: fix it without a test, fix it and add a test. Unfortunately
> I
> > did not have time to add a test because I was busy with other things but
> I
> > did not want to leave that bug in. We can revert the commit back which
> will
> > still not give me time to write the test so the bug will potentially
> remain
> > there for long (as Robert indicated he doesnt have time for it either).
> >
> > As for FLINK-2419, I agree that I could have added a simple test which
> > would not have taken me long. The only reason I did this because I am
> > testing this functionality in another PR. I understand if you want to
> > revert this I can open a PR with the simple test added.
> >
> > Would it have been better if I did not address the first issue if I dont
> > have a time to write a proper test? Then that issue would have been
> > lingering there in a core functionality for who knows how long.
> >
> > I would like to clearly understand what is expected in this situation.
> >
> >
> Well, we get to define what is expected, that's the fun of being open
> source :-) In my opinion it is better to provide a well tested fix later
> than a potentially sloppy fix earlier.
>
>
>
> > Gyula
> >
> >
> > Kostas Tzoumas  ezt írta (időpont: 2015. júl. 28.,
> K,
> > 20:48):
> >
> > > I am not familiar with this part of the code, but this is perhaps a
> good
> > > thing, as this is a matter of policy, not who introduced which bug (I
> > > suspect that the policy issue was Robert's motivation for starting a
> > thread
> > > at the dev list)
> > >
> > > So, I think we have two issues:
> > >
> > > (1) Pull request https://github.com/apache/flink/pull/895 was merged
> > > without addressing Gyula's comment.
> > >
> > > (2) Commit
> > >
> > >
> >
> https://github.com/apache/flink/commit/78fd2146dd00da1130910d9f23f09e2504854ef7
> > > was
> > > merged but consensus was not reached.
> > >
> > > Let's keep the two issues separate, as tracing back whose bug a PR is
> > > fixing (recursively :-) will not lead anywhere.
> > >
> > > Now, back to the original question: I think that commits should be
> > subject
> > > to consensus in a similar way as PRs. The right to commit does not mean
> > > that consensus should not be reached, and this is a clear case of not
> > > having consensus.
> > >
> > > Kostas
> > >
> > >
> > > On Tue, Jul 28, 2015 at 8:30 PM, Gyula Fóra 
> > wrote:
> > >
> > > > What concerns me here is that for FLINK-2419 I clearly indicated that
> > > there
> > > > is a test in my other PR, and since the fix was actually trivial,
> which
> > > > didn't break the current functionality according my test, I wanted to
> > > push
> > > > it in before my PR because that is pending on something else. I could
> > > have
> > > > added a test here that is true.
> > > >
> > > > With FLINK-2423 I was fixing some else's mistake who disregarded my
> > > message
> > > > when merging a PR. W

Re: On some GUI tools for building Flink Streaming data flows...

2015-07-29 Thread Gyula Fóra
Hi Slim,

I totally agree with you that we should start working on supporting tools
like these.

StreamFlow is really nice tool, I also like it a lot. I think it would be
not too hard to integrate it with Flink as there are several projects that
have already used Flink streaming in a compositional way. (SAMOA, or the
Storm compatibility layer). So I guess this is just a matter of someone
picking it up at one point, but I am really looking forward to it , and
will support anyone doing it :)

As for SPQR and the dynamic pipelines, I don't think we have this on the
short term roadmap, but it would indeed be a nice feature. I don't see any
system limitations that would keep us from implementing this either once we
start working on dynamic modifications to the execution graph (such as for
automatic scaling)

Cheers,
Gyula

Slim Baltagi  ezt írta (időpont: 2015. júl. 30., Cs,
5:50):

> Hi
>
> As Flink streaming API is being stabilized, it is worth investigating
> tooling around it to make developer's life easier and also turning Flink
> streaming a more attractive stream processing platform.
> I would like to suggest a couple tools which are worth integrating with
> Flink streaming:
>
> 1. 'StreamFlow™ is a stream processing tool designed to help build and
> monitor processing workflows. The ultimate goal of StreamFlow is to make
> working with stream processing frameworks such as Apache Storm easier,
> faster, and with "enterprise" like management functionality.
>
> StreamFlow also provides a mechanism for non-developers such as data
> scientists, analysts, or operational users to rapidly build scalable data
> flows and analytics.'
>
> Reference:https://github.com/lmco/streamflow The topology builder looks
> really slick!
>
> Any chance of seeing StreamFlow a GUI for building Flink streaming data
> flows?
>
> 2. SPQR pronounced Spooker is a framework for building *dynamic * data
> stream processing pipelines: a feature that is neither available in Apache
> Storm nor in Spark Streaming.
> https://github.com/ottogroup/SPQR
>
> It might be worth investigating SPQR, integrating it with Flink and make it
> production ready.
>
> What are your thoughts?
>
> Thanks
>
> Slim Baltagi
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/On-some-GUI-tools-for-building-Flink-Streaming-data-flows-tp7233.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>


Re: Guide/design doc for streaming operator states

2015-07-30 Thread Gyula Fóra
Thanks for the feedback :)

My idea when I wrote that was that you can chain keyBy statements to
maintain order if your key does not change. Otherwise you are right, we
need a sorting operator.

Gyula

Aljoscha Krettek  ezt írta (időpont: 2015. júl. 30.,
Cs, 13:18):

> Hi,
> sorry for the long wait but I finally found the time to read it. It looks
> good but the later parts of course still need to be fleshed out.
>
> I have one comments/questions:
> In the description of partitioned state you have this sentence: "Operations
> using partitioned state can also benefit from the partial ordering
> guarantees that the flink runtime provides, to implement deterministic
> behaviour." How do we provide the ordering guarantees. I would assume that
> after a keyBy() the ordering in each partition is arbitrary, unless we add
> some sorting operator.
>
> Cheers,
> Aljoscha
>
> On Thu, 23 Jul 2015 at 15:55 Gyula Fóra  wrote:
>
> > Hey!
> > I started putting together a guide/design document for the streaming
> > operator state interfaces and implementations. The idea would be to
> create
> > a doc that contains all the details about the implementations so anyone
> can
> > use it as a reference later.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Stateful+Stream+Processing
> >
> > It will probably take me a couple of days to finish it, but in any case
> > feel free to comment.
> >
> > Cheers,
> > Gyula
> >
>


Types in the Python API

2015-07-30 Thread Gyula Fóra
Hey!

Could anyone briefly tell me what exactly is the reason why we force the
users in the Python API to declare types for operators?

I don't really understand how this works in different systems but I am just
curious why Flink has types and why Spark doesn't for instance.

If you give me some pointers to read that would also be fine :)

Thank you,
Gyula


Re: Types in the Python API

2015-07-30 Thread Gyula Fóra
That I understand, but could you please tell me how is this done
differently in Spark for instance?

What would we need to change to make this work with pure python (as it
seems to be possible)? This probably have large performance implications
though.

Gyula

Chesnay Schepler  ezt írta (időpont: 2015. júl. 30., Cs,
22:04):

> because it still goes through the Java API that requires some kind of
> type information. imagine a java api program where you omit all generic
> types, it just wouldn't work as of now.
>
> On 30.07.2015 21:17, Gyula Fóra wrote:
> > Hey!
> >
> > Could anyone briefly tell me what exactly is the reason why we force the
> > users in the Python API to declare types for operators?
> >
> > I don't really understand how this works in different systems but I am
> just
> > curious why Flink has types and why Spark doesn't for instance.
> >
> > If you give me some pointers to read that would also be fine :)
> >
> > Thank you,
> > Gyula
> >
>
>


Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Gyula Fóra
Hey,

I am not sure what is the intuitive behaviour here. As you are not applying
a transformation on the feedback stream but pass it to a closeWith method,
I thought it was somehow nature that it gets the partitioning of the
iteration input, but maybe its not intuitive.

If others also think that preserving feedback partitioning should be the
default I am not against it :)

Btw, this still won't make it very simple. We still need as many
source/sink pairs as we have different parallelism among the head
operators. Otherwise the forwarding logic wont work.

Cheers,
Gyula

Aljoscha Krettek  ezt írta (időpont: 2015. júl. 31.,
P, 11:52):

> Hi,
> I'm currently working on making the StreamGraph generation more centralized
> (i.e. not spread across the different API classes). The question is now why
> we need to switch to preserve partitioning? Could we not make "preserve"
> partitioning the default and if users want to have shuffle partitioning or
> anything they have to specify it manually when adding the feedback edge?
>
> This would make for a very simple scheme where the iteration sources are
> always connected to the heads using "forward" and the tails are connected
> to the iteration sinks using whatever partitioner was set by the user. This
> would make it more transparent than the current default of the "shuffle"
> betweens tails and iteration sinks.
>
> Cheers,
> Aljoscha
>
> P.S. I now we had quite some discussion about introducing "preserve
> partitioning" but now, when I think of it it should be the default... :D
>


Re: Types in the Python API

2015-07-31 Thread Gyula Fóra
t;>>> Maybe it's overly simplistic, but it might work. :D
> > >>>>>
> > >>>>> On Thu, 30 Jul 2015 at 23:35 Chesnay Schepler 
> > >>> wrote:
> > >>>>> I can see this working for basic types, but am unsure how it would
> > >> work
> > >>>>>> with Tuples. Wouldn't the java API still need to know the arity to
> > >>> setup
> > >>>>>> serializers?
> > >>>>>>
> > >>>>>> On 30.07.2015 23:02, Aljoscha Krettek wrote:
> > >>>>>>
> > >>>>>>> I believe it should be possible to create a special
> PythonTypeInfo
> > >>> where
> > >>>>>>> the python side is responsible for serializing data to a byte
> array
> > >>> and
> > >>>>>> to
> > >>>>>>
> > >>>>>>> the java side it is just a byte array and all the comparisons are
> > >> also
> > >>>>>>> performed on these byte arrays. I think partitioning and sort
> > should
> > >>>>>>>
> > >>>>>> still
> > >>>>>>
> > >>>>>>> work, since the sorting is (in most cases) only used to group the
> > >>>>>>>
> > >>>>>> elements
> > >>>>>>
> > >>>>>>> for a groupBy(). If proper sort order would be required this
> would
> > >>> have
> > >>>>>> to
> > >>>>>>
> > >>>>>>> be done on the python side.
> > >>>>>>>
> > >>>>>>> On Thu, 30 Jul 2015 at 22:21 Chesnay Schepler  >
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>> To be perfectly honest i never really managed to work my way
> > through
> > >>>>>>>> Spark's python API, it's a whole bunch of magic to me; not even
> > the
> > >>>>>>>> general structure is understandable.
> > >>>>>>>>
> > >>>>>>>> With "pure python" do you mean doing everything in python? as in
> > >> just
> > >>>>>>>> having serialized data on the java side?
> > >>>>>>>>
> > >>>>>>>> I believe the way to do this with Flink is to add a switch that
> > >>>>>>>> a) disables all type checks
> > >>>>>>>> b) creates serializers dynamically at runtime.
> > >>>>>>>>
> > >>>>>>>> a) should be fairly straight forward, b) on the other hand
> > >>>>>>>>
> > >>>>>>>> btw., the Python API itself doesn't require the type
> information,
> > >> it
> > >>>>>>>> already does the b part.
> > >>>>>>>>
> > >>>>>>>> On 30.07.2015 22:11, Gyula Fóra wrote:
> > >>>>>>>>
> > >>>>>>>>> That I understand, but could you please tell me how is this
> done
> > >>>>>>>>> differently in Spark for instance?
> > >>>>>>>>>
> > >>>>>>>>> What would we need to change to make this work with pure python
> > >> (as
> > >>> it
> > >>>>>>>>> seems to be possible)? This probably have large performance
> > >>>>>>>>>
> > >>>>>>>> implications
> > >>>>>>> though.
> > >>>>>>>>> Gyula
> > >>>>>>>>>
> > >>>>>>>>> Chesnay Schepler  ezt írta (időpont: 2015.
> > >> júl.
> > >>>>>>>> 30.,
> > >>>>>>> Cs,
> > >>>>>>>>> 22:04):
> > >>>>>>>>>
> > >>>>>>>>> because it still goes through the Java API that requires some
> > kind
> > >>> of
> > >>>>>>>>>> type information. imagine a java api program where you omit
> all
> > >>>>>>>>>>
> > >>>>>>>>> generic
> > >>>>>>> types, it just wouldn't work as of now.
> > >>>>>>>>>> On 30.07.2015 21:17, Gyula Fóra wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hey!
> > >>>>>>>>>>>
> > >>>>>>>>>>> Could anyone briefly tell me what exactly is the reason why
> we
> > >>> force
> > >>>>>>>>>> the
> > >>>>>>>>> users in the Python API to declare types for operators?
> > >>>>>>>>>>> I don't really understand how this works in different systems
> > >> but
> > >>> I
> > >>>>>>>>>> am
> > >>>>>>> just
> > >>>>>>>>>>> curious why Flink has types and why Spark doesn't for
> instance.
> > >>>>>>>>>>>
> > >>>>>>>>>>> If you give me some pointers to read that would also be fine
> :)
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thank you,
> > >>>>>>>>>>> Gyula
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> >
> >
>


Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Gyula Fóra
I mean that the head operators have different parallelism:

IterativeDataStream ids = ...

ids.map().setParallelism(2)
ids.map().setParallelism(4)

//...

ids.closeWith(feedback)

Aljoscha Krettek  ezt írta (időpont: 2015. júl. 31.,
P, 14:23):

> I thought about having some tighter restrictions here. My idea was to
> enforce that the feedback edges must have the same parallelism as the
> original input stream, otherwise shipping strategies such as "keyBy",
> "shuffle", "rebalance" don't seem to make sense because they would differ
> from the distribution of the original elements (at least IMHO). Maybe I'm
> wrong there, though.
>
> To me it seems intuitive that I get the feedback at the head they way I
> specify it at the tail. But maybe that's also just me... :D
>
> On Fri, 31 Jul 2015 at 14:00 Gyula Fóra  wrote:
>
> > Hey,
> >
> > I am not sure what is the intuitive behaviour here. As you are not
> applying
> > a transformation on the feedback stream but pass it to a closeWith
> method,
> > I thought it was somehow nature that it gets the partitioning of the
> > iteration input, but maybe its not intuitive.
> >
> > If others also think that preserving feedback partitioning should be the
> > default I am not against it :)
> >
> > Btw, this still won't make it very simple. We still need as many
> > source/sink pairs as we have different parallelism among the head
> > operators. Otherwise the forwarding logic wont work.
> >
> > Cheers,
> > Gyula
> >
> > Aljoscha Krettek  ezt írta (időpont: 2015. júl.
> 31.,
> > P, 11:52):
> >
> > > Hi,
> > > I'm currently working on making the StreamGraph generation more
> > centralized
> > > (i.e. not spread across the different API classes). The question is now
> > why
> > > we need to switch to preserve partitioning? Could we not make
> "preserve"
> > > partitioning the default and if users want to have shuffle partitioning
> > or
> > > anything they have to specify it manually when adding the feedback
> edge?
> > >
> > > This would make for a very simple scheme where the iteration sources
> are
> > > always connected to the heads using "forward" and the tails are
> connected
> > > to the iteration sinks using whatever partitioner was set by the user.
> > This
> > > would make it more transparent than the current default of the
> "shuffle"
> > > betweens tails and iteration sinks.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > P.S. I now we had quite some discussion about introducing "preserve
> > > partitioning" but now, when I think of it it should be the default...
> :D
> > >
> >
>


Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Gyula Fóra
I still don't get how it could possibly work, let me tell you how I see and
correct me in my logic :)

You have this program:
ids.map1().setParallelism(2)
ids.map2().setParallelism(4)

//...

ids.closeWith(feedback.groupBy(0))

You are suggesting that we only have one iteration source/sink pair with
parallelism of either 2 or 4. I will assume that the parallelism is 2 for
the sake of the argument.

The iteration source is connected to map1 and map2 with Forward
partitioning and the sink is connected with groupBy(0).
Each sink instance will receive all tuples of a given key which also means
that each iteration source instance (2) will too.

Now here comes the problem: the source will forward the tuples to map 1 and
since we have forward connection we maintiain the groupby semantics (this
is perfect.)  the sources will also forward to map 2 which has higher
parallelism so the tuple sending turns into round robin, which screws up
the groupby.

What did I miss?
Gyula

Aljoscha Krettek  ezt írta (időpont: 2015. júl. 31.,
P, 14:59):

> Yes, this would still work. For example, I have this crazy graph:
> http://postimg.org/image/xtv8ay8hv/full/ That results from this program:
> https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5
>
> It works, and the implementation is very simple, actually.
>
> On Fri, 31 Jul 2015 at 14:30 Gyula Fóra  wrote:
>
> > I mean that the head operators have different parallelism:
> >
> > IterativeDataStream ids = ...
> >
> > ids.map().setParallelism(2)
> > ids.map().setParallelism(4)
> >
> > //...
> >
> > ids.closeWith(feedback)
> >
> > Aljoscha Krettek  ezt írta (időpont: 2015. júl.
> 31.,
> > P, 14:23):
> >
> > > I thought about having some tighter restrictions here. My idea was to
> > > enforce that the feedback edges must have the same parallelism as the
> > > original input stream, otherwise shipping strategies such as "keyBy",
> > > "shuffle", "rebalance" don't seem to make sense because they would
> differ
> > > from the distribution of the original elements (at least IMHO). Maybe
> I'm
> > > wrong there, though.
> > >
> > > To me it seems intuitive that I get the feedback at the head they way I
> > > specify it at the tail. But maybe that's also just me... :D
> > >
> > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra  wrote:
> > >
> > > > Hey,
> > > >
> > > > I am not sure what is the intuitive behaviour here. As you are not
> > > applying
> > > > a transformation on the feedback stream but pass it to a closeWith
> > > method,
> > > > I thought it was somehow nature that it gets the partitioning of the
> > > > iteration input, but maybe its not intuitive.
> > > >
> > > > If others also think that preserving feedback partitioning should be
> > the
> > > > default I am not against it :)
> > > >
> > > > Btw, this still won't make it very simple. We still need as many
> > > > source/sink pairs as we have different parallelism among the head
> > > > operators. Otherwise the forwarding logic wont work.
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > Aljoscha Krettek  ezt írta (időpont: 2015. júl.
> > > 31.,
> > > > P, 11:52):
> > > >
> > > > > Hi,
> > > > > I'm currently working on making the StreamGraph generation more
> > > > centralized
> > > > > (i.e. not spread across the different API classes). The question is
> > now
> > > > why
> > > > > we need to switch to preserve partitioning? Could we not make
> > > "preserve"
> > > > > partitioning the default and if users want to have shuffle
> > partitioning
> > > > or
> > > > > anything they have to specify it manually when adding the feedback
> > > edge?
> > > > >
> > > > > This would make for a very simple scheme where the iteration
> sources
> > > are
> > > > > always connected to the heads using "forward" and the tails are
> > > connected
> > > > > to the iteration sinks using whatever partitioner was set by the
> > user.
> > > > This
> > > > > would make it more transparent than the current default of the
> > > "shuffle"
> > > > > betweens tails and iteration sinks.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > P.S. I now we had quite some discussion about introducing "preserve
> > > > > partitioning" but now, when I think of it it should be the
> default...
> > > :D
> > > > >
> > > >
> > >
> >
>


Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Gyula Fóra
There might be reasons why a user would want different parallelism at the
head operators (depending on what else that head operator might process) so
restricting them to the same parallelism is a little bit weird don't you
think? It kind of goes against the whole opeartors-parallelism idea.

I don't think its a huge complexity to group head operators together by
parallelism and add a source/sink per each group like we do now. What do
you say?

Aljoscha Krettek  ezt írta (időpont: 2015. júl. 31.,
P, 17:10):

> Yes, I'm not saying that it makes sense to do it, I'm just saying that it
> does translate and run. Your observation is true. :D
>
> I'm wondering whether it makes sense to allow users to have iteration heads
> with differing parallelism, in fact.
>
> On Fri, 31 Jul 2015 at 16:40 Gyula Fóra  wrote:
>
> > I still don't get how it could possibly work, let me tell you how I see
> and
> > correct me in my logic :)
> >
> > You have this program:
> > ids.map1().setParallelism(2)
> > ids.map2().setParallelism(4)
> >
> > //...
> >
> > ids.closeWith(feedback.groupBy(0))
> >
> > You are suggesting that we only have one iteration source/sink pair with
> > parallelism of either 2 or 4. I will assume that the parallelism is 2 for
> > the sake of the argument.
> >
> > The iteration source is connected to map1 and map2 with Forward
> > partitioning and the sink is connected with groupBy(0).
> > Each sink instance will receive all tuples of a given key which also
> means
> > that each iteration source instance (2) will too.
> >
> > Now here comes the problem: the source will forward the tuples to map 1
> and
> > since we have forward connection we maintiain the groupby semantics (this
> > is perfect.)  the sources will also forward to map 2 which has higher
> > parallelism so the tuple sending turns into round robin, which screws up
> > the groupby.
> >
> > What did I miss?
> > Gyula
> >
> > Aljoscha Krettek  ezt írta (időpont: 2015. júl.
> 31.,
> > P, 14:59):
> >
> > > Yes, this would still work. For example, I have this crazy graph:
> > > http://postimg.org/image/xtv8ay8hv/full/ That results from this
> program:
> > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5
> > >
> > > It works, and the implementation is very simple, actually.
> > >
> > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra  wrote:
> > >
> > > > I mean that the head operators have different parallelism:
> > > >
> > > > IterativeDataStream ids = ...
> > > >
> > > > ids.map().setParallelism(2)
> > > > ids.map().setParallelism(4)
> > > >
> > > > //...
> > > >
> > > > ids.closeWith(feedback)
> > > >
> > > > Aljoscha Krettek  ezt írta (időpont: 2015. júl.
> > > 31.,
> > > > P, 14:23):
> > > >
> > > > > I thought about having some tighter restrictions here. My idea was
> to
> > > > > enforce that the feedback edges must have the same parallelism as
> the
> > > > > original input stream, otherwise shipping strategies such as
> "keyBy",
> > > > > "shuffle", "rebalance" don't seem to make sense because they would
> > > differ
> > > > > from the distribution of the original elements (at least IMHO).
> Maybe
> > > I'm
> > > > > wrong there, though.
> > > > >
> > > > > To me it seems intuitive that I get the feedback at the head they
> > way I
> > > > > specify it at the tail. But maybe that's also just me... :D
> > > > >
> > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra  wrote:
> > > > >
> > > > > > Hey,
> > > > > >
> > > > > > I am not sure what is the intuitive behaviour here. As you are
> not
> > > > > applying
> > > > > > a transformation on the feedback stream but pass it to a
> closeWith
> > > > > method,
> > > > > > I thought it was somehow nature that it gets the partitioning of
> > the
> > > > > > iteration input, but maybe its not intuitive.
> > > > > >
> > > > > > If others also think that preserving feedback partitioning should
> > be
> > > > the
> > > > > > default I am not against it :)
> > > > > >
> > > > > > Btw, this stil

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Gyula Fóra
Maybe you can reuse some of the logic that is currently there on the
StreamGraph, with building StreamLoops first which will be used to generate
the sources and sinks right before building the JobGraph. This avoids the
need of knowing everything beforehand.

I actually added this to avoid the complexities that you are probably
facing now.

Aljoscha Krettek  ezt írta (időpont: 2015. júl. 31.,
P, 17:28):

> Sure it can be done, it's just more complex if you try to do it in a sane
> way without having the code that builds the StreamGraph all over the place.
> :D
>
> I'll try to come up with something. This is my current work in progress, by
> the way: https://github.com/aljoscha/flink/tree/stream-api-rework
>
> I managed to ban the StreamGraph from StreamExecutionEnvironment and the
> API classes such as DataStream. The API methods construct a Graph of
> Transformation Nodes and don't contain any information themselves. Then
> there is a StreamGraphGenerator that builds a StreamGraph from the
> transformations. The abstraction is very nice and simple, the only problem
> that remains are the differing-parallelism-iterations but I'll figure them
> out.
>
> P.S. The code is not well documented yet, but the base class for
> transformations is StreamTransformation. From there anyone who want's to
> check it out can find the other transformations.
>
> On Fri, 31 Jul 2015 at 17:17 Gyula Fóra  wrote:
>
> > There might be reasons why a user would want different parallelism at the
> > head operators (depending on what else that head operator might process)
> so
> > restricting them to the same parallelism is a little bit weird don't you
> > think? It kind of goes against the whole opeartors-parallelism idea.
> >
> > I don't think its a huge complexity to group head operators together by
> > parallelism and add a source/sink per each group like we do now. What do
> > you say?
> >
> > Aljoscha Krettek  ezt írta (időpont: 2015. júl.
> 31.,
> > P, 17:10):
> >
> > > Yes, I'm not saying that it makes sense to do it, I'm just saying that
> it
> > > does translate and run. Your observation is true. :D
> > >
> > > I'm wondering whether it makes sense to allow users to have iteration
> > heads
> > > with differing parallelism, in fact.
> > >
> > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra  wrote:
> > >
> > > > I still don't get how it could possibly work, let me tell you how I
> see
> > > and
> > > > correct me in my logic :)
> > > >
> > > > You have this program:
> > > > ids.map1().setParallelism(2)
> > > > ids.map2().setParallelism(4)
> > > >
> > > > //...
> > > >
> > > > ids.closeWith(feedback.groupBy(0))
> > > >
> > > > You are suggesting that we only have one iteration source/sink pair
> > with
> > > > parallelism of either 2 or 4. I will assume that the parallelism is 2
> > for
> > > > the sake of the argument.
> > > >
> > > > The iteration source is connected to map1 and map2 with Forward
> > > > partitioning and the sink is connected with groupBy(0).
> > > > Each sink instance will receive all tuples of a given key which also
> > > means
> > > > that each iteration source instance (2) will too.
> > > >
> > > > Now here comes the problem: the source will forward the tuples to
> map 1
> > > and
> > > > since we have forward connection we maintiain the groupby semantics
> > (this
> > > > is perfect.)  the sources will also forward to map 2 which has higher
> > > > parallelism so the tuple sending turns into round robin, which screws
> > up
> > > > the groupby.
> > > >
> > > > What did I miss?
> > > > Gyula
> > > >
> > > > Aljoscha Krettek  ezt írta (időpont: 2015. júl.
> > > 31.,
> > > > P, 14:59):
> > > >
> > > > > Yes, this would still work. For example, I have this crazy graph:
> > > > > http://postimg.org/image/xtv8ay8hv/full/ That results from this
> > > program:
> > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5
> > > > >
> > > > > It works, and the implementation is very simple, actually.
> > > > >
> > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra 
> > wrote:
> > > > >
> > > > > > I mean that the head operators have differen

Re: Question about DataStream class hierarchy

2015-07-31 Thread Gyula Fóra
Hi Matthias,

I think Aljoscha is preparing a nice PR that completely reworks the
DataStream classes and the information they actually contain. I don't think
it's a good idea to mess things up before he gets a chance to open the PR.

Also I don't see a well supported reason for moving the setParallelism,
setName etc method to the DataStream, as these are specific things that you
can only set on operators. The KeyedDataStream is not an operator on the
other hand.

Can we just wait a little bit for Aljoscha with this? If you really need
his changes, you can for his branch and we can consider your changes after
merging his.

Regards,
Gyula



Matthias J. Sax  ezt írta (időpont: 2015.
júl. 31., P, 21:57):

> Hi,
>
> I would like to apply the following changes to DataStream class
> hierarchy:
> https://github.com/mjsax/flink/tree/flink-2306-storm-namedStreams
>
> Please give some feedback if those changes are reasonable to you.
>
> I need those change to get a clean design for
> https://issues.apache.org/jira/browse/FLINK-2306
>
>
> -Matthias
>
>
>
> On 07/29/2015 12:07 PM, Matthias J. Sax wrote:
> > What is the expected time frame for you work? I don't want to delay my
> > work too long (if I base it on your branch, it could not be merged
> > before yours).
> >
> > Right now, you did not change the class hierarchy. However, that is what
> > I would need. Thus, it make no sense to use you branch as a base right
> > now. What are your plans about this?
> >
> > -> one side comment: would it make sense to make DataStream abstract?
> >
> > From my point of view, it make most sense to me, that I apply the
> > changes I need in my PR directly (based on master).
> >
> > -Matthias
> >
> >
> > On 07/29/2015 08:11 AM, Aljoscha Krettek wrote:
> >> Right now it's mostly under-the-hood changes but you can look at the
> >> progress here: https://github.com/aljoscha/flink/tree/stream-api-rework
> >>
> >> The commit is going to change, so if you do put your work on top of it
> you
> >> might have to rebase.
> >>
> >> On Wed, 29 Jul 2015 at 07:26 Matthias J. Sax <
> mj...@informatik.hu-berlin.de>
> >> wrote:
> >>
> >>> My current work depends on a clean design of those. Otherwise, my own
> >>> code would get very messy. I would like to apply some changes in my own
> >>> PR (not opened yet). Do you thinks this is feasible? I don't want get
> in
> >>> a messy state. What kind of changes are you going to apply in
> FLINK-2398?
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 07/28/2015 10:30 PM, Aljoscha Krettek wrote:
>  Yes, very good points. I think we will be fixing these when we do the
> API
>  cleanups that we discussed on the wiki design docs. In fact, the work
> I'm
>  doing on https://issues.apache.org/jira/browse/FLINK-2398 can be
> seen as
>  preparation for making these changes possible/easier.
> 
>  On Tue, 28 Jul 2015 at 21:56 Matthias J. Sax <
> >>> mj...@informatik.hu-berlin.de>
>  wrote:
> 
> > Hi,
> >
> > I am a little bit confused about the class hierarchy of DataStream.
> It
> > has three subclasses: KeyedDataStream, SingleOutputStreamOperator,
> and
> > SplitDataStream.
> >
> > 1) Why is the name "SingleOutputStreamOperator" (why OPERATOR ??)
> >
> > 2) Is it correct, that a SplitDataStream emit multiple logical output
> > streams, while SingleOutputStreamOperator and KeyedDataStream emit a
> > single logical output stream?
> >=> If yes, why is a KeyedDataStream not a subclass of
> > SingleOutputStreamOperator ?
> >
> > 3)
> >   a) Why does only SingleOutputStreamOperator has method
> >>> name()/getName()?
> >   b) Why does only SingleOutputStreamOperator has method
> >>> setParallelism()?
> >   c) Should those methods be members of DataStream instead?
> >
> >
> >
> > -Matthias
> >
> >
> 
> >>>
> >>>
> >>
> >
>
>


Re: Question About "Preserve Partitioning" in Stream Iteration

2015-08-02 Thread Gyula Fóra
In a streaming program when we create an IterativeDataStream, we
practically mark the union point of some later feedback stream (the one
passed in to closeWith(..)).

The operators applied on this IterativeDataStream will receive the feedback
input as well. We call the operators applied on the iterative dataStream
head operators. We call the operators that produce the streams passed into
closeWith tail operators. With this terminology we can have many heads and
tails with varying parallelism.

Stephan Ewen  ezt írta (időpont: 2015. aug. 2., V, 20:16):

> I don't get the discussion here, can you help me with what you mean by
> "different iteration heads and tails" ?
>
> An iteration does not have one parallel head and one parallel tail?
>
> On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra  wrote:
>
> > Maybe you can reuse some of the logic that is currently there on the
> > StreamGraph, with building StreamLoops first which will be used to
> generate
> > the sources and sinks right before building the JobGraph. This avoids the
> > need of knowing everything beforehand.
> >
> > I actually added this to avoid the complexities that you are probably
> > facing now.
> >
> > Aljoscha Krettek  ezt írta (időpont: 2015. júl.
> 31.,
> > P, 17:28):
> >
> > > Sure it can be done, it's just more complex if you try to do it in a
> sane
> > > way without having the code that builds the StreamGraph all over the
> > place.
> > > :D
> > >
> > > I'll try to come up with something. This is my current work in
> progress,
> > by
> > > the way: https://github.com/aljoscha/flink/tree/stream-api-rework
> > >
> > > I managed to ban the StreamGraph from StreamExecutionEnvironment and
> the
> > > API classes such as DataStream. The API methods construct a Graph of
> > > Transformation Nodes and don't contain any information themselves. Then
> > > there is a StreamGraphGenerator that builds a StreamGraph from the
> > > transformations. The abstraction is very nice and simple, the only
> > problem
> > > that remains are the differing-parallelism-iterations but I'll figure
> > them
> > > out.
> > >
> > > P.S. The code is not well documented yet, but the base class for
> > > transformations is StreamTransformation. From there anyone who want's
> to
> > > check it out can find the other transformations.
> > >
> > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra  wrote:
> > >
> > > > There might be reasons why a user would want different parallelism at
> > the
> > > > head operators (depending on what else that head operator might
> > process)
> > > so
> > > > restricting them to the same parallelism is a little bit weird don't
> > you
> > > > think? It kind of goes against the whole opeartors-parallelism idea.
> > > >
> > > > I don't think its a huge complexity to group head operators together
> by
> > > > parallelism and add a source/sink per each group like we do now. What
> > do
> > > > you say?
> > > >
> > > > Aljoscha Krettek  ezt írta (időpont: 2015. júl.
> > > 31.,
> > > > P, 17:10):
> > > >
> > > > > Yes, I'm not saying that it makes sense to do it, I'm just saying
> > that
> > > it
> > > > > does translate and run. Your observation is true. :D
> > > > >
> > > > > I'm wondering whether it makes sense to allow users to have
> iteration
> > > > heads
> > > > > with differing parallelism, in fact.
> > > > >
> > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra 
> > wrote:
> > > > >
> > > > > > I still don't get how it could possibly work, let me tell you
> how I
> > > see
> > > > > and
> > > > > > correct me in my logic :)
> > > > > >
> > > > > > You have this program:
> > > > > > ids.map1().setParallelism(2)
> > > > > > ids.map2().setParallelism(4)
> > > > > >
> > > > > > //...
> > > > > >
> > > > > > ids.closeWith(feedback.groupBy(0))
> > > > > >
> > > > > > You are suggesting that we only have one iteration source/sink
> pair
> > > > with
> > > > > > parallelism of either 2 or 4. I will assume that the parallelism
> > is 2
> > > > for
&

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-08-03 Thread Gyula Fóra
It is critical for many applications (such as SAMOA or Storm compatibility)
to build arbitrary cyclic flows. If your suggestion covers all cases (for
instance nested iterations) then I am not against it.

The current implementation is just one way to do it, but it allows
arbitrary cycles. From the checkpointing perspective, I don't think this
will make too much of a difference as that will probably have to be handled
on the receiver side anyways if you think about the cyclic algorithm.

Gyula

Aljoscha Krettek  ezt írta (időpont: 2015. aug. 3., H,
9:41):

> Yes, that's what I was proposing in my second mail:
>
> I thought about having some tighter restrictions here. My idea was to
> enforce that the feedback edges must have the same parallelism as the
> original input stream, otherwise shipping strategies such as "keyBy",
> "shuffle", "rebalance" don't seem to make sense because they would differ
> from the distribution of the original elements (at least IMHO). Maybe I'm
> wrong there, though.
>
> To me it seems intuitive that I get the feedback at the head they way I
> specify it at the tail. But maybe that's also just me... :D
>
> On Mon, 3 Aug 2015 at 00:14 Stephan Ewen  wrote:
>
> > This model strikes me as pretty complicated. Imagine the extra logic and
> > code path necessary for proper checkpointing as well.
> >
> > Why not do a simple approach:
> >   - There is one parallel head, one parallel tail, both with the same
> > parallelism
> >
> >   - Any computation in between may have it own parallelism, no special
> > cases
> >
> >   - If the tail does not have the same parallelism as the head, it will
> not
> > by the tail, but flow will attach an additional tail operator. Between
> the
> > original tail and the additional tail, the streams are redistributed to
> > achieve the required parallelism.
> >
> > Wouldn't that give us the same and make things much easier. The batch
> > iterations work that way, by the way.
> >
> >
> >
> > On Sun, Aug 2, 2015 at 10:03 PM, Aljoscha Krettek 
> > wrote:
> >
> > > To answer the question plain and simple: No, there are several
> different
> > > parallel heads and tails.
> > >
> > > For example in this:
> > > val iter = ds.iteration()
> > >
> > > val head_tail1 = iter.map().parallelism(2)
> > > val head_tail2 = iter.map().parallelism(4)
> > >
> > > iter.closeWith(head_tail1.union(head_tail2))
> > >
> > > We have one head/tail pair with parallelism 2 and on with parallelism
> 4.
> > >
> > > Of the top of my head, I don't know what happens in this case though:
> > >
> > > val iter = ds.iteration()
> > >
> > > val head1 = iter.map().parallelism(2)
> > > val head2 = iter.map().parallelism(4)
> > >
> > > val tail1 = head1.map().parallelism(6)
> > > val tail2 = head2.map().parallelism(8)
> > >
> > > iter.closeWith(tail1.union(tail2))
> > >
> > > (Which is also tricky with the parallelism of the input stream)
> > >
> > >
> > > On Sun, 2 Aug 2015 at 21:22 Gyula Fóra  wrote:
> > >
> > > > In a streaming program when we create an IterativeDataStream, we
> > > > practically mark the union point of some later feedback stream (the
> one
> > > > passed in to closeWith(..)).
> > > >
> > > > The operators applied on this IterativeDataStream will receive the
> > > feedback
> > > > input as well. We call the operators applied on the iterative
> > dataStream
> > > > head operators. We call the operators that produce the streams passed
> > > into
> > > > closeWith tail operators. With this terminology we can have many
> heads
> > > and
> > > > tails with varying parallelism.
> > > >
> > > > Stephan Ewen  ezt írta (időpont: 2015. aug. 2., V,
> > > > 20:16):
> > > >
> > > > > I don't get the discussion here, can you help me with what you mean
> > by
> > > > > "different iteration heads and tails" ?
> > > > >
> > > > > An iteration does not have one parallel head and one parallel tail?
> > > > >
> > > > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra 
> > > > wrote:
> > > > >
> > > > > > Maybe you can reuse some of the logic that is currently there on
> > the
> > > > > > StreamGraph, with building 

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-08-03 Thread Gyula Fóra
Okay, sounds reasonable :)

Stephan Ewen  ezt írta (időpont: 2015. aug. 3., H, 10:24):

> I don't think there is a fundamental limitation to the simpler approach.
> The only real difference is that DOPs are adjusted before the tail, so only
> one head/tail pair is needed.
>
> Nested iterations should still be possible...
>
> On Mon, Aug 3, 2015 at 10:21 AM, Gyula Fóra  wrote:
>
> > It is critical for many applications (such as SAMOA or Storm
> compatibility)
> > to build arbitrary cyclic flows. If your suggestion covers all cases (for
> > instance nested iterations) then I am not against it.
> >
> > The current implementation is just one way to do it, but it allows
> > arbitrary cycles. From the checkpointing perspective, I don't think this
> > will make too much of a difference as that will probably have to be
> handled
> > on the receiver side anyways if you think about the cyclic algorithm.
> >
> > Gyula
> >
> > Aljoscha Krettek  ezt írta (időpont: 2015. aug. 3.,
> > H,
> > 9:41):
> >
> > > Yes, that's what I was proposing in my second mail:
> > >
> > > I thought about having some tighter restrictions here. My idea was to
> > > enforce that the feedback edges must have the same parallelism as the
> > > original input stream, otherwise shipping strategies such as "keyBy",
> > > "shuffle", "rebalance" don't seem to make sense because they would
> differ
> > > from the distribution of the original elements (at least IMHO). Maybe
> I'm
> > > wrong there, though.
> > >
> > > To me it seems intuitive that I get the feedback at the head they way I
> > > specify it at the tail. But maybe that's also just me... :D
> > >
> > > On Mon, 3 Aug 2015 at 00:14 Stephan Ewen  wrote:
> > >
> > > > This model strikes me as pretty complicated. Imagine the extra logic
> > and
> > > > code path necessary for proper checkpointing as well.
> > > >
> > > > Why not do a simple approach:
> > > >   - There is one parallel head, one parallel tail, both with the same
> > > > parallelism
> > > >
> > > >   - Any computation in between may have it own parallelism, no
> special
> > > > cases
> > > >
> > > >   - If the tail does not have the same parallelism as the head, it
> will
> > > not
> > > > by the tail, but flow will attach an additional tail operator.
> Between
> > > the
> > > > original tail and the additional tail, the streams are redistributed
> to
> > > > achieve the required parallelism.
> > > >
> > > > Wouldn't that give us the same and make things much easier. The batch
> > > > iterations work that way, by the way.
> > > >
> > > >
> > > >
> > > > On Sun, Aug 2, 2015 at 10:03 PM, Aljoscha Krettek <
> aljos...@apache.org
> > >
> > > > wrote:
> > > >
> > > > > To answer the question plain and simple: No, there are several
> > > different
> > > > > parallel heads and tails.
> > > > >
> > > > > For example in this:
> > > > > val iter = ds.iteration()
> > > > >
> > > > > val head_tail1 = iter.map().parallelism(2)
> > > > > val head_tail2 = iter.map().parallelism(4)
> > > > >
> > > > > iter.closeWith(head_tail1.union(head_tail2))
> > > > >
> > > > > We have one head/tail pair with parallelism 2 and on with
> parallelism
> > > 4.
> > > > >
> > > > > Of the top of my head, I don't know what happens in this case
> though:
> > > > >
> > > > > val iter = ds.iteration()
> > > > >
> > > > > val head1 = iter.map().parallelism(2)
> > > > > val head2 = iter.map().parallelism(4)
> > > > >
> > > > > val tail1 = head1.map().parallelism(6)
> > > > > val tail2 = head2.map().parallelism(8)
> > > > >
> > > > > iter.closeWith(tail1.union(tail2))
> > > > >
> > > > > (Which is also tricky with the parallelism of the input stream)
> > > > >
> > > > >
> > > > > On Sun, 2 Aug 2015 at 21:22 Gyula Fóra 
> wrote:
> > > > >
> > > > > > In a streaming program when we create an IterativeDataStream, we
> > > > >

Re: Failing Test again

2015-08-04 Thread Gyula Fóra
Honestly I don't think the partitioned state changes have anything to do
with the stability, only the reworked test case, which now test proper
exactly-once which was missing before.

Stephan Ewen  ezt írta (időpont: 2015. aug. 4., K, 12:12):

> Yes, the build stability is super serious right now.
>
> Here are the problems in question, and what we could do about this:
>
>
>
> BarrierBuffer:
> 
> Barrier Buffer tests fail in Java 6 builds.
>
> I have not found a way to diagnose that problem, yet, but if we cannot find
> the issue today, I would be willing to revert my latest commits on the
> barrier buffer to increase the stability.
>
>
> StreamCheckpointingITCase
> ---
> This seems to have started with either the barrier buffer, or the updated
> partitioned state. If fixing/reverting the barrier buffer does not fix it,
> and no fix has come up
>
> until then, let's revert the latest changes to the partitioned state and
> re-add them when they are stable.
>
>
> Tachyon:
> -
> The Tachyon mini cluster has a problem, apparently, the programs exit with
> a sysexit or segfault.
>
> Since we have no Tachyon code ourselves, do we need this test as part of
> the nightly tests?
> Can we make this a "manual" test that we trigger on demand?
>
>
>
> Greetings,
> Stephan
>
>
>
>
> On Tue, Aug 4, 2015 at 11:41 AM, Aljoscha Krettek 
> wrote:
>
> > I've also seen this fail:
> https://travis-ci.org/apache/flink/jobs/74025862
> >
> > in SuccessAfterNetworkBuffersFailureITCase
> >
> > Build seems quite flaky recently.
> >
> > On Tue, 4 Aug 2015 at 10:27 Matthias J. Sax <
> mj...@informatik.hu-berlin.de
> > >
> > wrote:
> >
> > > Rebased on:
> > >
> > >
> > >
> >
> https://github.com/mjsax/flink/commit/fab61a1954ff1554448e826e1d273689ed520fc3
> > >
> > > But if the gap between two rebases is large, it's hard to say what the
> > > problem might be...
> > >
> > > The old parent commit (ie, rebase before last rebase) was
> > >
> > >
> >
> https://github.com/mjsax/flink/commit/148395bcd81a93bcb1473e4e93f267edb3b71c7e
> > >
> > > -Matthias
> > >
> > > On 08/04/2015 08:57 AM, Aljoscha Krettek wrote:
> > > > What are the commits that you rebased on? Could you maybe narrow down
> > > what
> > > > caused the regression?
> > > >
> > > > On Mon, 3 Aug 2015 at 23:31 Matthias J. Sax <
> > > mj...@informatik.hu-berlin.de>
> > > > wrote:
> > > >
> > > >> I only report failing tests after a rebase. ;)
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 08/03/2015 11:23 PM, Henry Saputra wrote:
> > > >>> Thanks for reporting it , Matthias. Will try to run Travis for
> latest
> > > >> Flink.
> > > >>>
> > > >>> Tachyon test is a bit flaky. Maybe updating to latest release could
> > > help.
> > > >>>
> > > >>> - Henry
> > > >>>
> > > >>> On Mon, Aug 3, 2015 at 2:18 PM, Matthias J. Sax
> > > >>>  wrote:
> > >  Today, not a single built was successful completely. Please see
> > here:
> > > 
> > >  Flink Streaming Core:
> > >  https://travis-ci.org/mjsax/flink/jobs/73938109
> > >  https://travis-ci.org/mjsax/flink/jobs/73951362
> > >  https://travis-ci.org/apache/flink/jobs/73938124
> > >  https://travis-ci.org/apache/flink/jobs/73899795
> > >  https://travis-ci.org/apache/flink/jobs/73938122
> > >  https://travis-ci.org/apache/flink/jobs/73952441
> > > 
> > >  Flink Taychon:
> > >  https://travis-ci.org/apache/flink/jobs/73938123
> > > 
> > > 
> > >  -Matthias
> > > 
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>


Re: [ANNOUNCE] New Committer Chesnay Schepler

2015-08-20 Thread Gyula Fóra
Welcome! :)

On Thu, Aug 20, 2015 at 12:34 PM Matthias J. Sax <
mj...@informatik.hu-berlin.de> wrote:

> Congrats! The squirrel "army" is growing fast. :)
>
> On 08/20/2015 11:18 AM, Robert Metzger wrote:
> > The Project Management Committee (PMC) for Apache Flink has asked Chesnay
> > Schepler to become a committer and we are pleased to announce that they
> > have accepted.
> >
> > Chesnay has been very involved with the Flink project since its pre-ASF
> > days. He has worked on several components including the Java API,
> > documentation, and execution engine. Recently he made a big contribution
> > and added a Python API to Flink.
> >
> > Being a committer enables easier contribution to the project since there
> is
> > no need to go via the pull request submission process. This should enable
> > better productivity. Being a PMC member enables assistance with the
> > management and to guide the direction of the project.
> >
>
>


Streaming KV store abstraction

2015-09-08 Thread Gyula Fóra
Hey All,

The last couple of days I have been playing around with the idea of
building a streaming key-value store abstraction using stateful streaming
operators that can be used within Flink Streaming programs seamlessly.

Operations executed on this KV store would be fault tolerant as it
integrates with the checkpointing mechanism, and if we add timestamps to
each put/get/... operation we can use the watermarks to create fully
deterministic results. This functionality is very useful for many
applications, and is very hard to implement properly with some dedicates kv
store.

The KVStore abstraction could look as follows:

KVStore store = new KVStore<>;

Operations:

store.put(DataStream>)
store.get(DataStream) -> DataStream>
store.remove(DataStream) -> DataStream>
store.multiGet(DataStream) -> DataStream[]>
store.getWithKeySelector(DataStream, KeySelector) ->
DataStream[]>

For the resulting streams I used a special KV abstraction which let's us
return null values.

The implementation uses a simple streaming operator for executing most of
the operations (for multi get there is an additional merge operator) with
either local or partitioned states for storing the kev-value pairs (my
current prototype uses local states). And it can either execute operations
eagerly (which would not provide deterministic results), or buffer up
operations and execute them in order upon watermarks.

As for use cases you can probably come up with many I will save that for
now :D

I have a prototype implementation here that can execute the operations
described above (does not handle watermarks and time yet):

https://github.com/gyfora/flink/tree/KVStore
And also an example job:

https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java

What do you think?
If you like it I will work on writing tests and it still needs a lot of
tweaking and refactoring. This might be something we want to include with
the standard streaming libraries at one point.

Cheers,
Gyula


Re: Streaming KV store abstraction

2015-09-08 Thread Gyula Fóra
@Stephan:

Technically speaking this is really just a partitioned key-value state and
a fancy operator executing special operations on this state.

>From the user's perspective though this is something hard to implement. If
you want to share state between two stream for instance this way (getting
updates from one stream and enriching the other one) you would probably use
a connected datastream and custom implement the Key-value store logic. But
once you have one(or more) update stream and many get streams this
implementation will not work. So either the user end up replicating the
whole state in multiple connected operators, or custom implement some
inefficient wrapper class to take care of all the put/get operations.

The Idea behind this is to give a very simple abstraction for this type of
processing that uses the flink runtime efficiently instead of relying on
custom implementations.

Let me give you a stupid example:

You receive Temperature data in the form of (city, temperature), and you
are computing a rolling avg for each city.
Now you have 2 other incoming streams: first is a stream of some other info
about the city let's say population (city, population) and you want to
combine it with the last known avg temperature to produce (city, temp, pop)
triplets. The second stream is a pair of cities (city,city) and you are
interested in the difference of the temperature.

For enriching the (city, pop) to (city,temp,pop) you would probably use a
CoFlatMap and store the last known rolling avg as state. For computing the
(city,city) temperature difference it is a little more difficult, as you
need to get the temperature for both cities then combine in a second
operator. If you don't want to replicate your state, you have to combine
these two problems to a common wrapper type and execute them on a same
operator which will keep the avg state.

With the KVStore abstraction this is very simple:
you create a KVStore
For enriching you use kvStore.getWithKeySelector() which will give you
((cit,pop), temp) pairs and you are done. For computing the difference, you
can use kvStore.multiget(...) and get for the 2 cities at the same type.
The kv store will abstract away the getting of the 2 keys separately and
merging them so it will return [(city1, t1), (city2,t2)].

This might be slightly artificial example but I think it makes the point.
Implementing these jobs efficiently is not trivial for the users but I
think it is a very common problem.

Stephan Ewen  ezt írta (időpont: 2015. szept. 8., K,
14:53):

> @Gyula
>
> Can you explain a bit what this KeyValue store would do more then the
> partitioned key/value state?
>
> On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay  wrote:
>
> > Hello,
> >
> > As for use cases, in my old job at Ericsson we were building a
> > streaming system that was processing data from telephone networks, and
> > it was using key-value stores a LOT. For example, keeping track of
> > various state info of the users (which cell are they currently
> > connected to, what bearers do they have, ...); mapping from IDs of
> > users in one subsystem of the telephone network to the IDs of the same
> > users in an other subsystem; mapping from IDs of phone calls to lists
> > of IDs of participating users; etc.
> > So I imagine they would like this a lot. (At least, if they were
> > considering moving to Flink :))
> >
> > Best,
> > Gabor
> >
> >
> >
> >
> > 2015-09-08 13:35 GMT+02:00 Gyula Fóra :
> > > Hey All,
> > >
> > > The last couple of days I have been playing around with the idea of
> > > building a streaming key-value store abstraction using stateful
> streaming
> > > operators that can be used within Flink Streaming programs seamlessly.
> > >
> > > Operations executed on this KV store would be fault tolerant as it
> > > integrates with the checkpointing mechanism, and if we add timestamps
> to
> > > each put/get/... operation we can use the watermarks to create fully
> > > deterministic results. This functionality is very useful for many
> > > applications, and is very hard to implement properly with some
> dedicates
> > kv
> > > store.
> > >
> > > The KVStore abstraction could look as follows:
> > >
> > > KVStore store = new KVStore<>;
> > >
> > > Operations:
> > >
> > > store.put(DataStream>)
> > > store.get(DataStream) -> DataStream>
> > > store.remove(DataStream) -> DataStream>
> > > store.multiGet(DataStream) -> DataStream[]>
> > > store.getWithKeySelector(DataStream, KeySelector) ->
> > > DataStream[]>
> > >
> > > For the resulting streams I used a 

Re: Releasing 0.10.0-milestone1

2015-09-09 Thread Gyula Fóra
This sounds good +1 from me as well :)
Till Rohrmann  ezt írta (időpont: 2015. szept. 9.,
Sze, 10:40):

> +1 for a milestone release with the TypeInformation issues fixed. I'm
> working on it.
>
> On Tue, Sep 8, 2015 at 9:32 PM, Stephan Ewen  wrote:
>
> > Great!
> >
> > I'd like to push one more commit later today.
> > A fix for https://issues.apache.org/jira/browse/FLINK-2632 would also be
> > highly appreciated by some users.
> >
> > Anyone volunteering as release manager (for creating release candidates
> and
> > uploading them)?
> >
> >
> > On Tue, Sep 8, 2015 at 6:11 PM, Kostas Tzoumas 
> > wrote:
> >
> > > +1 for a milestone release
> > >
> > > On Tue, Sep 8, 2015 at 5:43 PM, Robert Metzger 
> > > wrote:
> > >
> > > > +1 for a "milestone1" release. We have a lot of good features in
> master
> > > > that people can benefit from.
> > > >
> > > > On Tue, Sep 8, 2015 at 5:10 PM, Maximilian Michels 
> > > wrote:
> > > >
> > > > > +1 for releasing a milestone release soon to encourage people to
> try
> > > > > out the new features.
> > > > >
> > > > > There is this bug:
> https://issues.apache.org/jira/browse/FLINK-2632
> > > > > which affects the Web Client's error and results display for jobs.
> > > > > Would be nice to fix it but IMHO it is not critical for the
> milestone
> > > > > release.
> > > > >
> > > > > On Tue, Sep 8, 2015 at 1:00 PM, Ufuk Celebi 
> wrote:
> > > > > >
> > > > > >> On 08 Sep 2015, at 12:01, Stephan Ewen 
> wrote:
> > > > > >>
> > > > > >> Hi all!
> > > > > >>
> > > > > >> Some day back we talked about releasing an 0.10.0-milestone1
> > > release.
> > > > > The
> > > > > >> master has advanced quite a bit (especially due to
> > high-availability
> > > > > code).
> > > > > >>
> > > > > >> I cherry picked the important additions to the
> > > > release-0.10.0-milestone1
> > > > > >> branch (fixes and Kafka consumer/producer rework).
> > > > > >>
> > > > > >> How about releasing the branch now as an intermediate version
> for
> > > > > people to
> > > > > >> try out while we stabilize the windows and HA code for the 0.10
> > > > release?
> > > > > >
> > > > > > +1
> > > > > >
> > > > > > Thanks for cping the important changes. I’ve checked and there is
> > > > > nothing I would add at this point.
> > > > > >
> > > > > > Can anybody else took a look at it? Other than that, I think it’s
> > > good
> > > > > to go.
> > > > > >
> > > > > > – Ufuk
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Streaming KV store abstraction

2015-09-09 Thread Gyula Fóra
Hey Gianmarco,

So the implementation looks something different:

The update stream is received by a stateful KVStoreOperator which stores
the K-V pairs as their partitioned state.

The query for the 2 cities is assigned an ID yes, and is split to the 2
cities, and each of these are  sent to the same KVStoreOperator as the
update stream. The output is the value for each key practically (qid,
city1, temp1) which is retreived from the operator state , and this output
is merged in a next operator to form the KV[] output on which the user can
execute the difference if he wants.

So actually no co-group is happening although semantically it might be
similar. Instead I use stateful operators to be much more efficient.

Does this answer you question?

Gyula

Gianmarco De Francisci Morales  ezt írta (időpont: 2015.
szept. 9., Sze, 14:29):

> Just a silly question.
> For the example you described, in a data flow model, you would do something
> like this:
>
> Have query ids added to the city pairs (qid, city1, city2),
> then split the query stream on the two cities and co-group it with the
> updates stream ((city1, qid) , (city, temp)), same for city2,
> then emit (qid, city1, temp1), (qid, city2, temp2) from the two co-groups,
> group on the qid, and apply a difference operator to get the final answer.
>
> Is your  idea to implement a way to generalize this logic, or it would use
> remote read/write to a KV-store?
>
> --
> Gianmarco
>
> On 8 September 2015 at 16:27, Aljoscha Krettek 
> wrote:
>
> > That's a very nice application of the Stream API and partitioned state.
> :D
> >
> > I think we should run some tests on a cluster  based on this to see what
> > kind of throughput the partitioned state system can handle and also how
> it
> > behaves with larger numbers of keys. The KVStore is just an interface and
> > the really heavy lifting is done by the state system so this would be a
> > good test for it.
> >
> >
> > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra  wrote:
> >
> > > @Stephan:
> > >
> > > Technically speaking this is really just a partitioned key-value state
> > and
> > > a fancy operator executing special operations on this state.
> > >
> > > From the user's perspective though this is something hard to implement.
> > If
> > > you want to share state between two stream for instance this way
> (getting
> > > updates from one stream and enriching the other one) you would probably
> > use
> > > a connected datastream and custom implement the Key-value store logic.
> > But
> > > once you have one(or more) update stream and many get streams this
> > > implementation will not work. So either the user end up replicating the
> > > whole state in multiple connected operators, or custom implement some
> > > inefficient wrapper class to take care of all the put/get operations.
> > >
> > > The Idea behind this is to give a very simple abstraction for this type
> > of
> > > processing that uses the flink runtime efficiently instead of relying
> on
> > > custom implementations.
> > >
> > > Let me give you a stupid example:
> > >
> > > You receive Temperature data in the form of (city, temperature), and
> you
> > > are computing a rolling avg for each city.
> > > Now you have 2 other incoming streams: first is a stream of some other
> > info
> > > about the city let's say population (city, population) and you want to
> > > combine it with the last known avg temperature to produce (city, temp,
> > pop)
> > > triplets. The second stream is a pair of cities (city,city) and you are
> > > interested in the difference of the temperature.
> > >
> > > For enriching the (city, pop) to (city,temp,pop) you would probably
> use a
> > > CoFlatMap and store the last known rolling avg as state. For computing
> > the
> > > (city,city) temperature difference it is a little more difficult, as
> you
> > > need to get the temperature for both cities then combine in a second
> > > operator. If you don't want to replicate your state, you have to
> combine
> > > these two problems to a common wrapper type and execute them on a same
> > > operator which will keep the avg state.
> > >
> > > With the KVStore abstraction this is very simple:
> > > you create a KVStore
> > > For enriching you use kvStore.getWithKeySelector() which will give you
> > > ((cit,pop), temp) pairs and you are done. For computing the difference,
> > you
> > > can use kvStore.multi

Using event timestamps

2015-09-13 Thread Gyula Fóra
Hey All!

Is there a proper way of using a Flink Streaming source with event
timestamps and watermarks? What I mean here is instead of implementing a
custom SourceFunction, use an existing one and provide some Timestamp
extractor (like the one currently used for Time windows), which will also
automatically generate the watermarks based on event time (assuming that
timestamps are partially ordered).

Maybe I missed something, or there might be somebody working on this
already :)

Cheers,
Gyula


Re: Using event timestamps

2015-09-14 Thread Gyula Fóra
Then as a first step, I will open a Jira for it :)

Aljoscha Krettek  ezt írta (időpont: 2015. szept. 14.,
H, 10:18):

> Hi,
> there is nothing like that in there yet but it would be very nice to have
> it. :D Also, as far as I know no-one is working on this right now.
>
> Cheers,
> Aljoscha
>
> On Mon, 14 Sep 2015 at 06:27 Márton Balassi 
> wrote:
>
> > Hey Gyula,
> >
> > I have been recently looking at the streaming UdfOperators and can not
> > recall a utility for the sources that you are looking for, but maybe I am
> > also missing it. :)
> > It would be a convenient addition though.
> >
> > Best,
> >
> > Marton
> >
> > On Sun, Sep 13, 2015 at 8:59 PM, Gyula Fóra  wrote:
> >
> > > Hey All!
> > >
> > > Is there a proper way of using a Flink Streaming source with event
> > > timestamps and watermarks? What I mean here is instead of implementing
> a
> > > custom SourceFunction, use an existing one and provide some Timestamp
> > > extractor (like the one currently used for Time windows), which will
> also
> > > automatically generate the watermarks based on event time (assuming
> that
> > > timestamps are partially ordered).
> > >
> > > Maybe I missed something, or there might be somebody working on this
> > > already :)
> > >
> > > Cheers,
> > > Gyula
> > >
> >
>


Re: Streaming KV store abstraction

2015-09-15 Thread Gyula Fóra
Hey All,

We decided to make this a standalone library until it is stable enough and
then we can decide whether we want to keep it like that or include in the
project:

https://github.com/gyfora/StreamKV

Cheers,
Gyula

Gianmarco De Francisci Morales  ezt írta (időpont: 2015.
szept. 9., Sze, 20:25):

> Yes, pretty clear. I guess semantically it's still a co-group, but
> implemented slightly differently.
>
> Thanks!
>
> --
> Gianmarco
>
> On 9 September 2015 at 15:37, Gyula Fóra  wrote:
>
> > Hey Gianmarco,
> >
> > So the implementation looks something different:
> >
> > The update stream is received by a stateful KVStoreOperator which stores
> > the K-V pairs as their partitioned state.
> >
> > The query for the 2 cities is assigned an ID yes, and is split to the 2
> > cities, and each of these are  sent to the same KVStoreOperator as the
> > update stream. The output is the value for each key practically (qid,
> > city1, temp1) which is retreived from the operator state , and this
> output
> > is merged in a next operator to form the KV[] output on which the user
> can
> > execute the difference if he wants.
> >
> > So actually no co-group is happening although semantically it might be
> > similar. Instead I use stateful operators to be much more efficient.
> >
> > Does this answer you question?
> >
> > Gyula
> >
> > Gianmarco De Francisci Morales  ezt írta (időpont:
> 2015.
> > szept. 9., Sze, 14:29):
> >
> > > Just a silly question.
> > > For the example you described, in a data flow model, you would do
> > something
> > > like this:
> > >
> > > Have query ids added to the city pairs (qid, city1, city2),
> > > then split the query stream on the two cities and co-group it with the
> > > updates stream ((city1, qid) , (city, temp)), same for city2,
> > > then emit (qid, city1, temp1), (qid, city2, temp2) from the two
> > co-groups,
> > > group on the qid, and apply a difference operator to get the final
> > answer.
> > >
> > > Is your  idea to implement a way to generalize this logic, or it would
> > use
> > > remote read/write to a KV-store?
> > >
> > > --
> > > Gianmarco
> > >
> > > On 8 September 2015 at 16:27, Aljoscha Krettek 
> > > wrote:
> > >
> > > > That's a very nice application of the Stream API and partitioned
> state.
> > > :D
> > > >
> > > > I think we should run some tests on a cluster  based on this to see
> > what
> > > > kind of throughput the partitioned state system can handle and also
> how
> > > it
> > > > behaves with larger numbers of keys. The KVStore is just an interface
> > and
> > > > the really heavy lifting is done by the state system so this would
> be a
> > > > good test for it.
> > > >
> > > >
> > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra  wrote:
> > > >
> > > > > @Stephan:
> > > > >
> > > > > Technically speaking this is really just a partitioned key-value
> > state
> > > > and
> > > > > a fancy operator executing special operations on this state.
> > > > >
> > > > > From the user's perspective though this is something hard to
> > implement.
> > > > If
> > > > > you want to share state between two stream for instance this way
> > > (getting
> > > > > updates from one stream and enriching the other one) you would
> > probably
> > > > use
> > > > > a connected datastream and custom implement the Key-value store
> > logic.
> > > > But
> > > > > once you have one(or more) update stream and many get streams this
> > > > > implementation will not work. So either the user end up replicating
> > the
> > > > > whole state in multiple connected operators, or custom implement
> some
> > > > > inefficient wrapper class to take care of all the put/get
> operations.
> > > > >
> > > > > The Idea behind this is to give a very simple abstraction for this
> > type
> > > > of
> > > > > processing that uses the flink runtime efficiently instead of
> relying
> > > on
> > > > > custom implementations.
> > > > >
> > > > > Let me give you a stupid example:
> > > > >
> > > > > You receive Temperature data in the

Iteration feedback partitioning does not work properly

2015-10-05 Thread Gyula Fóra
Hey,

This question is mainly targeted towards Aljoscha but maybe someone can
help me out here:

I think the way feedback partitioning is handled does not work, let me
illustrate with a simple example:

IterativeStream it = ... (parallelism 1)
DataStream mapped = it.map(...) (parallelism 2)
// this does not work as the feedback has parallelism 2 != 1
// it.closeWith(mapped.partitionByHash(someField))
// so we need rebalance the data
it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))

This program will execute but the feedback will not be partitioned by hash
to the mapper instances:
The partitioning will be set from the noOpMap to the iteration sink which
has parallelism different from the mapper (1 vs 2) and then the iteration
source forwards the element to the mapper (always to 0).

So the problem is basically that the iteration source/sink pair gets the
parallelism of the input stream (p=1) not the head operator (p = 2) which
leads to incorrect partitioning.

Did I miss something here?

Cheers,
Gyula


Re: Iteration feedback partitioning does not work properly

2015-10-06 Thread Gyula Fóra
Hi,

This is just a workaround, which actually breaks input order from my
source. I think the iteration construction should be reworked to set the
parallelism of the source/sink to the parallelism of the head operator (and
validate that all heads have the same parallelism).

I thought this was the solution that you described with Stephan in some
older discussion before the rewrite.

Cheers,
Gyula

Aljoscha Krettek  ezt írta (időpont: 2015. okt. 6., K,
9:15):

> Hi,
> I think what you would like to to can be achieved by:
>
> IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
> DataStream mapped = it.map(...)
>  it.closeWith(mapped.partitionByHash(someField))
>
> The input is rebalanced to the map inside the iteration as in your example
> and the feedback should be partitioned by hash.
>
> Cheers,
> Aljoscha
>
>
> On Tue, 6 Oct 2015 at 00:11 Gyula Fóra  wrote:
>
> > Hey,
> >
> > This question is mainly targeted towards Aljoscha but maybe someone can
> > help me out here:
> >
> > I think the way feedback partitioning is handled does not work, let me
> > illustrate with a simple example:
> >
> > IterativeStream it = ... (parallelism 1)
> > DataStream mapped = it.map(...) (parallelism 2)
> > // this does not work as the feedback has parallelism 2 != 1
> > // it.closeWith(mapped.partitionByHash(someField))
> > // so we need rebalance the data
> >
> >
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
> >
> > This program will execute but the feedback will not be partitioned by
> hash
> > to the mapper instances:
> > The partitioning will be set from the noOpMap to the iteration sink which
> > has parallelism different from the mapper (1 vs 2) and then the iteration
> > source forwards the element to the mapper (always to 0).
> >
> > So the problem is basically that the iteration source/sink pair gets the
> > parallelism of the input stream (p=1) not the head operator (p = 2) which
> > leads to incorrect partitioning.
> >
> > Did I miss something here?
> >
> > Cheers,
> > Gyula
> >
> >
>


TM failure when deploying a large number of sources

2015-10-07 Thread Gyula Fóra
Hey guys,

I am writing a job which involves creating many different sources to read
data from (in this case 80 sources wiht the parallelism of 8 each, running
locally on my mac). I cannot create less unfortunately.

The problem is that the job fails while deploying the tasks with the
following exception:

java.lang.Exception: Failed to deploy the task to slot SimpleSlot (1)(63) -
eea7250ab5b368693e3c4f14fb94f86d @ localhost - 8 slots - URL:
akka://flink/user/taskmanager_1 - ALLOCATED/ALIVE: Response was not of type
Acknowledge
at
org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:392)

at akka.dispatch.OnComplete.internal(Future.scala:247)
at akka.dispatch.OnComplete.internal(Future.scala:244)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
scala.concurrent.impl.ExecutionContextImpl$anon$3.exec(ExecutionContextImpl.scala:107)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Any idea what might cause this?

Cheers,
Gyula


Re: TM failure when deploying a large number of sources

2015-10-07 Thread Gyula Fóra
Thanks!

Yes, it was indeed a memory issue:
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at org.apache.flink.runtime.taskmanager.Task.startTaskThread(Task.java:415)
at
org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:904)

I will just decrease the parallelism locally :)

Cheers,
Gyula

Stephan Ewen  ezt írta (időpont: 2015. okt. 7., Sze,
14:16):

> Any further information from teh log?
>
> If you create so many tasks (8 x 80) on one machine, the JVM often has not
> enough memory reserved for the stack space to create enough threads (1-2
> threads per task)...
>
> On Wed, Oct 7, 2015 at 2:13 PM, Gyula Fóra  wrote:
>
> > Hey guys,
> >
> > I am writing a job which involves creating many different sources to read
> > data from (in this case 80 sources wiht the parallelism of 8 each,
> running
> > locally on my mac). I cannot create less unfortunately.
> >
> > The problem is that the job fails while deploying the tasks with the
> > following exception:
> >
> > java.lang.Exception: Failed to deploy the task to slot SimpleSlot
> (1)(63) -
> > eea7250ab5b368693e3c4f14fb94f86d @ localhost - 8 slots - URL:
> > akka://flink/user/taskmanager_1 - ALLOCATED/ALIVE: Response was not of
> type
> > Acknowledge
> > at
> >
> >
> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:392)
> >
> > at akka.dispatch.OnComplete.internal(Future.scala:247)
> > at akka.dispatch.OnComplete.internal(Future.scala:244)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > at
> >
> >
> scala.concurrent.impl.ExecutionContextImpl$anon$3.exec(ExecutionContextImpl.scala:107)
> >
> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >
> > at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
> > Any idea what might cause this?
> >
> > Cheers,
> > Gyula
> >
>


Re: TM failure when deploying a large number of sources

2015-10-07 Thread Gyula Fóra
Alright, I am creating one.

Stephan Ewen  ezt írta (időpont: 2015. okt. 7., Sze,
15:44):

> I think the error message could have been better, though...
>
> This actually warrants a JIRA issue...
>
> On Wed, Oct 7, 2015 at 2:44 PM, Gyula Fóra  wrote:
>
> > Thanks!
> >
> > Yes, it was indeed a memory issue:
> > java.lang.OutOfMemoryError: unable to create new native thread
> > at java.lang.Thread.start0(Native Method)
> > at java.lang.Thread.start(Thread.java:714)
> > at
> org.apache.flink.runtime.taskmanager.Task.startTaskThread(Task.java:415)
> > at
> >
> >
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:904)
> >
> > I will just decrease the parallelism locally :)
> >
> > Cheers,
> > Gyula
> >
> > Stephan Ewen  ezt írta (időpont: 2015. okt. 7., Sze,
> > 14:16):
> >
> > > Any further information from teh log?
> > >
> > > If you create so many tasks (8 x 80) on one machine, the JVM often has
> > not
> > > enough memory reserved for the stack space to create enough threads
> (1-2
> > > threads per task)...
> > >
> > > On Wed, Oct 7, 2015 at 2:13 PM, Gyula Fóra  wrote:
> > >
> > > > Hey guys,
> > > >
> > > > I am writing a job which involves creating many different sources to
> > read
> > > > data from (in this case 80 sources wiht the parallelism of 8 each,
> > > running
> > > > locally on my mac). I cannot create less unfortunately.
> > > >
> > > > The problem is that the job fails while deploying the tasks with the
> > > > following exception:
> > > >
> > > > java.lang.Exception: Failed to deploy the task to slot SimpleSlot
> > > (1)(63) -
> > > > eea7250ab5b368693e3c4f14fb94f86d @ localhost - 8 slots - URL:
> > > > akka://flink/user/taskmanager_1 - ALLOCATED/ALIVE: Response was not
> of
> > > type
> > > > Acknowledge
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.executiongraph.Execution$2.onComplete(Execution.java:392)
> > > >
> > > > at akka.dispatch.OnComplete.internal(Future.scala:247)
> > > > at akka.dispatch.OnComplete.internal(Future.scala:244)
> > > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> > > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> > > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > > at
> > > >
> > > >
> > >
> >
> scala.concurrent.impl.ExecutionContextImpl$anon$3.exec(ExecutionContextImpl.scala:107)
> > > >
> > > > at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > at
> > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > > >
> > > > at
> > >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > at
> > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > >
> > > > Any idea what might cause this?
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > >
> >
>


Re: Iteration feedback partitioning does not work properly

2015-10-08 Thread Gyula Fóra
The feedback tuples might get rebalanced but the normal input should not.

But still the main problem is the fact that partitioning is not handled
transparently, and actually does not work when you set the way you expect.

Gyula

Aljoscha Krettek  ezt írta (időpont: 2015. okt. 8.,
Cs, 16:33):

> Ok, I see your point. But I think there will be problems no matter what
> parallelism is chosen for the iteration source/sink. If the parallelism of
> the head is chosen then there will be an implicit rebalance from the
> operation right before the iteration to the iteration head. I think this
> should break ordering as well, in your case.
>
> On Tue, 6 Oct 2015 at 10:39 Gyula Fóra  wrote:
>
> > Hi,
> >
> > This is just a workaround, which actually breaks input order from my
> > source. I think the iteration construction should be reworked to set the
> > parallelism of the source/sink to the parallelism of the head operator
> (and
> > validate that all heads have the same parallelism).
> >
> > I thought this was the solution that you described with Stephan in some
> > older discussion before the rewrite.
> >
> > Cheers,
> > Gyula
> >
> > Aljoscha Krettek  ezt írta (időpont: 2015. okt. 6.,
> > K,
> > 9:15):
> >
> > > Hi,
> > > I think what you would like to to can be achieved by:
> > >
> > > IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
> > > DataStream mapped = it.map(...)
> > >  it.closeWith(mapped.partitionByHash(someField))
> > >
> > > The input is rebalanced to the map inside the iteration as in your
> > example
> > > and the feedback should be partitioned by hash.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > >
> > > On Tue, 6 Oct 2015 at 00:11 Gyula Fóra  wrote:
> > >
> > > > Hey,
> > > >
> > > > This question is mainly targeted towards Aljoscha but maybe someone
> can
> > > > help me out here:
> > > >
> > > > I think the way feedback partitioning is handled does not work, let
> me
> > > > illustrate with a simple example:
> > > >
> > > > IterativeStream it = ... (parallelism 1)
> > > > DataStream mapped = it.map(...) (parallelism 2)
> > > > // this does not work as the feedback has parallelism 2 != 1
> > > > // it.closeWith(mapped.partitionByHash(someField))
> > > > // so we need rebalance the data
> > > >
> > > >
> > >
> >
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
> > > >
> > > > This program will execute but the feedback will not be partitioned by
> > > hash
> > > > to the mapper instances:
> > > > The partitioning will be set from the noOpMap to the iteration sink
> > which
> > > > has parallelism different from the mapper (1 vs 2) and then the
> > iteration
> > > > source forwards the element to the mapper (always to 0).
> > > >
> > > > So the problem is basically that the iteration source/sink pair gets
> > the
> > > > parallelism of the input stream (p=1) not the head operator (p = 2)
> > which
> > > > leads to incorrect partitioning.
> > > >
> > > > Did I miss something here?
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > >
> > >
> >
>


Re: Iteration feedback partitioning does not work properly

2015-10-08 Thread Gyula Fóra
I agree that there are many things that needs to be figured out properly
for iterations, and I am okay with postponing them for the next release if
we want to get this one out quickly.

The only problem is that this probably breaks the SAMOA connector.

Paris can you confirm this?

Stephan Ewen  ezt írta (időpont: 2015. okt. 8., Cs,
17:12):

> For me as an outsider to the iterations, I would say that both approaches
> are in some way tricky with some unexpected behavior.
>
> Parallelism implicitly from the predecessor (input) or the successor (head
> task - what happens if there are multiple with different parallelism?) can
> confuse in either way.
> I have the feeling that what each one perceives as more consistent or
> intuitive depends a bit on their mental model of the iterations (given
> their prior experience and expectations).
>
> I agree that we should do something there. But given that we are apparently
> not really close to knowing what would be best way to go (or agreeing on
> it), I would like to not block 0.10 on this (workarounds are available
> after all) and take this for the next release with enough time properly
> figure this out and discuss it.
>
> The iterations will anyways need some work for the next release to
> integrate them with checkpointing and watermarks, so would you agree that
> we tackle this then as part of an effort to advance the iteration feature
> as a whole?
>
> Greetings,
> Stephan
>
>
>
> On Thu, Oct 8, 2015 at 4:42 PM, Gyula Fóra  wrote:
>
> > The feedback tuples might get rebalanced but the normal input should not.
> >
> > But still the main problem is the fact that partitioning is not handled
> > transparently, and actually does not work when you set the way you
> expect.
> >
> > Gyula
> >
> > Aljoscha Krettek  ezt írta (időpont: 2015. okt. 8.,
> > Cs, 16:33):
> >
> > > Ok, I see your point. But I think there will be problems no matter what
> > > parallelism is chosen for the iteration source/sink. If the parallelism
> > of
> > > the head is chosen then there will be an implicit rebalance from the
> > > operation right before the iteration to the iteration head. I think
> this
> > > should break ordering as well, in your case.
> > >
> > > On Tue, 6 Oct 2015 at 10:39 Gyula Fóra  wrote:
> > >
> > > > Hi,
> > > >
> > > > This is just a workaround, which actually breaks input order from my
> > > > source. I think the iteration construction should be reworked to set
> > the
> > > > parallelism of the source/sink to the parallelism of the head
> operator
> > > (and
> > > > validate that all heads have the same parallelism).
> > > >
> > > > I thought this was the solution that you described with Stephan in
> some
> > > > older discussion before the rewrite.
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > Aljoscha Krettek  ezt írta (időpont: 2015. okt.
> > 6.,
> > > > K,
> > > > 9:15):
> > > >
> > > > > Hi,
> > > > > I think what you would like to to can be achieved by:
> > > > >
> > > > > IterativeStream it =
> in.map(IdentityMap).setParallelism(2).iterate()
> > > > > DataStream mapped = it.map(...)
> > > > >  it.closeWith(mapped.partitionByHash(someField))
> > > > >
> > > > > The input is rebalanced to the map inside the iteration as in your
> > > > example
> > > > > and the feedback should be partitioned by hash.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > >
> > > > > On Tue, 6 Oct 2015 at 00:11 Gyula Fóra 
> wrote:
> > > > >
> > > > > > Hey,
> > > > > >
> > > > > > This question is mainly targeted towards Aljoscha but maybe
> someone
> > > can
> > > > > > help me out here:
> > > > > >
> > > > > > I think the way feedback partitioning is handled does not work,
> let
> > > me
> > > > > > illustrate with a simple example:
> > > > > >
> > > > > > IterativeStream it = ... (parallelism 1)
> > > > > > DataStream mapped = it.map(...) (parallelism 2)
> > > > > > // this does not work as the feedback has parallelism 2 != 1
> > > > > > // it.closeWith(mapped.partitionByHash(someField))
> > > > > > // so we need rebalance the data
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
> > > > > >
> > > > > > This program will execute but the feedback will not be
> partitioned
> > by
> > > > > hash
> > > > > > to the mapper instances:
> > > > > > The partitioning will be set from the noOpMap to the iteration
> sink
> > > > which
> > > > > > has parallelism different from the mapper (1 vs 2) and then the
> > > > iteration
> > > > > > source forwards the element to the mapper (always to 0).
> > > > > >
> > > > > > So the problem is basically that the iteration source/sink pair
> > gets
> > > > the
> > > > > > parallelism of the input stream (p=1) not the head operator (p =
> 2)
> > > > which
> > > > > > leads to incorrect partitioning.
> > > > > >
> > > > > > Did I miss something here?
> > > > > >
> > > > > > Cheers,
> > > > > > Gyula
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Kafka source stuck while canceling

2015-10-19 Thread Gyula Fóra
Hey guys,

Has anyone ever got something similar working with the kafka sources?

11:52:48,838 WARN  org.apache.flink.runtime.taskmanager.Task
  - Task 'Source: Kafka[***] (3/4)' did not react to cancelling signal,
but is stuck in method:
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
java.lang.Thread.run(Thread.java:745)

The failure was caused by a different operator in the pipeline, but the job
could never be fully cancelled and restarted due to this error.

Any idea is appreciated :)

Cheers,
Gyula


Re: [DISCUSS] Java code style

2015-10-20 Thread Gyula Fóra
+1 for both :)

Till Rohrmann  ezt írta (időpont: 2015. okt. 20., K,
14:58):

> I like the idea to have a bit stricter code style which will increase code
> maintainability and makes it easier for people to go through the code.
> Furthermore, it will relieve us from code style comments while reviewing
> PRs which can be quite cumbersome.
>
> Personally, I like the Google code style. Thus, +1 for both points.
>
> Just a remark: We should discuss the same for Flink's Scala style at some
> point.
>
> On Tue, Oct 20, 2015 at 2:54 PM, Márton Balassi 
> wrote:
>
> > +1 for both
> >
> > As we are planning to restructure the maven projects at the point that
> > breaks the PRs anyway, so going on step further at this point in time is
> > reasonable for me.
> >
> > On Tue, Oct 20, 2015 at 2:37 PM, Matthias J. Sax 
> wrote:
> >
> > > big +1 for both!
> > >
> > > On 10/20/2015 02:31 PM, Ufuk Celebi wrote:
> > > > DISCLAIMER: This is not my personal idea, but a community discussion
> > from
> > > > some time ago. Don't kill the messenger.
> > > >
> > > > In March we were discussing issues with heterogeneity of the code
> [1].
> > > The
> > > > summary is that we had a consensus to enforce a stricter code style
> on
> > > our
> > > > Java code base in order to make it easier to switch between projects
> > and
> > > to
> > > > have clear rules for new contributions. The main proposal in the last
> > > > discussion was to go with Google's Java code style. Not all were
> fully
> > > > satisfied with this, but still everyone agreed on some kind of style.
> > > >
> > > > I think the upcoming 0.10 release is a good point to finally go
> through
> > > > with these changes (right after the release/branch-off).
> > > >
> > > > I propose to go with Google's Java code style [2] as proposed
> earlier.
> > > >
> > > > PROs:
> > > > - Clear style guide available
> > > > - Tooling like checkstyle rules, IDE plugins already available
> > > >
> > > > CONs:
> > > > - Fully breaks our current style
> > > >
> > > > The main problem with this will be open pull requests, which will be
> > > harder
> > > > to merge after all the changes. On the other hand, should pull
> requests
> > > > that have been open for a long time block this? Most of the important
> > > > changes will be merged for the release anyways. I think in the long
> run
> > > we
> > > > will gain more than we loose by this (more homogenous code, clear
> > rules).
> > > > And it is questionable whether we will ever be able to do such a
> change
> > > in
> > > > the future if we cannot do it now. The project will most likely grow
> > and
> > > > attract more contributors, at which point it will be even harder to
> do.
> > > >
> > > > Please make sure to answer the following points in the discussion:
> > > >
> > > > 1) Are you (still) in favour of enforcing stricter rules on the Java
> > > > codebase?
> > > >
> > > > 2) If yes, would you be OK with the Google's Java code style?
> > > >
> > > > – Ufuk
> > > >
> > > > [1]
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201503.mbox/%3ccanc1h_von0b5omnwzxchtyzwhakeghbzvquyk7s9o2a36b8...@mail.gmail.com%3e
> > > >
> > > > [2] https://google.github.io/styleguide/javaguide.html
> > > >
> > >
> > >
> >
>


[DISCUSSION] Timely function interface and timer params

2016-10-28 Thread Gyula Fóra
Hello,

I was thinking about the methods provided by the timely functions and the
timerservice and I am wondering if it makes sense to change them a little
so they can cover a wider set of use case. Maybe I missed something
completely obvious so please shoot me down in that case :)

Currently the user gets a TimerService to register timers that will in the
future call the onTimer method. It is not completely obvious to me how
would I implement a function that needs to trigger two types of callbacks
in the future. If I get only one onTimer method I should be able to pass in
some sort of parameter or flag so I can branch in my onTimer
implementation.

As parameters are not supported I am left with states that are scoped to
the keys which is also pretty useless if I want to trigger different timed
actions for the same keys.

I know this is quite tricky but I see some alternative options:
 - The register timer method returns a unique (per key) timer id, so we can
associate state with this id to fetch info about the timer registered. (We
could also remove timers with this id and maybe add methods to remove all
for the current key)
 - Allow the users to pass a custom parameter when registering the
callback, and the parameter would be passed to the onTimer method
 - Allow users to pass custom callback functions when registering the
timers, but this would mean we have to support some sort of context for
accessing the state (like the window context before)
 - We could go for an annotation based API like in beam but thats probably
not good to mix in the current ones

I personally prefer the first one.

What do you think?

Regards,
Gyula


Re: [DISCUSSION] Timely function interface and timer params

2016-10-29 Thread Gyula Fóra
Thanks for the feedback guys,

I think exposing the namespace in a simplified form in the user facing API
is I think a very good idea, that already let's the users implement
practically anything they want. Maybe doing it as a simple string as Jamie
suggested would be a nice way to do it and that would serve as a label or
timer-id for the user. Thinking of it as a label/id is probably a much
simpler concept than the "namespace".

Should we open a JIRA for this? Judging from the internal timer service
this should be a fairly straight forward extension as Aljoscha pointed out.

Gyula

Jamie Grier  ezt írta (időpont: 2016. okt. 29.,
Szo, 15:37):

> Hi guys,
>
> Good points, Gyula.  I think it would be much easier on a user if there
> could be multiple timers in flight per key.  I prefer the second approach,
> though, where a user associates some bit of metadata with the timer and we
> pass it back to them in the onTimer() callback, otherwise they are forced
> to maintain this state themselves.
>
> It looks to me like somehow exposing the namespaces, even if it's simpler
> and just a string, is the way to go.
>
> I'm really excited by this guys!  I think the TimelyFlatMap and
> TimelyCoFlatMap are going to get a LOT of use.  This is gonna make a lot of
> people happy.
>
> -Jamie
>
>
> On Fri, Oct 28, 2016 at 1:58 PM, Aljoscha Krettek 
> wrote:
>
> > Hi Gyula,
> > if you look at the internal API you'll notice that it is pretty much like
> > your second proposal. Just for reference, the interface is roughly this:
> >
> > public interface InternalTimerService {
> >   long currentProcessingTime();
> >   long currentWatermark();
> >   void registerProcessingTimeTimer(N namespace, long time);
> >   void deleteProcessingTimeTimer(N namespace, long time);
> >   void registerEventTimeTimer(N namespace, long time);
> >   void deleteEventTimeTimer(N namespace, long time);
> > }
> >
> > that namespace bit can be anything for which you can provide a
> > TypeSerializer.
> >
> > IMHO, this goes back a bit to the discussion about adding a low level
> > operator-like interface that allows pretty much anything a custom
> operator
> > can do but with a defined, stable interface. The internal operator
> > interface is somewhat in flux, still, so I wouldn't recommend people to
> use
> > it directly.
> >
> > The only thing missing, really, from TimelyFlatMap is access to
> namespaces
> > for state and timers. With that, you could implement even the
> > WindowOperator as a TimelyFlatMap since I worked on abstracting
> everything
> > that it uses away behind interfaces that any operator can use. The last
> > pice, the generic timer API went in last, of course. :-)
> >
> > Cheers,
> > Aljoscha
> >
> > On Fri, 28 Oct 2016 at 16:55 Gyula Fóra  wrote:
> >
> > > Hello,
> > >
> > > I was thinking about the methods provided by the timely functions and
> the
> > > timerservice and I am wondering if it makes sense to change them a
> little
> > > so they can cover a wider set of use case. Maybe I missed something
> > > completely obvious so please shoot me down in that case :)
> > >
> > > Currently the user gets a TimerService to register timers that will in
> > the
> > > future call the onTimer method. It is not completely obvious to me how
> > > would I implement a function that needs to trigger two types of
> callbacks
> > > in the future. If I get only one onTimer method I should be able to
> pass
> > in
> > > some sort of parameter or flag so I can branch in my onTimer
> > > implementation.
> > >
> > > As parameters are not supported I am left with states that are scoped
> to
> > > the keys which is also pretty useless if I want to trigger different
> > timed
> > > actions for the same keys.
> > >
> > > I know this is quite tricky but I see some alternative options:
> > >  - The register timer method returns a unique (per key) timer id, so we
> > can
> > > associate state with this id to fetch info about the timer registered.
> > (We
> > > could also remove timers with this id and maybe add methods to remove
> all
> > > for the current key)
> > >  - Allow the users to pass a custom parameter when registering the
> > > callback, and the parameter would be passed to the onTimer method
> > >  - Allow users to pass custom callback functions when registering the
> > > timers, but this would mean we have to support some sort of context for
> > > accessing the state (like the window context before)
> > >  - We could go for an annotation based API like in beam but thats
> > probably
> > > not good to mix in the current ones
> > >
> > > I personally prefer the first one.
> > >
> > > What do you think?
> > >
> > > Regards,
> > > Gyula
> > >
> >
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> ja...@data-artisans.com
>


Re: [DISCUSSION] Timely function interface and timer params

2016-11-02 Thread Gyula Fóra
Hi,
I opened this:
https://issues.apache.org/jira/browse/FLINK-4992

With this users can implement practically anything depending how they use
the parameter so changing the state access logic is not necessary.

Cheers,
Gyula

Aljoscha Krettek  ezt írta (időpont: 2016. nov. 1., K,
16:39):

> Hi,
> yes, I think exposing a simple form of namespaces as String would be good.
> By the way, do we also need access to state with namespaces then?
>
> @Gyula: Please go ahead and open the Jira issue.
>
> Cheers,
> Aljoscha
>
> On Sat, 29 Oct 2016 at 17:28 Gyula Fóra  wrote:
>
> > Thanks for the feedback guys,
> >
> > I think exposing the namespace in a simplified form in the user facing
> API
> > is I think a very good idea, that already let's the users implement
> > practically anything they want. Maybe doing it as a simple string as
> Jamie
> > suggested would be a nice way to do it and that would serve as a label or
> > timer-id for the user. Thinking of it as a label/id is probably a much
> > simpler concept than the "namespace".
> >
> > Should we open a JIRA for this? Judging from the internal timer service
> > this should be a fairly straight forward extension as Aljoscha pointed
> out.
> >
> > Gyula
> >
> > Jamie Grier  ezt írta (időpont: 2016. okt. 29.,
> > Szo, 15:37):
> >
> > > Hi guys,
> > >
> > > Good points, Gyula.  I think it would be much easier on a user if there
> > > could be multiple timers in flight per key.  I prefer the second
> > approach,
> > > though, where a user associates some bit of metadata with the timer and
> > we
> > > pass it back to them in the onTimer() callback, otherwise they are
> forced
> > > to maintain this state themselves.
> > >
> > > It looks to me like somehow exposing the namespaces, even if it's
> simpler
> > > and just a string, is the way to go.
> > >
> > > I'm really excited by this guys!  I think the TimelyFlatMap and
> > > TimelyCoFlatMap are going to get a LOT of use.  This is gonna make a
> lot
> > of
> > > people happy.
> > >
> > > -Jamie
> > >
> > >
> > > On Fri, Oct 28, 2016 at 1:58 PM, Aljoscha Krettek  >
> > > wrote:
> > >
> > > > Hi Gyula,
> > > > if you look at the internal API you'll notice that it is pretty much
> > like
> > > > your second proposal. Just for reference, the interface is roughly
> > this:
> > > >
> > > > public interface InternalTimerService {
> > > >   long currentProcessingTime();
> > > >   long currentWatermark();
> > > >   void registerProcessingTimeTimer(N namespace, long time);
> > > >   void deleteProcessingTimeTimer(N namespace, long time);
> > > >   void registerEventTimeTimer(N namespace, long time);
> > > >   void deleteEventTimeTimer(N namespace, long time);
> > > > }
> > > >
> > > > that namespace bit can be anything for which you can provide a
> > > > TypeSerializer.
> > > >
> > > > IMHO, this goes back a bit to the discussion about adding a low level
> > > > operator-like interface that allows pretty much anything a custom
> > > operator
> > > > can do but with a defined, stable interface. The internal operator
> > > > interface is somewhat in flux, still, so I wouldn't recommend people
> to
> > > use
> > > > it directly.
> > > >
> > > > The only thing missing, really, from TimelyFlatMap is access to
> > > namespaces
> > > > for state and timers. With that, you could implement even the
> > > > WindowOperator as a TimelyFlatMap since I worked on abstracting
> > > everything
> > > > that it uses away behind interfaces that any operator can use. The
> last
> > > > pice, the generic timer API went in last, of course. :-)
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Fri, 28 Oct 2016 at 16:55 Gyula Fóra  wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I was thinking about the methods provided by the timely functions
> and
> > > the
> > > > > timerservice and I am wondering if it makes sense to change them a
> > > little
> > > > > so they can cover a wider set of use case. Maybe I missed something
> > > > > completely obvious so please shoot me down in that case :)
> 

Re: [VOTE] Release Apache Flink 1.1.4 (RC1)

2016-11-13 Thread Gyula Fóra
Hi,

+1 from me.

Tested:

   - Built from source for Hadoop 2.7.3
   - Tested running on YARN (with some fairly complex topologies)
   - Savepoint app running on 1.1.3 -> restored successfully on 1.1.4

Cheers,
Gyula

Ufuk Celebi  ezt írta (időpont: 2016. nov. 11., P, 10:10):

Dear Flink community,

Please vote on releasing the following candidate as Apache Flink version
1.1.4.

The commit to be voted on:
3c1024a (http://git-wip-us.apache.org/repos/asf/flink/commit/3c1024a)

Branch:
release-1.1.4-rc1
(
https://git1-us-west.apache.org/repos/asf/flink/repo?p=flink.git;a=shortlog;h=refs/heads/release-1.1.4-rc1
)

The release artifacts to be voted on can be found at:
http://people.apache.org/~uce/flink-1.1.4-rc1/

The release artifacts are signed with the key with fingerprint 9D403309:
http://www.apache.org/dist/flink/KEYS

The staging repository for this release can be found at:
https://repository.apache.org/content/repositories/orgapacheflink-1107

-

The voting time is at least three days and the vote passes if a majority of
at least three +1 PMC votes are cast.

The vote ends on Tuesday, November 15th, 2016, counting the weekend as a
single day.

[ ] +1 Release this package as Apache Flink 1.1.4
[ ] -1 Do not release this package, because ...


Re: [DISCUSS] FLIP-14: Loops API and Termination

2016-11-15 Thread Gyula Fóra
Hi Paris,

I like the proposed changes to the iteration API, this cleans up things in
the Java API without any strict restriction I think (it was never a problem
in the Scala API).

The termination algorithm based on the proposed scoped loops seems to be
fairly simple and looks good :)

Cheers,
Gyula

Paris Carbone  ezt írta (időpont: 2016. nov. 14., H, 8:50):

> That would be great Shi! Let's take that offline.
>
> Anyone else interested in the iteration changes? It would be nice to
> incorporate these to v1.2 if possible so I count on your review asap.
>
> cheers,
> Paris
>
> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg  > wrote:
>
> Hi Paris
>
> Unfortunately, the project is not public yet.
> But i can provide you a primitive implementation of the update protocol in
> the paper. It’s implemented in Storm. Since the protocol assumes the
> communication channels between different tasks are dual, i think it’s not
> easy to adapt it to Flink.
>
> Regards
> Xiaogang
>
>
> 在 2016年11月12日,上午3:03,Paris Carbone mailto:par...@kth.se>>
> 写道:
>
> Hi Shi,
>
> Naiad/Timely Dataflow and other projects use global coordination which is
> very convenient for asynchronous progress tracking in general but it has
> some downsides in a production systems that count on in-flight
> transactional control mechanisms and rollback recovery guarantees. This is
> why we generally prefer decentralized approaches (despite their our
> downsides).
>
> Regarding synchronous/structured iterations, this is a bit off topic and
> they are a bit of a different story as you already know.
> We maintain a graph streaming (gelly-streams) library on Flink that you
> might find interesting [1]. Vasia, another Flink committer is also working
> on that among others.
> You can keep an eye on it since we are planning to use this project as a
> showcase for a new way of doing structured and fixpoint iterations on
> streams in the future.
>
> P.S. many thanks for sharing your publication, it was an interesting read.
> Do you happen to have your source code public? We could most certainly use
> it in an benchmark soon.
>
> [1] https://github.com/vasia/gelly-streaming
>
>
> On 11 Nov 2016, at 19:18, SHI Xiaogang  shixiaoga...@gmail.com>> wrote:
>
> Hi, Fouad
>
> Thank you for the explanation. Now the centralized method seems correct to
> me.
> The passing of StatusUpdate events will lead to synchronous iterations and
> we are using the information in each iterations to terminate the
> computation.
>
> Actually, i prefer the centralized method because in many applications, the
> convergence may depend on some global statistics.
> For example, a PageRank program may terminate the computation when 99%
> vertices are converged.
> I think those learning programs which cannot reach the fixed-point
> (oscillating around the fixed-point) can benefit a lot from such features.
> The decentralized method makes it hard to support such convergence
> conditions.
>
>
> Another concern is that Flink cannot produce periodical results in the
> iteration over infinite data streams.
> Take a concrete example. Given an edge stream constructing a graph, the
> user may need the PageRank weight of each vertex in the graphs formed at
> certain instants.
> Currently Flink does not provide any input or iteration information to
> users, making users hard to implement such real-time iterative
> applications.
> Such features are supported in both Naiad and Tornado. I think Flink should
> support it as well.
>
> What do you think?
>
> Regards
> Xiaogang
>
>
> 2016-11-11 19:27 GMT+08:00 Fouad ALi  fouad.alsay...@gmail.com>>:
>
> Hi Shi,
>
> It seems that you are referring to the centralized algorithm which is no
> longer the proposed version.
> In the decentralized version (check last doc) there is no master node or
> global coordination involved.
>
> Let us keep this discussion to the decentralized one if possible.
>
> To answer your points on the previous approach, there is a catch in your
> trace at t7. Here is what is happening :
> - Head,as well as RS, will receive  a 'BroadcastStatusUpdate' from
> runtime (see 2.1 in the steps).
> - RS and Heads will broadcast StatusUpdate  event and will not notify its
> status.
> - When StatusUpdate event gets back to the head it will notify its
> WORKING  status.
>
> Hope that answers your concern.
>
> Best,
> Fouad
>
> On Nov 11, 2016, at 6:21 AM, SHI Xiaogang  shixiaoga...@gmail.com>>
> wrote:
>
> Hi Paris
>
> I have several concerns about the correctness of the termination
> protocol.
> I think the termination protocol put an end to the computation even when
> the computation has not converged.
>
> Suppose there exists a loop context constructed by a OP operator, a Head
> operator and a Tail operator (illustrated in Figure 2 in the first
> draft).
> The stream only contains one record. OP will pass the record to its

Re: [DISCUSS] FLIP-14: Loops API and Termination

2016-11-16 Thread Gyula Fóra
I am not completely sure whether we should deprecate the old API for 1.2 or
remove it completely. Personally I am in favor of removing it, I don't
think it is a huge burden to move to the new one if it makes for a much
nicer user experience.

I think you can go ahead add the FLIP to the wiki and open the PR so we can
start the review if you have it ready anyways.

Gyula

Paris Carbone  ezt írta (időpont: 2016. nov. 16., Sze,
11:55):

> Thanks for reviewing, Gyula.
>
> One thing that is still up to discussion is whether we should remove
> completely the old iterations API or simply mark it as deprecated till v2.0.
> Also, not sure what is the best process now. We have the changes ready.
> Should I copy the FLIP to the wiki and trigger the PRs or wait for a few
> more days in case someone has objections?
>
> @Stephan, what is your take on our interpretation of the approach you
> suggested? Should we proceed or is there anything that you do not find nice?
>
> Paris
>
> > On 15 Nov 2016, at 10:01, Gyula Fóra  wrote:
> >
> > Hi Paris,
> >
> > I like the proposed changes to the iteration API, this cleans up things
> in
> > the Java API without any strict restriction I think (it was never a
> problem
> > in the Scala API).
> >
> > The termination algorithm based on the proposed scoped loops seems to be
> > fairly simple and looks good :)
> >
> > Cheers,
> > Gyula
> >
> > Paris Carbone  ezt írta (időpont: 2016. nov. 14., H,
> 8:50):
> >
> >> That would be great Shi! Let's take that offline.
> >>
> >> Anyone else interested in the iteration changes? It would be nice to
> >> incorporate these to v1.2 if possible so I count on your review asap.
> >>
> >> cheers,
> >> Paris
> >>
> >> On Nov 14, 2016, at 2:59 AM, xiaogang.sxg  >> <mailto:xiaogang@alibaba-inc.com>> wrote:
> >>
> >> Hi Paris
> >>
> >> Unfortunately, the project is not public yet.
> >> But i can provide you a primitive implementation of the update protocol
> in
> >> the paper. It’s implemented in Storm. Since the protocol assumes the
> >> communication channels between different tasks are dual, i think it’s
> not
> >> easy to adapt it to Flink.
> >>
> >> Regards
> >> Xiaogang
> >>
> >>
> >> 在 2016年11月12日,上午3:03,Paris Carbone mailto:par...@kth.se
> >>
> >> 写道:
> >>
> >> Hi Shi,
> >>
> >> Naiad/Timely Dataflow and other projects use global coordination which
> is
> >> very convenient for asynchronous progress tracking in general but it has
> >> some downsides in a production systems that count on in-flight
> >> transactional control mechanisms and rollback recovery guarantees. This
> is
> >> why we generally prefer decentralized approaches (despite their our
> >> downsides).
> >>
> >> Regarding synchronous/structured iterations, this is a bit off topic and
> >> they are a bit of a different story as you already know.
> >> We maintain a graph streaming (gelly-streams) library on Flink that you
> >> might find interesting [1]. Vasia, another Flink committer is also
> working
> >> on that among others.
> >> You can keep an eye on it since we are planning to use this project as a
> >> showcase for a new way of doing structured and fixpoint iterations on
> >> streams in the future.
> >>
> >> P.S. many thanks for sharing your publication, it was an interesting
> read.
> >> Do you happen to have your source code public? We could most certainly
> use
> >> it in an benchmark soon.
> >>
> >> [1] https://github.com/vasia/gelly-streaming
> >>
> >>
> >> On 11 Nov 2016, at 19:18, SHI Xiaogang  >> shixiaoga...@gmail.com><mailto:shixiaoga...@gmail.com>> wrote:
> >>
> >> Hi, Fouad
> >>
> >> Thank you for the explanation. Now the centralized method seems correct
> to
> >> me.
> >> The passing of StatusUpdate events will lead to synchronous iterations
> and
> >> we are using the information in each iterations to terminate the
> >> computation.
> >>
> >> Actually, i prefer the centralized method because in many applications,
> the
> >> convergence may depend on some global statistics.
> >> For example, a PageRank program may terminate the computation when 99%
> >> vertices are converged.
> >> I think those learning programs which cannot reach the fixed-point
> >> (osc

Kafka Sink stuck in cancelling

2016-11-22 Thread Gyula Fóra
Hi,

Has anyone ever experienced the Kafka producer getting stuck in cancelling?

I am aware that there were problems with the Kafka consumer before but I
haven't seen this one yet. It happened simultaneously to 3 of my jobs last
night, they were stuck from about 8 pm to 8 am (not exact times but you get
the length.).

The logs don't seem to be very helpful on the JobManager, they just show
that all tasks start cancelling and then go cancelled except for one Kafka
sink task. That goes into cancelling but only gets cancelled 12 hours
later. On one of the task managers I have found this though:

2016-11-21 20:22:52,220 INFO  org.apache.flink.yarn.YarnTaskManager
 - Un-registering task and sending final execution
state CANCELED to JobManager for task Execute EventProcessors
(f030e71787a6dbd7a543e9745c42289d)

2016-11-22 08:49:35,181 WARN  org.apache.kafka.common.network.Selector
 - Error in I/O with
kafka17.sto.midasplayer.com/172.25.82.212
java.io.EOFException
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
at java.lang.Thread.run(Thread.java:745)
2016-11-22 08:49:35,183 INFO
org.apache.flink.runtime.taskmanager.Task - Sink:
Kafka output (2/8) switched to CANCELED


There might have been some network/kafka issue that caused 3 jobs to get
stuck at the same time but I don't know what actually happened.

Any ideas?
Gyula


Re: Kafka Sink stuck in cancelling

2016-11-22 Thread Gyula Fóra
Ah sorry I completely missed the version details. I am using Flink 1.1.3
with Kafka 0.8 producer.

We havent had issues with the consumers yet and this is the first time this
happenned as well.

Gyula

On Tue, Nov 22, 2016, 12:15 Till Rohrmann  wrote:

> Hi Gyula,
>
> I'm not aware of any recent issues with the Kafka Producer. However there
> was one with the Kafka Consumer which prevented the proper cancellation (
> https://issues.apache.org/jira/browse/FLINK-5048).
>
> Which version of Flink and which Kafka Producer were you using?
>
> Cheers,
> Till
>
> On Tue, Nov 22, 2016 at 10:03 AM, Gyula Fóra  wrote:
>
> > Hi,
> >
> > Has anyone ever experienced the Kafka producer getting stuck in
> cancelling?
> >
> > I am aware that there were problems with the Kafka consumer before but I
> > haven't seen this one yet. It happened simultaneously to 3 of my jobs
> last
> > night, they were stuck from about 8 pm to 8 am (not exact times but you
> get
> > the length.).
> >
> > The logs don't seem to be very helpful on the JobManager, they just show
> > that all tasks start cancelling and then go cancelled except for one
> Kafka
> > sink task. That goes into cancelling but only gets cancelled 12 hours
> > later. On one of the task managers I have found this though:
> >
> > 2016-11-21 20:22:52,220 INFO  org.apache.flink.yarn.YarnTaskManager
> >  - Un-registering task and sending final execution
> > state CANCELED to JobManager for task Execute EventProcessors
> > (f030e71787a6dbd7a543e9745c42289d)
> >
> > 2016-11-22 08:49:35,181 WARN  org.apache.kafka.common.network.Selector
> >  - Error in I/O with
> > kafka17.sto.midasplayer.com/172.25.82.212
> > java.io.EOFException
> > at org.apache.kafka.common.network.NetworkReceive.
> > readFrom(NetworkReceive.java:62)
> > at org.apache.kafka.common.network.Selector.poll(
> > Selector.java:248)
> > at org.apache.kafka.clients.NetworkClient.poll(
> > NetworkClient.java:192)
> > at org.apache.kafka.clients.producer.internals.Sender.run(
> > Sender.java:191)
> > at org.apache.kafka.clients.producer.internals.Sender.run(
> > Sender.java:135)
> > at java.lang.Thread.run(Thread.java:745)
> > 2016-11-22 08:49:35,183 INFO
> > org.apache.flink.runtime.taskmanager.Task - Sink:
> > Kafka output (2/8) switched to CANCELED
> >
> >
> > There might have been some network/kafka issue that caused 3 jobs to get
> > stuck at the same time but I don't know what actually happened.
> >
> > Any ideas?
> > Gyula
> >
>


Savepoints seem to not work properly on release-1.1 branch

2016-12-03 Thread Gyula Fóra
Hi,

I think something is not behaving correctly with savepoint/checkpoint
discard on the release-1.1 branch. If I build the release-1.1.4-rc branch
everything works correctly but on the latest 1.1 branch it seems like
savepoints are discarded like checkpoints and cannot be restored after the
job is stopped.

I can take a savepoints, I see it complete. But when I try to restore from
it doesnt find the checkpoint folder anymore. I know that there are tests
for this behaviour but it still seems to happen when I run it on yarn.

Cheers,
Gyula


Re: Savepoints seem to not work properly on release-1.1 branch

2016-12-04 Thread Gyula Fóra
Hi,
Thanks for being so quick, I will try the fix as soon as it is merged. :)
Gyula

Ufuk Celebi  ezt írta (időpont: 2016. dec. 4., V, 12:02):

> Thanks for reporting this Gyula. A fix is available in #2930. We are going
> to merge that today.
>
> – Ufuk
>
> On 3 December 2016 at 09:56:21, Gyula Fóra (gyula.f...@gmail.com) wrote:
> > Hi,
> >
> > I think something is not behaving correctly with savepoint/checkpoint
> > discard on the release-1.1 branch. If I build the release-1.1.4-rc branch
> > everything works correctly but on the latest 1.1 branch it seems like
> > savepoints are discarded like checkpoints and cannot be restored after
> the
> > job is stopped.
> >
> > I can take a savepoints, I see it complete. But when I try to restore
> from
> > it doesnt find the checkpoint folder anymore. I know that there are tests
> > for this behaviour but it still seems to happen when I run it on yarn.
> >
> > Cheers,
> > Gyula
> >
>
>


Re: Savepoints seem to not work properly on release-1.1 branch

2016-12-04 Thread Gyula Fóra
I tried it, works fine!

Ufuk Celebi  ezt írta (időpont: 2016. dec. 4., V, 18:52):

> Thanks. It has been merged now. :-)
>
> On 4 December 2016 at 17:24:30, Gyula Fóra (gyula.f...@gmail.com) wrote:
> > Hi,
> > Thanks for being so quick, I will try the fix as soon as it is merged. :)
> > Gyula
> >
> > Ufuk Celebi ezt írta (időpont: 2016. dec. 4., V, 12:02):
> >
> > > Thanks for reporting this Gyula. A fix is available in #2930. We are
> going
> > > to merge that today.
> > >
> > > – Ufuk
> > >
> > > On 3 December 2016 at 09:56:21, Gyula Fóra (gyula.f...@gmail.com)
> wrote:
> > > > Hi,
> > > >
> > > > I think something is not behaving correctly with savepoint/checkpoint
> > > > discard on the release-1.1 branch. If I build the release-1.1.4-rc
> branch
> > > > everything works correctly but on the latest 1.1 branch it seems like
> > > > savepoints are discarded like checkpoints and cannot be restored
> after
> > > the
> > > > job is stopped.
> > > >
> > > > I can take a savepoints, I see it complete. But when I try to restore
> > > from
> > > > it doesnt find the checkpoint folder anymore. I know that there are
> tests
> > > > for this behaviour but it still seems to happen when I run it on
> yarn.
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > >
> > >
> >
>
>


Re: [VOTE] Release Apache Flink 1.1.4 (RC2)

2016-12-12 Thread Gyula Fóra
Hi Ufuk,

Thanks for the RC and all the effort that went into the bugfixes :)

I have tested the following things:
 - Built from source (mvn clean install)
 - Tested savepoint/restore on both YARN (+HDFS) and standalone
 - Ran heavy production jobs with pretty much the same build (- the last
few commits) for more than a week without any issues

The only thing I found is that the fix for [FLINK-5071] has not been
backported. (I opened a PR: https://github.com/apache/flink/pull/2990)
This can be quite annoying for YARN deployments. I am not sure if this is a
blocker though.

So +1 from me, except the YARN vcore problem.

Cheers,
Gyula


Ufuk Celebi  ezt írta (időpont: 2016. dec. 12., H, 9:28):

> Dear Flink community,
>
> Please vote on releasing the following candidate as Apache Flink version
> 1.1.4.
>
> The commit to be voted on:
> a587fe9 (http://git-wip-us.apache.org/repos/asf/flink/commit/a587fe9)
>
> Branch:
> release-1.1.4-rc2
> (
> https://git1-us-west.apache.org/repos/asf/flink/repo?p=flink.git;a=shortlog;h=refs/heads/release-1.1.4-rc2
> )
>
> The release artifacts to be voted on can be found at:
> http://people.apache.org/~uce/flink-1.1.4-rc2/
>
> The release artifacts are signed with the key with fingerprint 9D403309:
> http://www.apache.org/dist/flink/KEYS
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapacheflink-1108
>
> -
>
> The voting time is at least three days and the vote passes if a
> majority of at least three +1 PMC votes are cast. The vote ends on
> Thursday, December 15th, 2016.
>
> [ ] +1 Release this package as Apache Flink 1.1.4
> [ ] -1 Do not release this package, because ...
>
> -
>
> A short notes on testing this RC: A lot of core fixes have been made
> for this RC and it is more important than usual to extensively test
> this RC. Please have a look at the new commits since 1.1.3 and try to
> target the fixes specifically, too. That will be much appreciated and
> helpful.
>


Re: [VOTE] Release Apache Flink 1.1.4 (RC3)

2016-12-16 Thread Gyula Fóra
@Robert

-I am not sure if the RocksDB problems are closely related to the version
upgrade, I have been experiencing similar problems for months. This is
usually not a huge problem on YARN I think, it mostly hurts in standalone
clusters.
-Also the yarn memory limits are tricky to configure nicely as it depends a
lot on how rocks handles native memory. It seems to grow quite a lot over
time.


Flavio Pompermaier  ezt írta (időpont: 2016. dec.
16., P, 10:56):

> I personally think that it should be quite important to have a fix also for
> the ES connector (https://issues.apache.org/jira/browse/FLINK-5122).
>
> Best,
> Flavio
>
> On Fri, Dec 16, 2016 at 10:43 AM, Robert Metzger 
> wrote:
>
> > I'm not sure if we can release the release candidate like this, because
> I'm
> > running into two issues probably related to a recent rocksdb version
> > upgrade.
> >
> > This is my list of points so far:
> >
> > - Checked the staging repository. Quickstarts and Hadoop 1 / 2 are okay.
> > - Build a job against the staging repository
> > - Binaries deploy on a kerberized HA YARN / HDFS setup. Ran the KMeans
> and
> > WordCount batch jobs
> > - Executed a heavy, misbehaved streaming job for a few hours. While
> running
> > that job, I found that:
> >   - Not all checkpoint directories are cleaned up in HDFS (I use the
> async
> > rocksdb statebackend)
> >   -  segfaults from rocksdb (8 segfaults in ~3 hrs, but they were all
> > happening in the last minutes)
> >   - "beyond physical memory limits" container killings from YARN (I know
> we
> > can configure this, I just wonder what if we should change the default
> > value)
> >   -  the segfaults and memory limits caused the job to not run anymore in
> > the end because it was in a constant retry loop.
> >   - This is not a blocking issue I found during the testing:
> > https://issues.apache.org/jira/browse/FLINK-5345
> >   - This is also a non blocking issue for 1.1.4 (fixed for 1.2)
> > https://issues.apache.org/jira/browse/FLINK-4631
> >
> >
> > Let me know if we should release anyways or fix these issues first.
> >
> >
> > On Tue, Dec 13, 2016 at 11:04 PM, Ufuk Celebi  wrote:
> >
> > > Dear Flink community,
> > >
> > > Please vote on releasing the following candidate as Apache Flink
> version
> > > 1.1.4.
> > >
> > > The commit to be voted on:
> > > 2cd6579 (http://git-wip-us.apache.org/repos/asf/flink/commit/2cd6579)
> > >
> > > Branch:
> > > release-1.1.4-rc3
> > > (https://git1-us-west.apache.org/repos/asf/flink/repo?p=flin
> > > k.git;a=shortlog;h=refs/heads/release-1.1.4-rc3)
> > >
> > > The release artifacts to be voted on can be found at:
> > > http://people.apache.org/~uce/flink-1.1.4-rc3/
> > >
> > > The release artifacts are signed with the key with fingerprint
> 9D403309:
> > > http://www.apache.org/dist/flink/KEYS
> > >
> > > The staging repository for this release can be found at:
> > > https://repository.apache.org/content/repositories/orgapacheflink-1109
> > >
> > > -
> > >
> > > The voting time is at least three days and the vote passes if a
> > > majority of at least three +1 PMC votes are cast. The vote ends
> earliest
> > > on Friday, December 16th, 2016, at 11 PM (CET)/2 PM (PST).
> > >
> > > [ ] +1 Release this package as Apache Flink 1.1.4
> > > [ ] -1 Do not release this package, because ...
> > >
> >
>


Re: [VOTE] Release Apache Flink 1.1.4 (RC4)

2016-12-20 Thread Gyula Fóra
I have built and ran the tests for the latest RC, so I carry my +1 vote
from the previous thread

Gyula

Ufuk Celebi  ezt írta (időpont: 2016. dec. 19., H, 16:16):

> Dear Flink community,
>
> Please vote on releasing the following candidate as Apache Flink version
> 1.1.4.
>
> The commit to be voted on:
> 8fb0fc8 (http://git-wip-us.apache.org/repos/asf/flink/commit/8fb0fc8)
>
> Branch:
> release-1.1.4-rc4
> (
> https://git1-us-west.apache.org/repos/asf/flink/repo?p=flink.git;a=shortlog;h=refs/heads/release-1.1.4-rc4
> )
>
> The release artifacts to be voted on can be found at:
> http://people.apache.org/~uce/flink-1.1.4-rc4/
>
> The release artifacts are signed with the key with fingerprint 9D403309:
> http://www.apache.org/dist/flink/KEYS
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapacheflink-1110
>
> -
>
> The voting time is reduced to at least two days and the vote passes if
> a majority of at least three +1 PMC votes are cast. The vote ends
> earliest on Wednesday, December 21st, 2016, at 4 PM (CET)/7 AM (PST).
>
> I reduced the voting time, because the changes since the last RC are
> minimal. Please object if you are against this and we can extend it to
> the regular three days.
>
> [ ] +1 Release this package as Apache Flink 1.1.4
> [ ] -1 Do not release this package, because ...
>


<    2   3   4   5   6   7   8   >