Re: [DISCUSS] FLIP-8: Rescalable Non-Partitioned State

2016-08-12 Thread Ufuk Celebi
I will update the design doc with more details for the Checkpointed
variants and remove Option 2 (I think that's an orthogonal thing).

The way I see it now, we should have base CheckpointedBase interface,
have the current Checkpointed interface be a subclass for not
repartitionable state. Then we have two other List-based variants:

1) Union List => on restore all state is unioned (what is currently in
the design doc)

2) List => on restore state is automatically redistributed (if
parallelism stays the same, state should go to the same sub tasks, but
no guarantees when changed parallelism).



Regarding the other thing you and Aljoscha discussed: I feel like that
should be handled as part of the side input effort. Does that make
sense?



On Fri, Aug 12, 2016 at 3:11 PM, Gyula Fóra  wrote:
> Hi Aljoscha,
>
> Yes this is pretty much how I think about it as well.
>
> Basically the state in this case would be computed from the side inputs
> with the same state update logic on all operators. I think it is imprtant
> that operators compute their own state or at least observe all state
> changes otherwise a lot of things can get weird.
>
> Lets say for instance I am building a dynamic filter where new filter
> conditions are added /removed on the fly. For the sake of my argument lets
> also assume that initializing a new filter condition is a heavy operation.
> The global state in this case is the union of all filter conditions.
>
> If at any point in time the operators could only observe the current state
> we might end up with a very inefficient code, while if we observe all state
> changes individually  (add 1 new filter) we can jus instantiate the new
> filter without worrying about the other ones.
>
> I am not completely sure if its clear what I am trying to say :D
>
> Gyula
>
> On Fri, Aug 12, 2016, 14:28 Aljoscha Krettek  wrote:
>
>> Hi Gyula,
>> I was thinking about this as well, in the context of side-inputs, which
>> would be a generalization of your use case. If I'm not mistaken. In my head
>> I was calling it global state. Essentially, this state would be the same on
>> all operators and when checkpointing you would only have to checkpoint the
>> state of operator 0. Upon restore you would distribute this state to all
>> operators again.
>>
>> Is this what you had in mind?
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 12 Aug 2016 at 13:07 Gyula Fóra  wrote:
>>
>> > Hi,
>> > Let me try to explain what I mean by broadcast states.
>> >
>> > I think it is a very common pattern that people broadcast control
>> messages
>> > to operators that also receive normal input events.
>> >
>> > some examples: broadcast a model for prediction, broadcast some
>> information
>> > that should be the same at all subtasks but is evolving over time. At the
>> > same time these operators usually also do normal event processing based
>> on
>> > the broadcasted input stream.
>> >
>> > There is currently no proper solution for this provided by the api. We
>> can
>> > of course use connected operators or wrapper types and broadcast one of
>> the
>> > input but there are several limitations. We cant use keyed states for
>> > instance becase that requires both inputs to be keyed (so we cant
>> > broadcast).
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Fri, Aug 12, 2016, 12:28 Ufuk Celebi  wrote:
>> >
>> > > Comments inline.
>> > >
>> > > On Thu, Aug 11, 2016 at 8:06 PM, Gyula Fóra 
>> > wrote:
>> > > > Option 1:
>> > > > I think the main problem here is sending all the state everywhere
>> will
>> > > not
>> > > > scale at all. I think this will even fail for some internal Flink
>> > > operators
>> > > > (window timers I think are kept like this, maybe Im wrong here). The
>> > > > general problem here what we don't have with the key-value states is
>> > that
>> > > > the system can't do the repartitioning automatically. I think we
>> should
>> > > try
>> > > > to make abstractions that would allow the system to do this.
>> > >
>> > > The state size can definitely become a problem. For Kafka sources for
>> > > example I don' think it would be problematic, but the timers it might
>> > > be, yes. It definitely depends on the use case.
>> > >
>> > > In theory, we could also redistribute the list elements automatically,
>> > > for example in a round robing fashion. The question is whether this
>> > > will be enough in general.
>> > >
>> > > >
>> > > > Option 2:
>> > > > To be honest I don't completely get this approach, what do the
>> indices
>> > > mean
>> > > > in the get set methods? What happens if the same index is used from
>> > > > multiple operators?
>> > > > This may also suffers in scalability like option 1 (but as I said I
>> > dont
>> > > > get this completely :()
>> > >
>> > > Yes, I don't like it either. It's actually similar to Option 1 (from
>> > > runtime perspective). I think the main question with Option 2 is
>> > > 

Re: [DISCUSS] FLIP-8: Rescalable Non-Partitioned State

2016-08-12 Thread Gyula Fóra
Hi Aljoscha,

Yes this is pretty much how I think about it as well.

Basically the state in this case would be computed from the side inputs
with the same state update logic on all operators. I think it is imprtant
that operators compute their own state or at least observe all state
changes otherwise a lot of things can get weird.

Lets say for instance I am building a dynamic filter where new filter
conditions are added /removed on the fly. For the sake of my argument lets
also assume that initializing a new filter condition is a heavy operation.
The global state in this case is the union of all filter conditions.

If at any point in time the operators could only observe the current state
we might end up with a very inefficient code, while if we observe all state
changes individually  (add 1 new filter) we can jus instantiate the new
filter without worrying about the other ones.

I am not completely sure if its clear what I am trying to say :D

Gyula

On Fri, Aug 12, 2016, 14:28 Aljoscha Krettek  wrote:

> Hi Gyula,
> I was thinking about this as well, in the context of side-inputs, which
> would be a generalization of your use case. If I'm not mistaken. In my head
> I was calling it global state. Essentially, this state would be the same on
> all operators and when checkpointing you would only have to checkpoint the
> state of operator 0. Upon restore you would distribute this state to all
> operators again.
>
> Is this what you had in mind?
>
> Cheers,
> Aljoscha
>
> On Fri, 12 Aug 2016 at 13:07 Gyula Fóra  wrote:
>
> > Hi,
> > Let me try to explain what I mean by broadcast states.
> >
> > I think it is a very common pattern that people broadcast control
> messages
> > to operators that also receive normal input events.
> >
> > some examples: broadcast a model for prediction, broadcast some
> information
> > that should be the same at all subtasks but is evolving over time. At the
> > same time these operators usually also do normal event processing based
> on
> > the broadcasted input stream.
> >
> > There is currently no proper solution for this provided by the api. We
> can
> > of course use connected operators or wrapper types and broadcast one of
> the
> > input but there are several limitations. We cant use keyed states for
> > instance becase that requires both inputs to be keyed (so we cant
> > broadcast).
> >
> > Cheers,
> > Gyula
> >
> > On Fri, Aug 12, 2016, 12:28 Ufuk Celebi  wrote:
> >
> > > Comments inline.
> > >
> > > On Thu, Aug 11, 2016 at 8:06 PM, Gyula Fóra 
> > wrote:
> > > > Option 1:
> > > > I think the main problem here is sending all the state everywhere
> will
> > > not
> > > > scale at all. I think this will even fail for some internal Flink
> > > operators
> > > > (window timers I think are kept like this, maybe Im wrong here). The
> > > > general problem here what we don't have with the key-value states is
> > that
> > > > the system can't do the repartitioning automatically. I think we
> should
> > > try
> > > > to make abstractions that would allow the system to do this.
> > >
> > > The state size can definitely become a problem. For Kafka sources for
> > > example I don' think it would be problematic, but the timers it might
> > > be, yes. It definitely depends on the use case.
> > >
> > > In theory, we could also redistribute the list elements automatically,
> > > for example in a round robing fashion. The question is whether this
> > > will be enough in general.
> > >
> > > >
> > > > Option 2:
> > > > To be honest I don't completely get this approach, what do the
> indices
> > > mean
> > > > in the get set methods? What happens if the same index is used from
> > > > multiple operators?
> > > > This may also suffers in scalability like option 1 (but as I said I
> > dont
> > > > get this completely :()
> > >
> > > Yes, I don't like it either. It's actually similar to Option 1 (from
> > > runtime perspective). I think the main question with Option 2 is
> > > whether we expose the API as an interface or a state class. If we go
> > > for this kind of interface we could parameterize the restore behaviour
> > > via the descriptor (e.g. flag to merge/union etc.). That should be
> > > more extensible than providing interfaces.
> > >
> > > > I think another approach could be (might be similar what option 2 is
> > > trying
> > > > to achieve) to provide a Set  (or Map) like  abstraction to keep
> the
> > > non
> > > > partitioned states. Users could add/remove things from it at their on
> > > will,
> > > > but the system would be free to redistribute the Sets between the
> > > > operators. In practice this would mean for instance that the Kafka
> > > sources
> > > > would store (partition, offset) tuples in the set but and every time
> in
> > > the
> > > > open method they would check what is assigned to them (the system is
> > free
> > > > to decide). This of course would only work well if we can 

Re: [DISCUSS] FLIP-8: Rescalable Non-Partitioned State

2016-08-12 Thread Aljoscha Krettek
Hi Gyula,
I was thinking about this as well, in the context of side-inputs, which
would be a generalization of your use case. If I'm not mistaken. In my head
I was calling it global state. Essentially, this state would be the same on
all operators and when checkpointing you would only have to checkpoint the
state of operator 0. Upon restore you would distribute this state to all
operators again.

Is this what you had in mind?

Cheers,
Aljoscha

On Fri, 12 Aug 2016 at 13:07 Gyula Fóra  wrote:

> Hi,
> Let me try to explain what I mean by broadcast states.
>
> I think it is a very common pattern that people broadcast control messages
> to operators that also receive normal input events.
>
> some examples: broadcast a model for prediction, broadcast some information
> that should be the same at all subtasks but is evolving over time. At the
> same time these operators usually also do normal event processing based on
> the broadcasted input stream.
>
> There is currently no proper solution for this provided by the api. We can
> of course use connected operators or wrapper types and broadcast one of the
> input but there are several limitations. We cant use keyed states for
> instance becase that requires both inputs to be keyed (so we cant
> broadcast).
>
> Cheers,
> Gyula
>
> On Fri, Aug 12, 2016, 12:28 Ufuk Celebi  wrote:
>
> > Comments inline.
> >
> > On Thu, Aug 11, 2016 at 8:06 PM, Gyula Fóra 
> wrote:
> > > Option 1:
> > > I think the main problem here is sending all the state everywhere will
> > not
> > > scale at all. I think this will even fail for some internal Flink
> > operators
> > > (window timers I think are kept like this, maybe Im wrong here). The
> > > general problem here what we don't have with the key-value states is
> that
> > > the system can't do the repartitioning automatically. I think we should
> > try
> > > to make abstractions that would allow the system to do this.
> >
> > The state size can definitely become a problem. For Kafka sources for
> > example I don' think it would be problematic, but the timers it might
> > be, yes. It definitely depends on the use case.
> >
> > In theory, we could also redistribute the list elements automatically,
> > for example in a round robing fashion. The question is whether this
> > will be enough in general.
> >
> > >
> > > Option 2:
> > > To be honest I don't completely get this approach, what do the indices
> > mean
> > > in the get set methods? What happens if the same index is used from
> > > multiple operators?
> > > This may also suffers in scalability like option 1 (but as I said I
> dont
> > > get this completely :()
> >
> > Yes, I don't like it either. It's actually similar to Option 1 (from
> > runtime perspective). I think the main question with Option 2 is
> > whether we expose the API as an interface or a state class. If we go
> > for this kind of interface we could parameterize the restore behaviour
> > via the descriptor (e.g. flag to merge/union etc.). That should be
> > more extensible than providing interfaces.
> >
> > > I think another approach could be (might be similar what option 2 is
> > trying
> > > to achieve) to provide a Set  (or Map) like  abstraction to keep the
> > non
> > > partitioned states. Users could add/remove things from it at their on
> > will,
> > > but the system would be free to redistribute the Sets between the
> > > operators. In practice this would mean for instance that the Kafka
> > sources
> > > would store (partition, offset) tuples in the set but and every time in
> > the
> > > open method they would check what is assigned to them (the system is
> free
> > > to decide). This of course would only work well if we can assume that
> > > distributing the states by equal numbers is desirable.
> >
> > I think the same point applies to redistributing the list
> > automatically (what I meant with whether it is "general enough"). I
> > think what you describe here could be the list w/o unioning it.
> >
> > >
> > > Broadcast states:
> > > This might be a good time to think about broadcast states.
> > Non-partitioned
> > > states that are the same at all subtasks, I think this comes up in a
> lot
> > of
> > > use-cases (I know at least one myself haha) and it is pretty straight
> > > forward from a runtime perspective, the bigger question is the API.
> >
> > Can you explain this a little more?
> >
> > 
> >
> > Another open question (not addressed in the FLIP yet) is how we treat
> > operators that have both keyed and non-keyed state. The current API
> > kind of moves this question to the user.
> >
>


[jira] [Created] (FLINK-4389) Expose metrics to Webfrontend

2016-08-12 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4389:
---

 Summary: Expose metrics to Webfrontend
 Key: FLINK-4389
 URL: https://issues.apache.org/jira/browse/FLINK-4389
 Project: Flink
  Issue Type: Sub-task
  Components: Metrics, Webfrontend
Affects Versions: 1.1.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler


https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] FLIP-8: Rescalable Non-Partitioned State

2016-08-12 Thread Gyula Fóra
Hi,
Let me try to explain what I mean by broadcast states.

I think it is a very common pattern that people broadcast control messages
to operators that also receive normal input events.

some examples: broadcast a model for prediction, broadcast some information
that should be the same at all subtasks but is evolving over time. At the
same time these operators usually also do normal event processing based on
the broadcasted input stream.

There is currently no proper solution for this provided by the api. We can
of course use connected operators or wrapper types and broadcast one of the
input but there are several limitations. We cant use keyed states for
instance becase that requires both inputs to be keyed (so we cant
broadcast).

Cheers,
Gyula

On Fri, Aug 12, 2016, 12:28 Ufuk Celebi  wrote:

> Comments inline.
>
> On Thu, Aug 11, 2016 at 8:06 PM, Gyula Fóra  wrote:
> > Option 1:
> > I think the main problem here is sending all the state everywhere will
> not
> > scale at all. I think this will even fail for some internal Flink
> operators
> > (window timers I think are kept like this, maybe Im wrong here). The
> > general problem here what we don't have with the key-value states is that
> > the system can't do the repartitioning automatically. I think we should
> try
> > to make abstractions that would allow the system to do this.
>
> The state size can definitely become a problem. For Kafka sources for
> example I don' think it would be problematic, but the timers it might
> be, yes. It definitely depends on the use case.
>
> In theory, we could also redistribute the list elements automatically,
> for example in a round robing fashion. The question is whether this
> will be enough in general.
>
> >
> > Option 2:
> > To be honest I don't completely get this approach, what do the indices
> mean
> > in the get set methods? What happens if the same index is used from
> > multiple operators?
> > This may also suffers in scalability like option 1 (but as I said I dont
> > get this completely :()
>
> Yes, I don't like it either. It's actually similar to Option 1 (from
> runtime perspective). I think the main question with Option 2 is
> whether we expose the API as an interface or a state class. If we go
> for this kind of interface we could parameterize the restore behaviour
> via the descriptor (e.g. flag to merge/union etc.). That should be
> more extensible than providing interfaces.
>
> > I think another approach could be (might be similar what option 2 is
> trying
> > to achieve) to provide a Set  (or Map) like  abstraction to keep the
> non
> > partitioned states. Users could add/remove things from it at their on
> will,
> > but the system would be free to redistribute the Sets between the
> > operators. In practice this would mean for instance that the Kafka
> sources
> > would store (partition, offset) tuples in the set but and every time in
> the
> > open method they would check what is assigned to them (the system is free
> > to decide). This of course would only work well if we can assume that
> > distributing the states by equal numbers is desirable.
>
> I think the same point applies to redistributing the list
> automatically (what I meant with whether it is "general enough"). I
> think what you describe here could be the list w/o unioning it.
>
> >
> > Broadcast states:
> > This might be a good time to think about broadcast states.
> Non-partitioned
> > states that are the same at all subtasks, I think this comes up in a lot
> of
> > use-cases (I know at least one myself haha) and it is pretty straight
> > forward from a runtime perspective, the bigger question is the API.
>
> Can you explain this a little more?
>
> 
>
> Another open question (not addressed in the FLIP yet) is how we treat
> operators that have both keyed and non-keyed state. The current API
> kind of moves this question to the user.
>


Re: [DISCUSS] FLIP-8: Rescalable Non-Partitioned State

2016-08-12 Thread Ufuk Celebi
Comments inline.

On Thu, Aug 11, 2016 at 8:06 PM, Gyula Fóra  wrote:
> Option 1:
> I think the main problem here is sending all the state everywhere will not
> scale at all. I think this will even fail for some internal Flink operators
> (window timers I think are kept like this, maybe Im wrong here). The
> general problem here what we don't have with the key-value states is that
> the system can't do the repartitioning automatically. I think we should try
> to make abstractions that would allow the system to do this.

The state size can definitely become a problem. For Kafka sources for
example I don' think it would be problematic, but the timers it might
be, yes. It definitely depends on the use case.

In theory, we could also redistribute the list elements automatically,
for example in a round robing fashion. The question is whether this
will be enough in general.

>
> Option 2:
> To be honest I don't completely get this approach, what do the indices mean
> in the get set methods? What happens if the same index is used from
> multiple operators?
> This may also suffers in scalability like option 1 (but as I said I dont
> get this completely :()

Yes, I don't like it either. It's actually similar to Option 1 (from
runtime perspective). I think the main question with Option 2 is
whether we expose the API as an interface or a state class. If we go
for this kind of interface we could parameterize the restore behaviour
via the descriptor (e.g. flag to merge/union etc.). That should be
more extensible than providing interfaces.

> I think another approach could be (might be similar what option 2 is trying
> to achieve) to provide a Set  (or Map) like  abstraction to keep the non
> partitioned states. Users could add/remove things from it at their on will,
> but the system would be free to redistribute the Sets between the
> operators. In practice this would mean for instance that the Kafka sources
> would store (partition, offset) tuples in the set but and every time in the
> open method they would check what is assigned to them (the system is free
> to decide). This of course would only work well if we can assume that
> distributing the states by equal numbers is desirable.

I think the same point applies to redistributing the list
automatically (what I meant with whether it is "general enough"). I
think what you describe here could be the list w/o unioning it.

>
> Broadcast states:
> This might be a good time to think about broadcast states. Non-partitioned
> states that are the same at all subtasks, I think this comes up in a lot of
> use-cases (I know at least one myself haha) and it is pretty straight
> forward from a runtime perspective, the bigger question is the API.

Can you explain this a little more?



Another open question (not addressed in the FLIP yet) is how we treat
operators that have both keyed and non-keyed state. The current API
kind of moves this question to the user.


[jira] [Created] (FLINK-4388) Race condition during initialization of MemorySegmentFactory

2016-08-12 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4388:
---

 Summary: Race condition during initialization of 
MemorySegmentFactory
 Key: FLINK-4388
 URL: https://issues.apache.org/jira/browse/FLINK-4388
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.1.1
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.2.0, 1.1.2


The check whether the factory is initialized, and the actual initialization are 
not atomic. When starting multiple TaskManagers, this can lead to races and 
exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Conceptual difference Windows and DataSet

2016-08-12 Thread Stephan Ewen
Hi Kevin!

The windows in Flink's DataStream API are organized by key. The reason is
that the windows are very flexible, and each key can form different windows
than the other (think sessions per user - each session starts and stops
differently).

There has been discussion about introducing something like "aligned
windows". These types of windows would be the same across all keys and
could therefor be globally organized. One could even think that these offer
DataSet-like features.
That is a bit into the future, still.

Greeting,
Stephan


On Sat, Aug 6, 2016 at 11:58 PM, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> Hello Kevin,
>
> I'm not very familiar with the stream API, but I think you can achieve what
> you want by mapping over your elements to turn the
> strings into one-item lists, so that you get a key-value that is (K:
> String, V: (List[String], Int))  and then apply the window reduce function,
> which produces a data stream out of
> a windowed stream, you combine your lists there and sum the value. Again,
> it's not a great way to use reduce, since you are growing the list with
> each reduction.
>
> Regards,
> Theodore
>
> On Thu, Aug 4, 2016 at 1:36 AM, Kevin Jacobs  wrote:
>
> > Hi,
> >
> > I have the following use case:
> >
> > 1. Group by a specific field.
> >
> > 2. Get a list of all messages belonging to the group.
> >
> > 3. Count the number of records in the group.
> >
> > With the use of DataSets, it is fairly easy to do this (see
> > http://stackoverflow.com/questions/38745446/apache-flink-
> > sum-and-keep-grouped/38747685#38747685):
> >
> > |fromElements(("a-b", "data1", 1), ("a-c", "data2", 1), ("a-b", "data3",
> > 1)). groupBy(0). reduceGroup { (it: Iterator[(String, String, Int)], out:
> > Collector[(String, List[String], Int)]) => { val group = it.toList if
> > (group.length > 0) out.collect((group(0)._1, group.map(_._2),
> > group.map(_._3).sum)) } |
> >
> > So, now I am moving to DataStreams (since the input is really a
> > DataStream). From my perspective, a Window should provide the same
> > functionality as a DataSet. This would easify the process a lot:
> >
> > 1. Window the elements.
> >
> > 2. Apply the same operations as before.
> >
> > Is there a way in Flink to do so? Otherwise, I would like to think of a
> > solution to this problem.
> >
> > Regards,
> > Kevin
> >
>


[jira] [Created] (FLINK-4387) Instability in KvStateClientTest.testClientServerIntegration()

2016-08-12 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-4387:
-

 Summary: Instability in 
KvStateClientTest.testClientServerIntegration()
 Key: FLINK-4387
 URL: https://issues.apache.org/jira/browse/FLINK-4387
 Project: Flink
  Issue Type: Bug
Reporter: Robert Metzger


According to this log: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/151491745/log.txt

the {{KvStateClientTest}} didn't complete.

{code}
"main" #1 prio=5 os_prio=0 tid=0x7fb2b400a000 nid=0x29dc in Object.wait() 
[0x7fb2bcb3b000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0xf7c049a0> (a 
io.netty.util.concurrent.DefaultPromise)
at java.lang.Object.wait(Object.java:502)
at 
io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254)
- locked <0xf7c049a0> (a 
io.netty.util.concurrent.DefaultPromise)
at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:32)
at 
org.apache.flink.runtime.query.netty.KvStateServer.shutDown(KvStateServer.java:185)
at 
org.apache.flink.runtime.query.netty.KvStateClientTest.testClientServerIntegration(KvStateClientTest.java:680)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
{code}

and
{code}
Exception in thread "globalEventExecutor-1-3" java.lang.AssertionError
at 
io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83)
at 
io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110)
at 
io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95)
at 
io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)