SteamingFileSink with TwoPhaseCommit

2019-01-10 Thread Taher Koitawala
Hi All,
  As per my understanding and the API of StreamingFileSink,
TwoPhaseCommit is not being used. Can someone please confirm is that's
right?  Also if StreamingFileSink does not support
TwoPhaseCommits what is the best way to implement this?


Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


Re: SteamingFileSink with TwoPhaseCommit

2019-01-10 Thread Kostas Kloudas
Hi Taher,

The StreamingFileSink implements a version of TwoPhaseCommit. Can you
elaborate a bit on what do you mean by " TwoPhaseCommit is not being used"?

Cheers,
Kostas

On Thu, Jan 10, 2019 at 9:29 AM Taher Koitawala 
wrote:

> Hi All,
>   As per my understanding and the API of StreamingFileSink,
> TwoPhaseCommit is not being used. Can someone please confirm is that's
> right?  Also if StreamingFileSink does not support
> TwoPhaseCommits what is the best way to implement this?
>
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
>


Re: [DISCUSS] Dropping flink-storm?

2019-01-10 Thread Ufuk Celebi
+1 to drop.

I totally agree with your reasoning. I like that we tried to keep it,
but I don't think the maintenance overhead would be justified.

– Ufuk

On Wed, Jan 9, 2019 at 4:09 PM Till Rohrmann  wrote:
>
> With https://issues.apache.org/jira/browse/FLINK-10571, we will remove the
> Storm topologies from Flink and keep the wrappers for the moment.
>
> However, looking at the FlinkTopologyContext [1], it becomes quite obvious
> that Flink's compatibility with Storm is really limited. Almost all of the
> context methods are not supported which makes me wonder how useful these
> wrappers really are. Given the additional maintenance overhead of having
> them in the code base and no indication that someone is actively using
> them, I would still be in favour of removing them. This will reduce our
> maintenance burden in the future. What do you think?
>
> [1]
> https://github.com/apache/flink/blob/master/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
>
> Cheers,
> Till
>
> On Tue, Oct 9, 2018 at 10:08 AM Fabian Hueske  wrote:
>
> > Yes, let's do it this way.
> > The wrapper classes are probably not too complex and can be easily tested.
> > We have the same for the Hadoop interfaces, although I think only the
> > Input- and OutputFormatWrappers are actually used.
> >
> >
> > Am Di., 9. Okt. 2018 um 09:46 Uhr schrieb Chesnay Schepler <
> > ches...@apache.org>:
> >
> >> That sounds very good to me.
> >>
> >> On 08.10.2018 11:36, Till Rohrmann wrote:
> >> > Good point. The initial idea of this thread was to remove the storm
> >> > compatibility layer completely.
> >> >
> >> > During the discussion I realized that it might be useful for our users
> >> > to not completely remove it in one go. Instead for those who still
> >> > want to use some Bolt and Spout code in Flink, it could be nice to
> >> > keep the wrappers. At least, we could remove flink-storm in a more
> >> > graceful way by first removing the Topology and client parts and then
> >> > the wrappers. What do you think?
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Mon, Oct 8, 2018 at 11:13 AM Chesnay Schepler  >> > > wrote:
> >> >
> >> > I don't believe that to be the consensus. For starters it is
> >> > contradictory; we can't /drop /flink-storm yet still /keep //some
> >> > parts/.
> >> >
> >> > From my understanding we drop flink-storm completely, and put a
> >> > note in the docs that the bolt/spout wrappers of previous versions
> >> > will continue to work.
> >> >
> >> > On 08.10.2018 11:04, Till Rohrmann wrote:
> >> >> Thanks for opening the issue Chesnay. I think the overall
> >> >> consensus is to drop flink-storm and only keep the Bolt and Spout
> >> >> wrappers. Thanks for your feedback!
> >> >>
> >> >> Cheers,
> >> >> Till
> >> >>
> >> >> On Mon, Oct 8, 2018 at 9:37 AM Chesnay Schepler
> >> >> mailto:ches...@apache.org>> wrote:
> >> >>
> >> >> I've created
> >> >> https://issues.apache.org/jira/browse/FLINK-10509 for
> >> >> removing flink-storm.
> >> >>
> >> >> On 28.09.2018 15:22, Till Rohrmann wrote:
> >> >> > Hi everyone,
> >> >> >
> >> >> > I would like to discuss how to proceed with Flink's storm
> >> >> compatibility
> >> >> > layer flink-strom.
> >> >> >
> >> >> > While working on removing Flink's legacy mode, I noticed
> >> >> that some parts of
> >> >> > flink-storm rely on the legacy Flink client. In fact, at
> >> >> the moment
> >> >> > flink-storm does not work together with Flink's new
> >> distributed
> >> >> > architecture.
> >> >> >
> >> >> > I'm also wondering how many people are actually using
> >> >> Flink's Storm
> >> >> > compatibility layer and whether it would be worth porting it.
> >> >> >
> >> >> > I see two options how to proceed:
> >> >> >
> >> >> > 1) Commit to maintain flink-storm and port it to Flink's
> >> >> new architecture
> >> >> > 2) Drop flink-storm
> >> >> >
> >> >> > I doubt that we can contribute it to Apache Bahir [1],
> >> >> because once we
> >> >> > remove the legacy mode, this module will no longer work
> >> >> with all newer
> >> >> > Flink versions.
> >> >> >
> >> >> > Therefore, I would like to hear your opinion on this and in
> >> >> particular if
> >> >> > you are using or planning to use flink-storm in the future.
> >> >> >
> >> >> > [1] https://github.com/apache/bahir-flink
> >> >> >
> >> >> > Cheers,
> >> >> > Till
> >> >> >
> >> >>
> >> >
> >>
> >>


Re: SteamingFileSink with TwoPhaseCommit

2019-01-10 Thread Taher Koitawala
StreamingFileSink extends RichSinkFunction and implements
CheckpointedFunction, CheckpointListener and ProcessingTimeCallback however
TwoPhaseCommitSinkFunction is never used by StreamingFileSink.  Hence I had
a question if the sink uses the TwoPhaseCommit protocol or not.

Regards,
Taher Koitawala

On Thu 10 Jan, 2019, 2:40 PM Kostas Kloudas  Hi Taher,
>
> The StreamingFileSink implements a version of TwoPhaseCommit. Can you
> elaborate a bit on what do you mean by " TwoPhaseCommit is not being used
> "?
>
> Cheers,
> Kostas
>
> On Thu, Jan 10, 2019 at 9:29 AM Taher Koitawala 
> wrote:
>
>> Hi All,
>>   As per my understanding and the API of StreamingFileSink,
>> TwoPhaseCommit is not being used. Can someone please confirm is that's
>> right?  Also if StreamingFileSink does not support
>> TwoPhaseCommits what is the best way to implement this?
>>
>>
>> Regards,
>> Taher Koitawala
>> GS Lab Pune
>> +91 8407979163
>>
>


Re: [DISCUSS] Dropping flink-storm?

2019-01-10 Thread Kostas Kloudas
+1 to drop as well.

On Thu, Jan 10, 2019 at 10:15 AM Ufuk Celebi  wrote:

> +1 to drop.
>
> I totally agree with your reasoning. I like that we tried to keep it,
> but I don't think the maintenance overhead would be justified.
>
> – Ufuk
>
> On Wed, Jan 9, 2019 at 4:09 PM Till Rohrmann  wrote:
> >
> > With https://issues.apache.org/jira/browse/FLINK-10571, we will remove
> the
> > Storm topologies from Flink and keep the wrappers for the moment.
> >
> > However, looking at the FlinkTopologyContext [1], it becomes quite
> obvious
> > that Flink's compatibility with Storm is really limited. Almost all of
> the
> > context methods are not supported which makes me wonder how useful these
> > wrappers really are. Given the additional maintenance overhead of having
> > them in the code base and no indication that someone is actively using
> > them, I would still be in favour of removing them. This will reduce our
> > maintenance burden in the future. What do you think?
> >
> > [1]
> >
> https://github.com/apache/flink/blob/master/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
> >
> > Cheers,
> > Till
> >
> > On Tue, Oct 9, 2018 at 10:08 AM Fabian Hueske  wrote:
> >
> > > Yes, let's do it this way.
> > > The wrapper classes are probably not too complex and can be easily
> tested.
> > > We have the same for the Hadoop interfaces, although I think only the
> > > Input- and OutputFormatWrappers are actually used.
> > >
> > >
> > > Am Di., 9. Okt. 2018 um 09:46 Uhr schrieb Chesnay Schepler <
> > > ches...@apache.org>:
> > >
> > >> That sounds very good to me.
> > >>
> > >> On 08.10.2018 11:36, Till Rohrmann wrote:
> > >> > Good point. The initial idea of this thread was to remove the storm
> > >> > compatibility layer completely.
> > >> >
> > >> > During the discussion I realized that it might be useful for our
> users
> > >> > to not completely remove it in one go. Instead for those who still
> > >> > want to use some Bolt and Spout code in Flink, it could be nice to
> > >> > keep the wrappers. At least, we could remove flink-storm in a more
> > >> > graceful way by first removing the Topology and client parts and
> then
> > >> > the wrappers. What do you think?
> > >> >
> > >> > Cheers,
> > >> > Till
> > >> >
> > >> > On Mon, Oct 8, 2018 at 11:13 AM Chesnay Schepler <
> ches...@apache.org
> > >> > > wrote:
> > >> >
> > >> > I don't believe that to be the consensus. For starters it is
> > >> > contradictory; we can't /drop /flink-storm yet still /keep
> //some
> > >> > parts/.
> > >> >
> > >> > From my understanding we drop flink-storm completely, and put a
> > >> > note in the docs that the bolt/spout wrappers of previous
> versions
> > >> > will continue to work.
> > >> >
> > >> > On 08.10.2018 11:04, Till Rohrmann wrote:
> > >> >> Thanks for opening the issue Chesnay. I think the overall
> > >> >> consensus is to drop flink-storm and only keep the Bolt and
> Spout
> > >> >> wrappers. Thanks for your feedback!
> > >> >>
> > >> >> Cheers,
> > >> >> Till
> > >> >>
> > >> >> On Mon, Oct 8, 2018 at 9:37 AM Chesnay Schepler
> > >> >> mailto:ches...@apache.org>> wrote:
> > >> >>
> > >> >> I've created
> > >> >> https://issues.apache.org/jira/browse/FLINK-10509 for
> > >> >> removing flink-storm.
> > >> >>
> > >> >> On 28.09.2018 15:22, Till Rohrmann wrote:
> > >> >> > Hi everyone,
> > >> >> >
> > >> >> > I would like to discuss how to proceed with Flink's storm
> > >> >> compatibility
> > >> >> > layer flink-strom.
> > >> >> >
> > >> >> > While working on removing Flink's legacy mode, I noticed
> > >> >> that some parts of
> > >> >> > flink-storm rely on the legacy Flink client. In fact, at
> > >> >> the moment
> > >> >> > flink-storm does not work together with Flink's new
> > >> distributed
> > >> >> > architecture.
> > >> >> >
> > >> >> > I'm also wondering how many people are actually using
> > >> >> Flink's Storm
> > >> >> > compatibility layer and whether it would be worth
> porting it.
> > >> >> >
> > >> >> > I see two options how to proceed:
> > >> >> >
> > >> >> > 1) Commit to maintain flink-storm and port it to Flink's
> > >> >> new architecture
> > >> >> > 2) Drop flink-storm
> > >> >> >
> > >> >> > I doubt that we can contribute it to Apache Bahir [1],
> > >> >> because once we
> > >> >> > remove the legacy mode, this module will no longer work
> > >> >> with all newer
> > >> >> > Flink versions.
> > >> >> >
> > >> >> > Therefore, I would like to hear your opinion on this and
> in
> > >> >> particular if
> > >> >> > you are using or planning to use flink-storm in the
> future.
> > >> >>   

Re: SteamingFileSink with TwoPhaseCommit

2019-01-10 Thread Kostas Kloudas
That is correct.
The StreamingFileSink does not extend the TwoPhaseCommitSinkFunction
because of some internal implementation details.
But this, does not mean that it does not implement a two phase commit
protocol (which is independent of the implementation).

Cheers,
Kostas

On Thu, Jan 10, 2019 at 10:33 AM Taher Koitawala 
wrote:

> StreamingFileSink extends RichSinkFunction and implements
> CheckpointedFunction, CheckpointListener and ProcessingTimeCallback however
> TwoPhaseCommitSinkFunction is never used by StreamingFileSink.  Hence I had
> a question if the sink uses the TwoPhaseCommit protocol or not.
>
> Regards,
> Taher Koitawala
>
> On Thu 10 Jan, 2019, 2:40 PM Kostas Kloudas 
>> Hi Taher,
>>
>> The StreamingFileSink implements a version of TwoPhaseCommit. Can you
>> elaborate a bit on what do you mean by " TwoPhaseCommit is not being used
>> "?
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Jan 10, 2019 at 9:29 AM Taher Koitawala <
>> taher.koitaw...@gslab.com> wrote:
>>
>>> Hi All,
>>>   As per my understanding and the API of StreamingFileSink,
>>> TwoPhaseCommit is not being used. Can someone please confirm is that's
>>> right?  Also if StreamingFileSink does not support
>>> TwoPhaseCommits what is the best way to implement this?
>>>
>>>
>>> Regards,
>>> Taher Koitawala
>>> GS Lab Pune
>>> +91 8407979163
>>>
>>


Re: SteamingFileSink with TwoPhaseCommit

2019-01-10 Thread Taher Koitawala
Hi Kostas,
   Thanks you for the clarification, also can you please point
how StreamingFileSink uses TwoPhaseCommit. Can you also point out the
implementing class for that?


Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Thu, Jan 10, 2019 at 3:10 PM Kostas Kloudas  wrote:

> That is correct.
> The StreamingFileSink does not extend the TwoPhaseCommitSinkFunction
> because of some internal implementation details.
> But this, does not mean that it does not implement a two phase commit
> protocol (which is independent of the implementation).
>
> Cheers,
> Kostas
>
> On Thu, Jan 10, 2019 at 10:33 AM Taher Koitawala <
> taher.koitaw...@gslab.com> wrote:
>
>> StreamingFileSink extends RichSinkFunction and implements
>> CheckpointedFunction, CheckpointListener and ProcessingTimeCallback however
>> TwoPhaseCommitSinkFunction is never used by StreamingFileSink.  Hence I had
>> a question if the sink uses the TwoPhaseCommit protocol or not.
>>
>> Regards,
>> Taher Koitawala
>>
>> On Thu 10 Jan, 2019, 2:40 PM Kostas Kloudas >
>>> Hi Taher,
>>>
>>> The StreamingFileSink implements a version of TwoPhaseCommit. Can you
>>> elaborate a bit on what do you mean by " TwoPhaseCommit is not being
>>> used"?
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Jan 10, 2019 at 9:29 AM Taher Koitawala <
>>> taher.koitaw...@gslab.com> wrote:
>>>
 Hi All,
   As per my understanding and the API of StreamingFileSink,
 TwoPhaseCommit is not being used. Can someone please confirm is that's
 right?  Also if StreamingFileSink does not support
 TwoPhaseCommits what is the best way to implement this?


 Regards,
 Taher Koitawala
 GS Lab Pune
 +91 8407979163

>>>


Re: [DISCUSS] Dropping flink-storm?

2019-01-10 Thread Fabian Hueske
+1 from my side as well.

I would assume that most Bolts that are supported by our current wrappers
can be easily converted into respective Flink functions.

Fabian



Am Do., 10. Jan. 2019 um 10:35 Uhr schrieb Kostas Kloudas <
k.klou...@da-platform.com>:

> +1 to drop as well.
>
> On Thu, Jan 10, 2019 at 10:15 AM Ufuk Celebi  wrote:
>
>> +1 to drop.
>>
>> I totally agree with your reasoning. I like that we tried to keep it,
>> but I don't think the maintenance overhead would be justified.
>>
>> – Ufuk
>>
>> On Wed, Jan 9, 2019 at 4:09 PM Till Rohrmann 
>> wrote:
>> >
>> > With https://issues.apache.org/jira/browse/FLINK-10571, we will remove
>> the
>> > Storm topologies from Flink and keep the wrappers for the moment.
>> >
>> > However, looking at the FlinkTopologyContext [1], it becomes quite
>> obvious
>> > that Flink's compatibility with Storm is really limited. Almost all of
>> the
>> > context methods are not supported which makes me wonder how useful these
>> > wrappers really are. Given the additional maintenance overhead of having
>> > them in the code base and no indication that someone is actively using
>> > them, I would still be in favour of removing them. This will reduce our
>> > maintenance burden in the future. What do you think?
>> >
>> > [1]
>> >
>> https://github.com/apache/flink/blob/master/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
>> >
>> > Cheers,
>> > Till
>> >
>> > On Tue, Oct 9, 2018 at 10:08 AM Fabian Hueske 
>> wrote:
>> >
>> > > Yes, let's do it this way.
>> > > The wrapper classes are probably not too complex and can be easily
>> tested.
>> > > We have the same for the Hadoop interfaces, although I think only the
>> > > Input- and OutputFormatWrappers are actually used.
>> > >
>> > >
>> > > Am Di., 9. Okt. 2018 um 09:46 Uhr schrieb Chesnay Schepler <
>> > > ches...@apache.org>:
>> > >
>> > >> That sounds very good to me.
>> > >>
>> > >> On 08.10.2018 11:36, Till Rohrmann wrote:
>> > >> > Good point. The initial idea of this thread was to remove the storm
>> > >> > compatibility layer completely.
>> > >> >
>> > >> > During the discussion I realized that it might be useful for our
>> users
>> > >> > to not completely remove it in one go. Instead for those who still
>> > >> > want to use some Bolt and Spout code in Flink, it could be nice to
>> > >> > keep the wrappers. At least, we could remove flink-storm in a more
>> > >> > graceful way by first removing the Topology and client parts and
>> then
>> > >> > the wrappers. What do you think?
>> > >> >
>> > >> > Cheers,
>> > >> > Till
>> > >> >
>> > >> > On Mon, Oct 8, 2018 at 11:13 AM Chesnay Schepler <
>> ches...@apache.org
>> > >> > > wrote:
>> > >> >
>> > >> > I don't believe that to be the consensus. For starters it is
>> > >> > contradictory; we can't /drop /flink-storm yet still /keep
>> //some
>> > >> > parts/.
>> > >> >
>> > >> > From my understanding we drop flink-storm completely, and put a
>> > >> > note in the docs that the bolt/spout wrappers of previous
>> versions
>> > >> > will continue to work.
>> > >> >
>> > >> > On 08.10.2018 11:04, Till Rohrmann wrote:
>> > >> >> Thanks for opening the issue Chesnay. I think the overall
>> > >> >> consensus is to drop flink-storm and only keep the Bolt and
>> Spout
>> > >> >> wrappers. Thanks for your feedback!
>> > >> >>
>> > >> >> Cheers,
>> > >> >> Till
>> > >> >>
>> > >> >> On Mon, Oct 8, 2018 at 9:37 AM Chesnay Schepler
>> > >> >> mailto:ches...@apache.org>> wrote:
>> > >> >>
>> > >> >> I've created
>> > >> >> https://issues.apache.org/jira/browse/FLINK-10509 for
>> > >> >> removing flink-storm.
>> > >> >>
>> > >> >> On 28.09.2018 15:22, Till Rohrmann wrote:
>> > >> >> > Hi everyone,
>> > >> >> >
>> > >> >> > I would like to discuss how to proceed with Flink's
>> storm
>> > >> >> compatibility
>> > >> >> > layer flink-strom.
>> > >> >> >
>> > >> >> > While working on removing Flink's legacy mode, I noticed
>> > >> >> that some parts of
>> > >> >> > flink-storm rely on the legacy Flink client. In fact, at
>> > >> >> the moment
>> > >> >> > flink-storm does not work together with Flink's new
>> > >> distributed
>> > >> >> > architecture.
>> > >> >> >
>> > >> >> > I'm also wondering how many people are actually using
>> > >> >> Flink's Storm
>> > >> >> > compatibility layer and whether it would be worth
>> porting it.
>> > >> >> >
>> > >> >> > I see two options how to proceed:
>> > >> >> >
>> > >> >> > 1) Commit to maintain flink-storm and port it to Flink's
>> > >> >> new architecture
>> > >> >> > 2) Drop flink-storm
>> > >> >> >
>> > >> >> > I doubt that we can contribute it to Apache Bahir [1],
>> > >> >> bec

Re: windowAll and AggregateFunction

2019-01-10 Thread CPC
I converted to this

SingleOutputStreamOperator> tuple2Stream =
sourceStream.map(new RichMapFunction>() {
@Override
public Tuple2 map(XMPP value) throws Exception {
return Tuple2.of(getRuntimeContext().getIndexOfThisSubtask(), value);
}
});
DataStreamUtils.reinterpretAsKeyedStream(tuple2Stream, (t) -> t.f0)


an uggly hack but works.


On Thu, 10 Jan 2019 at 10:54, CPC  wrote:

> Hi Ken,
>
> I am doing a global distinct. What i want to achive is someting like
> below. With windowAll it sends all data to single operator which means
> shuffle all data and calculate with par 1. I dont want to shuffle data
> since i just want to feed it to hll instance and shuffle just hll instances
> at the end of the window and merge them. This is exactly the same scenario
> with global count. Suppose you want to count events for each 1 minutes
> window. In current case we should send all data to single operator and
> count there. Instead of this we can calculate sub totals and then send
> those subtotals to single operator and merge there.
>
>
> [image: image.png]
>
> On Thu, 10 Jan 2019 at 02:26, Ken Krugler 
> wrote:
>
>>
>> On Jan 9, 2019, at 3:10 PM, CPC  wrote:
>>
>> Hi Ken,
>>
>> From regular time-based windows do you mean keyed windows?
>>
>>
>> Correct. Without doing a keyBy() you would have a parallelism of 1.
>>
>> I think you want to key on whatever you’re counting for unique values, so
>> that each window operator gets a slice of the unique values.
>>
>> — Ken
>>
>> On Wed, Jan 9, 2019, 10:22 PM Ken Krugler > wrote:
>>
>>> Hi there,
>>>
>>> You should be able to use a regular time-based window(), and emit the
>>> HyperLogLog binary data as your result, which then would get merged in your
>>> custom function (which you set a parallelism of 1 on).
>>>
>>> Note that if you are generating unique counts per non-overlapping time
>>> window, you’ll need to keep N HLL structures in each operator.
>>>
>>> — Ken
>>>
>>>
>>> On Jan 9, 2019, at 10:26 AM, CPC  wrote:
>>>
>>> Hi Stefan,
>>>
>>> Could i use "Reinterpreting a pre-partitioned data stream as keyed
>>> stream" feature for this?
>>>
>>> On Wed, 9 Jan 2019 at 17:50, Stefan Richter 
>>> wrote:
>>>
 Hi,

 I think your expectation about windowAll is wrong, from the method
 documentation: “Note: This operation is inherently non-parallel since all
 elements have to pass through the same operator instance” and I also cannot
 think of a way in which the windowing API would support your use case
 without a shuffle. You could probably build the functionality by hand
 through, but I guess this is not quite what you want.

 Best,
 Stefan

 > On 9. Jan 2019, at 13:43, CPC  wrote:
 >
 > Hi all,
 >
 > In our implementation,we are consuming from kafka and calculating
 distinct with hyperloglog. We are using windowAll function with a custom
 AggregateFunction but flink runtime shows a little bit unexpected behavior
 at runtime. Our sources running with parallelism 4 and i expect add
 function to run after source calculate partial results and at the end of
 the window i expect it to send 4 hll object to single operator to merge
 there(merge function). Instead, it sends all data to single instance and
 call add function there.
 >
 > Is here any way to make flink behave like this? I mean calculate
 partial results after consuming from kafka with paralelism of sources
 without shuffling(so some part of the calculation can be calculated in
 parallel) and merge those partial results with a merge function?
 >
 > Thank you in advance...


>>> --
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>
>>>
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>


Re: Reducing runtime of Flink planner

2019-01-10 Thread Fabian Hueske
Hi Niklas,

The planning time of a job does not depend on the data size.
It would be the same whether you process 5MB or 5PB.

FLINK-10566 (as pointed to by Timo) fixed a problem for plans with many
braching and joining nodes.
Looking at your plan, there are some, but (IMO) not enough to be
problematic.

Your idea of providing hints is very good.
You can do that for joins with size hints [1] or strategy hints [2].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/dataset_transformations.html#join-with-dataset-size-hint
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/dataset_transformations.html#join-algorithm-hints


Am Mo., 7. Jan. 2019 um 17:36 Uhr schrieb Timo Walther :

> Hi Niklas,
>
> it would be interesting to know which planner caused the long runtime.
> Could you use a debugger to figure out more details? Is it really the
> Flink Table API planner or the under DataSet planner one level deeper?
>
> There was an issue that was recently closed [1] about the DataSet
> optimizer. Could this solve your problem?
>
> I will also loop in Fabian who might knows more.
>
> Regards,
> Timo
>
> [1] https://issues.apache.org/jira/browse/FLINK-10566
>
> Am 07.01.19 um 14:05 schrieb Niklas Teichmann:
> > Hi everybody,
> >
> > I have a question concerning the planner for the Flink Table / Batch API.
> > At the moment I try to use a library called Cypher for Apache Flink, a
> > project that tries to implement
> > the graph database query language Cypher on Apache Flink (CAPF,
> > https://github.com/soerenreichardt/cypher-for-apache-flink).
> >
> > The problem is that the planner seemingly takes a very long time to
> > plan and optimize the job created by CAPF. This example job in json
> > format
> >
> > https://pastebin.com/J84grsjc
> >
> > takes on a 24 GB data set about 20 minutes to plan and about 5 minutes
> > to run the job. That seems very long for a job of this size.
> >
> > Do you have any idea why this is the case?
> > Is there a way to give the planner hints to reduce the planning time?
> >
> > Thanks in advance!
> > Niklas
>
>
>


Re: Multiple MapState vs single nested MapState in stateful Operator

2019-01-10 Thread Congxian Qiu
Hi, Gagan Agrawal

In my opinion, I prefer the first.

Here is the reason.

In RocksDB StateBackend, we will serialize the key, namespace, user-key
into a serialized bytes (key-bytes) and serialize user-value to serialized
bytes(value-bytes) then insert  into the key-bytes/value-bytes into
RocksDB, when retrieving from RocksDB we can user get(for a single
key/value) or iterator(for a key range).

If we store four maps into a single MapState, we need to deserialize the
value-bytes(a Map) when we want to retrieve a single user-value.


Gagan Agrawal  于2019年1月10日周四 上午10:38写道:

> Hi,
> I have a use case where 4 streams get merged (union) and grouped on common
> key (keyBy) and a custom KeyedProcessFunction is called. Now I need to keep
> state (RocksDB backend) for all 4 streams in my custom KeyedProcessFunction
> where each of these 4 streams would be stored as map. So I have 2 options
>
> 1. Create a separate MapStateDescriptor for each of these streams and
> store their events separately.
> 2. Create a single MapStateDescriptor where there will be only 4 keys
> (corresponding to 4 stream types) and value will be of type Map which
> further keep events from respective streams.
>
> I want to understand from performance perspective, would there be any
> difference in above approaches. Will keeping 4 different MapState cause 4
> lookups for RocksDB backend when they are accessed? Or all of these
> MapStates are internally stored within RocksDB in single row corresponding
> to respective key (as per keyedStream) and hence they are all fetched in
> single call before operator's processElement is called? If there are
> different lookups in RocksDB for each of MapStateDescriptor, then I think
> keeping them in single MapStateDescriptor would be more efficient minimize
> RocksDB calls? Please advise.
>
> Gagan
>


-- 
Best,
Congxian


Re: Multiple MapState vs single nested MapState in stateful Operator

2019-01-10 Thread Kostas Kloudas
Hi Gagan,

I agree with Congxian!
In MapState, when accessing the state/value associated with a key in the
map, then the whole value is de-serialized (and serialized in case of a
put()).
Given this, it is more efficient to have many keys, with small state, than
fewer keys with huge state.

Cheers,
Kostas


On Thu, Jan 10, 2019 at 12:34 PM Congxian Qiu 
wrote:

> Hi, Gagan Agrawal
>
> In my opinion, I prefer the first.
>
> Here is the reason.
>
> In RocksDB StateBackend, we will serialize the key, namespace, user-key
> into a serialized bytes (key-bytes) and serialize user-value to serialized
> bytes(value-bytes) then insert  into the key-bytes/value-bytes into
> RocksDB, when retrieving from RocksDB we can user get(for a single
> key/value) or iterator(for a key range).
>
> If we store four maps into a single MapState, we need to deserialize the
> value-bytes(a Map) when we want to retrieve a single user-value.
>
>
> Gagan Agrawal  于2019年1月10日周四 上午10:38写道:
>
>> Hi,
>> I have a use case where 4 streams get merged (union) and grouped on
>> common key (keyBy) and a custom KeyedProcessFunction is called. Now I need
>> to keep state (RocksDB backend) for all 4 streams in my custom
>> KeyedProcessFunction where each of these 4 streams would be stored as map.
>> So I have 2 options
>>
>> 1. Create a separate MapStateDescriptor for each of these streams and
>> store their events separately.
>> 2. Create a single MapStateDescriptor where there will be only 4 keys
>> (corresponding to 4 stream types) and value will be of type Map which
>> further keep events from respective streams.
>>
>> I want to understand from performance perspective, would there be any
>> difference in above approaches. Will keeping 4 different MapState cause 4
>> lookups for RocksDB backend when they are accessed? Or all of these
>> MapStates are internally stored within RocksDB in single row corresponding
>> to respective key (as per keyedStream) and hence they are all fetched in
>> single call before operator's processElement is called? If there are
>> different lookups in RocksDB for each of MapStateDescriptor, then I think
>> keeping them in single MapStateDescriptor would be more efficient minimize
>> RocksDB calls? Please advise.
>>
>> Gagan
>>
>
>
> --
> Best,
> Congxian
>


Re: SteamingFileSink with TwoPhaseCommit

2019-01-10 Thread Kostas Kloudas
Hi Taher,

Well, I would say there is no single class that implements it.
In a nutshell, it is the StreamingFileSink that (through Buckets) tells the
responsible Bucket what to do at each step of the lifecycle of the Flink
operator
(mainly on element, on checkpoint, on checkpoint completed and on restore).

So I would suggest that you should have a look in these classes.
In essence, the main steps of the "choreography" are similar to the ones
that the older BucketingSink was doing.

Cheers,
Kostas

On Thu, Jan 10, 2019 at 10:47 AM Taher Koitawala 
wrote:

> Hi Kostas,
>Thanks you for the clarification, also can you please point
> how StreamingFileSink uses TwoPhaseCommit. Can you also point out the
> implementing class for that?
>
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
>
>
> On Thu, Jan 10, 2019 at 3:10 PM Kostas Kloudas  wrote:
>
>> That is correct.
>> The StreamingFileSink does not extend the TwoPhaseCommitSinkFunction
>> because of some internal implementation details.
>> But this, does not mean that it does not implement a two phase commit
>> protocol (which is independent of the implementation).
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Jan 10, 2019 at 10:33 AM Taher Koitawala <
>> taher.koitaw...@gslab.com> wrote:
>>
>>> StreamingFileSink extends RichSinkFunction and implements
>>> CheckpointedFunction, CheckpointListener and ProcessingTimeCallback however
>>> TwoPhaseCommitSinkFunction is never used by StreamingFileSink.  Hence I had
>>> a question if the sink uses the TwoPhaseCommit protocol or not.
>>>
>>> Regards,
>>> Taher Koitawala
>>>
>>> On Thu 10 Jan, 2019, 2:40 PM Kostas Kloudas >>
 Hi Taher,

 The StreamingFileSink implements a version of TwoPhaseCommit. Can you
 elaborate a bit on what do you mean by " TwoPhaseCommit is not being
 used"?

 Cheers,
 Kostas

 On Thu, Jan 10, 2019 at 9:29 AM Taher Koitawala <
 taher.koitaw...@gslab.com> wrote:

> Hi All,
>   As per my understanding and the API of StreamingFileSink,
> TwoPhaseCommit is not being used. Can someone please confirm is that's
> right?  Also if StreamingFileSink does not support
> TwoPhaseCommits what is the best way to implement this?
>
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
>



Re: SteamingFileSink with TwoPhaseCommit

2019-01-10 Thread Taher Koitawala
Ok, thanks for the clarification. Really appreciate your help Kostas

On Thu 10 Jan, 2019, 6:19 PM Kostas Kloudas  Hi Taher,
>
> Well, I would say there is no single class that implements it.
> In a nutshell, it is the StreamingFileSink that (through Buckets) tells
> the responsible Bucket what to do at each step of the lifecycle of the
> Flink operator
> (mainly on element, on checkpoint, on checkpoint completed and on restore).
>
> So I would suggest that you should have a look in these classes.
> In essence, the main steps of the "choreography" are similar to the ones
> that the older BucketingSink was doing.
>
> Cheers,
> Kostas
>
> On Thu, Jan 10, 2019 at 10:47 AM Taher Koitawala <
> taher.koitaw...@gslab.com> wrote:
>
>> Hi Kostas,
>>Thanks you for the clarification, also can you please
>> point how StreamingFileSink uses TwoPhaseCommit. Can you also point out the
>> implementing class for that?
>>
>>
>> Regards,
>> Taher Koitawala
>> GS Lab Pune
>> +91 8407979163
>>
>>
>> On Thu, Jan 10, 2019 at 3:10 PM Kostas Kloudas 
>> wrote:
>>
>>> That is correct.
>>> The StreamingFileSink does not extend the TwoPhaseCommitSinkFunction
>>> because of some internal implementation details.
>>> But this, does not mean that it does not implement a two phase commit
>>> protocol (which is independent of the implementation).
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Jan 10, 2019 at 10:33 AM Taher Koitawala <
>>> taher.koitaw...@gslab.com> wrote:
>>>
 StreamingFileSink extends RichSinkFunction and implements
 CheckpointedFunction, CheckpointListener and ProcessingTimeCallback however
 TwoPhaseCommitSinkFunction is never used by StreamingFileSink.  Hence I had
 a question if the sink uses the TwoPhaseCommit protocol or not.

 Regards,
 Taher Koitawala

 On Thu 10 Jan, 2019, 2:40 PM Kostas Kloudas >>>
> Hi Taher,
>
> The StreamingFileSink implements a version of TwoPhaseCommit. Can you
> elaborate a bit on what do you mean by " TwoPhaseCommit is not being
> used"?
>
> Cheers,
> Kostas
>
> On Thu, Jan 10, 2019 at 9:29 AM Taher Koitawala <
> taher.koitaw...@gslab.com> wrote:
>
>> Hi All,
>>   As per my understanding and the API of StreamingFileSink,
>> TwoPhaseCommit is not being used. Can someone please confirm is that's
>> right?  Also if StreamingFileSink does not support
>> TwoPhaseCommits what is the best way to implement this?
>>
>>
>> Regards,
>> Taher Koitawala
>> GS Lab Pune
>> +91 8407979163
>>
>


Re: Custom Serializer for Avro GenericRecord

2019-01-10 Thread Tzu-Li (Gordon) Tai
Hi,

Have you looked at [1]?
You can annotate your type and provide a type info factory. The factory
would be used to create the TypeInformation for that type, and in turn
create the serializer used for that type.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#defining-type-information-using-a-factory



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Reducing runtime of Flink planner

2019-01-10 Thread Niklas Teichmann

Hi Fabian and Timo,

Thanks for your answers! At the moment we're working at updating our  
project to Flink 1.7, so that we can check if the commit you wrote  
about solves the problem. The debugging we did so far seems to point  
to calcite as being responsible for the long planning times - we're  
still experimenting though.  As soon as I have new information, I will  
share it with you.


Kind Regards,
Niklas

Quoting Fabian Hueske :


Hi Niklas,

The planning time of a job does not depend on the data size.
It would be the same whether you process 5MB or 5PB.

FLINK-10566 (as pointed to by Timo) fixed a problem for plans with many
braching and joining nodes.
Looking at your plan, there are some, but (IMO) not enough to be
problematic.

Your idea of providing hints is very good.
You can do that for joins with size hints [1] or strategy hints [2].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/dataset_transformations.html#join-with-dataset-size-hint
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/dataset_transformations.html#join-algorithm-hints


Am Mo., 7. Jan. 2019 um 17:36 Uhr schrieb Timo Walther :


Hi Niklas,

it would be interesting to know which planner caused the long runtime.
Could you use a debugger to figure out more details? Is it really the
Flink Table API planner or the under DataSet planner one level deeper?

There was an issue that was recently closed [1] about the DataSet
optimizer. Could this solve your problem?

I will also loop in Fabian who might knows more.

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-10566

Am 07.01.19 um 14:05 schrieb Niklas Teichmann:
> Hi everybody,
>
> I have a question concerning the planner for the Flink Table / Batch API.
> At the moment I try to use a library called Cypher for Apache Flink, a
> project that tries to implement
> the graph database query language Cypher on Apache Flink (CAPF,
> https://github.com/soerenreichardt/cypher-for-apache-flink).
>
> The problem is that the planner seemingly takes a very long time to
> plan and optimize the job created by CAPF. This example job in json
> format
>
> https://pastebin.com/J84grsjc
>
> takes on a 24 GB data set about 20 minutes to plan and about 5 minutes
> to run the job. That seems very long for a job of this size.
>
> Do you have any idea why this is the case?
> Is there a way to give the planner hints to reduce the planning time?
>
> Thanks in advance!
> Niklas






--





[Flink 1.7.0] always got 10s ask timeout exception when submitting job with checkpoint via REST

2019-01-10 Thread Steven Wu
We are trying out Flink 1.7.0. We always get this exception when submitting
a job with external checkpoint via REST. Job parallelism is 1,600. state
size is probably in the range of 1-5 TBs. Job is actually started. Just
REST api returns this failure.

If we submitting the job without external checkpoint, everything works
fine.

Anyone else see such problem with 1.7? Appreciate your help!

Thanks,
Steven

org.apache.flink.runtime.rest.handler.RestHandlerException:
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$4(JarRunHandler.java:114)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException:
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 21 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
... 9 more


Re: [Flink 1.7.0] always got 10s ask timeout exception when submitting job with checkpoint via REST

2019-01-10 Thread Aaron Levin
We are also experiencing this! Thanks for speaking up! It's relieving to
know we're not alone :)

We tried adding `akka.ask.timeout: 1 min` to our `flink-conf.yaml`, which
did not seem to have any effect. I tried adding every other related akka,
rpc, etc. timeout and still continue to encounter these errors. I believe
they may also impact our ability to deploy (as we get a timeout when
submitting the job programmatically). I'd love to see a solution to this if
one exists!

Best,

Aaron Levin

On Thu, Jan 10, 2019 at 2:58 PM Steven Wu  wrote:

> We are trying out Flink 1.7.0. We always get this exception when
> submitting a job with external checkpoint via REST. Job parallelism is
> 1,600. state size is probably in the range of 1-5 TBs. Job is actually
> started. Just REST api returns this failure.
>
> If we submitting the job without external checkpoint, everything works
> fine.
>
> Anyone else see such problem with 1.7? Appreciate your help!
>
> Thanks,
> Steven
>
> org.apache.flink.runtime.rest.handler.RestHandlerException:
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$4(JarRunHandler.java:114)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException:
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> at
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> at
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> at
> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
> ... 21 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> ... 9 more
>


Re: [Flink 1.7.0] always got 10s ask timeout exception when submitting job with checkpoint via REST

2019-01-10 Thread Gary Yao
Hi all,

I think increasing the default value of the config option web.timeout [1] is
what you are looking for.

Best,
Gary

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java#L76
[2]
https://github.com/apache/flink/blob/a07ce7f6c88dc7d0c0d2ba55a0ab3f2283bf247c/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java#L177

On Thu, Jan 10, 2019 at 9:19 PM Aaron Levin  wrote:

> We are also experiencing this! Thanks for speaking up! It's relieving to
> know we're not alone :)
>
> We tried adding `akka.ask.timeout: 1 min` to our `flink-conf.yaml`, which
> did not seem to have any effect. I tried adding every other related akka,
> rpc, etc. timeout and still continue to encounter these errors. I believe
> they may also impact our ability to deploy (as we get a timeout when
> submitting the job programmatically). I'd love to see a solution to this if
> one exists!
>
> Best,
>
> Aaron Levin
>
> On Thu, Jan 10, 2019 at 2:58 PM Steven Wu  wrote:
>
>> We are trying out Flink 1.7.0. We always get this exception when
>> submitting a job with external checkpoint via REST. Job parallelism is
>> 1,600. state size is probably in the range of 1-5 TBs. Job is actually
>> started. Just REST api returns this failure.
>>
>> If we submitting the job without external checkpoint, everything works
>> fine.
>>
>> Anyone else see such problem with 1.7? Appreciate your help!
>>
>> Thanks,
>> Steven
>>
>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>> akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
>> Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$4(JarRunHandler.java:114)
>> at
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> at
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at
>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>> at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>> at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>> at
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>> at
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.util.concurrent.CompletionException:
>> akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
>> Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>> at
>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>> at
>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>> at
>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>> at
>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>> ... 21 more
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
>> Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>> at
>> akka.pattern.PromiseAc

Recovery problem 1 of 2 in Flink 1.6.3

2019-01-10 Thread John Stone
This is the first of two recovery problems I'm seeing running Flink 1.6.3 in 
Kubernetes.  I'm posting them in separate messages for brevity and because the 
second is not directly related to the first.  Any advice is appreciated.

Setup:
Flink 1.6.3 in Kubernetes (flink:1.6.3-hadoop28-scala_2.11).  One JobManager 
and two TaskManagers (TM_1, TM_2).  Each pod has 4 CPUs.  Each TaskManager has 
16 task slots.  High availability is enabled.  S3 (s3a) for storage.  RocksDB 
with incremental snapshots.  It doesn't matter if local recover is enabled - 
I've managed to replicate with both local recovery enabled and disabled.

Problem:
Flink cannot recover a job unless there are the same number of free task slots 
as the job's parallelism.

Replication steps:
Create a job with a parallelism of either 17 or 32 - enough to force the job to 
use both TMs.  After the job has successfully is fully running and has taken a 
checkpoint, delete one of the TaskManagers (TM_1).  Kubernetes will spawn a new 
TaskManager (TM_3) which will successfully connect to the JobManager.

Actual Behavior:
The running job will be canceled and redeployed but will be caught in a 
SCHEDULED state (shows as CREATED in the web UI).  JobManager will repeatively 
attempt to request slots from the ResourceManager.  The tasks in the job will 
never resume.

Expected Behavior:
Job should be fully unscheduled from TM_2.  TM_2 and TM_3 should pick up the 
job.  The job should successfully resume from the last checkpoint.

Known Workarounds:
1) Cancel and resubmit the job.
2) Using the above example, have a free TaskManager (TM_4) that also has 16 
available slots.

Log snip:
2019-01-10 19:42:50,299 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
Co-Process-Broadcast -> Flat Map -> Sink: Unnamed (29/32) 
(6078b9c76953c7c27b05b522880d3d1b) switched from CANCELING to CANCELED.
2019-01-10 19:42:50,299 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Try to restart 
or fail the job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) if no 
longer possible.
2019-01-10 19:42:50,299 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job 
streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state 
FAILING to RESTARTING.
2019-01-10 19:42:50,299 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Restarting the 
job streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb).
2019-01-10 19:42:50,302 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job 
streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state 
RESTARTING to CREATED.
2019-01-10 19:42:50,302 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - 
Recovering checkpoints from ZooKeeper.
2019-01-10 19:42:50,308 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 
1 checkpoints in ZooKeeper.
2019-01-10 19:42:50,308 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying 
to fetch 1 checkpoints from storage.
2019-01-10 19:42:50,308 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying 
to retrieve checkpoint 1.
2019-01-10 19:42:50,386 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 
c44a91b76ea99ead6fdf9b13a98c15bb from latest valid checkpoint: Checkpoint 1 @ 
1547149215694 for c44a91b76ea99ead6fdf9b13a98c15bb.
2019-01-10 19:42:50,388 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state 
to restore
2019-01-10 19:42:50,388 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job 
streamProcessorJob (c44a91b76ea99ead6fdf9b13a98c15bb) switched from state 
CREATED to RUNNING.
2019-01-10 19:42:50,388 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
Purchase Order Kafka Consumer (1/1) (49b728769a3a2b3a3a6ba45cd4445e3b) switched 
from CREATED to SCHEDULED.
2019-01-10 19:42:50,388 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
Purchase Order Bundle Kafka Consumer (1/1) (1220cf4b9f5eb937191bb2232a482899) 
switched from CREATED to SCHEDULED.
2019-01-10 19:42:50,389 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
ControlInstruction Kafka Consumer -> Filter -> Filter -> Map (1/1) 
(29f69ee8fbc208cd7c63e99907d11386) switched from CREATED to SCHEDULED.
2019-01-10 19:42:50,389 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Ticket 
Update Kafka Consumer (1/1) (4bdfbcb7280fb7a7c9ea2d5aa02efa41) switched from 
CREATED to SCHEDULED.
2019-01-10 19:42:50,389 INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Requesting new 
slot [SlotRequestId{83bcd1c29b885a7799bf6e5d73d1961c}] and profile 
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
2019-01-10 1

Recovery problem 2 of 2 in Flink 1.6.3

2019-01-10 Thread John Stone
This is the second of two recovery problems I'm seeing running Flink in 
Kubernetes.  I'm posting them in separate messages for brevity and because the 
second is not directly related to the first.  Any advice is appreciated.  First 
problem: 
https://lists.apache.org/thread.html/a663a8ce2f697e6d207cb59eff1f77dbb8bd745e3f44aab09866ab46@%3Cuser.flink.apache.org%3E

Setup:
Flink 1.6.3 in Kubernetes (flink:1.6.3-hadoop28-scala_2.11).  One JobManager 
and two TaskManagers (TM_1, TM_2).  Each pod has 4 CPUs.  Each TaskManager has 
16 task slots.  High availability is enabled.  S3 (s3a) for storage.  RocksDB 
with incremental snapshots.  It doesn't matter if local recover is enabled - 
I've managed to replicate with both local recovery enabled and disabled.  The 
value of "fs.s3a.connection.maximum" is 128.

Problem:
Flink + Hadoop does not either re-use existing connections to S3 or kill 
existing connections and create new ones when a job dies.

Replication Steps:
Create a job with a parallelism of 16 - all processing is occurring on TM_1.  
After a checkpoint has been taken, delete TM_1.  Job is canceled on TM_1, 
deployed and restored sucessfully on TM_2, and a new TaskManager (TM_3) is 
created and successfully registers with the JobManager.  No work is scheduled 
on TM_3.  After another checkpoint is taken, delete TM_2.  The job is canceled 
on TM_2, and attempts to be deployed TM_3 but fails with 
"org.apache.flink.fs.s3hadoop.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
 Timeout waiting for connection from pool".  Flink attempts to recover by 
canceling on TM_3 and deploying on TM_4, but Flink does not does not release 
the task slots on TM_3 (TM_3 now has no free task slots).  The job is deployed 
to TM_4 which again fails with "ConnectionPoolTimeoutException: Timeout waiting 
for connection from pool".  Flink attempts to recover by canceling on TM_4, but 
does not release the task slots on TM_4 (TM_4 now has no free task slots).  As 
there are 0 available slots, the job is now caught in a SCHEDULED state.

Actual Behavior:
Shaded Hadoop does not release hold on S3 connections when job dies.

Expected Behavior:
Hadoop should be told to release connections when job dies, or should re-use 
existing connections.

Log Snip:
2019-01-10 20:03:40,191 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
(8/16) (aaa18fa82aa555a51474d49ac14665e7) switched from RUNNING to FAILED.
java.io.InterruptedIOException: getFileStatus on 
s3a://my-s3-bucket/stream-cluster/prod/checkpoints/83d7cb3e6d08318ef2c27878d0fe1bbd:
 org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable 
to execute HTTP request: Timeout waiting for connection from pool
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:125)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:101)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1571)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:1507)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:1482)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1913)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:83)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:443)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:399)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable to 
execute HTTP request: Timeout waiting for connection from pool
at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1038)
at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java

Re: [Flink 1.7.0] always got 10s ask timeout exception when submitting job with checkpoint via REST

2019-01-10 Thread Steven Wu
Gary, thanks a lot. web.timeout seems to help.

now I ran into a diff issue with loading the checkpoint. will take that
separately.

On Thu, Jan 10, 2019 at 12:25 PM Gary Yao  wrote:

> Hi all,
>
> I think increasing the default value of the config option web.timeout [1]
> is
> what you are looking for.
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java#L76
> [2]
> https://github.com/apache/flink/blob/a07ce7f6c88dc7d0c0d2ba55a0ab3f2283bf247c/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java#L177
>
> On Thu, Jan 10, 2019 at 9:19 PM Aaron Levin  wrote:
>
>> We are also experiencing this! Thanks for speaking up! It's relieving to
>> know we're not alone :)
>>
>> We tried adding `akka.ask.timeout: 1 min` to our `flink-conf.yaml`, which
>> did not seem to have any effect. I tried adding every other related akka,
>> rpc, etc. timeout and still continue to encounter these errors. I believe
>> they may also impact our ability to deploy (as we get a timeout when
>> submitting the job programmatically). I'd love to see a solution to this if
>> one exists!
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Thu, Jan 10, 2019 at 2:58 PM Steven Wu  wrote:
>>
>>> We are trying out Flink 1.7.0. We always get this exception when
>>> submitting a job with external checkpoint via REST. Job parallelism is
>>> 1,600. state size is probably in the range of 1-5 TBs. Job is actually
>>> started. Just REST api returns this failure.
>>>
>>> If we submitting the job without external checkpoint, everything works
>>> fine.
>>>
>>> Anyone else see such problem with 1.7? Appreciate your help!
>>>
>>> Thanks,
>>> Steven
>>>
>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>> akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
>>> Sender[null] sent message of type
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>> at
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$4(JarRunHandler.java:114)
>>> at
>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>> at
>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>> at
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at
>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>>> at
>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>> at
>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>> at
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
>>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>> at
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>> at
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>> at
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.util.concurrent.CompletionException:
>>> akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
>>> Sender[null] sent message of type
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>> at
>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>>> at
>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>>> at
>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>>> at
>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899

Re: Data loss when restoring from savepoint

2019-01-10 Thread Juho Autio
Stefan, would you have time to comment?

On Wednesday, January 2, 2019, Juho Autio  wrote:

> Bump – does anyone know if Stefan will be available to comment the latest
> findings? Thanks.
>
> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio  wrote:
>
>> Stefan, I managed to analyze savepoint with bravo. It seems that the data
>> that's missing from output *is* found in savepoint.
>>
>> I simplified my test case to the following:
>>
>> - job 1 has bee running for ~10 days
>> - savepoint X created & job 1 cancelled
>> - job 2 started with restore from savepoint X
>>
>> Then I waited until the next day so that job 2 has triggered the 24 hour
>> window.
>>
>> Then I analyzed the output & savepoint:
>>
>> - compare job 2 output with the output of a batch pyspark script => find
>> 4223 missing rows
>> - pick one of the missing rows (say, id Z)
>> - read savepoint X with bravo, filter for id Z => Z was found in the
>> savepoint!
>>
>> How can it be possible that the value is in state but doesn't end up in
>> output after state has been restored & window is eventually triggered?
>>
>> I also did similar analysis on the previous case where I savepointed &
>> restored the job multiple times (5) within the same 24-hour window. A
>> missing id that I drilled down to, was found in all of those savepoints,
>> yet missing from the output that gets written at the end of the day. This
>> is even more surprising: that the missing ID was written to the new
>> savepoints also after restoring. Is the reducer state somehow decoupled
>> from the window contents?
>>
>> Big thanks to bravo-developer Gyula for guiding me through to be able
>> read the reducer state! https://github.com/king/bravo/pull/11
>>
>> Gyula also had an idea for how to troubleshoot the missing data in a
>> scalable way: I could add some "side effect kafka output" on individual
>> operators. This should allow tracking more closely at which point the data
>> gets lost. However, maybe this would have to be in some Flink's internal
>> components, and I'm not sure which those would be.
>>
>> Cheers,
>> Juho
>>
>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio  wrote:
>>
>>>
>>> Hi Stefan,
>>>
>>> Bravo doesn't currently support reading a reducer state. I gave it a try
>>> but couldn't get to a working implementation yet. If anyone can provide
>>> some insight on how to make this work, please share at github:
>>> https://github.com/king/bravo/pull/11
>>>
>>> Thanks.
>>>
>>> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio  wrote:
>>>
 I was glad to find that bravo had now been updated to support
 installing bravo to a local maven repo.

 I was able to load a checkpoint created by my job, thanks to the
 example provided in bravo README, but I'm still missing the essential 
 piece.

 My code was:

 OperatorStateReader reader = new OperatorStateReader(env2,
 savepoint, "DistinctFunction");
 DontKnowWhatTypeThisIs reducingState =
 reader.readKeyedStates(what should I put here?);

 I don't know how to read the values collected from reduce() calls in
 the state. Is there a way to access the reducing state of the window with
 bravo? I'm a bit confused how this works, because when I check with
 debugger, flink internally uses a ReducingStateDescriptor
 with name=window-contents, but still reading operator state for
 "DistinctFunction" didn't at least throw an exception ("window-contents"
 threw – obviously there's no operator by that name).

 Cheers,
 Juho

 On Mon, Oct 15, 2018 at 2:25 PM Juho Autio 
 wrote:

> Hi Stefan,
>
> Sorry but it doesn't seem immediately clear to me what's a good way to
> use https://github.com/king/bravo.
>
> How are people using it? Would you for example modify build.gradle
> somehow to publish the bravo as a library locally/internally? Or add code
> directly in the bravo project (locally) and run it from there (using an
> IDE, for example)? Also it doesn't seem like the bravo gradle project
> supports building a flink job jar, but if it does, how do I do it?
>
> Thanks.
>
> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio 
> wrote:
>
>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>
>> > How would you assume that backpressure would influence your
>> updates? Updates to each local state still happen event-by-event, in a
>> single reader/writing thread.
>>
>> Sure, just an ignorant guess by me. I'm not familiar with most of
>> Flink's internals. Any way high backpressure is not a seen on this job
>> after it has caught up the lag, so at I thought it would be worth
>> mentioning.
>>
>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> Am 04.10.2018 um 16:08 schrieb Juho Autio :
>>>
>>> > you could take a look at Bravo [1] to qu