Re: [ANNOUNCE] Jiangjie (Becket) Qin has been added as a committer to the Flink project

2019-07-18 Thread Kostas Kloudas
Congratulations Becket!

Kostas

On Thu, Jul 18, 2019 at 11:21 AM Guowei Ma  wrote:

> Congrats Becket!
>
> Best,
> Guowei
>
>
> Terry Wang  于2019年7月18日周四 下午5:17写道:
>
> > Congratulations Becket!
> >
> > > 在 2019年7月18日,下午5:09,Dawid Wysakowicz  写道:
> > >
> > > Congratulations Becket! Good to have you onboard!
> > >
> > > On 18/07/2019 10:56, Till Rohrmann wrote:
> > >> Congrats Becket!
> > >>
> > >> On Thu, Jul 18, 2019 at 10:52 AM Jeff Zhang  wrote:
> > >>
> > >>> Congratulations Becket!
> > >>>
> > >>> Xu Forward  于2019年7月18日周四 下午4:39写道:
> > >>>
> >  Congratulations Becket! Well deserved.
> > 
> > 
> >  Cheers,
> > 
> >  forward
> > 
> >  Kurt Young  于2019年7月18日周四 下午4:20写道:
> > 
> > > Congrats Becket!
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Thu, Jul 18, 2019 at 4:12 PM JingsongLee <
> lzljs3620...@aliyun.com
> > > .invalid>
> > > wrote:
> > >
> > >> Congratulations Becket!
> > >>
> > >> Best, Jingsong Lee
> > >>
> > >>
> > >> --
> > >> From:Congxian Qiu 
> > >> Send Time:2019年7月18日(星期四) 16:09
> > >> To:dev@flink.apache.org 
> > >> Subject:Re: [ANNOUNCE] Jiangjie (Becket) Qin has been added as a
> > > committer
> > >> to the Flink project
> > >>
> > >> Congratulations Becket! Well deserved.
> > >>
> > >> Best,
> > >> Congxian
> > >>
> > >>
> > >> Jark Wu  于2019年7月18日周四 下午4:03写道:
> > >>
> > >>> Congratulations Becket! Well deserved.
> > >>>
> > >>> Cheers,
> > >>> Jark
> > >>>
> > >>> On Thu, 18 Jul 2019 at 15:56, Paul Lam 
> >  wrote:
> >  Congrats Becket!
> > 
> >  Best,
> >  Paul Lam
> > 
> > > 在 2019年7月18日,15:41,Robert Metzger  写道:
> > >
> > > Hi all,
> > >
> > > I'm excited to announce that Jiangjie (Becket) Qin just became
> > >>> a
> > >> Flink
> > > committer!
> > >
> > > Congratulations Becket!
> > >
> > > Best,
> > > Robert (on behalf of the Flink PMC)
> > 
> > >>>
> > >>> --
> > >>> Best Regards
> > >>>
> > >>> Jeff Zhang
> > >>>
> > >
> >
> >
>


[jira] [Created] (FLINK-13307) SourceStreamTaskTest test instability.

2019-07-17 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-13307:
--

 Summary: SourceStreamTaskTest test instability.
 Key: FLINK-13307
 URL: https://issues.apache.org/jira/browse/FLINK-13307
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.9.0
Reporter: Kostas Kloudas
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13275) Race condition in SourceStreamTaskTest.finishingIgnoresExceptions()

2019-07-15 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-13275:
--

 Summary: Race condition in 
SourceStreamTaskTest.finishingIgnoresExceptions()
 Key: FLINK-13275
 URL: https://issues.apache.org/jira/browse/FLINK-13275
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.10.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas


The race condition consists of the following series of steps:

1) the `finishTask` of the source puts the {{POISON_PILL}} in the {{mailbox}}. 
This will be put as first letter in the queue because we call 
{{sendPriorityLetter()}} in the {{MailboxProcessor}}. 

2) then in the {{cancel()}} of the source (called by the {{cancelTask()}} leads 
the source out of the run-loop which throws the exception in the test source. 
The latter, calls again {{sendPriorityLetter()}} for the exception, which means 
that this exception letter may override the previously sent {{POISON_PILL}} 
(because it jumps the line), 

3) if the {{POISON_PILL}} has already been executed, we are good, if not, then 
the test harness sets the exception in the 
{{StreamTaskTestHarness.TaskThread.error}}.

To fix that, I would suggest to follow the same strategy for the root problem 
https://issues.apache.org/jira/browse/FLINK-13124 as in release-1.9.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Kostas Kloudas
Congratulations Rong!

On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:

> Congratulations Rong Rong!
> Welcome on board!
>
> On Thu, 11 Jul 2019 at 22:25, Fabian Hueske  wrote:
>
>> Hi everyone,
>>
>> I'm very happy to announce that Rong Rong accepted the offer of the Flink
>> PMC to become a committer of the Flink project.
>>
>> Rong has been contributing to Flink for many years, mainly working on SQL
>> and Yarn security features. He's also frequently helping out on the
>> user@f.a.o mailing lists.
>>
>> Congratulations Rong!
>>
>> Best, Fabian
>> (on behalf of the Flink PMC)
>>
>


Re: [DISCUSS] Graceful Shutdown Handling by UDFs.

2019-07-08 Thread Kostas Kloudas
Hi Konstantin,

Yes you are right that `cancel` falls under the "abrupt" termination of a
job.
I will update the FLIP accordingly.

Cheers,
Kostas

On Sun, Jul 7, 2019 at 11:38 AM Konstantin Knauf 
wrote:

> Hi Klou,
>
> +1 for this proposal. I am missing any mention of "cancel" in the design
> document though. In my understanding we are not planning to deprecate
> "cancel" completely (just cancel-with-savepoint, which is superseded by
> "stop"). In any case we should consider it in the design document. It seems
> to me that "cancel" should be consider an ungraceful shutdown, so that the
> Job could be restarted from last (retained) checkpoint (as right now).
>
> Cheers,
>
> Konstantin
>
> On Thu, Jul 4, 2019 at 3:21 PM Kostas Kloudas  wrote:
>
> > Hi all,
> >
> > In many cases, UDFs (User Defined Functions) need to be able to perform
> > application-specific actions when they stop in an orderly manner.
> > Currently, Flink's UDFs, and more specifically the RichFunction which
> > exposes lifecycle-related hooks, only have a close() method which is
> called
> > in any case of job termination. This includes any form of orderly
> > termination (STOP or End-Of-Stream) and termination due to an error.
> >
> >
> > The FLIP in [1] and the design document in [2] propose the addition of an
> > interface that will allow UDFs that implement it to perform application
> > specific logic in the case of graceful termination. These cases include
> > DRAIN and SUSPEND for streaming jobs (see FLIP-34), but also reaching the
> > End-Of-Stream for jobs with finite sources.
> >
> > Let's have a lively discussion to solve this issue that has been around
> for
> > quite some time.
> >
> > Cheers,
> > Kostas
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
> >
> > [2]
> >
> >
> https://docs.google.com/document/d/1SXfhmeiJfWqi2ITYgCgAoSDUv5PNq1T8Zu01nR5Ebog/edit?usp=sharing
> >
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


[FLIP-47] Savepoints vs Checkpoints

2019-07-08 Thread Kostas Kloudas
Hi Devs,

Currently there is a number of efforts around checkpoints/savepoints, as
reflected by the number of FLIPs. From a quick look FLIP-34, FLIP-41,
FLIP-43, and FLIP-45 are all directly related to these topics. This
reflects the importance of these two notions/features to the users of the
framework.

Although many efforts are centred around these notions, their semantics and
the interplay between them is not always clearly defined. This makes them
difficult to explain them to the users (all the different combinations of
state-backends, formats and tradeoffs) and in some cases it may have
negative effects to the users (e.g. the already-fixed-some-time-ago issue
of savepoints not being considered for recovery although they committed
side-effects).

FLIP-47 [1] and the related Document [2] is aiming at starting a discussion
around the semantics of savepoints/checkpoints and their interplay, and to
some extent help us fix the future steps concerning these notions. As an
example, should we work towards bringing them closer, or moving them
further apart.

This is not a complete proposal (by no means), as many of the practical
implications can only be fleshed out after we agree on the basic semantics
and the general frame around these notions. To that end, there are no
concrete implementation steps and the FLIP is going to be updated as the
discussion continues.

I am really looking forward to your opinions on the topic.

Cheers,
Kostas

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-47%3A+Checkpoints+vs.+Savepoints
[2]
https://docs.google.com/document/d/1_1FF8D3u0tT_zHWtB-hUKCP_arVsxlmjwmJ-TvZd4fs/edit?usp=sharing


Re: Rolling policy when using StreamingFileSink for bulk-encoded output

2019-07-03 Thread Kostas Kloudas
Thanks Ying!

Looking forward to your contribution.

Kostas

On Wed, Jul 3, 2019 at 6:48 PM Ying Xu  wrote:

> Hi Kostas:
>
> For simplicity FLINK-13027
> <https://issues.apache.org/jira/browse/FLINK-13027> has been assigned to
> my
> current user ID. I will contribute using that ID.
>
> Will circulate with the community once we have initial success with this
> new rolling policy !
>
> Thank you again.
>
> -
> Ying
>
>
> On Fri, Jun 28, 2019 at 9:51 AM Ying Xu  wrote:
>
> > Hi Kostas:
> >
> > I'd like to.  The account used to file the JIRA does not have contributor
> > access yet .  I had contributed a few Flink JIRAs in the past, using a
> very
> > similar but different account.  Now I would like to consolidate and use a
> > common account for Apache projects contributions.
> >
> > Would you mind granting me the contributor access for the following
> > account ?  This way I can assign the JIRA to myself.
> >*yxu-apache
> > <https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache
> >*
> >
> > Many thanks!
> > <http://www.lyft.com/>
> > -
> > Ying
> >
> >
> > On Fri, Jun 28, 2019 at 2:23 AM Kostas Kloudas 
> wrote:
> >
> >> Hi Ying,
> >>
> >> That sounds great!
> >> Looking forward to your PR!
> >>
> >> Btw don't you want to assign the issue to yourself if you are
> >> planning to work on it?
> >>
> >> Kostas
> >>
> >> On Fri, Jun 28, 2019 at 9:54 AM Ying Xu  wrote:
> >>
> >> > Thanks Kostas for confirming!
> >> >
> >> > I've filed a issue FLINK-13027
> >> > <https://issues.apache.org/jira/browse/FLINK-13027> .   We are
> actively
> >> > working on the interface of such a file rolling policy, and will also
> >> > perform benchmarks when it is integrated with a StreamingFileSink. We
> >> are
> >> > more than happy to contribute if there's no other plan to address this
> >> > issue.
> >> >
> >> > Thanks again.
> >> >
> >> > -
> >> > Bests
> >> > Ying
> >> >
> >> >
> >> > On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas 
> >> wrote:
> >> >
> >> > > Hi Ying,
> >> > >
> >> > > You are right! If it is either on checkpoint or on size, then this
> is
> >> > > doable even with the current state of things.
> >> > > Could you open a JIRA so that we can keep track of the progress?
> >> > >
> >> > > Cheers,
> >> > > Kostas
> >> > >
> >> > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu 
> wrote:
> >> > >
> >> > > > HI Kostas:
> >> > > >
> >> > > > Thanks for the prompt reply.
> >> > > >
> >> > > > The file rolling policy mentioned previously is meant to roll
> files
> >> > > EITHER
> >> > > > when a size limited is reached, OR when a checkpoint happens.
> Looks
> >> > like
> >> > > > every time a file is rolled, the part file is closed
> >> > > > <
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
> >> > > > >,
> >> > > > during which file is closed with a committable returned
> >> > > > <
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
> >> > > > >.
> >> > > > I assume it is during closeForCommit() when the Parquet file
> >> metatdata
> >> > is
> >> > > > written.  At a first glance, the code path of file rolling looks
> >> very
> >> > > > similar to that inside prepareBucketForCheckpointing()
> >> > > > <
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket

Re: Rolling policy when using StreamingFileSink for bulk-encoded output

2019-06-28 Thread Kostas Kloudas
Hi Ying,

That sounds great!
Looking forward to your PR!

Btw don't you want to assign the issue to yourself if you are
planning to work on it?

Kostas

On Fri, Jun 28, 2019 at 9:54 AM Ying Xu  wrote:

> Thanks Kostas for confirming!
>
> I've filed a issue FLINK-13027
> <https://issues.apache.org/jira/browse/FLINK-13027> .   We are actively
> working on the interface of such a file rolling policy, and will also
> perform benchmarks when it is integrated with a StreamingFileSink. We are
> more than happy to contribute if there's no other plan to address this
> issue.
>
> Thanks again.
>
> -
> Bests
> Ying
>
>
> On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas  wrote:
>
> > Hi Ying,
> >
> > You are right! If it is either on checkpoint or on size, then this is
> > doable even with the current state of things.
> > Could you open a JIRA so that we can keep track of the progress?
> >
> > Cheers,
> > Kostas
> >
> > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu  wrote:
> >
> > > HI Kostas:
> > >
> > > Thanks for the prompt reply.
> > >
> > > The file rolling policy mentioned previously is meant to roll files
> > EITHER
> > > when a size limited is reached, OR when a checkpoint happens.  Looks
> like
> > > every time a file is rolled, the part file is closed
> > > <
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
> > > >,
> > > during which file is closed with a committable returned
> > > <
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
> > > >.
> > > I assume it is during closeForCommit() when the Parquet file metatdata
> is
> > > written.  At a first glance, the code path of file rolling looks very
> > > similar to that inside prepareBucketForCheckpointing()
> > > <
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
> > > >.
> > > Not sure if I miss anything there.
> > >
> > >
> > > -
> > > Ying
> > >
> > >
> > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas 
> > wrote:
> > >
> > > > Hi Ying,
> > > >
> > > > Thanks for using the StreamingFileSink.
> > > >
> > > > The reason why the StreamingFileSink only supports
> > > > OnCheckpointRollingPolicy with bulk
> > > > formats has to do with the fact that currently Flink relies on the
> > Hadoop
> > > > writer for Parquet.
> > > >
> > > > Bulk formats keep important details about how they write the actual
> > data
> > > > (such as compression
> > > > schemes, offsets, etc) in metadata and they write this metadata with
> > the
> > > > file (e.g. parquet writes
> > > > them as a footer). The hadoop writer gives no access to these
> metadata.
> > > > Given this, there is
> > > > no way for flink to be able to checkpoint a part file securely
> without
> > > > closing it.
> > > >
> > > > The solution would be to write our own writer and not go through the
> > > hadoop
> > > > one, but there
> > > > are no concrete plans for this, as far as I know.
> > > >
> > > > I hope this explains a bit more why the StreamingFileSink has this
> > > > limitation.
> > > >
> > > > Cheers,
> > > > Kostas
> > > >
> > > >
> > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu 
> wrote:
> > > >
> > > > > Dear Flink community:
> > > > >
> > > > > We have a use case where StreamingFileSink
> > > > > <
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > > > >
> > > > > is used for persisting bulk-encoded data to AWS s3. In our case,
> the
> > > data
> > > > > sources consist of hybrid types of events, for which each type is
> > > > uploaded
> > >

Re: Rolling policy when using StreamingFileSink for bulk-encoded output

2019-06-25 Thread Kostas Kloudas
Hi Ying,

You are right! If it is either on checkpoint or on size, then this is
doable even with the current state of things.
Could you open a JIRA so that we can keep track of the progress?

Cheers,
Kostas

On Tue, Jun 25, 2019 at 9:49 AM Ying Xu  wrote:

> HI Kostas:
>
> Thanks for the prompt reply.
>
> The file rolling policy mentioned previously is meant to roll files EITHER
> when a size limited is reached, OR when a checkpoint happens.  Looks like
> every time a file is rolled, the part file is closed
> <
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
> >,
> during which file is closed with a committable returned
> <
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
> >.
> I assume it is during closeForCommit() when the Parquet file metatdata is
> written.  At a first glance, the code path of file rolling looks very
> similar to that inside prepareBucketForCheckpointing()
> <
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
> >.
> Not sure if I miss anything there.
>
>
> -
> Ying
>
>
> On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas  wrote:
>
> > Hi Ying,
> >
> > Thanks for using the StreamingFileSink.
> >
> > The reason why the StreamingFileSink only supports
> > OnCheckpointRollingPolicy with bulk
> > formats has to do with the fact that currently Flink relies on the Hadoop
> > writer for Parquet.
> >
> > Bulk formats keep important details about how they write the actual data
> > (such as compression
> > schemes, offsets, etc) in metadata and they write this metadata with the
> > file (e.g. parquet writes
> > them as a footer). The hadoop writer gives no access to these metadata.
> > Given this, there is
> > no way for flink to be able to checkpoint a part file securely without
> > closing it.
> >
> > The solution would be to write our own writer and not go through the
> hadoop
> > one, but there
> > are no concrete plans for this, as far as I know.
> >
> > I hope this explains a bit more why the StreamingFileSink has this
> > limitation.
> >
> > Cheers,
> > Kostas
> >
> >
> > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu  wrote:
> >
> > > Dear Flink community:
> > >
> > > We have a use case where StreamingFileSink
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > >
> > > is used for persisting bulk-encoded data to AWS s3. In our case, the
> data
> > > sources consist of hybrid types of events, for which each type is
> > uploaded
> > > to an individual s3 prefix location. Because the event size is highly
> > > skewed, the uploaded file size may differ dramatically.  In order to
> > have a
> > > better control over the uploaded file size, we would like to adopt a
> > > rolling policy based on file sizes (e.g., roll the file every 100MB).
> Yet
> > > it appears bulk-encoding StreamingFileSink only supports
> checkpoint-based
> > > file rolling.
> > >
> > > IMPORTANT: Bulk-encoding formats can only be combined with the
> > > `OnCheckpointRollingPolicy`, which rolls the in-progress part file on
> > every
> > > checkpoint.
> > >
> > > Checkpoint-based file rolling appears to have other side effects. For
> > > instance, quite a lot of the heavy liftings (e.g file parts uploading)
> > are
> > > performed at the checkpointing time. As a result, checkpointing takes
> > > longer duration when data volume is high.
> > >
> > > Having a customized file rolling policy can be achieved by small
> > > adjustments on the BulkFormatBuilder interface in StreamingFileSink. In
> > the
> > > case of using S3RecoverableWriter, file rolling triggers data uploading
> > and
> > > corresponding S3Committer is also constructed and stored. Hence on the
> > > surface, adding a simple file-size based rolling policy would NOT
> > > compromise the established exact-once guarantee.
> > >
> > > Any advises on whether the above idea makes sense? Or perhaps there are
> > > pitfalls that one might pay attention when introducing such rolling
> > policy.
> > > Thanks a lot!
> > >
> > >
> > > -
> > > Ying
> > >
> >
>


Re: Rolling policy when using StreamingFileSink for bulk-encoded output

2019-06-24 Thread Kostas Kloudas
Hi Ying,

Thanks for using the StreamingFileSink.

The reason why the StreamingFileSink only supports
OnCheckpointRollingPolicy with bulk
formats has to do with the fact that currently Flink relies on the Hadoop
writer for Parquet.

Bulk formats keep important details about how they write the actual data
(such as compression
schemes, offsets, etc) in metadata and they write this metadata with the
file (e.g. parquet writes
them as a footer). The hadoop writer gives no access to these metadata.
Given this, there is
no way for flink to be able to checkpoint a part file securely without
closing it.

The solution would be to write our own writer and not go through the hadoop
one, but there
are no concrete plans for this, as far as I know.

I hope this explains a bit more why the StreamingFileSink has this
limitation.

Cheers,
Kostas


On Mon, Jun 24, 2019 at 9:19 AM Ying Xu  wrote:

> Dear Flink community:
>
> We have a use case where StreamingFileSink
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> >
> is used for persisting bulk-encoded data to AWS s3. In our case, the data
> sources consist of hybrid types of events, for which each type is uploaded
> to an individual s3 prefix location. Because the event size is highly
> skewed, the uploaded file size may differ dramatically.  In order to have a
> better control over the uploaded file size, we would like to adopt a
> rolling policy based on file sizes (e.g., roll the file every 100MB). Yet
> it appears bulk-encoding StreamingFileSink only supports checkpoint-based
> file rolling.
>
> IMPORTANT: Bulk-encoding formats can only be combined with the
> `OnCheckpointRollingPolicy`, which rolls the in-progress part file on every
> checkpoint.
>
> Checkpoint-based file rolling appears to have other side effects. For
> instance, quite a lot of the heavy liftings (e.g file parts uploading) are
> performed at the checkpointing time. As a result, checkpointing takes
> longer duration when data volume is high.
>
> Having a customized file rolling policy can be achieved by small
> adjustments on the BulkFormatBuilder interface in StreamingFileSink. In the
> case of using S3RecoverableWriter, file rolling triggers data uploading and
> corresponding S3Committer is also constructed and stored. Hence on the
> surface, adding a simple file-size based rolling policy would NOT
> compromise the established exact-once guarantee.
>
> Any advises on whether the above idea makes sense? Or perhaps there are
> pitfalls that one might pay attention when introducing such rolling policy.
> Thanks a lot!
>
>
> -
> Ying
>


[jira] [Created] (FLINK-12226) Add documentation about SUSPEND/TERMINATE

2019-04-17 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-12226:
--

 Summary: Add documentation about SUSPEND/TERMINATE
 Key: FLINK-12226
 URL: https://issues.apache.org/jira/browse/FLINK-12226
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.9.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [Discuss] Semantics of event time for state TTL

2019-04-08 Thread Kostas Kloudas
Hi all,

For GDPR: I am not sure about the regulatory requirements of GDPR but I
would assume that the time for deletion starts counting from the time an
organisation received the data (i.e. the wall-clock ingestion time of the
data), and not the "event time" of the data. In other case, an organisaton
may be violating GDPR by just receiving e.g. 1 year old data of a user
whole deletion policy is "you are allowed to keep them for 6 months".

Now for the discussion in this thread, I think that the scenario:

* Timestamp stored: Event timestamp
* Timestamp to check expiration: Processing Time

has the underlying assumption that there is a relationship between
event-time and processing time, which is not necessarily the case.
Event-time, although we call it "time", is just another user-defined column
or attribute of the data and can be anything. It is not an "objective" and
independently evolving attribute like wall-clock time. I am not sure what
could be the solution, as out-of-orderness can always lead to arbitrary,
non-reproducible and difficult to debug behaviour (e.g. a super-early
element that arrives out-of-order and, as the succeeding elements set the
timestamp to lower values, it gets deleted by the state backend, although
the user-level windowing logic would expect it to be there).

Given that last point made above, and apart from the semantics of the
proposed feature, I think that we should also discuss if it is a good idea
to have event time TTL implemented in state backend level in the first
place. Personally, I am not so convinced that this is a good idea, as we
introduce another (potentially competing) mechanism for handling event
time, apart from the user program. An example can be the one that I
described above. And this also defeats one of the main advantages of event
time, in my opinion, which is reproducability of the results.

I may be wrong, but I would appreciate any opinions on this.

Cheers,
Kostas

On Mon, Apr 8, 2019 at 11:12 AM Aljoscha Krettek 
wrote:

> Oh boy, this is an interesting pickle.
>
> For *last-access-timestamp*, I think only *event-time-of-current-record*
> makes sense. I’m looking at this from a GDPR/regulatory compliance
> perspective. If you update a state, by say storing the event you just
> received in state, you want to use the exact timestamp of that event to to
> expiration. Both *max-timestamp-of-data-seen-so-far* and *last-watermark*
> suffer from problems in edge cases: if the timestamp of an event you
> receive is quite a bit earlier than other timestamps that we have seen so
> far (i.e. the event is late) we would artificially lengthen the TTL of that
> event (which is stored in state) and would therefore break regulatory
> requirements. Always using the timestamp of an event doesn’t suffer from
> that problem.
>
> For *expiration-check-time*, both *last-watermark* and
> *current-processing-time* could make sense but I’m leaning towards
> *processing-time*. The reason is again the GDPR/compliance view: if we have
> an old savepoint with data that should have been expired by now but we
> re-process it with *last-watermark* expiration, this means that we will get
> to “see” that state even though we shouldn’t allowed to be. If we use
> *current-processing-time* for expiration, we wouldn’t have that problem
> because that old data (according to their event-time timestamp) would be
> properly cleaned up and access would be prevented.
>
> To sum up:
> last-access-timestamp: event-time of event
> expiration-check-time: processing-time
>
> What do you think?
>
> Aljoscha
>
> > On 6. Apr 2019, at 01:30, Konstantin Knauf 
> wrote:
> >
> > Hi Andrey,
> >
> > I agree with Elias. This would be the most natural behavior. I wouldn't
> add
> > additional slightly different notions of time to Flink.
> >
> > As I can also see a use case for the combination
> >
> > * Timestamp stored: Event timestamp
> > * Timestamp to check expiration: Processing Time
> >
> > we could (maybe in a second step) add the possibility to mix and match
> time
> > characteristics for both aspects.
> >
> > Cheers,
> >
> > Konstantin
> >
> > On Thu, Apr 4, 2019 at 7:59 PM Elias Levy 
> > wrote:
> >
> >> My 2c:
> >>
> >> Timestamp stored with the state value: Event timestamp
> >> Timestamp used to check expiration: Last emitted watermark
> >>
> >> That follows the event time processing model used elsewhere is Flink.
> >> E.g. events are segregated into windows based on their event time, but
> the
> >> windows do not fire until the watermark advances past the end of the
> window.
> >>
> >>
> >> On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin 
> >> wrote:
> >>
> >>> Hi All,
> >>>
> >>> As you might have already seen there is an effort tracked in
> FLINK-12005
> >>> [1] to support event time scale for state with time-to-live (TTL) [2].
> >>> While thinking about design, we realised that there can be multiple
> >>> options
> >>> for semantics of this feature, depending on use case. There is also
> >>> 

[jira] [Created] (FLINK-12051) TaskExecutorTest.testFilterOutDuplicateJobMasterRegistrations() failed locally.

2019-03-28 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-12051:
--

 Summary: 
TaskExecutorTest.testFilterOutDuplicateJobMasterRegistrations() failed locally.
 Key: FLINK-12051
 URL: https://issues.apache.org/jira/browse/FLINK-12051
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.8.0
Reporter: Kostas Kloudas


The test failed locally with:

{code}

Wanted but not invoked:
 jobLeaderService.start(
 ,
 ,
 ,
 
 );
 -> at 
org.apache.flink.runtime.taskexecutor.TaskExecutorTest.testFilterOutDuplicateJobMasterRegistrations(TaskExecutorTest.java:1171)
 Actually, there were zero interactions with this mock.

Wanted but not invoked:
 jobLeaderService.start(
 ,
 ,
 ,
 
 );
 -> at 
org.apache.flink.runtime.taskexecutor.TaskExecutorTest.testFilterOutDuplicateJobMasterRegistrations(TaskExecutorTest.java:1171)
 Actually, there were zero interactions with this mock.

at 
org.apache.flink.runtime.taskexecutor.TaskExecutorTest.testFilterOutDuplicateJobMasterRegistrations(TaskExecutorTest.java:1171)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
 at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
 at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
 at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
 at org.junit.runners.Suite.runChild(Suite.java:128)
 at org.junit.runners.Suite.runChild(Suite.java:27)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
 at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
 at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
 at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
 at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
 at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
 at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11984) StreamingFileSink docs do not mention S3 savepoint caveats.

2019-03-20 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11984:
--

 Summary: StreamingFileSink docs do not mention S3 savepoint 
caveats.
 Key: FLINK-11984
 URL: https://issues.apache.org/jira/browse/FLINK-11984
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem, Documentation
Affects Versions: 1.7.2
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] FLIP-33: Terminate/Suspend Job with Savepoint

2019-03-12 Thread Kostas Kloudas
Thanks a lot Aljoscha!

On Tue, Mar 12, 2019 at 2:50 PM Aljoscha Krettek 
wrote:

> I agree and already created a Jira issue for removing the old “stop”
> feature as preparation: https://issues.apache.org/jira/browse/FLINK-11889
> <https://issues.apache.org/jira/browse/FLINK-11889>
>
> Aljoscha
>
> > On 7. Mar 2019, at 11:08, Kostas Kloudas  wrote:
> >
> > Hi,
> >
> > Thanks for the comments.
> > I agree with the Ufuk's and Elias' proposal.
> >
> > - "cancel" remains the good old "cancel"
> > - "terminate" becomes "stop --drain-with-savepoint"
> > - "suspend" becomes "stop --with-savepoint"
> > - "cancel-with-savepoint" is subsumed by "stop --with-savepoint"
> >
> > As you see from the previous, I would also add "terminate" and "suspend"
> > to result in keeping a savepoint by default.
> >
> > As for Ufuk's remarks:
> >
> > 1) You are correct that to have a proper way to not allow elements to be
> > fed in the pipeline
> > after the checkpoint barrier, we need support from the sources. This is
> > more the responsibility
> > of FLIP-27
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >
> > 2) I would lean more towards replacing the old "stop" command with the
> new
> > one. But, as you said,
> > I have no view of how many users (if any) rely on the old "stop" command
> > for their usecases.
> >
> > Cheers,
> > Kostas
> >
> >
> >
> > On Wed, Mar 6, 2019 at 9:52 PM Ufuk Celebi  wrote:
> >
> >> I really like this effort. I think the original plan for
> >> "cancel-with-savepoint" was always to just be a workaround until we
> >> arrived at a better solution as proposed here.
> >>
> >> Regarding the FLIP, I agree with Elias comments. I think the number of
> >> termination modes the FLIP introduces can be overwhelming and I would
> >> personally rather follow Elias' proposal. In context of the proposal,
> >> this would result in the following:
> >> - "terminate" becomes "stop --drain"
> >> - "suspend" becomes "stop --with-savepoint"
> >> - "cancel-with-savepoint" is superseded by "stop --with-savepoint"
> >>
> >> I have two remaining questions:
> >>
> >> 1) @Kostas: Elias suggests for stop that "a job should process no
> >> messages after the checkpoints barrier". This is something that needs
> >> support from the sources. Is this in the scope of your proposal (I
> >> think not)? If not, is there a future plan for this?
> >>
> >> 2) Would we need to introduce a new command/name for "stop" as we
> >> already have a "stop" command? Assuming that there are no users that
> >> actually use the existing "stop" command as no major sources are
> >> stoppable (I think), I would personally suggest to upgrade the
> >> existing "stop" command to the proposed one. If on the other hand, if
> >> we know of users that rely on the current "stop" command, we'd need to
> >> find another name for it.
> >>
> >> Best,
> >>
> >> Ufuk
> >>
> >> On Wed, Mar 6, 2019 at 12:27 AM Elias Levy  >
> >> wrote:
> >>>
> >>> Apologies for the late reply.
> >>>
> >>> I think this is badly needed, but I fear we are adding complexity by
> >>> introducing yet two more stop commands.  We'll have: cancel, stop,
> >>> terminate. and suspend.  We basically want to do two things: terminate
> a
> >>> job with prejudice or stop a job safely.
> >>>
> >>> For the former "cancel" is the appropriate term, and should have no
> need
> >>> for a cancel with checkpoint option.  If the job was configured to use
> >>> externalized checkpoints and it ran long enough, a checkpoint will be
> >>> available for it.
> >>>
> >>> For the later "stop" is the appropriate term, and it means that a job
> >>> should process no messages after the checkpoints barrier and that it
> >> should
> >>> ensure that exactly-once sinks complete their two-phase commits
> >>> successfully.  If a savepoint was requested, one should be created.
> >>>
> >>> So in my mind there are two commands, cancel and s

Re: [DISCUSS] FLIP-33: Terminate/Suspend Job with Savepoint

2019-03-07 Thread Kostas Kloudas
Hi,

Thanks for the comments.
I agree with the Ufuk's and Elias' proposal.

- "cancel" remains the good old "cancel"
- "terminate" becomes "stop --drain-with-savepoint"
- "suspend" becomes "stop --with-savepoint"
- "cancel-with-savepoint" is subsumed by "stop --with-savepoint"

As you see from the previous, I would also add "terminate" and "suspend"
to result in keeping a savepoint by default.

As for Ufuk's remarks:

1) You are correct that to have a proper way to not allow elements to be
fed in the pipeline
after the checkpoint barrier, we need support from the sources. This is
more the responsibility
of FLIP-27
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

2) I would lean more towards replacing the old "stop" command with the new
one. But, as you said,
I have no view of how many users (if any) rely on the old "stop" command
for their usecases.

Cheers,
Kostas



On Wed, Mar 6, 2019 at 9:52 PM Ufuk Celebi  wrote:

> I really like this effort. I think the original plan for
> "cancel-with-savepoint" was always to just be a workaround until we
> arrived at a better solution as proposed here.
>
> Regarding the FLIP, I agree with Elias comments. I think the number of
> termination modes the FLIP introduces can be overwhelming and I would
> personally rather follow Elias' proposal. In context of the proposal,
> this would result in the following:
> - "terminate" becomes "stop --drain"
> - "suspend" becomes "stop --with-savepoint"
> - "cancel-with-savepoint" is superseded by "stop --with-savepoint"
>
> I have two remaining questions:
>
> 1) @Kostas: Elias suggests for stop that "a job should process no
> messages after the checkpoints barrier". This is something that needs
> support from the sources. Is this in the scope of your proposal (I
> think not)? If not, is there a future plan for this?
>
> 2) Would we need to introduce a new command/name for "stop" as we
> already have a "stop" command? Assuming that there are no users that
> actually use the existing "stop" command as no major sources are
> stoppable (I think), I would personally suggest to upgrade the
> existing "stop" command to the proposed one. If on the other hand, if
> we know of users that rely on the current "stop" command, we'd need to
> find another name for it.
>
> Best,
>
> Ufuk
>
> On Wed, Mar 6, 2019 at 12:27 AM Elias Levy 
> wrote:
> >
> > Apologies for the late reply.
> >
> > I think this is badly needed, but I fear we are adding complexity by
> > introducing yet two more stop commands.  We'll have: cancel, stop,
> > terminate. and suspend.  We basically want to do two things: terminate a
> > job with prejudice or stop a job safely.
> >
> > For the former "cancel" is the appropriate term, and should have no need
> > for a cancel with checkpoint option.  If the job was configured to use
> > externalized checkpoints and it ran long enough, a checkpoint will be
> > available for it.
> >
> > For the later "stop" is the appropriate term, and it means that a job
> > should process no messages after the checkpoints barrier and that it
> should
> > ensure that exactly-once sinks complete their two-phase commits
> > successfully.  If a savepoint was requested, one should be created.
> >
> > So in my mind there are two commands, cancel and stop, with appropriate
> > semantics.  Emitting MAX_WATERMARK before the checkpoint barrier during
> > stop is merely an optional behavior, like creation of a savepoint.  But
> if
> > a specific command for it is desired, then "drain" seems appropriate.
> >
> > On Tue, Feb 12, 2019 at 9:50 AM Stephan Ewen  wrote:
> >
> > > Hi Elias!
> > >
> > > I remember you brought this missing feature up in the past. Do you
> think
> > > the proposed enhancement would work for your use case?
> > >
> > > Best,
> > > Stephan
> > >
> > > -- Forwarded message -
> > > From: Kostas Kloudas 
> > > Date: Tue, Feb 12, 2019 at 5:28 PM
> > > Subject: [DISCUSS] FLIP-33: Terminate/Suspend Job with Savepoint
> > > To: 
> > >
> > >
> > > Hi everyone,
> > >
> > >  A commonly used functionality offered by Flink is the
> > > "cancel-with-savepoint" operation. When applied to the current
> exactly-once
> > > sinks, the current implementation of the 

Re: [DISCUSS] A more restrictive JIRA workflow

2019-02-25 Thread Kostas Kloudas
Really nice idea Timo,

Thanks for taking the initiative to open this discussion.

Although a side-effect, I consider it a big argument about my +1
the fact that now we create backpressure whenever needed at the
JIRA level, rather than at the open PR level.

The reason is that not accepting a PR after the contributor has spent
cycles working on an issue, it can be a lot more demotivating than
just waiting on the JIRA assignment to be completed.

+1 from my side,
Kostas


On Mon, Feb 25, 2019 at 2:23 PM Timo Walther  wrote:

> Hi everyone,
>
> as some of you might have noticed during the last weeks, the Flink
> community grew quite a bit. A lot of people have applied for contributor
> permissions and started working on issues, which is great for the growth
> of Flink!
>
> However, we've also observed that managing JIRA and coordinate work and
> responsibilities becomes more complex as more people are joining. Here
> are some observations to examplify the current challenges:
>
> - There is a high number of concurrent discussion about new features or
> important refactorings.
>
> - JIRA issues are being created for components to:
>- represent an implementation plan (e.g. of a FLIP)
>- track progress of the feature by splitting it into a finer granularity
>- coordinate work between contributors/contributor teams
>
> - Lack of guidance for new contributors: Contributors don't know which
> issues to pick but are motivated to work on something.
>
> - Contributors pick issues that:
>- require very good (historical) knowledge of a component
>- need to be implemented in a timely fashion as they block other
> contributors or a Flink release
>- have implicit dependencies on other changes
>
> - Contributors open pull requests with a bad description, without
> consensus, or an unsatisfactory architecture. Shortcomings that could
> have been solved in JIRA before.
>
> - Committers don't open issues because they fear that some "random"
> contributor picks it up or assign many issues to themselves to "protect"
> them. Even though they don't have the capacity to solve all of them.
>
> I propose to make our JIRA a bit more restrictive:
>
> - Don't allow contributors to assign issues to themselves. This forces
> them to find supporters first. As mentioned in the contribution
> guidelines [1]: "reach consensus with the community". Only committers
> can assign people to issues.
>
> - Don't allow contributors to set a fixed version or release notes. Only
> committers should do that after merging the contribution.
>
> - Don't allow contributors to set a blocker priority. The release
> manager should decide about that.
>
> As a nice side-effect, it might also impact the number of stale pull
> requests by moving the consensus and design discussion to an earlier
> phase in the process.
>
> What do you think? Feel free to propose more workflow improvements. Of
> course we need to check with INFRA if this can be represented in our JIRA.
>
> Thanks,
> Timo
>
> [1] https://flink.apache.org/contribute-code.html
>
>


[jira] [Created] (FLINK-11670) Add SUSPEND/TERMINATE calls to REST API

2019-02-20 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11670:
--

 Summary: Add SUSPEND/TERMINATE calls to REST API
 Key: FLINK-11670
 URL: https://issues.apache.org/jira/browse/FLINK-11670
 Project: Flink
  Issue Type: Sub-task
  Components: REST
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas


Exposes the SUSPEND/TERMINATE functionality to the user through the REST API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11671) Expose SUSPEND/TERMINATE to CLI

2019-02-20 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11671:
--

 Summary: Expose SUSPEND/TERMINATE to CLI
 Key: FLINK-11671
 URL: https://issues.apache.org/jira/browse/FLINK-11671
 Project: Flink
  Issue Type: Sub-task
  Components: Client
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas


Expose the SUSPEND/TERMINATE functionality to the user through the command line.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11669) Add Synchronous Checkpoint Triggering RPCs.

2019-02-20 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11669:
--

 Summary: Add Synchronous Checkpoint Triggering RPCs.
 Key: FLINK-11669
 URL: https://issues.apache.org/jira/browse/FLINK-11669
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas


Wires the triggering of the Synchronous {{Checkpoint/Savepoint}} from the 
{{JobMaster}} to the {{TaskExecutor}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11668) Allow sources to advance time to max watermark on checkpoint.

2019-02-20 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11668:
--

 Summary: Allow sources to advance time to max watermark on 
checkpoint.
 Key: FLINK-11668
 URL: https://issues.apache.org/jira/browse/FLINK-11668
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing, Streaming
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas


This is needed for the TERMINATE case. It will allow the sources to inject the 
{{MAX_WATERMARK}} before the barrier that will trigger the last savepoint. This 
will fire any registered event-time timers and flush any state associated with 
these timers, e.g. windows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11667) Add Synchronous Checkpoint handling in StreamTask

2019-02-20 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11667:
--

 Summary: Add Synchronous Checkpoint handling in StreamTask
 Key: FLINK-11667
 URL: https://issues.apache.org/jira/browse/FLINK-11667
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas


This is the basic building block for the SUSPEND/TERMINATE functionality. 

In case of a synchronous checkpoint barrier, the checkpointing thread will 
block (without holding the checkpoint lock) until the 
{{notifyCheckpointComplete}} is executed successfully. This  will allow the 
checkpoint to be considered successful ONLY when also the 
{{notifyCheckpointComplete}} is successfully executed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: StreamingFileSink causing AmazonS3Exception

2019-02-18 Thread Kostas Kloudas
>   at
>> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>>   at
>> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>>   at
>> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>>   at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>   at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>   at java.lang.Thread.run(Thread.java:748)
>>
>> From this, you can see that for (some reason) AWS fails to write a
>> multi-part chunk and then tries to reset the input stream in order to retry
>> but fails (because the InputStream is not mark-able)
>>
>> That exception is swallowed (it seems like it should be raised up to
>> client, but isn't for an unknown reason). The s3-client then tries to
>> repeat the request using it's built in retry logic, however, because the
>> InputStream is consumed
>> and has no more bytes to write, we never fill up the expected
>> content-length that the s3 put request is expecting. Eventually, after it
>> hits the max number of retries, it fails and you get the error above.
>>
>> I just started running a fix for this (which is a hack not the real
>> solution) here:
>> https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6
>>
>> This whole thing is documented here:
>> https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html
>>
>> However, I found that just using the documented property didn't appear to
>> work and I had to wrap the InputStream in the BufferedInputStream for it to
>> work.
>>
>> I think the real fix is either to:
>>
>> 1. Use the BufferedInputStream but make it configurable
>> 2. Refactor S3AccessHelper to have another signature that takes a File
>> object and change the RefCountedFSOutputStream to also be able to give a
>> reference the the underlying file.
>>
>> I can pretty easily do this work, but would be curious the direction that
>> the maintainers would prefer.
>>
>> Thanks,
>>
>> Addison!
>>
>>
>>
>>
>>
>>
>> On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Steffen,
>>>
>>> Thanks for reporting this.
>>>
>>> Internally Flink does not keep any open connections to S3.  It only
>>> keeps buffers data internally up
>>> till the point they reach a min-size limit (by default 5MB) and then
>>> uploads them as a part of
>>> an MPU on one go. Given this, I will have to dig a bit dipper to see why
>>> a connection would timeout.
>>>
>>> If you are willing to dig into the code, all interactions with S3 pass
>>> through the S3AccessHelper
>>> class and its implementation, the HadoopS3AccessHelper. For the
>>> buffering and uploading logic,
>>> you could have a look at the S3RecoverableWriter and the
>>> S3RecoverableFsDataOutputStream.
>>>
>>> I will keep looking into it. In the meantime, if you find anything let
>>> us know.
>>>
>>> Cheers,
>>> Kostas
>>>
>>>
> *Grab is hiring. Learn more at **https://grab.careers
> <https://grab.careers/>*
>
> By communicating with Grab Inc and/or its subsidiaries, associate
> companies and jointly controlled entities (“Grab Group”), you are deemed to
> have consented to processing of your personal data as set out in the
> Privacy Notice which can be viewed at https://grab.com/privacy/
>
> This email contains confidential information and is only for the intended
> recipient(s). If you are not the intended recipient(s), please do not
> disseminate, distribute or copy this email and notify Grab Group
> immediately if you have received this by mistake and delete this email from
> your system. Email transmission cannot be guaranteed to be secure or
> error-free as any information therein could be intercepted, corrupted,
> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
> not accept liability for any errors or omissions in the contents of this
> email arises as a result of email transmission. All intellectual property
> rights in this email and attachments therein shall remain vested in Grab
> Group, unless otherwise provided by law.
>


-- 

Kostas Kloudas | Software Engineer


<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


[DISCUSS] FLIP-33: Terminate/Suspend Job with Savepoint

2019-02-12 Thread Kostas Kloudas
Hi everyone,

 A commonly used functionality offered by Flink is the
"cancel-with-savepoint" operation. When applied to the current exactly-once
sinks, the current implementation of the feature can be problematic, as it
does not guarantee that side-effects will be committed by Flink to the 3rd
party storage system.

 This discussion targets fixing this issue and proposes the addition of two
termination modes, namely:
1) SUSPEND, for temporarily stopping the job, e.g. for Flink version
upgrading in your cluster
2) TERMINATE, for terminal shut down which ends the stream and sends
MAX_WATERMARK time, and flushes any state associated with (event time)
timers

A google doc with the FLIP proposal can be found here:
https://docs.google.com/document/d/1EZf6pJMvqh_HeBCaUOnhLUr9JmkhfPgn6Mre_z6tgp8/edit?usp=sharing

And the page for the FLIP is here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212

 The implementation sketch is far from complete, but it is worth having a
discussion on the semantics as soon as possible. The implementation section
is going to be updated soon.

 Looking forward to the discussion,
 Kostas

-- 

Kostas Kloudas | Software Engineer


<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


[jira] [Created] (FLINK-11574) Make StreamTask properly handle the DRAIN message.

2019-02-11 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11574:
--

 Summary: Make StreamTask properly handle the DRAIN message.
 Key: FLINK-11574
 URL: https://issues.apache.org/jira/browse/FLINK-11574
 Project: Flink
  Issue Type: Sub-task
  Components: Core, Streaming, TaskManager
Affects Versions: 1.8.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.8.0


In this case, the {{StreamTask}} has to;
- send {{MAX_WATERMARK}} so that timers fire
- launch the checkpointing process
- wait until also for the checkpoint notification callback to be successfully 
executed
- make sure no elements are processed by downstream operators (req. for 
exactly-once with idempotent updates)
-  call {{close()}}
- let the job go gracefully to {{FINISHED}}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11575) Let the JobManager send the SUSPEND and DRAIN messages

2019-02-11 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11575:
--

 Summary: Let the JobManager send the SUSPEND and DRAIN messages
 Key: FLINK-11575
 URL: https://issues.apache.org/jira/browse/FLINK-11575
 Project: Flink
  Issue Type: Sub-task
  Components: Core, Streaming, TaskManager
Affects Versions: 1.8.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.8.0


Expose the functionality to the job manager and subsequently to the user.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11573) Make StreamTask properly handle the SUSPEND message.

2019-02-11 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11573:
--

 Summary: Make StreamTask properly handle the SUSPEND message.
 Key: FLINK-11573
 URL: https://issues.apache.org/jira/browse/FLINK-11573
 Project: Flink
  Issue Type: Sub-task
  Components: Core, Streaming, TaskManager
Affects Versions: 1.8.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.8.0


In this case, the {{StreamTask}} has to;
- launch the checkpointing process
- wait until also for the checkpoint notification callback to be successfully 
executed
- make sure no elements are processed by downstream operators (req. for 
exactly-once with idempotent updates)
- do not call {{close()}}
- let the job go gracefully to {{FINISHED}}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11572) Add new types of CheckpointBarriers.

2019-02-11 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11572:
--

 Summary: Add new types of CheckpointBarriers.
 Key: FLINK-11572
 URL: https://issues.apache.org/jira/browse/FLINK-11572
 Project: Flink
  Issue Type: Sub-task
  Components: Core
Affects Versions: 1.8.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.8.0


Add the new type of barriers that will be sent to the JobManager to the 
TaskManagers and which will trigger the {{DRAINING}} or {{SUSPENSION}} of the 
pipeline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11458) Ensure sink commit side-effects when cancelling with savepoint.

2019-01-29 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11458:
--

 Summary: Ensure sink commit side-effects when cancelling with 
savepoint.
 Key: FLINK-11458
 URL: https://issues.apache.org/jira/browse/FLINK-11458
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.8.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.8.0


TBD.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Bot for stale PRs on GitHub

2019-01-13 Thread Kostas Kloudas
+1 to try the bot.

It may, at first, seem less empathetic than a solution that involves a
human monitoring the PRs,
but, in essence, having a PR stale for months (or even years) is at least
as discouraging for a
new contributor.

Labels could further reduce the problem of noise, but I think that this
"noise" is a necessary evil
during the "transition period" of moving from the current situation to one
with cleaner PR backlog.

Cheers,
Kostas

On Sun, Jan 13, 2019 at 1:02 PM Dominik Wosiński  wrote:

> >
> > Hey,
> >
> I agree with Timo here that we should introduce labels that will improve
> communication for PRs. IMHO this will show what percentage of PRs is really
> stale and not just abandoned due to the misunderstanding or other
> communication issues.
>
> Best Regards,
> Dom.
>


Re: [DISCUSS] Dropping flink-storm?

2019-01-10 Thread Kostas Kloudas
+1 to drop as well.

On Thu, Jan 10, 2019 at 10:15 AM Ufuk Celebi  wrote:

> +1 to drop.
>
> I totally agree with your reasoning. I like that we tried to keep it,
> but I don't think the maintenance overhead would be justified.
>
> – Ufuk
>
> On Wed, Jan 9, 2019 at 4:09 PM Till Rohrmann  wrote:
> >
> > With https://issues.apache.org/jira/browse/FLINK-10571, we will remove
> the
> > Storm topologies from Flink and keep the wrappers for the moment.
> >
> > However, looking at the FlinkTopologyContext [1], it becomes quite
> obvious
> > that Flink's compatibility with Storm is really limited. Almost all of
> the
> > context methods are not supported which makes me wonder how useful these
> > wrappers really are. Given the additional maintenance overhead of having
> > them in the code base and no indication that someone is actively using
> > them, I would still be in favour of removing them. This will reduce our
> > maintenance burden in the future. What do you think?
> >
> > [1]
> >
> https://github.com/apache/flink/blob/master/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
> >
> > Cheers,
> > Till
> >
> > On Tue, Oct 9, 2018 at 10:08 AM Fabian Hueske  wrote:
> >
> > > Yes, let's do it this way.
> > > The wrapper classes are probably not too complex and can be easily
> tested.
> > > We have the same for the Hadoop interfaces, although I think only the
> > > Input- and OutputFormatWrappers are actually used.
> > >
> > >
> > > Am Di., 9. Okt. 2018 um 09:46 Uhr schrieb Chesnay Schepler <
> > > ches...@apache.org>:
> > >
> > >> That sounds very good to me.
> > >>
> > >> On 08.10.2018 11:36, Till Rohrmann wrote:
> > >> > Good point. The initial idea of this thread was to remove the storm
> > >> > compatibility layer completely.
> > >> >
> > >> > During the discussion I realized that it might be useful for our
> users
> > >> > to not completely remove it in one go. Instead for those who still
> > >> > want to use some Bolt and Spout code in Flink, it could be nice to
> > >> > keep the wrappers. At least, we could remove flink-storm in a more
> > >> > graceful way by first removing the Topology and client parts and
> then
> > >> > the wrappers. What do you think?
> > >> >
> > >> > Cheers,
> > >> > Till
> > >> >
> > >> > On Mon, Oct 8, 2018 at 11:13 AM Chesnay Schepler <
> ches...@apache.org
> > >> > > wrote:
> > >> >
> > >> > I don't believe that to be the consensus. For starters it is
> > >> > contradictory; we can't /drop /flink-storm yet still /keep
> //some
> > >> > parts/.
> > >> >
> > >> > From my understanding we drop flink-storm completely, and put a
> > >> > note in the docs that the bolt/spout wrappers of previous
> versions
> > >> > will continue to work.
> > >> >
> > >> > On 08.10.2018 11:04, Till Rohrmann wrote:
> > >> >> Thanks for opening the issue Chesnay. I think the overall
> > >> >> consensus is to drop flink-storm and only keep the Bolt and
> Spout
> > >> >> wrappers. Thanks for your feedback!
> > >> >>
> > >> >> Cheers,
> > >> >> Till
> > >> >>
> > >> >> On Mon, Oct 8, 2018 at 9:37 AM Chesnay Schepler
> > >> >> mailto:ches...@apache.org>> wrote:
> > >> >>
> > >> >> I've created
> > >> >> https://issues.apache.org/jira/browse/FLINK-10509 for
> > >> >> removing flink-storm.
> > >> >>
> > >> >> On 28.09.2018 15:22, Till Rohrmann wrote:
> > >> >> > Hi everyone,
> > >> >> >
> > >> >> > I would like to discuss how to proceed with Flink's storm
> > >> >> compatibility
> > >> >> > layer flink-strom.
> > >> >> >
> > >> >> > While working on removing Flink's legacy mode, I noticed
> > >> >> that some parts of
> > >> >> > flink-storm rely on the legacy Flink client. In fact, at
> > >> >> the moment
> > >> >> > flink-storm does not work together with Flink's new
> > >> distributed
> > >> >> > architecture.
> > >> >> >
> > >> >> > I'm also wondering how many people are actually using
> > >> >> Flink's Storm
> > >> >> > compatibility layer and whether it would be worth
> porting it.
> > >> >> >
> > >> >> > I see two options how to proceed:
> > >> >> >
> > >> >> > 1) Commit to maintain flink-storm and port it to Flink's
> > >> >> new architecture
> > >> >> > 2) Drop flink-storm
> > >> >> >
> > >> >> > I doubt that we can contribute it to Apache Bahir [1],
> > >> >> because once we
> > >> >> > remove the legacy mode, this module will no longer work
> > >> >> with all newer
> > >> >> > Flink versions.
> > >> >> >
> > >> >> > Therefore, I would like to hear your opinion on this and
> in
> > >> >> particular if
> > >> >> > you are using or planning to use flink-storm in the
> future.
> > >> >>   

[jira] [Created] (FLINK-11116) Clean-up temporary files that upon recovery, they belong to no checkpoint.

2018-12-10 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-6:
--

 Summary: Clean-up temporary files that upon recovery, they belong 
to no checkpoint.
 Key: FLINK-6
 URL: https://issues.apache.org/jira/browse/FLINK-6
 Project: Flink
  Issue Type: Improvement
  Components: filesystem-connector
Affects Versions: 1.7.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.7.1


In order to guarantee exactly-once semantics, the streaming file sink is 
implementing a two-phase commit protocol when writing files to the filesystem.

Initially data is written to in-progress files. These files are then put into 
"pending" state when they are completed (based on the rolling policy), and they 
are finally committed when the checkpoint that put them in the "pending" state 
is acknowledged as complete.

The above shows that in the case that we have:
1) checkpoints A, B, C coming 
2) checkpoint A being acknowledged and 
3) failure

Then we may have files that do not belong to any checkpoint (because B and C 
were not considered successful). These files are currently not cleaned up.

This issue aims at cleaning up these files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: support/docs for compression in StreamingFileSink

2018-12-03 Thread Kostas Kloudas
Hi Addison,

Sorry for the late reply.

I agree that the documentation can be significantly improved
and that adding compression could be a nice thing to have.

There is already a PR open for supporting writing SequenceFiles with
the StreamingFileSink. When this gets merged, you will be able to use
compression when writing SequenceFiles (
https://github.com/apache/flink/pull/6774).

If this is not enough and you want to write plain-text and compress
it when you finalise your part-file, then you are right that you will need
to
write your own BulkWriter.

As you said, BulkWriters have only one RollingPolicy, and this is that they
roll on every checkpoint but there are plans to alleviate this limitation
in the future.

Cheers,
Kostas


On Thu, Nov 15, 2018 at 10:25 AM Till Rohrmann  wrote:

> Hi Addison,
>
> I think it is a good idea to add some more details to the documentation.
> Thus, it would be great if you could contribute how to enable compression.
>
> Concerning the RollingPolicy, I've pulled in Klou who might give you more
> details about the design decisions.
>
> Cheers,
> Till
>
> On Wed, Nov 14, 2018 at 10:07 PM Addison Higham 
> wrote:
>
>> Just noticed one detail about using the BulkWriter interface, you no
>> longer
>> can assign a rolling policy. That makes sense for formats like
>> orc/parquet,
>> but perhaps not for simple text compression.
>>
>>
>>
>> On Wed, Nov 14, 2018 at 1:43 PM Addison Higham 
>> wrote:
>>
>> > HI all,
>> >
>> > I am moving some code to use the StreamingFileSink. Currently, it
>> doesn't
>> > look like there is any native support for compression (gzip or
>> otherwise)
>> > built into flink when using the StreamingFileSink. It seems like this
>> is a
>> > really common need that as far as I could tell, wasn't represented in
>> jira.
>> >
>> > After a fair amount of digging, it seems like the way to do that is to
>> > implement that is the BulkWriter interface where you can trivially wrap
>> an
>> > outputStream with something like a GZIPOutputStream.
>> >
>> > It seems like it would make sense that until compression functionality
>> is
>> > built into the StreamingFileSink, it might make sense to add some docs
>> on
>> > how to use compression with the StreamingFileSink.
>> >
>> > I am willing to spend a bit of time documenting that, but before I do i
>> > wanted to make sure I understand if that is in fact the correct way to
>> > think about this problem and get your thoughts.
>> >
>> > Thanks!
>> >
>> >
>> >
>>
>


Re: Looking for relevant sources related to connecting Apache Flink and Edgent.

2018-11-29 Thread Kostas Kloudas
Hi again,

I forgot to say that, unfortunately, I am not familiar with Apache Edgent,
but if you can write your filter in Edgent's programming model,
Then you can push your data from Edgent to a third party storage system
(e.g. Kafka, HDFS, etc) and use Flink's connectors, instead of
having to implement a custom source.

Cheers,
Kostas

On Thu, Nov 29, 2018 at 11:08 AM Kostas Kloudas 
wrote:

> Hi Felipe,
>
> This seems related to your previous question about a custom scheduler that
> knows which task to run on which machine.
> As Chesnay said, this is a rather involved and laborious task, if you want
> to do it as a general framework.
>
> But if you know what operation to push down, then why not decoupling the
> two and implementing the filtering as a separate job
> running on your Raspberry and a new job which consumes the output of the
> first and does the analytics?
>
> Cheers,
> Kostas
>
> On Thu, Nov 29, 2018 at 10:23 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to design a little prototype with Flink and Apache Edgent (
>> http://edgent.apache.org/) and I would like some help on the direction
>> for it. I am running Flink at my laptop and Edgent on my Raspberry Pi with
>> a simple filter for a proximity sensor (
>> https://github.com/felipegutierrez/explore-rpi/blob/master/src/main/java/org/sense/edgent/app/UltrasonicEdgentApp.java
>> ).
>>
>> My idea is to push down the filter operator from Flink to the Raspberry
>> Pi which is running Apache Edgent. With this in mind, where do you guys
>> advise me to start?
>>
>> I have some ideas to study...
>> 1 - Try to get the list of operators that Flink is about to execute on
>> the JobManager. source:
>> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
>> 2 - Implement a connector to Apache Edgent in order to exchange messages
>> between Flink and Edgent.
>>
>> Do you guys think in another source that is interesting regarding my
>> prototype?
>>
>> Thanks,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>


Re: Looking for relevant sources related to connecting Apache Flink and Edgent.

2018-11-29 Thread Kostas Kloudas
Hi Felipe,

This seems related to your previous question about a custom scheduler that
knows which task to run on which machine.
As Chesnay said, this is a rather involved and laborious task, if you want
to do it as a general framework.

But if you know what operation to push down, then why not decoupling the
two and implementing the filtering as a separate job
running on your Raspberry and a new job which consumes the output of the
first and does the analytics?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:23 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I am trying to design a little prototype with Flink and Apache Edgent (
> http://edgent.apache.org/) and I would like some help on the direction
> for it. I am running Flink at my laptop and Edgent on my Raspberry Pi with
> a simple filter for a proximity sensor (
> https://github.com/felipegutierrez/explore-rpi/blob/master/src/main/java/org/sense/edgent/app/UltrasonicEdgentApp.java
> ).
>
> My idea is to push down the filter operator from Flink to the Raspberry Pi
> which is running Apache Edgent. With this in mind, where do you guys advise
> me to start?
>
> I have some ideas to study...
> 1 - Try to get the list of operators that Flink is about to execute on the
> JobManager. source:
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
> 2 - Implement a connector to Apache Edgent in order to exchange messages
> between Flink and Edgent.
>
> Do you guys think in another source that is interesting regarding my
> prototype?
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-27 Thread Kostas Kloudas
Hi Biao,

Thanks for the answer!

So given the multi-threaded readers, now we have as open questions:

1) How do we let the checkpoints pass through our multi-threaded reader
operator?

2) Do we have separate reader and source operators or not? In the strategy
that has a separate source, the source operator has a parallelism of 1 and
is responsible for split recovery only.

For the first one, given also the constraints (blocking, finite queues,
etc), I do not have an answer yet.

For the 2nd, I think that we should go with separate operators for the
source and the readers, for the following reasons:

1) This is more aligned with a potential future improvement where the split
discovery becomes a responsibility of the JobManager and readers are
pooling more work from the JM.

2) The source is going to be the "single point of truth". It will know what
has been processed and what not. If the source and the readers are a single
operator with parallelism > 1, or in general, if the split discovery is
done by each task individually, then:
   i) we have to have a deterministic scheme for each reader to assign
splits to itself (e.g. mod subtaskId). This is not necessarily trivial for
all sources.
   ii) each reader would have to keep a copy of all its processed slpits
   iii) the state has to be a union state with a non-trivial merging logic
in order to support rescaling.

Two additional points that you raised above:

i) The point that you raised that we need to keep all splits (processed and
not-processed) I think is a bit of a strong requirement. This would imply
that for infinite sources the state will grow indefinitely. This is problem
is even more pronounced if we do not have a single source that assigns
splits to readers, as each reader will have its own copy of the state.

ii) it is true that for finite sources we need to somehow not close the
readers when the source/split discoverer finishes. The
ContinuousFileReaderOperator has a work-around for that. It is not elegant,
and checkpoints are not emitted after closing the source, but this, I
believe, is a bigger problem which requires more changes than just
refactoring the source interface.

Cheers,
Kostas


Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-26 Thread Kostas Kloudas
Hi all,

>From the discussion, I understand that we are leaning towards a design
where the user writes a single-threaded SplitReader, which Flink executes
on another thread (not the main task thread). This way the task can have
multiple readers running concurrently, each one reading a different split.

Each of these threads writes in its own queue. These queues are then polled
by the main thread (based on a potentially user-defined prioritization),
which is responsible for emitting data downstream. There were also
proposals for a single shared queue, but I believe that 1) the contention
for the lock in such a queue can be a limitation and 2) it is not easy to
prioritise which elements to consume first (assuming that we want to
support different prioritisation strategies).

Assuming the above model, I have the following question:

We have the split/shard/partition discovery logic outside the "reader"
operator. For now it can be a plain old source function with parallelism of
1 that periodically checks for new splits (for an example see the existing
ContinuousFileMonitoringFunction).[1]

This source sends the split to be read downstream to the multi-threaded
readers. In these settings, there must be a "throttling" or
"rate-limitting" mechanism that guaranttees that we do not surpass the
capabilities of the machines. The first thing that comes to mind is some
kind of a fixed size (blocking) queue or a fixed size thread pool. The main
thread adds splits to the queue and the readers consume them. When the
queue or the pool is full, then we block (backpressure).

In the case above, how do we make sure that the checkpoints still go
through?

Cheers,
Kostas

PS: I am assuming the current task implementation and not an "actor" based
one.

*[1] The ContinuousFileReaderOperator has a single thread (different from
the main task thread) consuming the splits one by one. Unfortunately, there
is no rate-limiting mechanism.


On Sun, Nov 25, 2018 at 6:40 PM Biao Liu  wrote:

> Hi community,
> Glad to see this topic is still so active.
>
> Thanks for replying @Piotrek and @Becket.
>
> Last time, I expressed some rough ideas about the thread model. However I
> found that it's hard to describe clearly in mailing list. So I wrote it
> down with some graphs, exampled some kinds of models, see Thread Model of
> Source
> <
> https://docs.google.com/document/d/1XpYkkJo97CUw-UMVrKU6b0ZZuJJ2V7mBb__L6UzdWTw/edit?usp=sharing
> >.
> I wish that can be helpful.
>
> IMO thread model is an important part. Without thinking of implementation
> clearly, it's difficult to decide what the up level interface should look
> like.
> It would be better if we draw the whole picture first and then fill the
> detail parts one by one.
>
> @Piotrek About adding new splits to existing split reader. It's an
> interesting idea. Not only for solving too many threads problem, but also
> for supporting some more complicated system. I know in some storage
> systems, there is some scenario which the partition is dynamic(dynamically
> splitting or merging). Though I have not think of it very clearly now. I
> would give you more detailed reply asap :)
>
>
> Guowei Ma  于2018年11月23日周五 下午6:37写道:
>
> > Hi,Piotr
> > Sorry  for so late to response.
> >
> >
> > First of all I think Flink runtime can assigned a thread for a
> StreamTask,
> > which likes  'Actor' model. The number of threads for a StreamTask should
> > not be proportional to the operator or other things. This will give Flink
> > the ability to scale horizontally. So I think it's not just the
> > network(flush),checkpoint and  source, but some operators' threads can
> also
> > be removed in the future, like AsyncWaitOperator.
> >
> >
> >
> > for b)
> > When using event time, some sources want to assign a timestamp to each
> > element. In current Flink interface, user will write like this
> > public class EventTimeSource implements SourceFunction {
> >   public void run() {
> >  while(...){
> >  Element record = // get from file or some queue;
> >  long timestamp = parseTimestampFromElement(record);
> >  sourceContext.collectWithTimestamp(record, timestamp);
> >  }
> >   }
> > }
> > Using the interfaces from this FLIP, user can write like this
> >
> > public EventTimeSplitReader implements SplitReader {
> > Element currentRecord = null;
> >
> >
> > // Please ignoring the handling of boundary conditions
> > public boolean advace(){
> >currentRecord = //move a pointer forward
> >return true;
> >  }
> >
> > public Element getCurrent(){
> >return currentRecord;
> > }
> > public long getCurrentTimestamp() {
> >   return parseTimestampFromElement(currentRecord);
> > }
> > }
> >
> > if merging the advance/getNext to a method like getNext() , the
> SplitReader
> > interface may need to change a little like this
> >
> > public interface SplitReader2 {
> > public class ElementWithTimestamp {
> > T element;
> > long 

[jira] [Created] (FLINK-10963) Cleanup small objects uploaded to S3 as independent objects

2018-11-21 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10963:
--

 Summary: Cleanup small objects uploaded to S3 as independent 
objects
 Key: FLINK-10963
 URL: https://issues.apache.org/jira/browse/FLINK-10963
 Project: Flink
  Issue Type: Sub-task
  Components: filesystem-connector
Affects Versions: 1.7.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.7.1


The S3 {{RecoverableWriter}} uses the Multipart Upload (MPU) Feature of S3 in 
order to upload the different part files. This means that a large part is split 
in chunks of at least 5MB which are uploaded independently, whenever each one 
of them is ready.

This 5MB minimum size requires special handling of parts that are less than 5MB 
when a checkpoint barrier arrives. These small files are uploaded as 
independent objects (not associated with an active MPU). This way, when Flink 
needs to restore, it simply downloads them and resumes writing to them.

These small objects are currently not cleaned up, thus leading to wasted space 
on S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10803) Add documentation about S3 support by the StreamingFileSink

2018-11-06 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10803:
--

 Summary: Add documentation about S3 support by the 
StreamingFileSink
 Key: FLINK-10803
 URL: https://issues.apache.org/jira/browse/FLINK-10803
 Project: Flink
  Issue Type: Bug
  Components: filesystem-connector
Affects Versions: 1.7.0
Reporter: Kostas Kloudas
 Fix For: 1.7.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10739) Unstable ProcessFailureCancelingITCase.testCancelingOnProcessFailure

2018-10-31 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10739:
--

 Summary: Unstable 
ProcessFailureCancelingITCase.testCancelingOnProcessFailure
 Key: FLINK-10739
 URL: https://issues.apache.org/jira/browse/FLINK-10739
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.2
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.7.0


An instance can be found here https://api.travis-ci.org/v3/job/448844674/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10733) Misleading clean_log_files() in common.sh

2018-10-31 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10733:
--

 Summary: Misleading clean_log_files() in common.sh
 Key: FLINK-10733
 URL: https://issues.apache.org/jira/browse/FLINK-10733
 Project: Flink
  Issue Type: Bug
  Components: E2E Tests
Affects Versions: 1.6.2
Reporter: Kostas Kloudas


In the `common.sh` base script of the end-to-end tests, there is a 
`clean_stdout_files` which cleans only the `*.out` files and a 
`clean_log_files` which cleans *both* `*.log` and `*.out` files.

Given the current behavior that at the end of a test, the logs are checked and 
if there are exceptions (even expected ones but not whitelisted), the tests 
fails, some tests chose to call the `clean_log_files` so that exceptions are 
ignored. In this case, also `*.out` files are cleaned so if a test was checking 
for errors in the `.out` files, then the test will falsely pass.

The solution is as simple as renaming the method to something more descriptive 
like `clean_logs_and_output_files`, but doing so, also includes checking if any 
existing tests were falsely passing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: FLINK-9752 (s3 recoverable writer) not actually fixed in 1.6.2?

2018-10-31 Thread Kostas Kloudas
Hi Addison,

This is definitely an error on my end, as the feature is going to be available 
from Flink 1.7 onwards.
I forgot to correctly update the Flink version when closing the JIRA issue.

I will update the release notes accordingly.

Sorry for the miscommunication,
Kostas


> On Oct 31, 2018, at 5:45 AM, Addison Higham  wrote:
> 
> I have backported this at 
> https://github.com/instructure/flink/tree/s3_recover_backport 
>  by 
> cherry-picking all the relevant code, I am not sure how backports are usually 
> done with Flink (if you squash and merge) but there were a few minor 
> conflicts and involved quite a few changes from master. 
> 
> Going to try with my branch tomorrow and will report any issues.
> 
> 
> 
> On Tue, Oct 30, 2018 at 8:44 PM Mike Mintz  > wrote:
> FWIW I also tried this on Flink 1.6.2 today and got the same error. This is
> my full stack trace:
> 
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are only supported for HDFS and for Hadoop version 2.7 or newer
> at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)
> at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:111)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:277)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> 
> 
> On Tue, Oct 30, 2018 at 4:12 PM Till Rohrmann  > wrote:
> 
> > Hi Addison,
> >
> > I think the idea was to also backport this feature to 1.6 since we
> > considered it a bug that S3 was not supported in 1.6. I've pulled in Kostas
> > who worked on the S3 writer. @Klou did we intentionally not backport this
> > feature?
> >
> > I think there should be nothing special about backporting this feature and
> > building your own version of Flink.
> >
> > Cheers,
> > Till
> >
> > On Tue, Oct 30, 2018 at 10:54 PM Addison Higham  > >
> > wrote:
> >
> > > Hi all,
> > >
> > > Been hitting my head against a wall for the last few hours. The release
> > > notes for 1.6.2 show https://issues.apache.org/jira/browse/FLINK-9752 
> > >  as
> > > resolved in 1.6.2. I am trying to upgrade and switch some things to use
> > the
> > > StreamingFileSink against s3. However, when doing so, I get the following
> > > error:
> > >
> > > Recoverable writers on Hadoop are only supported for HDFS and for
> > > Hadoop version 2.7 or newer
> > >
> > >
> > > I started digging into the code and looking deeper, I don't see the code
> > at
> > > https://github.com/apache/flink/pull/6795 
> > >  as being backported to 1.6.2?
> > >
> > > Was the Fix Version erroneous? If so, are there plans to backport it? If
> > > not, seems like that should be fixed in the release notes.
> > >
> > > I have been waiting for that functionality and we already build our own
> > > flink, so I am tempted to backport it onto 1.6.2... anything tricky about
> > > that?
> > >
> > > Thanks!
> > >
> >



[jira] [Created] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10583:
--

 Summary: Add support for state retention to the Processing Time 
versioned joins.
 Key: FLINK-10583
 URL: https://issues.apache.org/jira/browse/FLINK-10583
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Affects Versions: 1.7.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.7.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10584) Add support for state retention to the Event Time versioned joins.

2018-10-17 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10584:
--

 Summary: Add support for state retention to the Event Time 
versioned joins.
 Key: FLINK-10584
 URL: https://issues.apache.org/jira/browse/FLINK-10584
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Affects Versions: 1.7.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.7.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10530) ProcessFailureCancelingITCase.testCancelingOnProcessFailure failed on Travis.

2018-10-11 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10530:
--

 Summary: 
ProcessFailureCancelingITCase.testCancelingOnProcessFailure failed on Travis.
 Key: FLINK-10530
 URL: https://issues.apache.org/jira/browse/FLINK-10530
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.7.0
Reporter: Kostas Kloudas
 Fix For: 1.7.0


The logs from Travis: https://api.travis-ci.org/v3/job/440109944/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10529) Add flink-s3-fs-base to the connectors in the travis stage file.

2018-10-11 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10529:
--

 Summary: Add flink-s3-fs-base to the connectors in the travis 
stage file.
 Key: FLINK-10529
 URL: https://issues.apache.org/jira/browse/FLINK-10529
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.7.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.7.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10522) Check if RecoverableWriter supportsResume and accordingly.

2018-10-10 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10522:
--

 Summary: Check if RecoverableWriter supportsResume and accordingly.
 Key: FLINK-10522
 URL: https://issues.apache.org/jira/browse/FLINK-10522
 Project: Flink
  Issue Type: Sub-task
  Components: filesystem-connector
Affects Versions: 1.6.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.7.0


So far we assumed that all `RecoverableWriters` support "resuming", i.e. after 
recovering from a failure or from a savepoint they could keep writing to the 
previously "in-progress" file. This assumption holds for all current writers, 
but in order to be able to accommodate also filesystems that may not support 
this operation, we should check upon initialization if the writer supports 
resuming and if yes, we go as before, if not, we recover for commit and commit 
the previously in-progress file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Dropping flink-storm?

2018-09-29 Thread Kostas Kloudas
+1 to drop it as nobody seems to be willing to maintain it and it also 
stands in the way for future developments in Flink.

Cheers,
Kostas

> On Sep 29, 2018, at 8:19 AM, Tzu-Li Chen  wrote:
> 
> +1 to drop it.
> 
> It seems few people use it. Commits history of an experimental
> module sparse often means that there is low interest.
> 
> Best,
> tison.
> 
> 
> 远远  于2018年9月29日周六 下午2:16写道:
> 
>> +1, it‘s time to drop it
>> 
>> Zhijiang(wangzhijiang999)  于2018年9月29日周六
>> 下午1:53写道:
>> 
>>> Very agree with to drop it. +1
>>> 
>>> --
>>> 发件人:Jeff Carter 
>>> 发送时间:2018年9月29日(星期六) 10:18
>>> 收件人:dev 
>>> 抄 送:chesnay ; Till Rohrmann ;
>>> user 
>>> 主 题:Re: [DISCUSS] Dropping flink-storm?
>>> 
>>> +1 to drop it.
>>> 
>>> On Fri, Sep 28, 2018, 7:25 PM Hequn Cheng  wrote:
>>> 
 Hi,
 
 +1 to drop it. It seems that few people use it.
 
 Best, Hequn
 
 On Fri, Sep 28, 2018 at 10:22 PM Chesnay Schepler 
 wrote:
 
> I'm very much in favor of dropping it.
> 
> Flink has been continually growing in terms of features, and IMO we've
> reached the point where we should cull some of the more obscure ones.
>>> 
> flink-storm, while interesting from a theoretical standpoint, offers too
> little value.
> 
>>> 
> Note that the bolt/spout wrapper parts of the part are still compatible,
> it's only topologies that aren't working.
> 
> IMO compatibility layers only add value if they ease the migration to
> Flink APIs.
>>> 
> * bolt/spout wrappers do this, but they will continue to work even if we
> drop it
> * topologies don't do this, so I'm not interested in then.
> 
> On 28.09.2018 15:22, Till Rohrmann wrote:
>> Hi everyone,
>> 
>> I would like to discuss how to proceed with Flink's storm
>> compatibility layer flink-strom.
>> 
>> While working on removing Flink's legacy mode, I noticed that some
>>> 
>> parts of flink-storm rely on the legacy Flink client. In fact, at the
>>> 
>> moment flink-storm does not work together with Flink's new distributed
>> architecture.
>> 
>> I'm also wondering how many people are actually using Flink's Storm
>> compatibility layer and whether it would be worth porting it.
>> 
>> I see two options how to proceed:
>> 
>> 1) Commit to maintain flink-storm and port it to Flink's new
 architecture
>> 2) Drop flink-storm
>> 
>>> 
>> I doubt that we can contribute it to Apache Bahir [1], because once we
>>> 
>> remove the legacy mode, this module will no longer work with all newer
>> Flink versions.
>> 
>>> 
>> Therefore, I would like to hear your opinion on this and in particular
>> if you are using or planning to use flink-storm in the future.
>> 
>> [1] https://github.com/apache/bahir-flink
>> 
>> Cheers,
>> Till
> 
> 
> 
 
>>> 
>>> 
>>> 



Re: Codespeed deployment for Flink

2018-09-21 Thread Kostas Kloudas
Thanks for this contribution Piotr and Nico. 

Tools like this are really useful for Flink’s success.

Cheers,
Kostas

> On Sep 21, 2018, at 4:59 PM, Piotr Nowojski  wrote:
> 
> Hello community,
> 
> For almost a year in data Artisans Nico and I were maintaining a setup
> that continuously evaluates Flink with benchmarks defined at
> https://github.com/dataArtisans/flink-benchmarks 
> . With growing interest
> and after proving useful a couple of times, we have finally decided to
> publish the web UI layer of this setup. Currently it is accessible via
> the following (maybe not so?) temporarily url:
> 
> http://codespeed.dak8s.net:8000 
> 
> This is a simple web UI to present performance changes over past and
> present commits to Apache Flink. It only has a couple of views and the
> most useful ones are:
> 
> 1. Timeline
> 2. Comparison (I recommend to use normalization)
> 
> Timeline is useful for spotting unintended regressions or unexpected
> improvements. It is being updated every six hours.
> Comparison is useful for comparing a given branch (for example a pending
> PR) with the master branch. More about that later.
> 
> The codespeed project on it’s own is just a presentation layer. As
> mentioned before, the only currently available benchmarks are defined in
> the flink-benchmarks repository and they are executed periodically or on
> demand by Jenkins on a single bare metal machine. The current setup
> limits us only to micro benchmarks (they are easier to
> setup/develop/maintain and have a quicker feedback loop compared to
> cluster benchmarks) but there is no reason preventing us from setting up 
> other kinds of benchmarks and upload their results to our codespeed 
> instance as well.
> 
> Regarding the comparison view. Currently data Artisans’ Flink mirror
> repository at https://github.com/dataArtisans/flink 
>  is configured to
> trigger benchmark runs on every commit/change that happens on the
> benchmark-request branch (We chose to use dataArtisans' repository here
> because we needed a custom GitHub hook that we couldn’t add to the
> apache/flink repository). Benchmarking usually takes between one and two
> hours. One obvious limitation at the moment is that there is only one
> comparison view, with one comparison branch, so trying to compare two
> PRs at the same time is impossible. However we can tackle
> this problem once it will become a real issue, not only a theoretical one.
> 
> Piotrek & Nico



Re: Watermark alignment during unit tests

2018-09-18 Thread Kostas Kloudas
Hi Eugen,

It is true that for ITcases this can be difficult and this should be improved 
in Flink’s testing infrastructure,
but for this specific PR, what you need to check is if the allowedLateness 
parameter is propagated correctly
throughout the translation process. The window operator with allowed lateness 
(which is applied next)
is covered by other tests.

In this case I would recommend to do the Joining of the stream “manually”, i.e.:

input1.coGroup(input2)
  .where(keySelector1)
  .equalTo(keySelector2)
  .window(windowAssigner)

and then from the resulting WithWindow, just try to get the allowed lateness 
and verify that this is the 
value that you provided.

This will cover the propagation and make sure that nobody breaks it in the 
future.

Cheers,
Kostas


> On Sep 18, 2018, at 11:40 AM, Евгений Юшин  wrote:
> 
> Hi devs
> 
> During the work on https://issues.apache.org/jira/browse/FLINK-10050 I've
> found unstable behaviour of unit tests for unioned streams (which are used
> in CoGroupedStream/JoinedStream under the hood).
> Let's assume we have late elements in one of the stream. The thing is we
> have no guarantees which source will be read first, and in which order
> watermark alignment will be applied. So, the following example produce
> different results for different invocation:
> 
> val s1 = env.addSource(new SourceFunction[(String, String)] {
>override def run(ctx: SourceFunction.SourceContext[(String,
> String)]): Unit = {
>  ctx.collectWithTimestamp(("a", "a1"), 1)
>  //wmAllignmentLock.synchronized {
>  //wmAllignmentLock.wait()
>  //}
>  ctx.emitWatermark(new Watermark(4))
>  ctx.collectWithTimestamp(("a", "a2"), 2)
>}
> 
>override def cancel(): Unit = {}
>  })
> 
>  val s2 = env.addSource(new SourceFunction[(String, String)] {
>override def run(ctx: SourceFunction.SourceContext[(String,
> String)]): Unit = {
>  ctx.collectWithTimestamp(("a", "b1"), 1)
>  ctx.emitWatermark(new Watermark(4))
>  //wmAllignmentLock.synchronized {
>  //wmAllignmentLock.notifyAll()
>  //}
>}
> 
>override def cancel(): Unit = {}
>  })
> 
>  val joined = s1.join(s2).where(_._1).equalTo(_._1)
>.window(TumblingEventTimeWindows.of(Time.milliseconds(3)))
>.apply((x, y) => s"$x:$y")
> For some invocations (when Flink decide to process 2nd source before
> 1st), ("a", "a2") is considered to be late and dropped; and vice
> versa.Here is the rate for 1000 invocations:
> Run JOIN periodic
> iteration [50] contains late total = 22, this iter = 22
> iteration [100] contains late total = 51, this iter = 29
> iteration [150] contains late total = 78, this iter = 27
> iteration [200] contains late total = 101, this iter = 23
> iteration [250] contains late total = 124, this iter = 23
> iteration [300] contains late total = 155, this iter = 31
> iteration [350] contains late total = 184, this iter = 29
> iteration [400] contains late total = 210, this iter = 26
> iteration [450] contains late total = 233, this iter = 23
> iteration [500] contains late total = 256, this iter = 23
> iteration [550] contains late total = 274, this iter = 18
> iteration [600] contains late total = 303, this iter = 29
> iteration [650] contains late total = 338, this iter = 35
> iteration [700] contains late total = 367, this iter = 29
> iteration [750] contains late total = 393, this iter = 26
> iteration [800] contains late total = 415, this iter = 22
> iteration [850] contains late total = 439, this iter = 24
> iteration [900] contains late total = 459, this iter = 20
> iteration [950] contains late total = 484, this iter = 25
> iteration [1000] contains late total = 502, this iter = 18
> contains late = 502
> 
> 
> It doesn't matter Periodic or Punctuated watermark assigner is used.
> As well as syncronization mechanism (commented in the code snippet
> above) doesn't help to align records in particular order.
> 
> While this behaviour is totally fine for Production case, I just
> wonder how to write stable unit test scenario to cover late elements
> processing.
> I didn't find any suitable test harness from utils.
> 
> Any feedback is appreciated!
> 
> Regards,
> Eugen



Re: [ANNOUNCE] New committer Gary Yao

2018-09-07 Thread Kostas Kloudas
Congratulations Gary! Well deserved!

Cheers,
Kostas

> On Sep 7, 2018, at 4:43 PM, Fabian Hueske  wrote:
> 
> Congratulations Gary!
> 
> 2018-09-07 16:29 GMT+02:00 Thomas Weise :
> 
>> Congrats, Gary!
>> 
>> On Fri, Sep 7, 2018 at 4:17 PM Dawid Wysakowicz 
>> wrote:
>> 
>>> Congratulations Gary! Well deserved!
>>> 
>>> On 07/09/18 16:00, zhangmingleihe wrote:
 Congrats Gary!
 
 Cheers
 Minglei
 
> 在 2018年9月7日,下午9:59,Andrey Zagrebin  写道:
> 
> Congratulations Gary!
> 
>> On 7 Sep 2018, at 15:45, Stefan Richter >> 
>>> wrote:
>> 
>> Congrats Gary!
>> 
>>> Am 07.09.2018 um 15:14 schrieb Till Rohrmann >> :
>>> 
>>> Hi everybody,
>>> 
>>> On behalf of the PMC I am delighted to announce Gary Yao as a new
>>> Flink
>>> committer!
>>> 
>>> Gary started contributing to the project in June 2017. He helped
>> with
>>> the
>>> Flip-6 implementation, implemented many of the new REST handlers,
>>> fixed
>>> Mesos issues and initiated the Jepsen-based distributed test suite
>>> which
>>> uncovered several serious issues. Moreover, he actively helps
>>> community
>>> members on the mailing list and with PR reviews.
>>> 
>>> Please join me in congratulating Gary for becoming a Flink
>> committer!
>>> 
>>> Cheers,
>>> Till
 
>>> 
>>> 
>> 



Re: Support Hadoop 2.6 for StreamingFileSink

2018-08-21 Thread Kostas Kloudas
Hi Artsem,

Till is correct in that getting rid of the “valid-length” file was a design 
decision 
for the new StreamingFileSink since the beginning. The motivation was that 
users were reporting that essentially it was very cumbersome to use.

In general, when the BucketingSink gets deprecated, I could see a benefit in 
having a 
legacy recoverable stream just in case you are obliged to use an older HDFS 
version. 
But, at least for now, this would be useful only for row-wise encoders, and NOT 
for 
bulk-encoders like Parquet.

The reason is that for now, when using bulk encoders you roll on every 
checkpoint.
This implies that you do not need truncate, or the valid length file. Given 
this, 
you may only need to write a Recoverable stream that just does not truncate.

Would you like to try it out and see if it works for your usecase?

Cheers,
Kostas

> On Aug 21, 2018, at 1:58 PM, Artsem Semianenka  wrote:
> 
> Thanks for reply, Till !
> 
> Buy the way, If Flink going to support compatibility with Hadoop 2.6 I don't 
> see another way how to achieve it. 
> As I mention before one of popular distributive Cloudera still based on 
> Hadoop 2.6 and it very sad if Flink unsupport it.
> I really want to help Flink comunity to support this legacy. But currently I 
> see only one way to acheve it by emulate 'truncate' logic and recreate new 
> file with needed lenght and replace old .
> 
> Cheers,
> Artsem
> 
> On Tue, 21 Aug 2018 at 14:41, Till Rohrmann  > wrote:
> Hi Artsem,
> 
> if I recall correctly, then we explicitly decided to not support the valid
> file length files with the new StreamingFileSink because they are really
> hard to handle for the user. I've pulled Klou into this conversation who is
> more knowledgeable and can give you a bit more advice.
> 
> Cheers,
> Till
> 
> On Mon, Aug 20, 2018 at 2:53 PM Artsem Semianenka  >
> wrote:
> 
> > I have an idea to create new version of HadoopRecoverableFsDataOutputStream
> > class (for example with name LegacyHadoopRecoverableFsDataOutputStream :) )
> > which will works with valid-length files without invoking truncate. And
> > modify check in HadoopRecoverableWriter to use
> > LegacyHadoopRecoverableFsDataOutputStream in case if Hadoop version is
> > lower then 2.7 . I will try to provide PR soon if no objections. I hope I
> > am on the right way.
> >
> > On Mon, 20 Aug 2018 at 14:40, Artsem Semianenka  > >
> > wrote:
> >
> > > Hi guys !
> > > I have a question regarding new StreamingFileSink (introduced in 1.6
> > > version) . We use this sink to write data into Parquet format. But I
> > faced
> > > with issue when trying to run job on Yarn cluster and save result to
> > HDFS.
> > > In our case we use latest Cloudera distributive (CHD 5.15) and it
> > contains
> > > HDFS 2.6.0  . This version is not support truncate method . I would like
> > to
> > > create Pull request but I want to ask your advice how better design this
> > > fix and which ideas are behind this decision . I saw similiar PR for
> > > BucketingSink https://github.com/apache/flink/pull/6108 
> > >  . Maybe I could
> > > also add support of valid-length files for older Hadoop versions ?
> > >
> > > P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
> > > Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .
> > >
> > > Best regards,
> > > Artsem
> > >
> >
> >
> > --
> >
> > С уважением,
> > Артем Семененко
> >
> 
> 
> -- 
> С уважением,
> Артем Семененко
> 



[jira] [Created] (FLINK-10097) More tests to increase StreamingFileSink test coverage

2018-08-07 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10097:
--

 Summary: More tests to increase StreamingFileSink test coverage
 Key: FLINK-10097
 URL: https://issues.apache.org/jira/browse/FLINK-10097
 Project: Flink
  Issue Type: Sub-task
  Components: filesystem-connector
Affects Versions: 1.6.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.6.1






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10029) Refactor the code for better separation of concerns.

2018-08-02 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10029:
--

 Summary: Refactor the code for better separation of concerns.
 Key: FLINK-10029
 URL: https://issues.apache.org/jira/browse/FLINK-10029
 Project: Flink
  Issue Type: Sub-task
  Components: filesystem-connector
Affects Versions: 1.6.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.6.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10027) Add logging to the StreamingFileSink

2018-08-02 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-10027:
--

 Summary: Add logging to the StreamingFileSink
 Key: FLINK-10027
 URL: https://issues.apache.org/jira/browse/FLINK-10027
 Project: Flink
  Issue Type: Sub-task
Reporter: Kostas Kloudas






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9994) IntervalJoinOperator#Context#getTimestamp does not return the Max timestamp.

2018-07-29 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-9994:
-

 Summary: IntervalJoinOperator#Context#getTimestamp does not return 
the Max timestamp.
 Key: FLINK-9994
 URL: https://issues.apache.org/jira/browse/FLINK-9994
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.6.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Cherry picking FLINK-9753 and FLINK-9753 on release-1.6

2018-07-23 Thread Kostas Kloudas
Thanks for the +1’s.

I added the relevant commits to the release-1.6 branch.

Cheers,
Kostas

> On Jul 22, 2018, at 3:05 PM, Till Rohrmann  wrote:
> 
> I would be in favor of including FLINK-9903 and FLINK-9753 in Flink 1.6
> because they add more value for the user at a comparable small risk of
> breaking the system since it is a connector. So +1 for cherry picking these
> two commits.
> 
> Cheers,
> Till
> 
> On Sat, Jul 21, 2018 at 6:16 PM Tony Wei  wrote:
> 
>> Hi Kostas,
>> 
>> Nice to see these features merged to the release-1.6 branch. Supporting
>> Parque format in StreamingFileSink
>> really makes me eager to apply it to some use cases in our company, but I'm
>> still wondering if the current
>> StreamingFileSink works well on S3 or I should wait util [FLINK-9752] done.
>> 
>> Anyway, I would still like to see this happen, because it let me have more
>> alternative to upload data with parquet
>> format. Thanks for your great efforts on this.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> https://issues.apache.org/jira/browse/FLINK-9752
>> 
>> 2018-07-21 4:24 GMT+08:00 Kostas Kloudas :
>> 
>>> Hi all,
>>> 
>>> We just merged on the master [FLINK-9753] (commit
>>> 66b1f854a0250bdd048808d40f93aa2990476841)
>>> and [FLINK-9903] (commit b56c75ca375049b1d2c80d2d0945ae1ae04eb39e).
>>> 
>>> These two commits introduce:
>>> 1) a big refactoring to the new StreamingFileSink which allows it to
>>> support bulk formats in general (FLINK-9753) and more expressive rolling
>>> and bucketing strategies, and
>>> 2) support for Parque, based on the previous refactoring.
>>> 
>>> Both these features were pretty popular in the mailing list.
>>> 
>>> Given that these 2 features are self contained, as they mainly affect a
>>> connector,  we would like to cherry pick these commits to the release-1.6
>>> branch. But for this, we would like to have the OK from the community.
>>> 
>>> Please let me know what you think,
>>> Kostas
>>> 
>>> https://issues.apache.org/jira/browse/FLINK-9903 <
>>> https://issues.apache.org/jira/browse/FLINK-9903>
>>> https://issues.apache.org/jira/browse/FLINK-9753 <
>>> https://issues.apache.org/jira/browse/FLINK-9753>
>> 



[jira] [Created] (FLINK-9921) Update the rolling policy interface.

2018-07-23 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-9921:
-

 Summary: Update the rolling policy interface.
 Key: FLINK-9921
 URL: https://issues.apache.org/jira/browse/FLINK-9921
 Project: Flink
  Issue Type: Sub-task
  Components: filesystem-connector
Affects Versions: 1.6.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.6.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Cherry picking FLINK-9753 and FLINK-9753 on release-1.6

2018-07-20 Thread Kostas Kloudas
Hi all,

We just merged on the master [FLINK-9753] (commit 
66b1f854a0250bdd048808d40f93aa2990476841)
and [FLINK-9903] (commit b56c75ca375049b1d2c80d2d0945ae1ae04eb39e). 

These two commits introduce:
1) a big refactoring to the new StreamingFileSink which allows it to support 
bulk formats in general (FLINK-9753) and more expressive rolling and bucketing 
strategies, and 
2) support for Parque, based on the previous refactoring.

Both these features were pretty popular in the mailing list. 

Given that these 2 features are self contained, as they mainly affect a 
connector,  we would like to cherry pick these commits to the release-1.6 
branch. But for this, we would like to have the OK from the community.

Please let me know what you think,
Kostas

https://issues.apache.org/jira/browse/FLINK-9903 

https://issues.apache.org/jira/browse/FLINK-9753 


[jira] [Created] (FLINK-9903) Add support for bulk writers.

2018-07-20 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-9903:
-

 Summary: Add support for bulk writers.
 Key: FLINK-9903
 URL: https://issues.apache.org/jira/browse/FLINK-9903
 Project: Flink
  Issue Type: Sub-task
Reporter: Kostas Kloudas






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Adding a part suffix setter to the BucketingSink

2018-07-12 Thread Kostas Kloudas
Hi Lakshmi,

I meant Flink 1.5.1 (not 1.5.3) which was recently released.

Cheers,
Kostas

> On Jul 12, 2018, at 7:34 PM, Lakshmi Gururaja Rao  wrote:
> 
> Hi Kostas,
> 
> Thank you for replying. I am already using the ability to set part suffix.
> I was not aware of this issue - https://issues.apache.org/
> jira/browse/FLINK-9603. Thanks for pointing out, I'll make sure to use the
> 1.5.3 version of the sink.
> 
> Thanks
> Lakshmi
> 
> 
> On Thu, Jul 12, 2018 at 4:55 AM, vino yang  wrote:
> 
>> Hi Kostas, good job!
>> 
>> 2018-07-12 19:40 GMT+08:00 Kostas Kloudas :
>> 
>>> Hi Lakshmi,
>>> 
>>> Since Flink-1.5 you have the ability to set the part suffix.
>>> As you said, you only want the .gzip to be the suffix of the final (or
>>> “completed”) part files, which is exactly what is currently supported.
>>> 
>>> If you want also intermediate files to have this suffix, then you can
>>> always set all the suffixes (in-progress, pending and final) to “.gzip”
>>> but then you have to also set the appropriate preffixes so that Flink can
>>> distinguish completed from non-completed files (filenames
>>> must not collide).
>>> 
>>> Also I would recommend to use the most recent stable version 1.5.3 which
>>> also includes this bug fix:
>>> https://issues.apache.org/jira/browse/FLINK-9603 <
>>> https://issues.apache.org/jira/browse/FLINK-9603>
>>> 
>>> I hope this helps,
>>> Kostas
>>> 
>>> 
>>>> On Apr 5, 2018, at 6:23 PM, Lakshmi Gururaja Rao 
>> wrote:
>>>> 
>>>> I can see two ways of achieving this:
>>>> 
>>>> 1. Setting a suffix* **only*** for the completed part files. I don't
>>>> necessarily think the suffix should be added for the intermediate files
>>> (as
>>>> intermediate files should not really be ready for consumption by a
>>>> downstream process?)
>>>> 2. Be able to override this partPath name creation -
>>>> https://github.com/apache/flink/blob/release-1.4.0/
>>> flink-connectors/flink-connector-filesystem/src/main/
>>> java/org/apache/flink/streaming/connectors/fs/
>>> bucketing/BucketingSink.java#L523
>>>> . That way any user who needs to set a custom/dynamic part file name
>> can
>>>> still do so.
>>>> 
>>>> Do you think either or one of these options is feasible?
>>>> 
>>>> Thanks
>>>> Lakshmi
>>>> 
>>>> On Tue, Apr 3, 2018 at 12:57 AM, Aljoscha Krettek >> 
>>>> wrote:
>>>> 
>>>>> So you want to be able to set a "global" suffix that should be
>> appended
>>> to
>>>>> all different kinds of files that the sink writes, including
>>> intermediate
>>>>> files?
>>>>> 
>>>>> Aljoscha
>>>>> 
>>>>>> On 29. Mar 2018, at 16:59, l...@lyft.com wrote:
>>>>>> 
>>>>>> Sorry, I meant "I don't see a way of doing this apart from setting a
>>>>> part file *suffix* with the required file extension. "
>>>>>> 
>>>>>> 
>>>>>> On 2018/03/29 14:55:43, l...@lyft.com  wrote:
>>>>>>> Currently the BucketingSink allows addition of part prefix, pending
>>>>> prefix/suffix and in-progress prefix/suffix via setter methods. Can we
>>> also
>>>>> support setting part suffixes?
>>>>>>> An instance where this maybe useful: I am currently writing GZIP
>>>>> compressed output to S3 using the BucketingSink and I would want the
>>>>> uploaded files to have a ".gz" or ".zip" extensions (if the files does
>>> not
>>>>> have such an extensionelse they are written as garbled bytes and don't
>>> get
>>>>> rendered correctly for reading). I don't see a way of doing this apart
>>> from
>>>>> setting a part file prefix with the required file extension.
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Lakshmi
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> *Lakshmi Gururaja Rao*
>>>> SWE
>>>> 217.778.7218 <+12177787218>
>>>> [image: Lyft] <http://www.lyft.com/>
>>> 
>>> 
>> 
> 
> 
> 
> -- 
> *Lakshmi Gururaja Rao*
> SWE
> 217.778.7218 <+12177787218>
> [image: Lyft] <http://www.lyft.com/>



Re: Adding a part suffix setter to the BucketingSink

2018-07-12 Thread Kostas Kloudas
Hi Lakshmi,

Since Flink-1.5 you have the ability to set the part suffix.
As you said, you only want the .gzip to be the suffix of the final (or 
“completed”) part files, which is exactly what is currently supported.

If you want also intermediate files to have this suffix, then you can always 
set all the suffixes (in-progress, pending and final) to “.gzip” 
but then you have to also set the appropriate preffixes so that Flink can 
distinguish completed from non-completed files (filenames 
must not collide).

Also I would recommend to use the most recent stable version 1.5.3 which also 
includes this bug fix:
https://issues.apache.org/jira/browse/FLINK-9603 


I hope this helps,
Kostas


> On Apr 5, 2018, at 6:23 PM, Lakshmi Gururaja Rao  wrote:
> 
> I can see two ways of achieving this:
> 
> 1. Setting a suffix* **only*** for the completed part files. I don't
> necessarily think the suffix should be added for the intermediate files (as
> intermediate files should not really be ready for consumption by a
> downstream process?)
> 2. Be able to override this partPath name creation -
> https://github.com/apache/flink/blob/release-1.4.0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L523
> . That way any user who needs to set a custom/dynamic part file name can
> still do so.
> 
> Do you think either or one of these options is feasible?
> 
> Thanks
> Lakshmi
> 
> On Tue, Apr 3, 2018 at 12:57 AM, Aljoscha Krettek 
> wrote:
> 
>> So you want to be able to set a "global" suffix that should be appended to
>> all different kinds of files that the sink writes, including intermediate
>> files?
>> 
>> Aljoscha
>> 
>>> On 29. Mar 2018, at 16:59, l...@lyft.com wrote:
>>> 
>>> Sorry, I meant "I don't see a way of doing this apart from setting a
>> part file *suffix* with the required file extension. "
>>> 
>>> 
>>> On 2018/03/29 14:55:43, l...@lyft.com  wrote:
 Currently the BucketingSink allows addition of part prefix, pending
>> prefix/suffix and in-progress prefix/suffix via setter methods. Can we also
>> support setting part suffixes?
 An instance where this maybe useful: I am currently writing GZIP
>> compressed output to S3 using the BucketingSink and I would want the
>> uploaded files to have a ".gz" or ".zip" extensions (if the files does not
>> have such an extensionelse they are written as garbled bytes and don't get
>> rendered correctly for reading). I don't see a way of doing this apart from
>> setting a part file prefix with the required file extension.
 
 Thanks
 Lakshmi
 
>> 
>> 
> 
> 
> -- 
> *Lakshmi Gururaja Rao*
> SWE
> 217.778.7218 <+12177787218>
> [image: Lyft] 



Re: [ANNOUNCE] New committer: Sihua Zhou

2018-06-22 Thread Kostas Kloudas
Congratulations!

On Fri, Jun 22, 2018, 21:33 Shuyi Chen  wrote:

> Congratulations!
>
> On Fri, Jun 22, 2018 at 11:08 AM Matthias J. Sax  wrote:
>
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > Congrats!
> >
> > On 6/22/18 10:33 AM, shimin yang wrote:
> > > Congrats!
> > >
> > > On Sat, Jun 23, 2018 at 1:13 AM Chen Qin 
> > > wrote:
> > >
> > >> Congrats!
> > >>
> > >>> On Jun 22, 2018, at 9:48 AM, Ted Yu 
> > >>> wrote:
> > >>>
> > >>> Congratulations Sihua!
> > >>>
> >  On Fri, Jun 22, 2018 at 6:42 AM, zhangminglei
> >  <18717838...@163.com>
> > >> wrote:
> > 
> >  Congrats! Sihua
> > 
> >  Cheers Minglei.
> > 
> > > 在 2018年6月22日,下午9:17,Till Rohrmann  写
> > > 道:
> > >
> > > Hi everybody,
> > >
> > > On behalf of the PMC I am delighted to announce Sihua Zhou
> > > as a new
> > >> Flink
> > > committer!
> > >
> > > Sihua has been an active member of our community for
> > > several months.
> >  Among
> > > other things, he helped developing Flip-6, improved
> > > Flink's state
> >  backends
> > > and fixed a lot of major and minor issues. Moreover, he is
> > > helping the Flink community reviewing PRs, answering users
> > > on the mailing list and proposing new features.
> > >
> > > Please join me in congratulating Sihua for becoming a
> > > Flink committer!
> > >
> > > Cheers, Till
> > 
> > 
> > 
> > >>
> > >
> > -BEGIN PGP SIGNATURE-
> > Comment: GPGTools - https://gpgtools.org
> >
> > iQIzBAEBCgAdFiEEeiQdEa0SVXokodP3DccxaWtLg18FAlstOykACgkQDccxaWtL
> > g18BnA/+OnY+NscS/uCYud9A0cM8Tj2z2QdoQ/ILe6jvvtcRX9SncYdZ1tNDcrPt
> > ogjOPR/2Uawz6u3tgC/ddjYeMb0YUewKaa2GHwUsD51222iYhQH1uor73rVT9pbz
> > u8xoC1x/NcaHr2XrQLlyToacMm7oh1fL66+sBHeoE3k0UDeFsJmh5LdKbMSZT5KG
> > yfrll9ND/PLKmeN0D00TRlgifdZZZiDY7ItDKZz0LKpdQ5DVBzVO003g8tg8Q1q+
> > mvRnkQ1MZcA/X6eqR1KOS85fW0WWwhSS5+7m3z0fR77mwM4yAIsJl9/HR69yKDCk
> > F8Js0DG5KtRm02IRP0Z5kgRZITmS3V7YOU/JR1874tqLvDfegdn9V/Pnk6A/vjsy
> > uW4FPqtL610I7eKAsL3ckDnGatOUuStwJGgM0KFZbmVxTrzveh8ow42uy70qykz1
> > 9tWhpZ6iDmCH7RTs0tJ/GFAWeq22at/EJG6qQ8T9ZPYz1pZWaEdYD0gSPZEUOPex
> > A978T4l2HucpMCiHR0b8gv7BttWndXFOCVS8wD1YJy0AFvMyxeegBmLZ1dQPo9Y2
> > hrOwLKc1o2wl7DQ7FdknMhJb3KKyPJZ1LXUmd4hSO5e+Gb20X2OEW53jvguxZSaG
> > DqAIxlu8zI+krlChJ9O+PNB2YeyO7Yhu48Kj7XuTs/xI/ZavFU0=
> > =ddxU
> > -END PGP SIGNATURE-
> >
> --
> "So you have to trust that the dots will somehow connect in your future."
>


Re: [ANNOUNCE] New committer Piotr Nowojski

2018-06-22 Thread Kostas Kloudas
Congratulations!

On Fri, Jun 22, 2018, 21:33 Shuyi Chen  wrote:

> Congratulations!
>
>
> On Fri, Jun 22, 2018 at 12:30 PM Matthias J. Sax  wrote:
>
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > Congrats!
> >
> > On 6/22/18 12:28 PM, Stefan Richter wrote:
> > > Congrats Piotr!
> > >
> > >> Am 22.06.2018 um 21:26 schrieb Till Rohrmann
> > >> :
> > >>
> > >> Hi everybody,
> > >>
> > >> On behalf of the PMC I am delighted to announce Piotr Nowojski as
> > >> a new Flink committer!
> > >>
> > >> Piotr has been an active member of our community for more than a
> > >>  year. Among other things, he contributed the TwoPhaseCommitSink,
> > >>  worked extensively on improving Flink's network stack and is now
> > >>  contributing to stream SQL. He is also helping the community by
> > >>  reviewing PRs, answering questions and driving discussions on
> > >> the mailing list.
> > >>
> > >> Please join me in congratulating Piotr for becoming a Flink
> > >> committer!
> > >>
> > >> Cheers, Till
> > >
> > -BEGIN PGP SIGNATURE-
> > Comment: GPGTools - https://gpgtools.org
> >
> > iQIzBAEBCgAdFiEEeiQdEa0SVXokodP3DccxaWtLg18FAlstTjMACgkQDccxaWtL
> > g1+fPA/8CY4uCC9w2TK9++d3r+qK2w5vQ5FxLiIFzARC5C3XRG75L+vinM6wBOiP
> > kMqLQxlKce4gIb7qfAEfLL2CnHZ8tryZrTWEYxQCrf3M1TFRM6yoGYbbLGVroi3/
> > CY5KtersshjQWnp4qNJ03jfGKRLk6tz7rs9kElVpek6X33nYyLbVsFx0jViYcoFt
> > ddBSiJYoc8nq/BTVAbYY8ClO7bTOYFgp5vj0rlFobMyQPGGgWfEdnGOfQTG2vqWL
> > 9siam1p+fKUg443TOdVrSlXB7e8U4CeAn6VO6BQZpwiTXha2WSZHjZTDOKb6GCVz
> > QmXG50tg2ngQrrrkbn2CNiOMZXnWT/QyD4farKk8tZuFIl2nICXK9r97W1DiV0SG
> > MXxgxnz7NZBfpajZNSuWTIBVRIXawPpdh3Icn+aAHCV9rnDtImZ2qNrQGvBlw60B
> > J9sNQGG34thg0yee6Kfc/XAEXceLZpQPAC6rNDQqR++JdQH4yV096EacMY7vowQN
> > 37uSnoSG6mhd9l5wMpynCSRaTm2eiEvbnb424b8xFjg/AfU5PZJT/NBOFcxKDYrM
> > Tbhdex+eDTqO1F8nF0YPEoKajvWBZqMlQfBp/qyF5hYNYK3KUhZJFWeYa7W5BaJ8
> > KyyZLn1HlhYv7il0MGXwDhn6STPvGpeun56Pc4BO+UVrHI2vlM8=
> > =TrrS
> > -END PGP SIGNATURE-
> >
> --
> "So you have to trust that the dots will somehow connect in your future."
>


Re: [ANNOUNCE] Two new committers: Xingcan Cui and Nico Kruber

2018-05-10 Thread Kostas Kloudas
Congratulations to both of you guys!

> On May 10, 2018, at 9:41 AM, Amit Jain  wrote:
> 
> Congrats!
> 
> On Thu, May 10, 2018 at 10:10 AM, Xingcan Cui  wrote:
>> Thanks, everyone!
>> 
>> It’s an honor which inspires me to devote more to our community.
>> 
>> Regards,
>> Xingcan
>> 
>>> On May 10, 2018, at 2:06 AM, Peter Huang  wrote:
>>> 
>>> Congratulations Nico and Xingcan!
>>> 
>>> On Wed, May 9, 2018 at 11:04 AM, Thomas Weise  wrote:
>>> 
 Congrats!
 
 
 On Wed, May 9, 2018 at 10:14 AM, Bowen Li  wrote:
 
> Congratulations!
> 
> On Tue, May 8, 2018 at 11:06 PM, Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com> wrote:
> 
>> Congratulations, Xingcan and Nico !
>> Nico is a good PR reviewer and I gained a lot from him.
>> :)--
> 发件人:Fabian
>> Hueske 发送时间:2018年5月9日(星期三) 02:53收件人:dev <
>> dev@flink.apache.org>主 题:[ANNOUNCE] Two new committers: Xingcan Cui
 and
>> Nico Kruber
>> Hi everyone,
>> 
>> I'm happy to announce that two members of the Flink community accepted
> the
>> offer of the PMC to become committers.
>> 
>> * Xingcan Cui has been contributing to Flink for about a year, focusing
> on
>> Flink's relational APIs (SQL & Table API). In the past year, Xingcan
 has
>> started design discussions, helped reviewing several pull requests, and
>> replied to questions on the user mailing list.
>> 
>> * Nico Kruber is an active contributor since 1.5 years
>> and worked mostly on
>> internal features, such as the blob manager and a new network stack.
 Nico
>> answers many questions on the user mailing list, reports lots of bugs
 and
>> is a very active PR reviewer.
>> 
>> Please join me in congratulating Xingcan and Nico.
>> 
>> Cheers,
>> Fabian
>> 
>> 
> 
 
>> 



Re: CoProcessFunction doesn't support timer on keyed stream

2018-04-26 Thread Kostas Kloudas
Hi again Ken,

This is the PR https://github.com/apache/flink/pull/5922 
<https://github.com/apache/flink/pull/5922> I promised.

You can build the docs by going in the docs directory of the Flink repo and 
executing
./build_docs.sh -p

After it finishes, you will be able to see all the documentation at 
localhost:4000 and
the Broadcast State one at:

   http://localhost:4000/dev/stream/state/broadcast_state.html 
<http://localhost:4000/dev/stream/state/broadcast_state.html>

Any feedback is welcomed!

Cheers,
Kostas

> On Apr 26, 2018, at 11:09 AM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Ken,
> 
> It is true that there is not reason for not having access to the timerService 
> from the processElement of 
> the keyed side. On the other side (the non-keyed side) you cannot set timers 
> because timers are bound 
> to a specific key. 
> 
> Now, if one stream is broadcasted and the other is keyed, then FLINK-1.5 also 
> has BroadcastState which 
> does exactly what you are describing. 
> 
> Unfortunately the documentation is being prepared but I will open a Pull 
> Request today and I can send 
> you the link so that you can have a look.
> 
> Kostas
> 
>> On Apr 26, 2018, at 2:37 AM, Ken Krugler <kkrugler_li...@transpac.com> wrote:
>> 
>> Hi devs,
>> 
>> I’m using Flink 1.5-SNAPSHOT, and I’ve got a connected stream that I’m using 
>> with a CoProcessFunction.
>> 
>> One of the streams is keyed, and the other is broadcast.
>> 
>> As per the documentation 
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html>),
>>  I tried to set a timer, but that fails with:
>> 
>> java.lang.UnsupportedOperationException: Setting timers is only supported on 
>> a keyed streams.
>>  at 
>> org.apache.flink.streaming.api.operators.co.CoProcessOperator$ContextImpl.registerProcessingTimeTimer(CoProcessOperator.java:123)
>> 
>> CoProcessOperator.java has:
>> 
>>  @Override
>>  public void registerProcessingTimeTimer(long time) {
>>  throw new UnsupportedOperationException("Setting timers 
>> is only supported on a keyed streams.");
>>  }
>> 
>>  @Override
>>  public void registerEventTimeTimer(long time) {
>>  throw new UnsupportedOperationException("Setting timers 
>> is only supported on a keyed streams.");
>>  }
>> 
>> So it seems like the documentation is wrong, and you currently can’t use 
>> timers with CoProcessFunction.
>> 
>> If that’s true, I’m curious why. Is it just an implementation detail, or is 
>> there a fundamental architectural problem?
>> 
>> I can see some challenges with needing two onTimerX() methods, and thus 
>> different timer services for each method, etc.
>> 
>> Thanks,
>> 
>> — Ken
>> 
>> 
>> http://about.me/kkrugler
>> +1 530-210-6378
>> 
> 



Re: CoProcessFunction doesn't support timer on keyed stream

2018-04-26 Thread Kostas Kloudas
Hi Ken,

It is true that there is not reason for not having access to the timerService 
from the processElement of 
the keyed side. On the other side (the non-keyed side) you cannot set timers 
because timers are bound 
to a specific key. 

Now, if one stream is broadcasted and the other is keyed, then FLINK-1.5 also 
has BroadcastState which 
does exactly what you are describing. 

Unfortunately the documentation is being prepared but I will open a Pull 
Request today and I can send 
you the link so that you can have a look.

Kostas

> On Apr 26, 2018, at 2:37 AM, Ken Krugler  wrote:
> 
> Hi devs,
> 
> I’m using Flink 1.5-SNAPSHOT, and I’ve got a connected stream that I’m using 
> with a CoProcessFunction.
> 
> One of the streams is keyed, and the other is broadcast.
> 
> As per the documentation 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html
>  
> ),
>  I tried to set a timer, but that fails with:
> 
> java.lang.UnsupportedOperationException: Setting timers is only supported on 
> a keyed streams.
>   at 
> org.apache.flink.streaming.api.operators.co.CoProcessOperator$ContextImpl.registerProcessingTimeTimer(CoProcessOperator.java:123)
> 
> CoProcessOperator.java has:
> 
>   @Override
>   public void registerProcessingTimeTimer(long time) {
>   throw new UnsupportedOperationException("Setting timers 
> is only supported on a keyed streams.");
>   }
> 
>   @Override
>   public void registerEventTimeTimer(long time) {
>   throw new UnsupportedOperationException("Setting timers 
> is only supported on a keyed streams.");
>   }
> 
> So it seems like the documentation is wrong, and you currently can’t use 
> timers with CoProcessFunction.
> 
> If that’s true, I’m curious why. Is it just an implementation detail, or is 
> there a fundamental architectural problem?
> 
> I can see some challenges with needing two onTimerX() methods, and thus 
> different timer services for each method, etc.
> 
> Thanks,
> 
> — Ken
> 
> 
> http://about.me/kkrugler
> +1 530-210-6378
> 



Re: Flink 1.3.2 with CEP Pattern, Memory usage increases results in OOM

2018-04-16 Thread Kostas Kloudas
Hi Abiramalakshmi,

Thanks for reporting this!

As a starting point I would recommend:

1) use RocksDB as your backend, so that state is not accumulated in memory
2) enable incremental checkpoints
3) the “new IterativeCondition() {…}” can become “new 
SimpleCondition() {}”, 
as this is more efficient
4) set the default watermark interval to a small value so that you have 
frequent watermarks and elements are not 
   accumulated.

If you do the above, please let me know if the problems persist.

Thanks,
Kostas


> On Apr 10, 2018, at 1:19 PM, Abiramalakshmi Natarajan 
>  wrote:
> 
> new Iter



Re: [jira] [Created] (FLINK-9164) times(#,#) quantifier does not seem to work

2018-04-12 Thread Kostas Kloudas
Hi Romain,

What if you remove the AfterMatchSkipStrategy.skipPastLastEvent()?

Kostas

> On Apr 12, 2018, at 1:19 PM, Romain Revol (JIRA)  wrote:
> 
> AfterMatchSkipStrategy.skipPastLastEvent()



Re: [Proposal] CEP library changes - review request

2018-04-03 Thread Kostas Kloudas
Hi Shailesh,

Your solution may fit your use case, but as Dawid mentioned earlier, it makes a 
lot of 
assumptions about the input. 

From a look at your PoC:
1) You assume no late data (you do not drop anything) and no out-of-orderness.
2) You mix the two notions of time (event and processing).
3) You eagerly process each element which can have performance implications 
especially if 
you go for RocksDb backend.

Given the above, I do not think that this can go in Flink.
 
Something that goes in Flink will have to be maintained by the community. 
So, although some use cases may have particular needs, we refrain from adding 
to the master, code that makes assumptions specifically tailored for specific 
use cases.

I understand that the one watermark per key could conceptually fit better in 
your use case, 
but there may be a better way to achieve your goal, one that aligns with 
Flink’s offered 
semantics.

Thanks, 
Kostas

> On Apr 3, 2018, at 11:01 AM, Shailesh Jain  
> wrote:
> 
> Bump.
> 
> On Thu, Mar 22, 2018 at 7:54 PM, Shailesh Jain 
> wrote:
> 
>> To trigger the computations for each batch, I'll have to use the
>> processing time timer in the abstract keyed cep operator, right?
>> 
>> The reason why I'm avoiding the watermarks is that it is not possible to
>> generate watermarks per key.
>> 
>> Thanks for the 'within' remark.
>> 
>> A couple of questions:
>> 
>> 1. Given our use case and the limitations of per key watermark, do you
>> think that this approach is worth adding to the library?
>> 
>> 2. What other aspects of the framework do I need to consider/test before
>> we go about implementing this approach formally?
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, 
>> wrote:
>> 
>>> If you do the buffering you can emit watermark for each such batch (equal
>>> to highest timestamp in such batch). This way you won’t need to sort. CEP
>>> library will do it for you.
>>> The within clause will work in EventTime then.
>>> 
>>> One more remark also the within clause always work for whole pattern not
>>> just to a part of it, it does not matter if you apply it in the middle (as
>>> you did) or at the very end.
>>> 
>>> Best,
>>> Dawid
>>> 
 On 19 Mar 2018, at 11:31, Shailesh Jain 
>>> wrote:
 
 Thanks for your reply, Dawid.
 
 I understand that the approach I've tried out is not generic enough, and
 would need a lot more thought to be put into w.r.t parallelism
 considerations, out of order events, effects on downstream operators
>>> etc.
 The intention was to do a quick implementation to check the feasibility
>>> of
 the approach.
 
>> It will also not sort the events etc.
 
 In the application code to test this approach, I had used a Global
>>> window
 to sort events based on their timestamp (similar to how out of order
>>> events
 are dropped based on a time-bound, I'm dropping them based on a count
>>> based
 bound).
 
 allEvents = allEvents
   .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
   .window(GlobalWindows.create())
   .trigger(new GlobalWindowCountTrigger(
>>> propLoader.getSortWindowSize()))
   .process(new SortWindowProcessFunction())
   .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
   .assignTimestampsAndWatermarks(new TimestampsExtractor())
   .uid(Constants.TS_EX_UID);
 PatternLoader
   .applyPatterns(allEvents, propLoader.getPatternClassNames())
   .addSink(createKafkaSink(kafkaProps))
   .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
 
 
>> If in the getCurrentWatermark method of your
 AssignerWithPeriodicWatermarks you will just return
>> new Watermark(System.currentTimeMillis()), you will get the same
 behaviour as with that change,
>> am I right?
 
 If watermarks are generated based on the machine time, the major issue I
 see is that we will not be able to leverage Event Time functionality.
 Specifically, if I have patterns which look for the absence of an Event
>>> for
 a fixed period of time.
 
 For eg. We have many such patterns:
 
 Pattern pattern = Pattern.begin
   (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
   .skipPastLastEvent())
   .where(Conditions.getUnderchilledCondition())
   .notFollowedBy(COMPRESSOR_ON)
   .where(Conditions.getCompressorOnCondition())
   .within(Time.minutes(30))
   .followedBy(HIGH_TEMP)
   .where(Conditions.getHighTemperatureCondition());
 
 Now when there are network issues (which are very frequent), queued
>>> events
 are delivered together, and such patterns will not be matched correctly
>>> as
 pruning of events from NFA's buffer will not be done 

Re: [DISCUSS] Implement end-to-end tests in Java

2018-03-27 Thread Kostas Kloudas
Hi Timo,

Thanks for opening this. 

I agree that bash is not the best tool for all the reasons that you mention 
plus: 
1) it is difficult to write re-usable code
2) there are a lot of ways to express the same thing and difficult to build 
“best practices” (as it should be 
in a community project) resulting in a difficulty also to make it a real 
community effort.

Now on the with what to replace bash, I agree that Java is the best option. 
Flink is mainly written in Java 
and most of its community is already familiar with it. In addition, there are a 
lot of utilities already built 
that can help reduce the boilerplate code that, for example, check logs.

The only alternative I can think of it Python, but given that Flink is a mainly 
Java project. 
I would vouch for Java.

Cheers,
Kostas

> On Mar 27, 2018, at 2:00 PM, Timo Walther  wrote:
> 
> Hi everyone,
> 
> after reviewing a bunch of end-to-end tests, I'm wondering if we should 
> really continue implementing everything in bash scripts. Wouldn't it be nicer 
> to implement them in Java code that just calls the interfaces of Flink (e.g. 
> "./bin/flink run" or REST API)?
> 
> Here are some thoughts why I think we should consider that:
> 
> - Implement the tests in a more object oriented way: We could have classes 
> such as Cluster, LogAnalyzer, WatchdogProcess, and other utility classes that 
> can make writing tests easier. This also ensure maintainability in the future 
> because all tools for proper Java coding style, comments etc. are already in 
> place.
> 
> - Exception handling and type safety: We could catch excpetion more easily 
> (e.g. if a REST request fails or a process is not there)
> 
> - Debuggability: We could run end-to-end tests in the IDE for debugging and 
> set breakpoints etc.
> 
> 
> What do you think?
> 
> 
> Regards,
> 
> Timo
> 



[jira] [Created] (FLINK-9046) Improve error messages when receiving unexpected messages.

2018-03-21 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-9046:
-

 Summary: Improve error messages when receiving unexpected messages.
 Key: FLINK-9046
 URL: https://issues.apache.org/jira/browse/FLINK-9046
 Project: Flink
  Issue Type: Bug
  Components: REST, Webfrontend
Affects Versions: 1.5.0
Reporter: Kostas Kloudas


Currently, in many cases, when a Rest Handler received an unexpected messages, 
e.g. for a job that it does not exist, it logs the full stack trace often with 
misguiding messages. This can happen for example if we launch a cluster, 
connect to the WebUI and monitor a job, then kill the cluster while not 
shutting down the WebUI and start a new cluster on the same port. In this case 
the WebUI will keep asking for the previous job and the JobDetailsHandler will 
log the following:

 
{code:java}
2018-03-21 14:27:24,319 ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Exception 
occurred in REST handler.
org.apache.flink.runtime.rest.NotFoundException: Job 
548afad8217f4a18db7f50e60d48885a not found
at 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:90)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache.lambda$getExecutionGraph$0(ExecutionGraphCache.java:133)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:755)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code}
 

The same holds when asking through the WebUI for the logs of TM. In this case, 
the logs will contain: 
{code:java}
2018-03-21 12:26:05,096 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler - 
Implementation error: Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
 No TaskExecutor registered under b4c468fc0fe729b09e35d319cbc45c39.
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:538)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke

[jira] [Created] (FLINK-8928) Improve error message on server binding error.

2018-03-12 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8928:
-

 Summary: Improve error message on server binding error.
 Key: FLINK-8928
 URL: https://issues.apache.org/jira/browse/FLINK-8928
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8926) Shutdown client proxy on test end.

2018-03-12 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8926:
-

 Summary: Shutdown client proxy on test end.
 Key: FLINK-8926
 URL: https://issues.apache.org/jira/browse/FLINK-8926
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8908) MapSerializer creates new serializer even if key and value serializers are stateless

2018-03-09 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8908:
-

 Summary: MapSerializer creates new serializer even if key and 
value serializers are stateless
 Key: FLINK-8908
 URL: https://issues.apache.org/jira/browse/FLINK-8908
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8841) Duplicate MapSerializer and HashMapSerializer.

2018-03-02 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8841:
-

 Summary: Duplicate MapSerializer and HashMapSerializer.
 Key: FLINK-8841
 URL: https://issues.apache.org/jira/browse/FLINK-8841
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Type Serialization System
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0


Currently there are two class MapSerializer and HashMapSerializer whose code is 
the same with the only difference being that one includes elements of type Map 
and the other HashMap. 

In addition, these two were merged on the same commit. 

I would like to remove the HashMapSerializer. I already created a branch 
without the HashMapSerialzer and nothing seems to be failing on Travis. The 
reasons why I hesitate to do it, is because I am not sure if this may create 
problems with Backwards Compatibility.

[~xiaogang.sxg] could you elaborate a bit on why they were both added and if 
there is any danger in removing the HashMapSerializer?

Also [~StephanEwen] and [~stefanrichte...@gmail.com] it is worth having a look 
and if you are ok, I can remove the redundant serializer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8802) Concurrent serialization without duplicating serializers in state server.

2018-02-28 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8802:
-

 Summary: Concurrent serialization without duplicating serializers 
in state server.
 Key: FLINK-8802
 URL: https://issues.apache.org/jira/browse/FLINK-8802
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0


The `getSerializedValue()` may be called by multiple threads but serializers 
are not duplicated, which may lead to exceptions thrown when a serializer is 
stateful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8780) Add Broadcast State documentation.

2018-02-26 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8780:
-

 Summary: Add Broadcast State documentation.
 Key: FLINK-8780
 URL: https://issues.apache.org/jira/browse/FLINK-8780
 Project: Flink
  Issue Type: Bug
  Components: DataStream API, Documentation, Streaming
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8724) Add BroadcastState examples with predetermined state transitions for the broadcast state

2018-02-21 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8724:
-

 Summary: Add BroadcastState examples with predetermined state 
transitions for the broadcast state
 Key: FLINK-8724
 URL: https://issues.apache.org/jira/browse/FLINK-8724
 Project: Flink
  Issue Type: Sub-task
  Components: Examples, Streaming
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8723) Remove existing BroadcastState examples.

2018-02-21 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8723:
-

 Summary: Remove existing BroadcastState examples.
 Key: FLINK-8723
 URL: https://issues.apache.org/jira/browse/FLINK-8723
 Project: Flink
  Issue Type: Sub-task
  Components: Examples, Streaming
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8722) Refactor BroadcastState examples.

2018-02-21 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8722:
-

 Summary: Refactor BroadcastState examples.
 Key: FLINK-8722
 URL: https://issues.apache.org/jira/browse/FLINK-8722
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: HAQueryableStateRocksDBBackendITCase test cause outofmemoryerror and never stop

2018-02-21 Thread Kostas Kloudas
Hi Minglei,

Can you send us where the execution is when it hangs?

Threaddump, what objects are created, or whatever can help track down the 
problem.

Thanks,
Kostas

> On Feb 21, 2018, at 9:07 AM, mingleizhang  wrote:
> 
> Hi,
> I run HAQueryableStateRocksDBBackendITCase#testValueStateDefault test class 
> on my local machine. And it hangs always and in the end gives me a 
> outofmemory error.
> 
> 
> Cheers
> Minglei.
> 



[jira] [Created] (FLINK-8659) Add migration tests for Broadcast state.

2018-02-14 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8659:
-

 Summary: Add migration tests for Broadcast state.
 Key: FLINK-8659
 URL: https://issues.apache.org/jira/browse/FLINK-8659
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8642) Initialize descriptors before use at getBroadcastState().

2018-02-13 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8642:
-

 Summary: Initialize descriptors before use at getBroadcastState().
 Key: FLINK-8642
 URL: https://issues.apache.org/jira/browse/FLINK-8642
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Releasing Flink 1.5.0

2018-02-12 Thread Kostas Kloudas
For me as well +1.

Cheers,
Kostas

> On Feb 12, 2018, at 2:59 PM, Timo Walther <twal...@apache.org> wrote:
> 
> Sounds good to me. +1 from my side.
> 
> Regards,
> Timo
> 
> 
> Am 2/12/18 um 2:56 PM schrieb Aljoscha Krettek:
>> I agree with Chesnay: we should do a soft "feature freeze" first, were we 
>> agree to not merge new features to master after that and then to the actual 
>> hard cutting of the release branch a while later.
>> 
>> For actual dates, I'm proposing end of this week (16.02.2018) as soft 
>> feature freeze and end of next week (23.02.2018) as the hard cut of the 
>> release branch?
>> 
>> What do you think?
>> 
>> --
>> Aljoscha
>> 
>>> On 8. Feb 2018, at 10:15, Till Rohrmann <trohrm...@apache.org> wrote:
>>> 
>>> Local state recovery is almost completely done. Only some reviews and
>>> merging of the final PRs is pending.
>>> 
>>> The network stack improvements are on a good way to be finished by the end
>>> of this week or beginning of next week. To my knowledge we got recently
>>> green Travis builds :-) The network stack changes will also include the
>>> application level flow control and the back pressure based checkpoint
>>> alignment. So only the last reviews and merging is missing.
>>> 
>>> Concerning Flip-6, I'm currently working on enabling Flip-6 by default.
>>> There are still some smaller things left to be done but I'm confident that
>>> we can resolve them quickly.
>>> 
>>> I agree that due to the big changes we should have a very thorough and
>>> principled testing period where we put Flink through the paces.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Wed, Feb 7, 2018 at 10:55 AM, Chesnay Schepler <ches...@apache.org>
>>> wrote:
>>> 
>>>> As Aljoscha said we wanted to do 1.5 soon after 1.4 based on the
>>>> assumption that the 3 big features (FLIP-6, network stack changes, local
>>>> state recovery) are nearly done.
>>>> 
>>>> I'm unsure about local state recovery, but I still see open issues for
>>>> FLIP-6 and the network stack rework.
>>>> As such it doesn't make sense to release 1.5 now.
>>>> 
>>>> Given the large scope of these features I would very much prefer to have
>>>> them active on master for a while before a feature-freeze
>>>> to expose them to a wider audience.
>>>> 
>>>> IMO it will take at least another month before we can start the release
>>>> process for 1.5, i.e. the feature freeze.
>>>> (2 more weeks for implementation, 2 weeks on master for the dust to settle)
>>>> 
>>>> 
>>>> On 05.02.2018 22:39, Kostas Kloudas wrote:
>>>> 
>>>>> Hi Aljoscha,
>>>>> 
>>>>> I believe that support for Broadcast State should also be in 1.5.
>>>>> There is an open PR https://github.com/apache/flink/pull/5230 <
>>>>> https://github.com/apache/flink/pull/5230> for that
>>>>> and there are some pending issues related to scala api and documentation.
>>>>> 
>>>>> Thanks,
>>>>> Kostas
>>>>> 
>>>>> On Feb 5, 2018, at 5:37 PM, Timo Walther <twal...@apache.org> wrote:
>>>>>> Hi Shuyi,
>>>>>> 
>>>>>> I will take a look at it again this week. I'm pretty sure it will be
>>>>>> part of 1.5.0.
>>>>>> 
>>>>>> Regards,
>>>>>> Timo
>>>>>> 
>>>>>> 
>>>>>> Am 2/5/18 um 5:25 PM schrieb Shuyi Chen:
>>>>>> 
>>>>>>> Hi Aljoscha, can we get this feature in for 1.5.0? We have a lot of
>>>>>>> internal users waiting for this feature.
>>>>>>> 
>>>>>>> [FLINK-7923 <https://issues.apache.org/jira/browse/FLINK-7923>] Support
>>>>>>> accessing subfields of a Composite element in an Object Array type
>>>>>>> column
>>>>>>> 
>>>>>>> Thanks a lot
>>>>>>> Shuyi
>>>>>>> 
>>>>>>> 
>>>>>>> On Mon, Feb 5, 2018 at 6:59 AM, Christophe Jolif <cjo...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>> Hi guys,
>>>>>>>> Sorry for jumping

[jira] [Created] (FLINK-8597) Add examples for Connected Streams with Broadcast State.

2018-02-07 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8597:
-

 Summary: Add examples for Connected Streams with Broadcast State.
 Key: FLINK-8597
 URL: https://issues.apache.org/jira/browse/FLINK-8597
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Releasing Flink 1.5.0

2018-02-05 Thread Kostas Kloudas
Hi Aljoscha,

I believe that support for Broadcast State should also be in 1.5.
There is an open PR https://github.com/apache/flink/pull/5230 
 for that
and there are some pending issues related to scala api and documentation.

Thanks,
Kostas

> On Feb 5, 2018, at 5:37 PM, Timo Walther  wrote:
> 
> Hi Shuyi,
> 
> I will take a look at it again this week. I'm pretty sure it will be part of 
> 1.5.0.
> 
> Regards,
> Timo
> 
> 
> Am 2/5/18 um 5:25 PM schrieb Shuyi Chen:
>> Hi Aljoscha, can we get this feature in for 1.5.0? We have a lot of
>> internal users waiting for this feature.
>> 
>> [FLINK-7923 ] Support
>> accessing subfields of a Composite element in an Object Array type column
>> 
>> Thanks a lot
>> Shuyi
>> 
>> 
>> On Mon, Feb 5, 2018 at 6:59 AM, Christophe Jolif  wrote:
>> 
>>> Hi guys,
>>> 
>>> Sorry for jumping in, but I think
>>> 
>>> [FLINK-8101] Elasticsearch 6.X support
>>> [FLINK-7386]  Flink Elasticsearch 5 connector is not compatible with
>>> Elasticsearch 5.2+ client
>>> 
>>>  have long been awaited and there was one PR from me and from someone else
>>> showing the interest ;) So if you could consider it for 1.5 that would be
>>> great!
>>> 
>>> Thanks!
>>> --
>>> Christophe
>>> 
>>> On Mon, Feb 5, 2018 at 2:47 PM, Timo Walther  wrote:
>>> 
 Hi Aljoscha,
 
 it would be great if we can include the first version of the SQL client
 (see FLIP-24, Implementation Plan 1). I will open a PR this week. I think
 we can merge this with explicit "experimental/alpha" status. It is far
>>> away
 from feature completeness but will be a great tool for Flink beginners.
 
 In order to use the SQL client we would need to also add some table
 sources with the new unified table factories (FLINK-8535), but this is
 optional because a user can implement own table factories at the
>>> begining.
 Regards,
 Timo
 
 
 Am 2/5/18 um 2:36 PM schrieb Tzu-Li (Gordon) Tai:
 
 Hi Aljoscha,
> Thanks for starting the discussion.
> 
> I think there’s a few connector related must-have improvements that we
> should get in before the feature freeze, since quite a few users have
>>> been
> asking for them:
> 
> [FLINK-6352] FlinkKafkaConsumer should support to use timestamp to set
>>> up
> start offset
> [FLINK-5479] Per-partition watermarks in FlinkKafkaConsumer should
> consider idle partitions
> [FLINK-8516] Pluggable shard-to-subtask partitioning for
> FlinkKinesisConsumer
> [FLINK-6109] Add a “checkpointed offset” metric to FlinkKafkaConsumer
> 
> These are still missing in the master branch. Only FLINK-5479 is still
> lacking a pull request.
> 
> Cheers,
> Gordon
> 
> On 31 January 2018 at 10:38:43 AM, Aljoscha Krettek (
>>> aljos...@apache.org)
> wrote:
> Hi Everyone,
> 
> When we decided to do the 1.4.0 release a while back we did that to get
>>> a
> stable release out before putting in a couple of new features. Back
>>> then,
> some of those new features (FLIP-6, network stack changes, local state
> recovery) were almost ready and we wanted to do a shortened 1.5.0
> development cycle to allow for those features to become ready and then
>>> do
> the next release.
> 
> We are now approaching the approximate time where we wanted to do the
> Flink 1.5.0 release so I would like to gauge where we are and gather
> opinions on how we should proceed now.
> 
> With this, I'd also like to propose myself as the release manager for
> 1.5.0 but I'm very happy to yield if someone else would be interested in
> doing that.
> 
> What do you think?
> 
> Best,
> Aljoscha
> 
 
 
>>> 
>>> --
>>> Christophe
>>> 
>> 
>> 
> 



[jira] [Created] (FLINK-8561) SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.

2018-02-05 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8561:
-

 Summary: SharedBuffer line 573 uses == to compare BufferEntries 
instead of .equals.
 Key: FLINK-8561
 URL: https://issues.apache.org/jira/browse/FLINK-8561
 Project: Flink
  Issue Type: Bug
  Components: CEP
Affects Versions: 1.4.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.4.1






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8522) DefaultOperatorStateBackend writes data in checkpoint that is never read.

2018-01-29 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8522:
-

 Summary: DefaultOperatorStateBackend writes data in checkpoint 
that is never read.
 Key: FLINK-8522
 URL: https://issues.apache.org/jira/browse/FLINK-8522
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.4.1


In the DefaultOperatorStateBackend at line 283 we write in the checkpoint an 
int declaring the number of the operator states that we include in the 
checkpoint. 

This number is never read when restoring and this can lead to confusion and 
problems with backwards compatibility and/or extension of the types of state we 
support (e.g. broadcast state).

There are two easy solutions, either remove the line and do not write the size, 
or make sure that we also read this number when restoring and simply ignore it.

I would go for the first one, i.e. remove the line. What do you think 
[~richtesn] and [~tzulitai] ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8446) Add support for multiple broadcast states.

2018-01-17 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8446:
-

 Summary: Add support for multiple broadcast states.
 Key: FLINK-8446
 URL: https://issues.apache.org/jira/browse/FLINK-8446
 Project: Flink
  Issue Type: New Feature
  Components: DataStream API
Affects Versions: 1.4.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-02 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8345:
-

 Summary: Iterate over keyed state on broadcast side of connect 
with broadcast.
 Key: FLINK-8345
 URL: https://issues.apache.org/jira/browse/FLINK-8345
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 1.5.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: CEP: Dynamic Patterns

2017-12-28 Thread Kostas Kloudas
Hi again Rafi,

Coming back to the second part of the question on what you can do right now,
I would suggest that you launch your initial job with your initial patterns and 
in your code you assign UID’s to your sources and the CEP operators, e.g.:

CEP.pattern(input, mySuperPattern).select(muSelectFunction).uid(“version1")
When you want to update the pattern (delete, add a new one or update an 
existing one):

1) take a savepoint from your running job and kill it.
2) update your patterns, and on the updated patterns only, change the uids so 
that the 
state in the savepoint corresponding to your previous version is ignored.
3) restart your job from the savepoint with the “—ignoreUnmappedState”  flag, 
as described here:
https://issues.apache.org/jira/browse/FLINK-4445 
<https://issues.apache.org/jira/browse/FLINK-4445>

This will allow your job to restart from where it left off (the sources 
checkpoint their state),
unchanged patterns to continue from where they were (as you did not change 
their uids)
and new/updated patterns to start from scratch as they have a new uid.

I have not really tried it but I think it can work although it requires some 
manual stopping/
restarting.

I do not know if Ufuk has something to add to this.

Hope this helps,
Kostas


> On Dec 28, 2017, at 2:57 PM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Rafi,
> 
> Currently this is unfortunately not supported out of the box.
> To support this, we need 2 features with one having to be added in Flink 
> itself,
> and the other to the CEP library.
> 
> The first one is broadcast state and the ability to connect keyed and 
> non-keyed 
> streams. This one is to be added to Flink itself and the good news are that 
> this 
> feature is scheduled to be added to Flink 1.5.
> 
> The second feature is to modify the CEP operator so that it can support 
> multiple 
> patterns and match incoming events against all of them. For this I have no 
> clear 
> deadline in my mind, but given that there are more and more people asking for 
> it, I think it is going to be added soon.
> 
> Thanks for raising the issue in the mailing list,
> Kostas
> 
>> On Dec 27, 2017, at 2:44 PM, Ufuk Celebi <u...@apache.org 
>> <mailto:u...@apache.org>> wrote:
>> 
>> Hey Rafi,
>> 
>> this is indeed a very nice feature to have. :-) I'm afraid that this
>> is currently hard to do manually with CEP. Let me pull in Dawid and
>> Klou (cc'd) who have worked a lot on CEP. They can probably update you
>> on the plan for FLINK-7129.
>> 
>> Best,
>> 
>> Ufuk
>> 
>> 
>> On Tue, Dec 26, 2017 at 8:47 PM, Shivam Sharma <28shivamsha...@gmail.com 
>> <mailto:28shivamsha...@gmail.com>> wrote:
>>> Hi Rafi,
>>> 
>>> Even I also wanted this facility from Flink Core. But I think this is
>>> already solved by Uber on Flink.
>>> 
>>> https://eng.uber.com/athenax/ <https://eng.uber.com/athenax/>
>>> 
>>> Best
>>> 
>>> On Tue, Dec 26, 2017 at 6:21 PM, Rafi Aroch <raf...@walkme.com> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I'm Rafi, Data Architect at WalkMe.
>>>> 
>>>> Our Mobile platform generates events coming from the end-users mobile
>>>> device for different actions that the user does.
>>>> 
>>>> The use case I wanted to implement using Flink CEP is as follows:
>>>> 
>>>> We would like to expose a UI where we could define a set of rules. Rules
>>>> can be statefull, like:
>>>> 
>>>> User did X 4 times in the last hour
>>>> AND
>>>> User did Y and then did Z in a session
>>>> AND
>>>> User average session duration is > 60 seconds
>>>> 
>>>> As the set of rules are met, we would like to trigger an action. Like a
>>>> REST call, fire event, etc.
>>>> 
>>>> This sounds like a good fit for Flink CEP, except that currently, I
>>>> understand that CEP patterns have to be "hard-coded" in my jobs code in
>>>> order to build the graph.
>>>> This set of rules may change many times a day. So re-deploying a Flink job
>>>> is not an option (or is it?).
>>>> 
>>>> I found this ticket: https://issues.apache.org/jira/browse/FLINK-7129 and
>>>> was hoping you plan to add this feature soon :)
>>>> 
>>>> This would make a powerful feature and open up many interesting use-cases.
>>>> 
>>>> Meanwhile, can you suggest a way of implementing this use-case?
>>>> 
>>>> Hope this makes sense.
>>>> Would love to hear your thoughts.
>>>> 
>>>> Thanks,
>>>> Rafi
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> Shivam Sharma
>>> Data Engineer @ Goibibo
>>> Indian Institute Of Information Technology, Design and Manufacturing
>>> Jabalpur
>>> Mobile No- (+91) 8882114744
>>> Email:- 28shivamsha...@gmail.com
>>> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
>>> <https://www.linkedin.com/in/28shivamsharma>*
> 



Re: CEP: Dynamic Patterns

2017-12-28 Thread Kostas Kloudas
Hi Rafi,

Currently this is unfortunately not supported out of the box.
To support this, we need 2 features with one having to be added in Flink itself,
and the other to the CEP library.

The first one is broadcast state and the ability to connect keyed and non-keyed 
streams. This one is to be added to Flink itself and the good news are that 
this 
feature is scheduled to be added to Flink 1.5.

The second feature is to modify the CEP operator so that it can support 
multiple 
patterns and match incoming events against all of them. For this I have no 
clear 
deadline in my mind, but given that there are more and more people asking for 
it, I think it is going to be added soon.

Thanks for raising the issue in the mailing list,
Kostas

> On Dec 27, 2017, at 2:44 PM, Ufuk Celebi  wrote:
> 
> Hey Rafi,
> 
> this is indeed a very nice feature to have. :-) I'm afraid that this
> is currently hard to do manually with CEP. Let me pull in Dawid and
> Klou (cc'd) who have worked a lot on CEP. They can probably update you
> on the plan for FLINK-7129.
> 
> Best,
> 
> Ufuk
> 
> 
> On Tue, Dec 26, 2017 at 8:47 PM, Shivam Sharma <28shivamsha...@gmail.com> 
> wrote:
>> Hi Rafi,
>> 
>> Even I also wanted this facility from Flink Core. But I think this is
>> already solved by Uber on Flink.
>> 
>> https://eng.uber.com/athenax/
>> 
>> Best
>> 
>> On Tue, Dec 26, 2017 at 6:21 PM, Rafi Aroch  wrote:
>> 
>>> Hi,
>>> 
>>> I'm Rafi, Data Architect at WalkMe.
>>> 
>>> Our Mobile platform generates events coming from the end-users mobile
>>> device for different actions that the user does.
>>> 
>>> The use case I wanted to implement using Flink CEP is as follows:
>>> 
>>> We would like to expose a UI where we could define a set of rules. Rules
>>> can be statefull, like:
>>> 
>>> User did X 4 times in the last hour
>>> AND
>>> User did Y and then did Z in a session
>>> AND
>>> User average session duration is > 60 seconds
>>> 
>>> As the set of rules are met, we would like to trigger an action. Like a
>>> REST call, fire event, etc.
>>> 
>>> This sounds like a good fit for Flink CEP, except that currently, I
>>> understand that CEP patterns have to be "hard-coded" in my jobs code in
>>> order to build the graph.
>>> This set of rules may change many times a day. So re-deploying a Flink job
>>> is not an option (or is it?).
>>> 
>>> I found this ticket: https://issues.apache.org/jira/browse/FLINK-7129 and
>>> was hoping you plan to add this feature soon :)
>>> 
>>> This would make a powerful feature and open up many interesting use-cases.
>>> 
>>> Meanwhile, can you suggest a way of implementing this use-case?
>>> 
>>> Hope this makes sense.
>>> Would love to hear your thoughts.
>>> 
>>> Thanks,
>>> Rafi
>>> 
>> 
>> 
>> 
>> --
>> Shivam Sharma
>> Data Engineer @ Goibibo
>> Indian Institute Of Information Technology, Design and Manufacturing
>> Jabalpur
>> Mobile No- (+91) 8882114744
>> Email:- 28shivamsha...@gmail.com
>> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
>> *



[jira] [Created] (FLINK-8090) An operator should not be able to register two states with the same name.

2017-11-15 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8090:
-

 Summary: An operator should not be able to register two states 
with the same name.
 Key: FLINK-8090
 URL: https://issues.apache.org/jira/browse/FLINK-8090
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.4.0
Reporter: Kostas Kloudas


Currently a {{ProcessFunction}} like this is a valid job:

{code}
new ProcessFunction<Tuple2<Integer, Long>, Object>() {
private static final long serialVersionUID = 
-805125545438296619L;

private transient MapState<Integer, 
Tuple2<Integer, Long>> firstMapState;
private transient MapState<Integer, 
Tuple2<Integer, Long>> secondMapState;

@Override
public void open(Configuration parameters) 
throws Exception {
super.open(parameters);
firstMapState = 
getRuntimeContext().getMapState(firstMapStateDescriptor);
secondMapState = 
getRuntimeContext().getMapState(secondMapStateDescriptor);
}

@Override
public void processElement(Tuple2<Integer, 
Long> value, Context ctx, Collector out) throws Exception {
Tuple2<Integer, Long> v = 
firstMapState.get(value.f0);
if (v == null) {
v = new Tuple2<>(value.f0, 0L);
}
System.out.println(value);
firstMapState.put(value.f0, new 
Tuple2<>(v.f0, v.f1 + value.f1));
}
}
{code}

This should not be the case, and the job should fail with an adequate message.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()

2017-11-14 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8073:
-

 Summary: Test instability 
FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
 Key: FLINK-8073
 URL: https://issues.apache.org/jira/browse/FLINK-8073
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Tests
Affects Versions: 1.4.0
Reporter: Kostas Kloudas


Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8050) RestServer#shutdown() ignores exceptions thrown when shutting down netty.

2017-11-12 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8050:
-

 Summary: RestServer#shutdown() ignores exceptions thrown when 
shutting down netty.
 Key: FLINK-8050
 URL: https://issues.apache.org/jira/browse/FLINK-8050
 Project: Flink
  Issue Type: Bug
  Components: REST
Affects Versions: 1.4.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [ANNOUNCE] New committer: Haohui Mai

2017-11-02 Thread Kostas Kloudas
Congratulations!

> On Nov 2, 2017, at 9:12 AM, Matthias J. Sax  wrote:
> 
> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
> 
> Congrats!
> 
> On 11/1/17 9:40 PM, Dawid Wysakowicz wrote:
>> Congratulations!
>> 
>> 01.11.2017 7:45 PM "Stephan Ewen"  napisał(a):
>> 
>>> Congrats and welcome!
>>> 
>>> On Wed, Nov 1, 2017 at 6:47 PM, Chen Qin 
>>> wrote:
>>> 
 Congratulations!
 
 On Wed, Nov 1, 2017 at 2:41 AM, Aljoscha Krettek
  wrote:
 
> Congratulations! 
> 
>> On 1. Nov 2017, at 10:13, Shaoxuan Wang
>>  wrote:
>> 
>> Congratulations!
>> 
>> On Wed, Nov 1, 2017 at 4:36 PM, Till Rohrmann
>> 
> wrote:
>> 
>>> Congrats and welcome on board :-)
>>> 
>>> On Wed, Nov 1, 2017 at 9:14 AM, Fabian Hueske
>>> 
> wrote:
>>> 
 Hi everybody,
 
 On behalf of the PMC I am delighted to announce Haohui
 Mai as a new
> Flink
 committer!
 
 Haohui has been an active member of our community for
 several
>>> months.
 Among other things, he made major contributions in
 ideas and code
>>> to
> the
 SQL and Table APIs.
 
 Please join me in congratulating Haohui for becoming a
 Flink
 committer!
 
 Cheers, Fabian
 
>>> 
> 
> 
 
>>> 
>> 
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
> 
> iQJFBAEBCgAvFiEEeiQdEa0SVXokodP3DccxaWtLg18FAln603YRHG1qc2F4QGFw
> YWNoZS5vcmcACgkQDccxaWtLg198Bg/+K+1xITykT7W2WQL0CFCn9/hJEF5VI3iK
> 5YLzu4LUC9JWKFVl6CpEFlNWO+h9RrGuNZu+MPFqjsz+8JwyX1mC780gr8Gg59Qa
> +IHsK5xNo5et6p45r0bgukA2hDGuap1W5WlB8cVyOat+Zmuhczc4J4WXAMeHXo1V
> 8fptsg1yNRkh7k4hweXCPSlqldwgCzllcqN26SkdlKGctr1729wpvWrAJsO+Azxl
> dBCnvB+qqDEmlTxPP/4R4cpprQ5kSvUjKorOMwDSw2/h5De6FsnwnUcoxmk8ZIcI
> vxU3KOU+mKYp0SH9v0soX/i7dWnQqBBiKmVqZgBQeDCS9C87X64TqjPYJUyhBjNX
> jHTVYY32Wmdyo0f1Y9SfQ5theq4eM/DApIVQLzzUHAX1OzibjuAGVtLEK8udcrk1
> /e3aBULzNcNqVICRGrKzZGFch0KW0VMBjdkAqjCcs8cdYRe0trZSVhHlaE5r1lS/
> lM2CUpqXIqx2/GFeRCC5znbUpnf0lwaUg/0PJfxBF6ifPtyeip2cfpggtX8uK75Q
> NrCR7UBAtYu1PUkXWAheEeuD8Gwl0YSrE57mjRs9ltbUSV0VlAOjBYx6yf1Ol6sw
> EYR/fVcyplVmshXUO3fqPfsWeVhGVZMtB9U2zOgVA0Fd+MwXsJ5Qs5b0dVEo7JHC
> pABowAipTbY=
> =zEpM
> -END PGP SIGNATURE-



[jira] [Created] (FLINK-7933) Test instability PrometheusReporterTest

2017-10-26 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-7933:
-

 Summary: Test instability PrometheusReporterTest
 Key: FLINK-7933
 URL: https://issues.apache.org/jira/browse/FLINK-7933
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.4.0
Reporter: Kostas Kloudas


Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7908) Restructure the QS module to reduce client deps.

2017-10-24 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-7908:
-

 Summary: Restructure the QS module to reduce client deps.
 Key: FLINK-7908
 URL: https://issues.apache.org/jira/browse/FLINK-7908
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.4.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.4.0






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7835) Fix duplicate() method in NFASerializer

2017-10-13 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-7835:
-

 Summary: Fix duplicate() method in NFASerializer
 Key: FLINK-7835
 URL: https://issues.apache.org/jira/browse/FLINK-7835
 Project: Flink
  Issue Type: Sub-task
  Components: CEP
Affects Versions: 1.3.2, 1.3.1, 1.3.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.4.0






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


<    1   2   3   4   5   >