Re: StandAlone job on k8s fails with "Unknown method truncate" on restore

2019-02-14 Thread Yun Tang
Hi

When 'RollingSink' try to initialize state, it would first check current file 
system supported truncate method. If file system not supported, it would use 
another work-around solution, which means you should not meet the problem. 
Otherwise 'RollingSink' thought and found the reflection method of 'truncate' 
while the file system actually not support. You could try to open DEBUG level 
to see whether log below could  be printed:
Truncate not found. Will write a file with suffix '.valid-length' and prefix 
'_' to specify how many bytes in a bucket are valid.

However, from your second email, the more serious problem should be using 
'Buckets' with Hadoop-2.6. From what I know the `RecoverableWriter` within 
'Buckets' can only support Hadoop-2.7+ , I'm not sure whether existed work 
around solution.

Best
Yun Tang

From: Vishal Santoshi 
Sent: Friday, February 15, 2019 3:43
To: user
Subject: Re: StandAlone job on k8s fails with "Unknown method truncate" on 
restore

And yes  cannot work with RollingFleSink for hadoop 2.6 release of 1.7.1 b'coz 
of this.


java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only 
supported for HDFS and for Hadoop version 2.7 or newer
at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)


Any work around ?

On Thu, Feb 14, 2019 at 1:42 PM Vishal Santoshi 
mailto:vishal.santo...@gmail.com>> wrote:
The job uses a RolllingFileSink to push data to hdfs. Run an HA standalone 
cluster on k8s,

* get the job running
* kill the pod.

The k8s deployment relaunches the pod but fails with


java.io.IOException: Missing data in tmp file: 
hdfs://nn-crunchy:8020/tmp/kafka-to-hdfs/ls_kraken_events/dt=2019-02-14/evt=ad_fill/.part-2-16.inprogress.449e8668-e886-4f89-b5f6-45ac68e25987


Unknown method truncate called on 
org.apache.hadoop.hdfs.protocol.ClientProtocol protocol.


The file does exist. We work with hadoop 2.6 , which does no have truncate. The 
previous version would see that "truncate" was not supported and drop a length 
file for the ,inprogress file and rename it to a valid part file.



Is this a known issue ?


Regards.





Re: Window elements for certain period for delayed processing

2019-02-14 Thread Fabian Hueske
Hi,

I would not use a window for that.
Implementing the logic with a ProcessFunction seems more straight-forward.
The function simply collects all events between 00:00 and 01:00 in a
ListState and emits them when the time passes 01:00.
All other records are simply forwarded.

Best, Fabian

Am Fr., 15. Feb. 2019 um 07:02 Uhr schrieb Congxian Qiu <
qcx978132...@gmail.com>:

> Hi, simpleusr
>
> Maybe custom trigger[1] can be helpful.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#triggers
>
> Best, Congxian
> On Feb 15, 2019, 13:15 +0800, simpleusr , wrote:
>
> Hi,
>
> My ultimate requirement is to stop processing of certain events between
> 00:00:00 and 01:00:00 for each day (Time is in HH:mm:SS format).
>
> I am flink newbie and I thought only option to delay elements is to collect
> them in a window between 00:00:00 and 01:00:00 for each day.
>
> TumblingEventTimeWindows for one hour seems the possible candidate but as I
> understand this window will be present for all times, i.e (startTime,
> startTime+1hour), (startTime+1hour, startTime+2hour), (startTime+2hour,
> startTime+3hour)...
>
> How will it be possible to start a window at 00:00:00 each day and stop the
> window at 01:00:00?
>
> Or is there any other option to keep elements in the job between 00:00:00
> and 01:00:00 and then start to process them after 01:00:00?
>
> Best regards
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Incorrect Javadoc in CheckpointedFunction.java?

2019-02-14 Thread Congxian Qiu
Hi Chirag

I think the doc is outdated, the comments in CheckpointFuncion.java on master 
now[1] is `get the state data structure for the per-partition state`

[1] 
https://github.com/apache/flink/blob/00fe8a01192a523544d3868360a924863a69d8f8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java#L83

Best, Congxian
On Feb 15, 2019, 13:39 +0800, Chirag Dewan , wrote:
> Hi,
>
> I was going through the Javadoc for CheckpointedFunction.java, it says that:
>
> * // get the state data structure for the per-key state
> * countPerKey = context.getKeyedStateStore().getReducingState(
> * new ReducingStateDescriptor<>("perKeyCount", new 
> AddFunction<>(), Long.class));
> *
> * // get the state data structure for the per-key state
> * countPerPartition = 
> context.getOperatorStateStore().getOperatorState(
> * new ListStateDescriptor<>("perPartitionCount", Long.class));
>
> OperatorStateStore would not be per-key state data structure since it applies 
> to non-keyed state.
>
> Is my understanding correct here?
>
> Thanks,
>
> Chirag


Re: Window elements for certain period for delayed processing

2019-02-14 Thread Congxian Qiu
Hi, simpleusr

Maybe custom trigger[1] can be helpful.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#triggers

Best, Congxian
On Feb 15, 2019, 13:15 +0800, simpleusr , wrote:
> Hi,
>
> My ultimate requirement is to stop processing of certain events between
> 00:00:00 and 01:00:00 for each day (Time is in HH:mm:SS format).
>
> I am flink newbie and I thought only option to delay elements is to collect
> them in a window between 00:00:00 and 01:00:00 for each day.
>
> TumblingEventTimeWindows for one hour seems the possible candidate but as I
> understand this window will be present for all times, i.e (startTime,
> startTime+1hour), (startTime+1hour, startTime+2hour), (startTime+2hour,
> startTime+3hour)...
>
> How will it be possible to start a window at 00:00:00 each day and stop the
> window at 01:00:00?
>
> Or is there any other option to keep elements in the job between 00:00:00
> and 01:00:00 and then start to process them after 01:00:00?
>
> Best regards
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Incorrect Javadoc in CheckpointedFunction.java?

2019-02-14 Thread Chirag Dewan
Hi,
I was going through the Javadoc for CheckpointedFunction.java, it says that:
* // get the state data structure for the per-key state
* countPerKey = context.getKeyedStateStore().getReducingState(
* new ReducingStateDescriptor<>("perKeyCount", new 
AddFunction<>(), Long.class));
*
* // get the state data structure for the per-key state
* countPerPartition = context.getOperatorStateStore().getOperatorState(
* new ListStateDescriptor<>("perPartitionCount", Long.class));

OperatorStateStore would not be per-key state data structure since it applies 
to non-keyed state. 
Is my understanding correct here?
Thanks,
Chirag

Window elements for certain period for delayed processing

2019-02-14 Thread simpleusr
Hi,

My ultimate requirement is to stop processing of certain events between
00:00:00 and 01:00:00 for each day (Time is in HH:mm:SS format).

I am flink newbie and I thought only option to delay elements is to collect
them in a window between 00:00:00 and 01:00:00 for each day. 

TumblingEventTimeWindows for one hour seems the possible candidate but as I
understand this window will be present for all times, i.e (startTime,
startTime+1hour), (startTime+1hour, startTime+2hour), (startTime+2hour,
startTime+3hour)...

How will it be possible to start a window at 00:00:00 each day and stop the
window at 01:00:00?

Or is there any other option to keep elements in the job between 00:00:00
and 01:00:00 and then start to process them after 01:00:00?

Best regards



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread jincheng sun
Hi Stephan,

Thanks for the clarification! You are right, we have never initiated a
discussion about supporting OVER Window on DataStream, we can discuss it in
a separate thread. I agree with you add the item after move the discussion
forward.

+1 for putting the roadmap on the website.
+1 for periodically update the roadmap, as mentioned by Fabian, we can
update it at every feature version release.

Thanks,
Jincheng

Stephan Ewen  于2019年2月14日周四 下午5:44写道:

> Thanks Jincheng and Rong Rong!
>
> I am not deciding a roadmap and making a call on what features should be
> developed or not. I was only collecting broader issues that are already
> happening or have an active FLIP/design discussion plus committer support.
>
> Do we have that for the suggested issues as well? If yes , we can add them
> (can you point me to the issue/mail-thread), if not, let's try and move the
> discussion forward and add them to the roadmap overview then.
>
> Best,
> Stephan
>
>
> On Wed, Feb 13, 2019 at 6:47 PM Rong Rong  wrote:
>
>> Thanks Stephan for the great proposal.
>>
>> This would not only be beneficial for new users but also for contributors
>> to keep track on all upcoming features.
>>
>> I think that better window operator support can also be separately group
>> into its own category, as they affects both future DataStream API and batch
>> stream unification.
>> can we also include:
>> - OVER aggregate for DataStream API separately as @jincheng suggested.
>> - Improving sliding window operator [1]
>>
>> One more additional suggestion, can we also include a more extendable
>> security module [2,3] @shuyi and I are currently working on?
>> This will significantly improve the usability for Flink in corporate
>> environments where proprietary or 3rd-party security integration is needed.
>>
>> Thanks,
>> Rong
>>
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>>
>>
>>
>>
>> On Wed, Feb 13, 2019 at 3:39 AM jincheng sun 
>> wrote:
>>
>>> Very excited and thank you for launching such a great discussion,
>>> Stephan !
>>>
>>> Here only a little suggestion that in the Batch Streaming Unification
>>> section, do we need to add an item:
>>>
>>> - Same window operators on bounded/unbounded Table API and DataStream
>>> API
>>> (currently OVER window only exists in SQL/TableAPI, DataStream API does
>>> not yet support)
>>>
>>> Best,
>>> Jincheng
>>>
>>> Stephan Ewen  于2019年2月13日周三 下午7:21写道:
>>>
 Hi all!

 Recently several contributors, committers, and users asked about making
 it more visible in which way the project is currently going.

 Users and developers can track the direction by following the
 discussion threads and JIRA, but due to the mass of discussions and open
 issues, it is very hard to get a good overall picture.
 Especially for new users and contributors, is is very hard to get a
 quick overview of the project direction.

 To fix this, I suggest to add a brief roadmap summary to the homepage.
 It is a bit of a commitment to keep that roadmap up to date, but I think
 the benefit for users justifies that.
 The Apache Beam project has added such a roadmap [1]
 , which was received very well by
 the community, I would suggest to follow a similar structure here.

 If the community is in favor of this, I would volunteer to write a
 first version of such a roadmap. The points I would include are below.

 Best,
 Stephan

 [1] https://beam.apache.org/roadmap/

 

 Disclaimer: Apache Flink is not governed or steered by any one single
 entity, but by its community and Project Management Committee (PMC). This
 is not a authoritative roadmap in the sense of a plan with a specific
 timeline. Instead, we share our vision for the future and major initiatives
 that are receiving attention and give users and contributors an
 understanding what they can look forward to.

 *Future Role of Table API and DataStream API*
   - Table API becomes first class citizen
   - Table API becomes primary API for analytics use cases
   * Declarative, automatic optimizations
   * No manual control over state and timers
   - DataStream API becomes primary API for applications and data
 pipeline use cases
   * Physical, user controls data types, no magic or optimizer
   * Explicit control over state and time

 *Batch Streaming Unification*
   - Table API unification (environments) (FLIP-32)
   - New unified source interface (FLIP-

Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread zhijiang
Thanks Stephan for this proposal and I totally agree with it. 

It is very necessary to summarize the overall features/directions the community 
is going or planning to go. Although I almost checked the mailing list 
everyday, it still seems difficult to trace everything. In addtion I think this 
whole roadmap picture can also help expose the relationships among different 
items, even avoid the similar/duplicated thoughts or works.

Just one small suggestion, if we coule add some existing link 
(jira/discussion/FLIP/google doc) for each listed item, then it would be easy 
to keep trace of the interested one and handle the progress of it.

Best,
Zhijiang
--
From:Jeff Zhang 
Send Time:2019年2月14日(星期四) 18:03
To:Stephan Ewen 
Cc:dev ; user ; jincheng sun 
; Shuyi Chen ; Rong Rong 

Subject:Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

Hi Stephan,

Thanks for this proposal. It is a good idea to track the roadmap. One
suggestion is that it might be better to put it into wiki page first.
Because it is easier to update the roadmap on wiki compared to on flink web
site. And I guess we may need to update the roadmap very often at the
beginning as there's so many discussions and proposals in community
recently. We can move it into flink web site later when we feel it could be
nailed down.

Stephan Ewen  于2019年2月14日周四 下午5:44写道:

> Thanks Jincheng and Rong Rong!
>
> I am not deciding a roadmap and making a call on what features should be
> developed or not. I was only collecting broader issues that are already
> happening or have an active FLIP/design discussion plus committer support.
>
> Do we have that for the suggested issues as well? If yes , we can add them
> (can you point me to the issue/mail-thread), if not, let's try and move the
> discussion forward and add them to the roadmap overview then.
>
> Best,
> Stephan
>
>
> On Wed, Feb 13, 2019 at 6:47 PM Rong Rong  wrote:
>
>> Thanks Stephan for the great proposal.
>>
>> This would not only be beneficial for new users but also for contributors
>> to keep track on all upcoming features.
>>
>> I think that better window operator support can also be separately group
>> into its own category, as they affects both future DataStream API and batch
>> stream unification.
>> can we also include:
>> - OVER aggregate for DataStream API separately as @jincheng suggested.
>> - Improving sliding window operator [1]
>>
>> One more additional suggestion, can we also include a more extendable
>> security module [2,3] @shuyi and I are currently working on?
>> This will significantly improve the usability for Flink in corporate
>> environments where proprietary or 3rd-party security integration is needed.
>>
>> Thanks,
>> Rong
>>
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>>
>>
>>
>>
>> On Wed, Feb 13, 2019 at 3:39 AM jincheng sun 
>> wrote:
>>
>>> Very excited and thank you for launching such a great discussion,
>>> Stephan !
>>>
>>> Here only a little suggestion that in the Batch Streaming Unification
>>> section, do we need to add an item:
>>>
>>> - Same window operators on bounded/unbounded Table API and DataStream
>>> API
>>> (currently OVER window only exists in SQL/TableAPI, DataStream API does
>>> not yet support)
>>>
>>> Best,
>>> Jincheng
>>>
>>> Stephan Ewen  于2019年2月13日周三 下午7:21写道:
>>>
 Hi all!

 Recently several contributors, committers, and users asked about making
 it more visible in which way the project is currently going.

 Users and developers can track the direction by following the
 discussion threads and JIRA, but due to the mass of discussions and open
 issues, it is very hard to get a good overall picture.
 Especially for new users and contributors, is is very hard to get a
 quick overview of the project direction.

 To fix this, I suggest to add a brief roadmap summary to the homepage.
 It is a bit of a commitment to keep that roadmap up to date, but I think
 the benefit for users justifies that.
 The Apache Beam project has added such a roadmap [1]
 , which was received very well by
 the community, I would suggest to follow a similar structure here.

 If the community is in favor of this, I would volunteer to write a
 first version of such a roadmap. The points I would include are below.

 Best,
 Stephan

 [1] https://beam.apache.org/roadmap/

 

 Disclaimer: Apache Flink is not governed or steered by any one single
 entity, but by i

Re: Linkage error when using DropwizardMeterWrapper

2019-02-14 Thread shkob1
Hey Jayant. Getting the same using gradle. my metrics reporter and my
application both using the flink-metrics-dropwizard dependency for reporting
Meters. how should i be solving it?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-14 Thread Averell
Thank you Gordon and Ken.

My Flink job is now running well with 1.7.2 RC1, with failed ES request
retried successfully.

One more question I have on this is how to limit the number of retries for
different types of errors with ES bulk request. Is there any guideline on
that?

My temporary solution is to use the version field of each ER request -
increase it for every time I retried putting the request into the queue.
This works for me until now, but it doesn't look right.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


[ANNOUNCEMENT] March 2019 Bay Area Apache Flink Meetup

2019-02-14 Thread Xuefu Zhang
Hi all,

I'm very excited to announce that the community is planning the next meetup
in Bay Area on March 25, 2019. The event is just announced on Meetup.com
[1].

To make the event successful, your participation and help will be needed.
Currently, we are looking for an organization that can host the event.
Please let me know if you have any leads.

Secondly, we encourage Flink users and developers to take this as an
opportunity to share experience or development. Thus, please let me know if
you like to give a short talk.

I look forward to meeting you all in the Meetup.

Regards,
Xuefu

[1] https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/258975465


Re: Impact of occasional big pauses in stream processing

2019-02-14 Thread Rong Rong
Hi Ajay,

Yes, Andrey is right. I was actually missing the first basic but important
point: If your process function is stuck, it will immediately block that
thread.
>From your description, what it sounds like is that not all the messages you
consume from kafka actually triggers the processing logic. There should be
plenty of way to avoid over provisioning your job just to satisfy your peak
traffic: for example what Andrey suggested, using a Async RPC call to some
other resource for the heavy computation; or split it into a filter (which
removes non-actionable messages) and the actual process (where you can use
a higher parallelism to reduce chances of stuck).

Regarding the second question, I am not expert but my understanding is that
there should be isolations between Flink jobs you run on that session
cluster. e.g. one job's backpressure will not affect other jobs' consumer.
I've CCed Till who might be able to better answer your question.

--
Rong


On Thu, Feb 14, 2019 at 8:24 AM Aggarwal, Ajay 
wrote:

> Thank you Rong and Andrey. The blog and your explanation was very useful.
>
>
>
> In my use case, source stream (kafka based) contains messages that capture
> some “work” that needs to be done for a tenant.  It’s a multi-tenant source
> stream. I need to queue up (and execute) this work per tenant in the order
> in which it was produced. And flink provides this ordered queuing per
> tenant very elegantly. Now the only thing is that executing this “work”
> could be expensive in terms of compute/memory/time.  Furthermore per tenant
> there is a constraint of doing this work serially. Hence this question.  I
> believe if our flink cluster has enough resources, it should work.
>
>
>
> But this leads to another related question. If there are multiple flink
> jobs sharing the same flink cluster and one of those jobs sees the spike
> such that back pressure builds up all the way to the source, will that
> impact other jobs as well? Is a task slot shared by multiple jobs? If not,
> my understanding is that this should not impact other flink jobs. Is that
> correct?
>
>
>
> Thanks.
>
>
>
> Ajay
>
>
>
> *From: *Andrey Zagrebin 
> *Date: *Thursday, February 14, 2019 at 5:09 AM
> *To: *Rong Rong 
> *Cc: *"Aggarwal, Ajay" , "user@flink.apache.org"
> 
> *Subject: *Re: Impact of occasional big pauses in stream processing
>
>
>
> Hi Ajay,
>
>
>
> Technically, it will immediately block the thread of
> MyKeyedProcessFunction subtask scheduled to some slot and basically block
> processing of the key range assigned to this subtask.
> Practically, I agree with Rong's answer. Depending on the topology of your
> inputStream, it can eventually block a lot of stuff.
> In general, I think, it is not recommended to perform blocking operations
> in process record functions. You could consider AsyncIO [1] to unblock the
> task thread.
>
> Best,
>
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>
>
>
> On Thu, Feb 14, 2019 at 6:03 AM Rong Rong  wrote:
>
> Hi Ajay,
>
>
>
> Flink handles "backpressure" in a graceful way so that it doesn't get
> affected when your processing pipeline is occasionally slowed down.
>
> I think the following articles will help [1,2].
>
>
>
> In your specific case: the "KeyBy" operation will re-hash data so they can
> be reshuffled from all input consumers to all your process operators (in
> this case the MyKeyedProcessFunction). If one of the process operator is
> backpressured, it will back track all the way to the source.
>
> So, my understanding is that: since there's the reshuffling, if one of the
> process function is backpressured, it will potentially affect all the
> source operators.
>
>
>
> Thanks,
>
> Rong
>
>
>
> [1] https://www.ververica.com/blog/how-flink-handles-backpressure
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html
>
>
>
> On Wed, Feb 13, 2019 at 8:50 AM Aggarwal, Ajay 
> wrote:
>
> I was wondering what is the impact if one of the stream operator function
> occasionally takes too long to process the event.  Given the following
> simple flink job
>
>
>
>inputStream
>
>   .KeyBy (“tenantId”)
>
>   .process ( new MyKeyedProcessFunction())
>
>
>
> , if occasionally MyKeyedProcessFunction takes too long (say ~5-10
> minutes) to process an incoming element, what is the impact on overall
> pipeline? Is the impact limited to
>
>1. Specific key for which MyKeyedProcessFunction is currently taking
>too long to process an element, or
>2. Specific Taskslot, where MyKeyedProcessFunction is currently taking
>too long to process an element, i.e. impacting multiple keys, or
>3. Entire inputstream ?
>
>
>
> Also what is the built in resiliency in these cases? Is there a concept of
> timeout for each operator function?
>
>
>
> Ajay
>
>


Re: StandAlone job on k8s fails with "Unknown method truncate" on restore

2019-02-14 Thread Vishal Santoshi
And yes  cannot work with RollingFleSink for hadoop 2.6 release of 1.7.1
b'coz of this.

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
are only supported for HDFS and for Hadoop version 2.7 or newer
at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)


Any work around ?


On Thu, Feb 14, 2019 at 1:42 PM Vishal Santoshi 
wrote:

> The job uses a RolllingFileSink to push data to hdfs. Run an HA standalone
> cluster on k8s,
>
> * get the job running
> * kill the pod.
>
> The k8s deployment relaunches the pod but fails with
>
> java.io.IOException: Missing data in tmp file:
> hdfs://nn-crunchy:8020/tmp/kafka-to-hdfs/ls_kraken_events/dt=2019-02-14/evt=ad_fill/.part-2-16.inprogress.449e8668-e886-4f89-b5f6-45ac68e25987
>
>
> Unknown method truncate called on
> org.apache.hadoop.hdfs.protocol.ClientProtocol protocol.
>
>
> The file does exist. We work with hadoop 2.6 , which does no have
> truncate. The previous version would see that "truncate" was not supported
> and drop a length file for the ,inprogress file and rename it to a valid
> part file.
>
>
>
> Is this a known issue ?
>
>
> Regards.
>
>
>
>
>


StandAlone job on k8s fails with "Unknown method truncate" on restore

2019-02-14 Thread Vishal Santoshi
The job uses a RolllingFileSink to push data to hdfs. Run an HA standalone
cluster on k8s,

* get the job running
* kill the pod.

The k8s deployment relaunches the pod but fails with

java.io.IOException: Missing data in tmp file:
hdfs://nn-crunchy:8020/tmp/kafka-to-hdfs/ls_kraken_events/dt=2019-02-14/evt=ad_fill/.part-2-16.inprogress.449e8668-e886-4f89-b5f6-45ac68e25987


Unknown method truncate called on
org.apache.hadoop.hdfs.protocol.ClientProtocol protocol.


The file does exist. We work with hadoop 2.6 , which does no have truncate.
The previous version would see that "truncate" was not supported and drop a
length file for the ,inprogress file and rename it to a valid part file.



Is this a known issue ?


Regards.


Re: Flink 1.6 Yarn Session behavior

2019-02-14 Thread Jins George
Thanks Gary. Understood the behavior.

I am leaning towards running 7 TM on each machine(8 core), I have 4 nodes, that 
will end up 28 taskmanagers and 1 job manager. I was wondering if this can 
bring additional burden on jobmanager? Is it recommended?

Thanks,

Jins George

On 2/14/19 8:49 AM, Gary Yao wrote:
Hi Jins George,

This has been asked before [1]. The bottom line is that you currently cannot
pre-allocate TMs and distribute your tasks evenly. You might be able to
achieve a better distribution across hosts by configuring fewer slots in your
TMs.

Best,
Gary

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-td21588.html


On Wed, Feb 13, 2019 at 6:20 AM Tzu-Li (Gordon) Tai 
mailto:tzuli...@apache.org>> wrote:
Hi,

I'm forwarding this question to Gary (CC'ed), who most likely would have an 
answer for your question here.

Cheers,
Gordon

On Wed, Feb 13, 2019 at 8:33 AM Jins George 
mailto:jins.geo...@aeris.net>> wrote:

Hello community,

I am trying to  upgrade a  Flink Yarn session cluster running BEAM pipelines  
from version 1.2.0 to 1.6.3.

Here is my session start command: yarn-session.sh -d -n 4  -jm 1024 -tm 3072 -s 
7

Because of the dynamic resource allocation,  no taskmanager gets created 
initially. Now once I submit a job with parallelism 5, I see that 1 
task-manager gets created and all 5 parallel instances are scheduled on the 
same taskmanager( because I have 7 slots).  This can create hot spot as only 
one physical node ( out of 4 in my case) is utilized for processing.

I noticed the legacy mode, which would provision all task managers at cluster 
creation, but since legacy mode is expected to go away soon, I didn't want to 
try that route.

Is there a way I can configure the multiple jobs or parallel instances of same 
job spread across all the available Yarn nodes and continue using the 'new' 
mode ?

Thanks,

Jins George


Re: No resource available error while testing HA

2019-02-14 Thread Gary Yao
Hi Averell,

The TM containers fetch the Flink binaries and config files form HDFS (or
another DFS if configured) [1]. I think you should be able to change the log
level by patching the logback configuration in HDFS, and kill all Flink
containers on all hosts. If you are running an HA setup, your cluster should
be running with the new logback configuration afterwards.

Best,
Gary

[1]
https://github.com/apache/flink/blob/02ff4bfe90d8e8b896c9f1a1bdbe8d43a48f5de7/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L691

On Wed, Feb 13, 2019 at 12:44 PM Averell  wrote:

> Hi Gary,
>
> Thanks for the suggestion.
>
> How about changing the configuration of the Flink job itself during
> runtime?
> What I have to do now is to take a savepoint, stop the job, change the
> configuration, and then restore the job from the save point.
>
> Is there any easier way to do that?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink 1.6 Yarn Session behavior

2019-02-14 Thread Gary Yao
Hi Jins George,

This has been asked before [1]. The bottom line is that you currently cannot
pre-allocate TMs and distribute your tasks evenly. You might be able to
achieve a better distribution across hosts by configuring fewer slots in
your
TMs.

Best,
Gary

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-td21588.html


On Wed, Feb 13, 2019 at 6:20 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> I'm forwarding this question to Gary (CC'ed), who most likely would have
> an answer for your question here.
>
> Cheers,
> Gordon
>
> On Wed, Feb 13, 2019 at 8:33 AM Jins George  wrote:
>
>> Hello community,
>>
>> I am trying to  upgrade a  Flink Yarn session cluster running BEAM
>> pipelines  from version 1.2.0 to 1.6.3.
>>
>> Here is my session start command: yarn-session.sh -d *-n 4*  -jm 1024
>> -tm 3072 *-s 7*
>>
>> Because of the dynamic resource allocation,  no taskmanager gets created
>> initially. Now once I submit a job with parallelism 5, I see that 1
>> task-manager gets created and all 5 parallel instances are scheduled on the
>> same taskmanager( because I have 7 slots).  This can create hot spot as
>> only one physical node ( out of 4 in my case) is utilized for processing.
>>
>> I noticed the legacy mode, which would provision all task managers at
>> cluster creation, but since legacy mode is expected to go away soon, I
>> didn't want to try that route.
>>
>> Is there a way I can configure the multiple jobs or parallel instances of
>> same job spread across all the available Yarn nodes and continue using the
>> 'new' mode ?
>>
>> Thanks,
>>
>> Jins George
>>
>


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread Rong Rong
Hi Stephan,

Thanks for the clarification, yes I think these issues has already been
discussed in previous mailing list threads [1,2,3].

I also agree that updating the "official" roadmap every release is a very
good idea to avoid frequent update.
One question I might've been a bit confusion is: are we suggesting to keep
one roadmap on the documentation site (e.g. [4]) per release, or simply
just one most up-to-date roadmap in the main website [5] ?
Just like the release notes in every release, the former will probably
provide a good tracker for users to look back at previous roadmaps as well
I am assuming.

Thanks,
Rong

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html

[4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/
[5] https://flink.apache.org/

On Thu, Feb 14, 2019 at 2:26 AM Stephan Ewen  wrote:

> I think the website is better as well.
>
> I agree with Fabian that the wiki is not so visible, and visibility is the
> main motivation.
> This type of roadmap overview would not be updated by everyone - letting
> committers update the roadmap means the listed threads are actually
> happening at the moment.
>
>
> On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske  wrote:
>
>> Hi,
>>
>> I like the idea of putting the roadmap on the website because it is much
>> more visible (and IMO more credible, obligatory) there.
>> However, I share the concerns about frequent updates.
>>
>> It think it would be great to update the "official" roadmap on the
>> website once per release (-bugfix releases), i.e., every three month.
>> We can use the wiki to collect and draft the roadmap for the next update.
>>
>> Best, Fabian
>>
>>
>> Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang :
>>
>>> Hi Stephan,
>>>
>>> Thanks for this proposal. It is a good idea to track the roadmap. One
>>> suggestion is that it might be better to put it into wiki page first.
>>> Because it is easier to update the roadmap on wiki compared to on flink web
>>> site. And I guess we may need to update the roadmap very often at the
>>> beginning as there's so many discussions and proposals in community
>>> recently. We can move it into flink web site later when we feel it could be
>>> nailed down.
>>>
>>> Stephan Ewen  于2019年2月14日周四 下午5:44写道:
>>>
 Thanks Jincheng and Rong Rong!

 I am not deciding a roadmap and making a call on what features should
 be developed or not. I was only collecting broader issues that are already
 happening or have an active FLIP/design discussion plus committer support.

 Do we have that for the suggested issues as well? If yes , we can add
 them (can you point me to the issue/mail-thread), if not, let's try and
 move the discussion forward and add them to the roadmap overview then.

 Best,
 Stephan


 On Wed, Feb 13, 2019 at 6:47 PM Rong Rong  wrote:

> Thanks Stephan for the great proposal.
>
> This would not only be beneficial for new users but also for
> contributors to keep track on all upcoming features.
>
> I think that better window operator support can also be separately
> group into its own category, as they affects both future DataStream API 
> and
> batch stream unification.
> can we also include:
> - OVER aggregate for DataStream API separately as @jincheng suggested.
> - Improving sliding window operator [1]
>
> One more additional suggestion, can we also include a more extendable
> security module [2,3] @shuyi and I are currently working on?
> This will significantly improve the usability for Flink in corporate
> environments where proprietary or 3rd-party security integration is 
> needed.
>
> Thanks,
> Rong
>
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>
>
>
>
> On Wed, Feb 13, 2019 at 3:39 AM jincheng sun 
> wrote:
>
>> Very excited and thank you for launching such a great discussion,
>> Stephan !
>>
>> Here only a little suggestion that in the Batch Streaming Unification
>> section, do we need to add an item:
>>
>> - Same window operators on bounded/unbounded Table API and DataStream
>> API
>> (currently OVER window only exists in SQL/TableAPI, DataStream API
>> does not yet support)
>>
>> Be

Re: long lived standalone job session cluster in kubernetes

2019-02-14 Thread Heath Albritton
My team and I are keen to help out with testing and review as soon as there is 
a pill request.

-H

> On Feb 11, 2019, at 00:26, Till Rohrmann  wrote:
> 
> Hi Heath,
> 
> I just learned that people from Alibaba already made some good progress with 
> FLINK-9953. I'm currently talking to them in order to see how we can merge 
> this contribution into Flink as fast as possible. Since I'm quite busy due to 
> the upcoming release I hope that other community members will help out with 
> the reviewing once the PRs are opened.
> 
> Cheers,
> Till
> 
>> On Fri, Feb 8, 2019 at 8:50 PM Heath Albritton  wrote:
>> Has any progress been made on this?  There are a number of folks in
>> the community looking to help out.
>> 
>> 
>> -H
>> 
>> On Wed, Dec 5, 2018 at 10:00 AM Till Rohrmann  wrote:
>> >
>> > Hi Derek,
>> >
>> > there is this issue [1] which tracks the active Kubernetes integration. 
>> > Jin Sun already started implementing some parts of it. There should also 
>> > be some PRs open for it. Please check them out.
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-9953
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Dec 5, 2018 at 6:39 PM Derek VerLee  wrote:
>> >>
>> >> Sounds good.
>> >>
>> >> Is someone working on this automation today?
>> >>
>> >> If not, although my time is tight, I may be able to work on a PR for 
>> >> getting us started down the path Kubernetes native cluster mode.
>> >>
>> >>
>> >> On 12/4/18 5:35 AM, Till Rohrmann wrote:
>> >>
>> >> Hi Derek,
>> >>
>> >> what I would recommend to use is to trigger the cancel with savepoint 
>> >> command [1]. This will create a savepoint and terminate the job 
>> >> execution. Next you simply need to respawn the job cluster which you 
>> >> provide with the savepoint to resume from.
>> >>
>> >> [1] 
>> >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Tue, Dec 4, 2018 at 10:30 AM Andrey Zagrebin 
>> >>  wrote:
>> >>>
>> >>> Hi Derek,
>> >>>
>> >>> I think your automation steps look good.
>> >>> Recreating deployments should not take long
>> >>> and as you mention, this way you can avoid unpredictable old/new version 
>> >>> collisions.
>> >>>
>> >>> Best,
>> >>> Andrey
>> >>>
>> >>> > On 4 Dec 2018, at 10:22, Dawid Wysakowicz  
>> >>> > wrote:
>> >>> >
>> >>> > Hi Derek,
>> >>> >
>> >>> > I am not an expert in kubernetes, so I will cc Till, who should be able
>> >>> > to help you more.
>> >>> >
>> >>> > As for the automation for similar process I would recommend having a
>> >>> > look at dA platform[1] which is built on top of kubernetes.
>> >>> >
>> >>> > Best,
>> >>> >
>> >>> > Dawid
>> >>> >
>> >>> > [1] https://data-artisans.com/platform-overview
>> >>> >
>> >>> > On 30/11/2018 02:10, Derek VerLee wrote:
>> >>> >>
>> >>> >> I'm looking at the job cluster mode, it looks great and I and
>> >>> >> considering migrating our jobs off our "legacy" session cluster and
>> >>> >> into Kubernetes.
>> >>> >>
>> >>> >> I do need to ask some questions because I haven't found a lot of
>> >>> >> details in the documentation about how it works yet, and I gave up
>> >>> >> following the the DI around in the code after a while.
>> >>> >>
>> >>> >> Let's say I have a deployment for the job "leader" in HA with ZK, and
>> >>> >> another deployment for the taskmanagers.
>> >>> >>
>> >>> >> I want to upgrade the code or configuration and start from a
>> >>> >> savepoint, in an automated way.
>> >>> >>
>> >>> >> Best I can figure, I can not just update the deployment resources in
>> >>> >> kubernetes and allow the containers to restart in an arbitrary order.
>> >>> >>
>> >>> >> Instead, I expect sequencing is important, something along the lines
>> >>> >> of this:
>> >>> >>
>> >>> >> 1. issue savepoint command on leader
>> >>> >> 2. wait for savepoint
>> >>> >> 3. destroy all leader and taskmanager containers
>> >>> >> 4. deploy new leader, with savepoint url
>> >>> >> 5. deploy new taskmanagers
>> >>> >>
>> >>> >>
>> >>> >> For example, I imagine old taskmanagers (with an old version of my
>> >>> >> job) attaching to the new leader and causing a problem.
>> >>> >>
>> >>> >> Does that sound right, or am I overthinking it?
>> >>> >>
>> >>> >> If not, has anyone tried implementing any automation for this yet?
>> >>> >>
>> >>> >
>> >>>


Synchronize reading from two Kafka Topics of different size

2019-02-14 Thread Olle Noren
Hi,

We have a Flink job were we are trying to window join two datastreams 
originating from two different Kafka topics, where one topic contains a lot 
more data per time instance than the other one.
We use event time processing, and this all works fine when running our pipeline 
live, i.e. data is consumed and processed as soon as it is ingested in Kafka.

The problem though occurs in the scenario when we are replaying with data 
stored in Kafka, then the watermarks of the “larger-stream” are lagging behind 
the “smaller-stream” since this stream has less data per time unit and then is 
advancing faster.
This leads to a large state at the join operation since data from the 
“smaller-stream” needs to be kept until the corresponding watermarks from the 
“larger-stream” have passed.
To avoid a very large state at the join operator, we have tried to increase the 
parallelism for the consumer of the “larger-stream” to make this keep up with 
the “smaller stream”, this decreases the size of the state to some extent. This 
seems though like a ugly way to get around the problem and will not work if the 
sizes of the two Kafka topics are changing over time.

Is there any way we can synchronize the reading of the Kafka sources based on 
the watermarks we have in the two streams, i.e. to pause the reading of the 
“smaller-topic” until the “larger-stream” has caught up? Any other ideas how to 
handle this replay-scenario?

Thanks in advance

Olle


Olle Noren
Systems Engineer
Fleet Perception for Maintenance
[cid:nd_logo_d020cb30-0d08-4da8-8390-474f0e5447c8.png]
NIRA Dynamics AB
Wallenbergs gata 4
58330 Linköping
Sweden  Mobile: +46 709 748 304
olle.no...@niradynamics.se
www.niradynamics.se
Together for smarter safety



Re: Impact of occasional big pauses in stream processing

2019-02-14 Thread Aggarwal, Ajay
Thank you Rong and Andrey. The blog and your explanation was very useful.

In my use case, source stream (kafka based) contains messages that capture some 
“work” that needs to be done for a tenant.  It’s a multi-tenant source stream. 
I need to queue up (and execute) this work per tenant in the order in which it 
was produced. And flink provides this ordered queuing per tenant very 
elegantly. Now the only thing is that executing this “work” could be expensive 
in terms of compute/memory/time.  Furthermore per tenant there is a constraint 
of doing this work serially. Hence this question.  I believe if our flink 
cluster has enough resources, it should work.

But this leads to another related question. If there are multiple flink jobs 
sharing the same flink cluster and one of those jobs sees the spike such that 
back pressure builds up all the way to the source, will that impact other jobs 
as well? Is a task slot shared by multiple jobs? If not, my understanding is 
that this should not impact other flink jobs. Is that correct?

Thanks.

Ajay

From: Andrey Zagrebin 
Date: Thursday, February 14, 2019 at 5:09 AM
To: Rong Rong 
Cc: "Aggarwal, Ajay" , "user@flink.apache.org" 

Subject: Re: Impact of occasional big pauses in stream processing

Hi Ajay,

Technically, it will immediately block the thread of MyKeyedProcessFunction 
subtask scheduled to some slot and basically block processing of the key range 
assigned to this subtask.
Practically, I agree with Rong's answer. Depending on the topology of your 
inputStream, it can eventually block a lot of stuff.
In general, I think, it is not recommended to perform blocking operations in 
process record functions. You could consider AsyncIO [1] to unblock the task 
thread.

Best,
Andrey

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Thu, Feb 14, 2019 at 6:03 AM Rong Rong 
mailto:walter...@gmail.com>> wrote:
Hi Ajay,

Flink handles "backpressure" in a graceful way so that it doesn't get affected 
when your processing pipeline is occasionally slowed down.
I think the following articles will help [1,2].

In your specific case: the "KeyBy" operation will re-hash data so they can be 
reshuffled from all input consumers to all your process operators (in this case 
the MyKeyedProcessFunction). If one of the process operator is backpressured, 
it will back track all the way to the source.
So, my understanding is that: since there's the reshuffling, if one of the 
process function is backpressured, it will potentially affect all the source 
operators.

Thanks,
Rong

[1] https://www.ververica.com/blog/how-flink-handles-backpressure
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html

On Wed, Feb 13, 2019 at 8:50 AM Aggarwal, Ajay 
mailto:ajay.aggar...@netapp.com>> wrote:
I was wondering what is the impact if one of the stream operator function 
occasionally takes too long to process the event.  Given the following simple 
flink job

   inputStream
  .KeyBy (“tenantId”)
  .process ( new MyKeyedProcessFunction())

, if occasionally MyKeyedProcessFunction takes too long (say ~5-10 minutes) to 
process an incoming element, what is the impact on overall pipeline? Is the 
impact limited to

  1.  Specific key for which MyKeyedProcessFunction is currently taking too 
long to process an element, or
  2.  Specific Taskslot, where MyKeyedProcessFunction is currently taking too 
long to process an element, i.e. impacting multiple keys, or
  3.  Entire inputstream ?

Also what is the built in resiliency in these cases? Is there a concept of 
timeout for each operator function?

Ajay


Re: TaskManager gets confused after the JobManager restarts

2019-02-14 Thread Ethan Li
The related job manager log is 
https://gist.github.com/Ethanlm/86a10e786ad9025ddaa27c113c536da8

> On Feb 14, 2019, at 9:40 AM, Ethan Li  wrote:
> 
> Hello,
> 
> I have a standalone flink-1.4.2 cluster with one JobManager, one TaskManager, 
> and zookeeper.  I first started JM and TM and waited for them to be stable. 
> Then I restarted JM. It’s when the TM got confused.
> 
> TM got notified that Leader node has changed and it tried to register to the 
> new Leader (the new rpc port is 34561). Then it got the acknowledge says it’s 
> already registered. And it then kept trying to associate with the old JM roc 
> port (35213) and fail.
> 
> 2019-02-14 14:56:54,059 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Trying to 
> register at JobManager 
> akka.ssl.tcp://fl...@openstorm10blue-n1.blue.ygrid.yahoo.com:34561/user/jobmanager
>  
> 
>  (attempt 1, timeout: 500 milliseconds)
> 2019-02-14 14:56:54,157 DEBUG 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.ssl.SslHandler  - [id: 
> 0x77ac93ae, /10.215.68.243:46796 => 
> openstorm10blue-n1.blue.ygrid.yahoo.com/10.215.68.98:34561 
> ] 
> HANDSHAKEN: TLS_RSA_WITH_AES_128_CBC_SHA
> 2019-02-14 14:56:54,276 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Successful 
> registration at JobManager 
> (akka.ssl.tcp://fl...@openstorm10blue-n1.blue.ygrid.yahoo.com:34561/user/jobmanager
>  
> ),
>  starting network stack and library cache.
> 2019-02-14 14:56:54,276 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Determined 
> BLOB server address to be 
> openstorm10blue-n1.blue.ygrid.yahoo.com/10.215.68.98:50100 
> . Starting 
> BLOB cache.
> 2019-02-14 14:56:54,278 INFO  
> org.apache.flink.runtime.blob.PermanentBlobCache  - Created BLOB 
> cache storage directory 
> /home/y/var/flink/blobstorage/blobStore-927b523f-f3ff-4ccc-83a0-362e09a3b858
> 2019-02-14 14:56:54,279 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache  - Created BLOB 
> cache storage directory 
> /home/y/var/flink/blobstorage/blobStore-8492465e-0e94-4792-a346-66e6da299f7a
> 2019-02-14 14:56:54,572 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager 
> was triggered to register at JobManager, but is already registered
> 2019-02-14 14:56:56,359 WARN  akka.remote.transport.netty.NettyTransport  
>   - Remote connection to [null] failed with 
> java.net.ConnectException: Connection refused: 
> openstorm10blue-n1.blue.ygrid.yahoo.com/10.215.68.98:35213 
> 
> 2019-02-14 14:56:56,360 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - The 
> association error event's root cause is not of type 
> InvalidAssociationException.
> 
> 
> 
> Full Task manage log:  
> https://gist.github.com/Ethanlm/e6f1b29d27d26813f5f8f40cd2c12643 
> 
> 
> 
> Is this expected or is this a bug? 
> 
> Thank you!
> 
> Ethan



TaskManager gets confused after the JobManager restarts

2019-02-14 Thread Ethan Li
Hello,

I have a standalone flink-1.4.2 cluster with one JobManager, one TaskManager, 
and zookeeper.  I first started JM and TM and waited for them to be stable. 
Then I restarted JM. It’s when the TM got confused.

TM got notified that Leader node has changed and it tried to register to the 
new Leader (the new rpc port is 34561). Then it got the acknowledge says it’s 
already registered. And it then kept trying to associate with the old JM roc 
port (35213) and fail.

2019-02-14 14:56:54,059 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Trying to register at JobManager 
akka.ssl.tcp://fl...@openstorm10blue-n1.blue.ygrid.yahoo.com:34561/user/jobmanager
 (attempt 1, timeout: 500 milliseconds)
2019-02-14 14:56:54,157 DEBUG 
org.apache.flink.shaded.akka.org.jboss.netty.handler.ssl.SslHandler  - [id: 
0x77ac93ae, /10.215.68.243:46796 => 
openstorm10blue-n1.blue.ygrid.yahoo.com/10.215.68.98:34561] HANDSHAKEN: 
TLS_RSA_WITH_AES_128_CBC_SHA
2019-02-14 14:56:54,276 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Successful registration at JobManager 
(akka.ssl.tcp://fl...@openstorm10blue-n1.blue.ygrid.yahoo.com:34561/user/jobmanager),
 starting network stack and library cache.
2019-02-14 14:56:54,276 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Determined BLOB server address to be 
openstorm10blue-n1.blue.ygrid.yahoo.com/10.215.68.98:50100. Starting BLOB cache.
2019-02-14 14:56:54,278 INFO  org.apache.flink.runtime.blob.PermanentBlobCache  
- Created BLOB cache storage directory 
/home/y/var/flink/blobstorage/blobStore-927b523f-f3ff-4ccc-83a0-362e09a3b858
2019-02-14 14:56:54,279 INFO  org.apache.flink.runtime.blob.TransientBlobCache  
- Created BLOB cache storage directory 
/home/y/var/flink/blobstorage/blobStore-8492465e-0e94-4792-a346-66e6da299f7a
2019-02-14 14:56:54,572 DEBUG org.apache.flink.runtime.taskmanager.TaskManager  
- TaskManager was triggered to register at JobManager, but is 
already registered
2019-02-14 14:56:56,359 WARN  akka.remote.transport.netty.NettyTransport
- Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: 
openstorm10blue-n1.blue.ygrid.yahoo.com/10.215.68.98:35213
2019-02-14 14:56:56,360 DEBUG org.apache.flink.runtime.taskmanager.TaskManager  
- The association error event's root cause is not of type 
InvalidAssociationException.



Full Task manage log:  
https://gist.github.com/Ethanlm/e6f1b29d27d26813f5f8f40cd2c12643 



Is this expected or is this a bug? 

Thank you!

Ethan

Re: How to register TypeInfoFactory for 'external' class

2019-02-14 Thread Alexey Trenikhun
Hi Gordon,
This class is used for states, in/out parameters and as key. As you wrote, 
there is no problem with usage in states - I just specify TypeInformation in 
descriptor. With return value of process function, I tried
.process(new MyProcessFunction())
.returns(MyTypeInformation), it works, but still gives warning, because method 
‘.process()’ is trying to extract type information of MyProcessFunction before 
‘.returns()’ is called. However, implementing of ResultTypeQueryable interface 
by MyProcessFunction helps, and this way I don’t need to call 
.returns(MyTypeInformation).
When used as key - I didn’t find way to override key type information for 
closure, seems the only way is to use explicit KeySelector implementing 
ResultTypeQueryable.
In short, ability to register MyTypeInformation for class at environment level 
would be more convenient, less bloat and less error prone.

Thanks,
Alexey



From: Tzu-Li (Gordon) Tai 
Sent: Tuesday, February 12, 2019 9:14 PM
To: user@flink.apache.org
Subject: Re: How to register TypeInfoFactory for 'external' class

Hi Alexey,

I don't think using the @TypeInfo annotation is doable at the moment.

Is this class being used only for input / output types of functions /
operators?
Or are you using it as a state type?

For the former, I think you can explicitly set the TypeInformation by
calling setTypeInfo on transformations.
For the latter, you can provide your own TypeInformation / TypeSerializer
for the state when declaring the StateDescriptor for the state.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: fllink 1.7.1 and RollingFileSink

2019-02-14 Thread Vishal Santoshi
Awesome, thanks!  Will open a new thread. But yes the inprogress file was
helpful.

On Thu, Feb 14, 2019, 7:50 AM Kostas Kloudas  Hi Vishal,
>
> For the StreamingFileSink vs Rolling/BucketingSink:
>  - you can use the StreamingFileSink instead of the Rolling/BucketingSink.
> You can see the StreamingFileSink as an evolution of the previous two.
>
> In the StreamingFileSink the files in Pending state are not renamed, but
> they keep their "*in-progress*" name. This is the reason why you do not see
> .pending files anymore.
>
> What Timothy said for bulk formats is correct. They only support
> "onCheckpoint" rolling policy.
>
> Now for the second issue about deployment, I would recommend to open a new
> thread so that people can see from the title if they can help or not.
> In addition, it is good to have the title indicating the content of the
> topic for the community. The mailing list is searchable by search engines,
> so if someone
> has a similar question, the title will help to retrieve the relevant
> thread.
>
> Cheers,
> Kostas
>
>
> On Thu, Feb 14, 2019 at 12:09 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Thanks Fabian,
>>
>>  more questions
>>
>> 1. I had on k8s standlone job
>> env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the
>> default. The job failed on chkpoint and I would have imagined that under HA
>> the job would restore from the last checkpoint but it did not ( The UI
>> showed the job had restarted without a restore . The state was wiped out
>> and the job was relaunched but with no state.
>>
>> 2. I had the inprogress files from that failed instance and that is
>> consistent with no restored state.
>>
>> Thus there are few  questions
>>
>> 1. In k8s and with stand alone job cluster, have we tested the scenerio
>> of the* container failing* ( the pod remained in tact ) and restore ?
>> In this case the pod remained up and running but it was definitely a clean
>> relaunch of the container the pod was executing.
>>
>>
>> 2. Did I have any configuration missing . given the below  ?
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(30 * 6);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
>> env.setRestartStrategy(fixedDelayRestart(4, 
>> org.apache.flink.api.common.time.Time.minutes(1)));
>> StateBackend stateBackEnd = new FsStateBackend(
>> new org.apache.flink.core.fs.Path(
>> ""));
>> env.setStateBackend(stateBackEnd);
>>
>>
>> 3. What is the nature of RollingFileSink ?  How does it enable exactly
>> once semantics ( or does it not . ) ?
>>
>> Any help will be appreciated.
>>
>> Regards.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske  wrote:
>>
>>> Hi Vishal,
>>>
>>> Kostas (in CC) should be able to help here.
>>>
>>> Best, Fabian
>>>
>>> Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <
>>> vishal.santo...@gmail.com>:
>>>
 Any one ?

 On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> You don't have to. Thank you for the input.
>
> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor 
> wrote:
>
>> My apologies for not seeing your use case properly.   The constraint
>> on rolling policy is only applicable for bulk formats such as Parquet as
>> highlighted in the docs.
>>
>> As for your questions, I'll have to defer to others more familiar
>> with it.   I mostly just use bulk formats such as avro and parquet.
>>
>> Tim
>>
>>
>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <
>> vishal.santo...@gmail.com wrote:
>>
>>> That said the in the DefaultRollingPolicy it seems the check is on
>>> the file size ( mimics the check shouldRollOnEVent()).
>>>
>>> I guess the question is
>>>
>>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
>>> thread ?
>>>
>>> Are the calls to the other 2 methods shouldRollOnEVent and
>>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Thanks for the quick reply.

 I am confused. If this was a more full featured BucketingSink ,I
 would imagine that  based on shouldRollOnEvent and shouldRollOnEvent,
 an in progress file could go into pending phase and on checkpoint
 the pending part file would be  finalized. For exactly once any files 
 ( in
 progress file ) will have a length of the file  snapshotted to the
 checkpoint  and used to truncate the file ( if supported ) or dropped 
 as a
 part-length file ( if truncate not supported )  if a resume from a

Re: fllink 1.7.1 and RollingFileSink

2019-02-14 Thread Kostas Kloudas
Hi Vishal,

For the StreamingFileSink vs Rolling/BucketingSink:
 - you can use the StreamingFileSink instead of the Rolling/BucketingSink.
You can see the StreamingFileSink as an evolution of the previous two.

In the StreamingFileSink the files in Pending state are not renamed, but
they keep their "*in-progress*" name. This is the reason why you do not see
.pending files anymore.

What Timothy said for bulk formats is correct. They only support
"onCheckpoint" rolling policy.

Now for the second issue about deployment, I would recommend to open a new
thread so that people can see from the title if they can help or not.
In addition, it is good to have the title indicating the content of the
topic for the community. The mailing list is searchable by search engines,
so if someone
has a similar question, the title will help to retrieve the relevant thread.

Cheers,
Kostas


On Thu, Feb 14, 2019 at 12:09 PM Vishal Santoshi 
wrote:

> Thanks Fabian,
>
>  more questions
>
> 1. I had on k8s standlone job
> env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the
> default. The job failed on chkpoint and I would have imagined that under HA
> the job would restore from the last checkpoint but it did not ( The UI
> showed the job had restarted without a restore . The state was wiped out
> and the job was relaunched but with no state.
>
> 2. I had the inprogress files from that failed instance and that is
> consistent with no restored state.
>
> Thus there are few  questions
>
> 1. In k8s and with stand alone job cluster, have we tested the scenerio of
> the* container failing* ( the pod remained in tact ) and restore ?  In
> this case the pod remained up and running but it was definitely a clean
> relaunch of the container the pod was executing.
>
>
> 2. Did I have any configuration missing . given the below  ?
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(30 * 6);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
> env.setRestartStrategy(fixedDelayRestart(4, 
> org.apache.flink.api.common.time.Time.minutes(1)));
> StateBackend stateBackEnd = new FsStateBackend(
> new org.apache.flink.core.fs.Path(
> ""));
> env.setStateBackend(stateBackEnd);
>
>
> 3. What is the nature of RollingFileSink ?  How does it enable exactly
> once semantics ( or does it not . ) ?
>
> Any help will be appreciated.
>
> Regards.
>
>
>
>
>
>
>
>
>
> On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske  wrote:
>
>> Hi Vishal,
>>
>> Kostas (in CC) should be able to help here.
>>
>> Best, Fabian
>>
>> Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <
>> vishal.santo...@gmail.com>:
>>
>>> Any one ?
>>>
>>> On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 You don't have to. Thank you for the input.

 On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor 
 wrote:

> My apologies for not seeing your use case properly.   The constraint
> on rolling policy is only applicable for bulk formats such as Parquet as
> highlighted in the docs.
>
> As for your questions, I'll have to defer to others more familiar with
> it.   I mostly just use bulk formats such as avro and parquet.
>
> Tim
>
>
> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <
> vishal.santo...@gmail.com wrote:
>
>> That said the in the DefaultRollingPolicy it seems the check is on
>> the file size ( mimics the check shouldRollOnEVent()).
>>
>> I guess the question is
>>
>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
>> thread ?
>>
>> Are the calls to the other 2 methods shouldRollOnEVent and
>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>
>>
>>
>>
>>
>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Thanks for the quick reply.
>>>
>>> I am confused. If this was a more full featured BucketingSink ,I
>>> would imagine that  based on shouldRollOnEvent and shouldRollOnEvent,
>>> an in progress file could go into pending phase and on checkpoint
>>> the pending part file would be  finalized. For exactly once any files ( 
>>> in
>>> progress file ) will have a length of the file  snapshotted to the
>>> checkpoint  and used to truncate the file ( if supported ) or dropped 
>>> as a
>>> part-length file ( if truncate not supported )  if a resume from a
>>> checkpoint was to happen, to indicate what part of the the finalized 
>>> file (
>>> finalized when resumed ) was valid . and  I had always assumed ( and 
>>> there
>>> is no doc otherwise ) that shouldRollOnCheckpoint would be similar
>>> to the other 2 apart from the fact it does the rol

Re: Data loss when restoring from savepoint

2019-02-14 Thread Konstantin Knauf
Hi Juho,


* does the output of the streaming job contain any data, which is not
>> contained in the batch
>
>
> No.
>
> * do you know if all lost records are contained in the last savepoint you
>> took before the window fired? This would mean that no records are lost
>> after the last restore.
>
>
> I haven't built the tooling required to check all IDs like that, but yes,
> that's my understanding currently. To check that I would need to:
> - kill the stream only once on a given day (so that there's only one
> savepoint creation & restore)
> - next day or later: save all missing ids from batch output comparison
> - next day or later: read the savepoint with bravo & check that it
> contains all of those missing IDs
>
> However I haven't built the tooling for that yet. Do you think it's
> necessary to verify that this assumption holds?
>

It would be another data point and might help us to track down the problem.
Wether it is worth doing it, depends on the result, i.e. wether the current
assumption would be falsified or not, but we only know that in retrospect ;)


> * could you please check the numRecordsOut metric for the WindowOperator
>> (FlinkUI -> TaskMetrics -> Select TaskChain containing WindowOperator ->
>> find metric)? Is the count reported there correct (no missing data)?
>
>
> Is that metric the result of window trigger? If yes, you must mean that I
> check the value of that metric on the next day after restore, so that it
> only contains the count for the output of previous day's window? The
> counter is reset to 0 when job starts (even when state is restored), right?
>

Yes, this metric would be incremented when the window is triggered. Yes,
please check this metric after the window, during which the restore
happened, is fired.

If you don't have a MetricsReporter configured so far, I recommend to
quickly register a Sl4jReporter to log out all metrics every X seconds
(maybe even minutes for your use case):
https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter.
Then you don't need to go trough the WebUI and can keep a history of the
metrics.


> Otherwise, do you have any suggestions for how to instrument the code to
> narrow down further where the data gets lost? To me it would make sense to
> proceed with this, because the problem seems hard to reproduce outside of
> our environment.
>

Let's focus on checking this metric above, to make sure that the
WindowOperator is actually emitting less records than the overall number of
keys in the state as your experiments suggest, and on sharing the code.


> On Thu, Feb 14, 2019 at 10:57 AM Konstantin Knauf <
> konstan...@ververica.com> wrote:
>
>> Hi Juho,
>>
>> you are right the problem has actually been narrowed down quite a bit
>> over time. Nevertheless, sharing the code (incl. flink-conf.yaml) might be
>> a good idea. Maybe something strikes the eye, that we have not thought
>> about so far. If you don't feel comfortable sharing the code on the ML,
>> feel free to send me a PM.
>>
>> Besides that, three more questions:
>>
>> * does the output of the streaming job contain any data, which is not
>> contained in the batch output?
>> * do you know if all lost records are contained in the last savepoint you
>> took before the window fired? This would mean that no records are lost
>> after the last restore.
>> * could you please check the numRecordsOut metric for the WindowOperator
>> (FlinkUI -> TaskMetrics -> Select TaskChain containing WindowOperator ->
>> find metric)? Is the count reported there correct (no missing data)?
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>>
>>
>> On Wed, Feb 13, 2019 at 3:19 PM Gyula Fóra  wrote:
>>
>>> Sorry not posting on the mail list was my mistake :/
>>>
>>>
>>> On Wed, 13 Feb 2019 at 15:01, Juho Autio  wrote:
>>>
 Thanks for stepping in, did you post outside of the mailing list on
 purpose btw?

 This I did long time ago:

 To rule out for good any questions about sink behaviour, the job was
> killed and started with an additional Kafka sink.
> The same number of ids were missed in both outputs: KafkaSink &
> BucketingSink.


 (I wrote about that On Oct 1, 2018 in this email thread)

 After that I did the savepoint analysis with Bravo.

 Currently I'm indeed trying to get suggestions how to debug further,
 for example, where to add additional kafka output, to catch where the data
 gets lost. That would probably be somewhere in Flink's internals.

 I could try to share the full code also, but IMHO the problem has been
 quite well narrowed down, considering that data can be found in savepoint,
 savepoint is successfully restored, and after restoring the data doesn't go
 to "user code" (like the reducer) any more.

 On Wed, Feb 13, 2019 at 3:47 PM Gyula Fóra 
 wrote:

> Hi Juho!
> I think the reason you are not getting much answers h

Flink DataStream: A few dates are getting through very slowly

2019-02-14 Thread Marke Builder
Hi,

I'm using a simply streaming app with processing time and without states.
The app read from kafka, transform the data and write the data to the
storage (redis).
But I see an interesting behavior, a few dates are getting through very
slowly.
Do you have any idea why this could be?

Best,

Marke


Re: Re: Re: How to clear state immediately after a keyed window is processed?

2019-02-14 Thread Kumar Bolar, Harshith
Thanks a lot :D

From: Konstantin Knauf 
Date: Thursday, 14 February 2019 at 5:38 PM
To: Harshith Kumar Bolar , user 
Subject: [External] Re: Re: How to clear state immediately after a keyed window 
is processed?

Yes, for processing-time windows the clean up time is exactly the end time of 
the window, because by definition there is no late data and state does not need 
to be kept around.

On Thu, Feb 14, 2019 at 1:03 PM Kumar Bolar, Harshith 
mailto:hk...@arity.com>> wrote:
Thanks Konstanin,

But I’m using processing time, hence no watermarks. Will the state still be 
cleared automatically if nothing is done?

From: Konstantin Knauf 
mailto:konstan...@ververica.com>>
Date: Thursday, 14 February 2019 at 5:18 PM
To: Harshith Kumar Bolar mailto:hk...@arity.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: [External] Re: How to clear state immediately after a keyed window is 
processed?

Hi Harshith,

when you use Flink's Windowing API, the state of an event time window is 
cleared once the watermark passes the end time of the window (that's when the 
window fires) + the allowed lateness. So, as long as you don't configure 
additional allowed lateness (default=0), Flink will already behave as described 
by default.

Cheers,

Konstantin

On Thu, Feb 14, 2019 at 12:03 PM Kumar Bolar, Harshith 
mailto:hk...@arity.com>> wrote:
Hi all,

My application uses a keyed window that is keyed by a function of timestamp. 
This means once that particular window has been fired and processed, there is 
no use in keeping that key active because there is no way that particular key 
will appear again. Because this use case involves continuously expanding keys, 
I want to clear the state of a key as soon as it is finished processing without 
having to configure timers.

Is this something that can be achieved in the evictor method or apply method 
after each keyed window is done processing?

Thanks,
Harshith




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Error! Filename not 
specified.


Follow us @VervericaData

--

Join Flink 
Forward
 - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


[mage removed by 
sender.]


Follow us @VervericaData

--

Join Flink 
Forward
 - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Re: How to clear state immediately after a keyed window is processed?

2019-02-14 Thread Konstantin Knauf
Yes, for processing-time windows the clean up time is exactly the end time
of the window, because by definition there is no late data and state does
not need to be kept around.

On Thu, Feb 14, 2019 at 1:03 PM Kumar Bolar, Harshith 
wrote:

> Thanks Konstanin,
>
>
>
> But I’m using processing time, hence no watermarks. Will the state still
> be cleared automatically if nothing is done?
>
>
>
> *From: *Konstantin Knauf 
> *Date: *Thursday, 14 February 2019 at 5:18 PM
> *To: *Harshith Kumar Bolar 
> *Cc: *"user@flink.apache.org" 
> *Subject: *[External] Re: How to clear state immediately after a keyed
> window is processed?
>
>
>
> Hi Harshith,
>
>
>
> when you use Flink's Windowing API, the state of an event time window is
> cleared once the watermark passes the end time of the window (that's when
> the window fires) + the allowed lateness. So, as long as you don't
> configure additional allowed lateness (default=0), Flink will already
> behave as described by default.
>
>
>
> Cheers,
>
>
>
> Konstantin
>
>
>
> On Thu, Feb 14, 2019 at 12:03 PM Kumar Bolar, Harshith 
> wrote:
>
> Hi all,
>
>
>
> My application uses a keyed window that is keyed by a function of
> timestamp. This means once that particular window has been fired and
> processed, there is no use in keeping that key active because there is no
> way that particular key will appear again. Because this use case involves
> continuously expanding keys, I want to clear the state of a key as soon as
> it is finished processing without having to configure timers.
>
>
>
> Is this something that can be achieved in the evictor method
> or apply method after each keyed window is done processing?
>
>
>
> Thanks,
>
> Harshith
>
>
>
>
>
>
>
> --
>
> *Konstantin Knauf* | Solutions Architect
>
> +49 160 91394525
>
>
>
> [image: mage removed by sender.]
> 
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward
> 
> - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Data loss when restoring from savepoint

2019-02-14 Thread Juho Autio
Thanks Konstantin!

I'll try to see if I can prepare code & conf to be shared as fully as
possible.

In the meantime:

* does the output of the streaming job contain any data, which is not
> contained in the batch


No.

* do you know if all lost records are contained in the last savepoint you
> took before the window fired? This would mean that no records are lost
> after the last restore.


I haven't built the tooling required to check all IDs like that, but yes,
that's my understanding currently. To check that I would need to:
- kill the stream only once on a given day (so that there's only one
savepoint creation & restore)
- next day or later: save all missing ids from batch output comparison
- next day or later: read the savepoint with bravo & check that it contains
all of those missing IDs

However I haven't built the tooling for that yet. Do you think it's
necessary to verify that this assumption holds?

* could you please check the numRecordsOut metric for the WindowOperator
> (FlinkUI -> TaskMetrics -> Select TaskChain containing WindowOperator ->
> find metric)? Is the count reported there correct (no missing data)?


Is that metric the result of window trigger? If yes, you must mean that I
check the value of that metric on the next day after restore, so that it
only contains the count for the output of previous day's window? The
counter is reset to 0 when job starts (even when state is restored), right?

Otherwise, do you have any suggestions for how to instrument the code to
narrow down further where the data gets lost? To me it would make sense to
proceed with this, because the problem seems hard to reproduce outside of
our environment.

On Thu, Feb 14, 2019 at 10:57 AM Konstantin Knauf 
wrote:

> Hi Juho,
>
> you are right the problem has actually been narrowed down quite a bit over
> time. Nevertheless, sharing the code (incl. flink-conf.yaml) might be a
> good idea. Maybe something strikes the eye, that we have not thought about
> so far. If you don't feel comfortable sharing the code on the ML, feel free
> to send me a PM.
>
> Besides that, three more questions:
>
> * does the output of the streaming job contain any data, which is not
> contained in the batch output?
> * do you know if all lost records are contained in the last savepoint you
> took before the window fired? This would mean that no records are lost
> after the last restore.
> * could you please check the numRecordsOut metric for the WindowOperator
> (FlinkUI -> TaskMetrics -> Select TaskChain containing WindowOperator ->
> find metric)? Is the count reported there correct (no missing data)?
>
> Cheers,
>
> Konstantin
>
>
>
>
> On Wed, Feb 13, 2019 at 3:19 PM Gyula Fóra  wrote:
>
>> Sorry not posting on the mail list was my mistake :/
>>
>>
>> On Wed, 13 Feb 2019 at 15:01, Juho Autio  wrote:
>>
>>> Thanks for stepping in, did you post outside of the mailing list on
>>> purpose btw?
>>>
>>> This I did long time ago:
>>>
>>> To rule out for good any questions about sink behaviour, the job was
 killed and started with an additional Kafka sink.
 The same number of ids were missed in both outputs: KafkaSink &
 BucketingSink.
>>>
>>>
>>> (I wrote about that On Oct 1, 2018 in this email thread)
>>>
>>> After that I did the savepoint analysis with Bravo.
>>>
>>> Currently I'm indeed trying to get suggestions how to debug further, for
>>> example, where to add additional kafka output, to catch where the data gets
>>> lost. That would probably be somewhere in Flink's internals.
>>>
>>> I could try to share the full code also, but IMHO the problem has been
>>> quite well narrowed down, considering that data can be found in savepoint,
>>> savepoint is successfully restored, and after restoring the data doesn't go
>>> to "user code" (like the reducer) any more.
>>>
>>> On Wed, Feb 13, 2019 at 3:47 PM Gyula Fóra  wrote:
>>>
 Hi Juho!
 I think the reason you are not getting much answers here is because it
 is very hard to debug this problem remotely.
 Seemingly you do very normal operations, the state contains all the
 required data and nobody else has hit a similar problem for ages.

 My best guess would be some bug with the deduplication or output
 writing logic but without a complete code example its very hard to say
 anything useful.
 Did you try writing it to Kafka to see if the output is there? (that
 way we could rule out the dedup probllem)

 Cheers,
 Gyula

 On Wed, Feb 13, 2019 at 2:37 PM Juho Autio 
 wrote:

> Stefan (or anyone!), please, could I have some feedback on the
> findings that I reported on Dec 21, 2018? This is still a major blocker..
>
> On Thu, Jan 31, 2019 at 11:46 AM Juho Autio 
> wrote:
>
>> Hello, is there anyone that could help with this?
>>
>> On Fri, Jan 11, 2019 at 8:14 AM Juho Autio 
>> wrote:
>>
>>> Stefan, would you have time to comment?
>>>
>>> On Wednesday, January 

Re: How to clear state immediately after a keyed window is processed?

2019-02-14 Thread Konstantin Knauf
Hi Harshith,

when you use Flink's Windowing API, the state of an event time window is
cleared once the watermark passes the end time of the window (that's when
the window fires) + the allowed lateness. So, as long as you don't
configure additional allowed lateness (default=0), Flink will already
behave as described by default.

Cheers,

Konstantin

On Thu, Feb 14, 2019 at 12:03 PM Kumar Bolar, Harshith 
wrote:

> Hi all,
>
>
>
> My application uses a keyed window that is keyed by a function of
> timestamp. This means once that particular window has been fired and
> processed, there is no use in keeping that key active because there is no
> way that particular key will appear again. Because this use case involves
> continuously expanding keys, I want to clear the state of a key as soon as
> it is finished processing without having to configure timers.
>
>
>
> Is this something that can be achieved in the evictor method
> or apply method after each keyed window is done processing?
>
>
>
> Thanks,
>
> Harshith
>
>
>
>
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: fllink 1.7.1 and RollingFileSink

2019-02-14 Thread Vishal Santoshi
Thanks Fabian,

 more questions

1. I had on k8s standlone job
env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the default.
The job failed on chkpoint and I would have imagined that under HA the job
would restore from the last checkpoint but it did not ( The UI showed the
job had restarted without a restore . The state was wiped out and the job
was relaunched but with no state.

2. I had the inprogress files from that failed instance and that is
consistent with no restored state.

Thus there are few  questions

1. In k8s and with stand alone job cluster, have we tested the scenerio of
the* container failing* ( the pod remained in tact ) and restore ?  In this
case the pod remained up and running but it was definitely a clean relaunch
of the container the pod was executing.


2. Did I have any configuration missing . given the below  ?

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30 * 6);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
env.setRestartStrategy(fixedDelayRestart(4,
org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new FsStateBackend(
new org.apache.flink.core.fs.Path(
""));
env.setStateBackend(stateBackEnd);


3. What is the nature of RollingFileSink ?  How does it enable exactly once
semantics ( or does it not . ) ?

Any help will be appreciated.

Regards.









On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske  wrote:

> Hi Vishal,
>
> Kostas (in CC) should be able to help here.
>
> Best, Fabian
>
> Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <
> vishal.santo...@gmail.com>:
>
>> Any one ?
>>
>> On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> You don't have to. Thank you for the input.
>>>
>>> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor 
>>> wrote:
>>>
 My apologies for not seeing your use case properly.   The constraint on
 rolling policy is only applicable for bulk formats such as Parquet as
 highlighted in the docs.

 As for your questions, I'll have to defer to others more familiar with
 it.   I mostly just use bulk formats such as avro and parquet.

 Tim


 On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <
 vishal.santo...@gmail.com wrote:

> That said the in the DefaultRollingPolicy it seems the check is on the
> file size ( mimics the check shouldRollOnEVent()).
>
> I guess the question is
>
> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
> thread ?
>
> Are the calls to the other 2 methods shouldRollOnEVent and
> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>
>
>
>
>
> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Thanks for the quick reply.
>>
>> I am confused. If this was a more full featured BucketingSink ,I
>> would imagine that  based on shouldRollOnEvent and shouldRollOnEvent,
>> an in progress file could go into pending phase and on checkpoint the
>> pending part file would be  finalized. For exactly once any files ( in
>> progress file ) will have a length of the file  snapshotted to the
>> checkpoint  and used to truncate the file ( if supported ) or dropped as 
>> a
>> part-length file ( if truncate not supported )  if a resume from a
>> checkpoint was to happen, to indicate what part of the the finalized 
>> file (
>> finalized when resumed ) was valid . and  I had always assumed ( and 
>> there
>> is no doc otherwise ) that shouldRollOnCheckpoint would be similar
>> to the other 2 apart from the fact it does the roll and finalize step in 
>> a
>> single step on a checkpoint.
>>
>>
>> Am I better off using BucketingSink ?  When to use BucketingSink and
>> when to use RollingSink is not clear at all, even though at the surface 
>> it
>> sure looks RollingSink is a better version of .BucketingSink ( or not )
>>
>> Regards.
>>
>>
>>
>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor 
>> wrote:
>>
>>> I think the only rolling policy that can be used is
>>> CheckpointRollingPolicy to ensure exactly once.
>>>
>>> Tim
>>>
>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com wrote:
>>>
 Can StreamingFileSink be used instead of 
 https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
  even though it looks it could.


 This code for example


 StreamingFileSink
 .forRowFormat(new Path(PATH),
 new SimpleStringEn

How to clear state immediately after a keyed window is processed?

2019-02-14 Thread Kumar Bolar, Harshith
Hi all,

My application uses a keyed window that is keyed by a function of timestamp. 
This means once that particular window has been fired and processed, there is 
no use in keeping that key active because there is no way that particular key 
will appear again. Because this use case involves continuously expanding keys, 
I want to clear the state of a key as soon as it is finished processing without 
having to configure timers.

Is this something that can be achieved in the evictor method or apply method 
after each keyed window is done processing?

Thanks,
Harshith




Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread Stephan Ewen
I think the website is better as well.

I agree with Fabian that the wiki is not so visible, and visibility is the
main motivation.
This type of roadmap overview would not be updated by everyone - letting
committers update the roadmap means the listed threads are actually
happening at the moment.


On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske  wrote:

> Hi,
>
> I like the idea of putting the roadmap on the website because it is much
> more visible (and IMO more credible, obligatory) there.
> However, I share the concerns about frequent updates.
>
> It think it would be great to update the "official" roadmap on the website
> once per release (-bugfix releases), i.e., every three month.
> We can use the wiki to collect and draft the roadmap for the next update.
>
> Best, Fabian
>
>
> Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang :
>
>> Hi Stephan,
>>
>> Thanks for this proposal. It is a good idea to track the roadmap. One
>> suggestion is that it might be better to put it into wiki page first.
>> Because it is easier to update the roadmap on wiki compared to on flink web
>> site. And I guess we may need to update the roadmap very often at the
>> beginning as there's so many discussions and proposals in community
>> recently. We can move it into flink web site later when we feel it could be
>> nailed down.
>>
>> Stephan Ewen  于2019年2月14日周四 下午5:44写道:
>>
>>> Thanks Jincheng and Rong Rong!
>>>
>>> I am not deciding a roadmap and making a call on what features should be
>>> developed or not. I was only collecting broader issues that are already
>>> happening or have an active FLIP/design discussion plus committer support.
>>>
>>> Do we have that for the suggested issues as well? If yes , we can add
>>> them (can you point me to the issue/mail-thread), if not, let's try and
>>> move the discussion forward and add them to the roadmap overview then.
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Wed, Feb 13, 2019 at 6:47 PM Rong Rong  wrote:
>>>
 Thanks Stephan for the great proposal.

 This would not only be beneficial for new users but also for
 contributors to keep track on all upcoming features.

 I think that better window operator support can also be separately
 group into its own category, as they affects both future DataStream API and
 batch stream unification.
 can we also include:
 - OVER aggregate for DataStream API separately as @jincheng suggested.
 - Improving sliding window operator [1]

 One more additional suggestion, can we also include a more extendable
 security module [2,3] @shuyi and I are currently working on?
 This will significantly improve the usability for Flink in corporate
 environments where proprietary or 3rd-party security integration is needed.

 Thanks,
 Rong


 [1]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
 [2]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
 [3]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html




 On Wed, Feb 13, 2019 at 3:39 AM jincheng sun 
 wrote:

> Very excited and thank you for launching such a great discussion,
> Stephan !
>
> Here only a little suggestion that in the Batch Streaming Unification
> section, do we need to add an item:
>
> - Same window operators on bounded/unbounded Table API and DataStream
> API
> (currently OVER window only exists in SQL/TableAPI, DataStream API
> does not yet support)
>
> Best,
> Jincheng
>
> Stephan Ewen  于2019年2月13日周三 下午7:21写道:
>
>> Hi all!
>>
>> Recently several contributors, committers, and users asked about
>> making it more visible in which way the project is currently going.
>>
>> Users and developers can track the direction by following the
>> discussion threads and JIRA, but due to the mass of discussions and open
>> issues, it is very hard to get a good overall picture.
>> Especially for new users and contributors, is is very hard to get a
>> quick overview of the project direction.
>>
>> To fix this, I suggest to add a brief roadmap summary to the
>> homepage. It is a bit of a commitment to keep that roadmap up to date, 
>> but
>> I think the benefit for users justifies that.
>> The Apache Beam project has added such a roadmap [1]
>> , which was received very well by
>> the community, I would suggest to follow a similar structure here.
>>
>> If the community is in favor of this, I would volunteer to write a
>> first version of such a roadmap. The points I would include are below.
>>
>> Best,
>> Stephan
>>
>> [1] https://beam.apache.org/roadmap/
>>
>> 

Re: Dataset statistics

2019-02-14 Thread Flavio Pompermaier
No effort in this direction, then?
I had a try using SQL on Table API but I fear that the generated plan is
not the optimal one..I'm looking for an efficient way to implement
describe() method on a table or dataset/datasource

On Fri, Feb 8, 2019 at 10:35 AM Flavio Pompermaier 
wrote:

> Hi to all,
> is there any effort to standardize descriptive statistics in Apache Flink?
> Is there any suggested way to achieve this?
>
> Best,
> Flavio
>


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread Fabian Hueske
Hi,

I like the idea of putting the roadmap on the website because it is much
more visible (and IMO more credible, obligatory) there.
However, I share the concerns about frequent updates.

It think it would be great to update the "official" roadmap on the website
once per release (-bugfix releases), i.e., every three month.
We can use the wiki to collect and draft the roadmap for the next update.

Best, Fabian


Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang :

> Hi Stephan,
>
> Thanks for this proposal. It is a good idea to track the roadmap. One
> suggestion is that it might be better to put it into wiki page first.
> Because it is easier to update the roadmap on wiki compared to on flink web
> site. And I guess we may need to update the roadmap very often at the
> beginning as there's so many discussions and proposals in community
> recently. We can move it into flink web site later when we feel it could be
> nailed down.
>
> Stephan Ewen  于2019年2月14日周四 下午5:44写道:
>
>> Thanks Jincheng and Rong Rong!
>>
>> I am not deciding a roadmap and making a call on what features should be
>> developed or not. I was only collecting broader issues that are already
>> happening or have an active FLIP/design discussion plus committer support.
>>
>> Do we have that for the suggested issues as well? If yes , we can add
>> them (can you point me to the issue/mail-thread), if not, let's try and
>> move the discussion forward and add them to the roadmap overview then.
>>
>> Best,
>> Stephan
>>
>>
>> On Wed, Feb 13, 2019 at 6:47 PM Rong Rong  wrote:
>>
>>> Thanks Stephan for the great proposal.
>>>
>>> This would not only be beneficial for new users but also for
>>> contributors to keep track on all upcoming features.
>>>
>>> I think that better window operator support can also be separately group
>>> into its own category, as they affects both future DataStream API and batch
>>> stream unification.
>>> can we also include:
>>> - OVER aggregate for DataStream API separately as @jincheng suggested.
>>> - Improving sliding window operator [1]
>>>
>>> One more additional suggestion, can we also include a more extendable
>>> security module [2,3] @shuyi and I are currently working on?
>>> This will significantly improve the usability for Flink in corporate
>>> environments where proprietary or 3rd-party security integration is needed.
>>>
>>> Thanks,
>>> Rong
>>>
>>>
>>> [1]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
>>> [3]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>>>
>>>
>>>
>>>
>>> On Wed, Feb 13, 2019 at 3:39 AM jincheng sun 
>>> wrote:
>>>
 Very excited and thank you for launching such a great discussion,
 Stephan !

 Here only a little suggestion that in the Batch Streaming Unification
 section, do we need to add an item:

 - Same window operators on bounded/unbounded Table API and DataStream
 API
 (currently OVER window only exists in SQL/TableAPI, DataStream API does
 not yet support)

 Best,
 Jincheng

 Stephan Ewen  于2019年2月13日周三 下午7:21写道:

> Hi all!
>
> Recently several contributors, committers, and users asked about
> making it more visible in which way the project is currently going.
>
> Users and developers can track the direction by following the
> discussion threads and JIRA, but due to the mass of discussions and open
> issues, it is very hard to get a good overall picture.
> Especially for new users and contributors, is is very hard to get a
> quick overview of the project direction.
>
> To fix this, I suggest to add a brief roadmap summary to the homepage.
> It is a bit of a commitment to keep that roadmap up to date, but I think
> the benefit for users justifies that.
> The Apache Beam project has added such a roadmap [1]
> , which was received very well by
> the community, I would suggest to follow a similar structure here.
>
> If the community is in favor of this, I would volunteer to write a
> first version of such a roadmap. The points I would include are below.
>
> Best,
> Stephan
>
> [1] https://beam.apache.org/roadmap/
>
> 
>
> Disclaimer: Apache Flink is not governed or steered by any one single
> entity, but by its community and Project Management Committee (PMC). This
> is not a authoritative roadmap in the sense of a plan with a specific
> timeline. Instead, we share our vision for the future and major 
> initiatives
> that are receiving attention and give users and contributors an
> understanding what they can look 

Re: Impact of occasional big pauses in stream processing

2019-02-14 Thread Andrey Zagrebin
Hi Ajay,

Technically, it will immediately block the thread of MyKeyedProcessFunction
subtask scheduled to some slot and basically block processing of the key
range assigned to this subtask.
Practically, I agree with Rong's answer. Depending on the topology of your
inputStream, it can eventually block a lot of stuff.
In general, I think, it is not recommended to perform blocking operations
in process record functions. You could consider AsyncIO [1] to unblock the
task thread.

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Thu, Feb 14, 2019 at 6:03 AM Rong Rong  wrote:

> Hi Ajay,
>
> Flink handles "backpressure" in a graceful way so that it doesn't get
> affected when your processing pipeline is occasionally slowed down.
> I think the following articles will help [1,2].
>
> In your specific case: the "KeyBy" operation will re-hash data so they can
> be reshuffled from all input consumers to all your process operators (in
> this case the MyKeyedProcessFunction). If one of the process operator is
> backpressured, it will back track all the way to the source.
> So, my understanding is that: since there's the reshuffling, if one of the
> process function is backpressured, it will potentially affect all the
> source operators.
>
> Thanks,
> Rong
>
> [1] https://www.ververica.com/blog/how-flink-handles-backpressure
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html
>
> On Wed, Feb 13, 2019 at 8:50 AM Aggarwal, Ajay 
> wrote:
>
>> I was wondering what is the impact if one of the stream operator function
>> occasionally takes too long to process the event.  Given the following
>> simple flink job
>>
>>
>>
>>inputStream
>>
>>   .KeyBy (“tenantId”)
>>
>>   .process ( new MyKeyedProcessFunction())
>>
>>
>>
>> , if occasionally MyKeyedProcessFunction takes too long (say ~5-10
>> minutes) to process an incoming element, what is the impact on overall
>> pipeline? Is the impact limited to
>>
>>1. Specific key for which MyKeyedProcessFunction is currently taking
>>too long to process an element, or
>>2. Specific Taskslot, where MyKeyedProcessFunction is currently
>>taking too long to process an element, i.e. impacting multiple keys, or
>>3. Entire inputstream ?
>>
>>
>>
>> Also what is the built in resiliency in these cases? Is there a concept
>> of timeout for each operator function?
>>
>>
>>
>> Ajay
>>
>


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread Jeff Zhang
Hi Stephan,

Thanks for this proposal. It is a good idea to track the roadmap. One
suggestion is that it might be better to put it into wiki page first.
Because it is easier to update the roadmap on wiki compared to on flink web
site. And I guess we may need to update the roadmap very often at the
beginning as there's so many discussions and proposals in community
recently. We can move it into flink web site later when we feel it could be
nailed down.

Stephan Ewen  于2019年2月14日周四 下午5:44写道:

> Thanks Jincheng and Rong Rong!
>
> I am not deciding a roadmap and making a call on what features should be
> developed or not. I was only collecting broader issues that are already
> happening or have an active FLIP/design discussion plus committer support.
>
> Do we have that for the suggested issues as well? If yes , we can add them
> (can you point me to the issue/mail-thread), if not, let's try and move the
> discussion forward and add them to the roadmap overview then.
>
> Best,
> Stephan
>
>
> On Wed, Feb 13, 2019 at 6:47 PM Rong Rong  wrote:
>
>> Thanks Stephan for the great proposal.
>>
>> This would not only be beneficial for new users but also for contributors
>> to keep track on all upcoming features.
>>
>> I think that better window operator support can also be separately group
>> into its own category, as they affects both future DataStream API and batch
>> stream unification.
>> can we also include:
>> - OVER aggregate for DataStream API separately as @jincheng suggested.
>> - Improving sliding window operator [1]
>>
>> One more additional suggestion, can we also include a more extendable
>> security module [2,3] @shuyi and I are currently working on?
>> This will significantly improve the usability for Flink in corporate
>> environments where proprietary or 3rd-party security integration is needed.
>>
>> Thanks,
>> Rong
>>
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>>
>>
>>
>>
>> On Wed, Feb 13, 2019 at 3:39 AM jincheng sun 
>> wrote:
>>
>>> Very excited and thank you for launching such a great discussion,
>>> Stephan !
>>>
>>> Here only a little suggestion that in the Batch Streaming Unification
>>> section, do we need to add an item:
>>>
>>> - Same window operators on bounded/unbounded Table API and DataStream
>>> API
>>> (currently OVER window only exists in SQL/TableAPI, DataStream API does
>>> not yet support)
>>>
>>> Best,
>>> Jincheng
>>>
>>> Stephan Ewen  于2019年2月13日周三 下午7:21写道:
>>>
 Hi all!

 Recently several contributors, committers, and users asked about making
 it more visible in which way the project is currently going.

 Users and developers can track the direction by following the
 discussion threads and JIRA, but due to the mass of discussions and open
 issues, it is very hard to get a good overall picture.
 Especially for new users and contributors, is is very hard to get a
 quick overview of the project direction.

 To fix this, I suggest to add a brief roadmap summary to the homepage.
 It is a bit of a commitment to keep that roadmap up to date, but I think
 the benefit for users justifies that.
 The Apache Beam project has added such a roadmap [1]
 , which was received very well by
 the community, I would suggest to follow a similar structure here.

 If the community is in favor of this, I would volunteer to write a
 first version of such a roadmap. The points I would include are below.

 Best,
 Stephan

 [1] https://beam.apache.org/roadmap/

 

 Disclaimer: Apache Flink is not governed or steered by any one single
 entity, but by its community and Project Management Committee (PMC). This
 is not a authoritative roadmap in the sense of a plan with a specific
 timeline. Instead, we share our vision for the future and major initiatives
 that are receiving attention and give users and contributors an
 understanding what they can look forward to.

 *Future Role of Table API and DataStream API*
   - Table API becomes first class citizen
   - Table API becomes primary API for analytics use cases
   * Declarative, automatic optimizations
   * No manual control over state and timers
   - DataStream API becomes primary API for applications and data
 pipeline use cases
   * Physical, user controls data types, no magic or optimizer
   * Explicit control over state and time

 *Batch Streaming Unification*
   - Table API unification (environments) (FLIP-32)
   - 

Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread Stephan Ewen
Thanks Jincheng and Rong Rong!

I am not deciding a roadmap and making a call on what features should be
developed or not. I was only collecting broader issues that are already
happening or have an active FLIP/design discussion plus committer support.

Do we have that for the suggested issues as well? If yes , we can add them
(can you point me to the issue/mail-thread), if not, let's try and move the
discussion forward and add them to the roadmap overview then.

Best,
Stephan


On Wed, Feb 13, 2019 at 6:47 PM Rong Rong  wrote:

> Thanks Stephan for the great proposal.
>
> This would not only be beneficial for new users but also for contributors
> to keep track on all upcoming features.
>
> I think that better window operator support can also be separately group
> into its own category, as they affects both future DataStream API and batch
> stream unification.
> can we also include:
> - OVER aggregate for DataStream API separately as @jincheng suggested.
> - Improving sliding window operator [1]
>
> One more additional suggestion, can we also include a more extendable
> security module [2,3] @shuyi and I are currently working on?
> This will significantly improve the usability for Flink in corporate
> environments where proprietary or 3rd-party security integration is needed.
>
> Thanks,
> Rong
>
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>
>
>
>
> On Wed, Feb 13, 2019 at 3:39 AM jincheng sun 
> wrote:
>
>> Very excited and thank you for launching such a great discussion, Stephan
>> !
>>
>> Here only a little suggestion that in the Batch Streaming Unification
>> section, do we need to add an item:
>>
>> - Same window operators on bounded/unbounded Table API and DataStream API
>> (currently OVER window only exists in SQL/TableAPI, DataStream API does
>> not yet support)
>>
>> Best,
>> Jincheng
>>
>> Stephan Ewen  于2019年2月13日周三 下午7:21写道:
>>
>>> Hi all!
>>>
>>> Recently several contributors, committers, and users asked about making
>>> it more visible in which way the project is currently going.
>>>
>>> Users and developers can track the direction by following the discussion
>>> threads and JIRA, but due to the mass of discussions and open issues, it is
>>> very hard to get a good overall picture.
>>> Especially for new users and contributors, is is very hard to get a
>>> quick overview of the project direction.
>>>
>>> To fix this, I suggest to add a brief roadmap summary to the homepage.
>>> It is a bit of a commitment to keep that roadmap up to date, but I think
>>> the benefit for users justifies that.
>>> The Apache Beam project has added such a roadmap [1]
>>> , which was received very well by the
>>> community, I would suggest to follow a similar structure here.
>>>
>>> If the community is in favor of this, I would volunteer to write a first
>>> version of such a roadmap. The points I would include are below.
>>>
>>> Best,
>>> Stephan
>>>
>>> [1] https://beam.apache.org/roadmap/
>>>
>>> 
>>>
>>> Disclaimer: Apache Flink is not governed or steered by any one single
>>> entity, but by its community and Project Management Committee (PMC). This
>>> is not a authoritative roadmap in the sense of a plan with a specific
>>> timeline. Instead, we share our vision for the future and major initiatives
>>> that are receiving attention and give users and contributors an
>>> understanding what they can look forward to.
>>>
>>> *Future Role of Table API and DataStream API*
>>>   - Table API becomes first class citizen
>>>   - Table API becomes primary API for analytics use cases
>>>   * Declarative, automatic optimizations
>>>   * No manual control over state and timers
>>>   - DataStream API becomes primary API for applications and data
>>> pipeline use cases
>>>   * Physical, user controls data types, no magic or optimizer
>>>   * Explicit control over state and time
>>>
>>> *Batch Streaming Unification*
>>>   - Table API unification (environments) (FLIP-32)
>>>   - New unified source interface (FLIP-27)
>>>   - Runtime operator unification & code reuse between DataStream / Table
>>>   - Extending Table API to make it convenient API for all analytical use
>>> cases (easier mix in of UDFs)
>>>   - Same join operators on bounded/unbounded Table API and DataStream API
>>>
>>> *Faster Batch (Bounded Streams)*
>>>   - Much of this comes via Blink contribution/merging
>>>   - Fine-grained Fault Tolerance on bounded data (Table API)
>>>   - Batch Scheduling on bounded data (Table API)
>>>   - External Shuffle Services Support on bounded streams
>>>   - Caching of intermediate results 

Re: Data loss when restoring from savepoint

2019-02-14 Thread Konstantin Knauf
Hi Juho,

you are right the problem has actually been narrowed down quite a bit over
time. Nevertheless, sharing the code (incl. flink-conf.yaml) might be a
good idea. Maybe something strikes the eye, that we have not thought about
so far. If you don't feel comfortable sharing the code on the ML, feel free
to send me a PM.

Besides that, three more questions:

* does the output of the streaming job contain any data, which is not
contained in the batch output?
* do you know if all lost records are contained in the last savepoint you
took before the window fired? This would mean that no records are lost
after the last restore.
* could you please check the numRecordsOut metric for the WindowOperator
(FlinkUI -> TaskMetrics -> Select TaskChain containing WindowOperator ->
find metric)? Is the count reported there correct (no missing data)?

Cheers,

Konstantin




On Wed, Feb 13, 2019 at 3:19 PM Gyula Fóra  wrote:

> Sorry not posting on the mail list was my mistake :/
>
>
> On Wed, 13 Feb 2019 at 15:01, Juho Autio  wrote:
>
>> Thanks for stepping in, did you post outside of the mailing list on
>> purpose btw?
>>
>> This I did long time ago:
>>
>> To rule out for good any questions about sink behaviour, the job was
>>> killed and started with an additional Kafka sink.
>>> The same number of ids were missed in both outputs: KafkaSink &
>>> BucketingSink.
>>
>>
>> (I wrote about that On Oct 1, 2018 in this email thread)
>>
>> After that I did the savepoint analysis with Bravo.
>>
>> Currently I'm indeed trying to get suggestions how to debug further, for
>> example, where to add additional kafka output, to catch where the data gets
>> lost. That would probably be somewhere in Flink's internals.
>>
>> I could try to share the full code also, but IMHO the problem has been
>> quite well narrowed down, considering that data can be found in savepoint,
>> savepoint is successfully restored, and after restoring the data doesn't go
>> to "user code" (like the reducer) any more.
>>
>> On Wed, Feb 13, 2019 at 3:47 PM Gyula Fóra  wrote:
>>
>>> Hi Juho!
>>> I think the reason you are not getting much answers here is because it
>>> is very hard to debug this problem remotely.
>>> Seemingly you do very normal operations, the state contains all the
>>> required data and nobody else has hit a similar problem for ages.
>>>
>>> My best guess would be some bug with the deduplication or output writing
>>> logic but without a complete code example its very hard to say anything
>>> useful.
>>> Did you try writing it to Kafka to see if the output is there? (that way
>>> we could rule out the dedup probllem)
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Wed, Feb 13, 2019 at 2:37 PM Juho Autio  wrote:
>>>
 Stefan (or anyone!), please, could I have some feedback on the findings
 that I reported on Dec 21, 2018? This is still a major blocker..

 On Thu, Jan 31, 2019 at 11:46 AM Juho Autio 
 wrote:

> Hello, is there anyone that could help with this?
>
> On Fri, Jan 11, 2019 at 8:14 AM Juho Autio 
> wrote:
>
>> Stefan, would you have time to comment?
>>
>> On Wednesday, January 2, 2019, Juho Autio 
>> wrote:
>>
>>> Bump – does anyone know if Stefan will be available to comment the
>>> latest findings? Thanks.
>>>
>>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio 
>>> wrote:
>>>
 Stefan, I managed to analyze savepoint with bravo. It seems that
 the data that's missing from output *is* found in savepoint.

 I simplified my test case to the following:

 - job 1 has bee running for ~10 days
 - savepoint X created & job 1 cancelled
 - job 2 started with restore from savepoint X

 Then I waited until the next day so that job 2 has triggered the 24
 hour window.

 Then I analyzed the output & savepoint:

 - compare job 2 output with the output of a batch pyspark script =>
 find 4223 missing rows
 - pick one of the missing rows (say, id Z)
 - read savepoint X with bravo, filter for id Z => Z was found in
 the savepoint!

 How can it be possible that the value is in state but doesn't end
 up in output after state has been restored & window is eventually 
 triggered?

 I also did similar analysis on the previous case where I
 savepointed & restored the job multiple times (5) within the same 
 24-hour
 window. A missing id that I drilled down to, was found in all of those
 savepoints, yet missing from the output that gets written at the end 
 of the
 day. This is even more surprising: that the missing ID was written to 
 the
 new savepoints also after restoring. Is the reducer state somehow 
 decoupled
 from the window contents?

 Big thanks to bravo-developer Gyula for guiding me thro

Re: Broadcast state before events stream consumption

2019-02-14 Thread Konstantin Knauf
Hi Chirag,

Broadcast state is checkpointed, hence the savepoint would contain it.

Best,

Konstantin

On Wed, Feb 13, 2019 at 4:04 PM Chirag Dewan 
wrote:

> Hi Konstantin,
>
> For the second solution, would savepoint persist the Broadcast state in
> State backend? Because I am aware that Broadcast state is not checkpointed.
>
> Is that correct?
>
> Thanks,
>
> Chirag
>
> Sent from Yahoo Mail on Android
> 
>
> On Mon, 11 Feb 2019 at 2:39 PM, Konstantin Knauf
>  wrote:
> Hi Chirag, Hi Vadim,
>
> from the top of my head, I see two options here:
>
> * Buffer the "fast" stream inside the KeyedBroadcastProcessFunction until
> relevant (whatever this means in your use case) broadcast events have
> arrived. Advantage: operationally easy, events are emitted as early as
> possible. Disadvantage: state size might become very large, depending on
> the nature of the broadcast stream it might be hard to know, when the
> "relevant broadcast events have arrived".
>
> * Start your job and only consume the broadcast stream (by configuration).
> Once the stream is "fully processed", i.e. has caught up, take a savepoint.
> Finally, start the job from this savepoint with the correct "fast" stream.
> There is a small race condition between taking the savepoint and restarting
> the job, which might matter (or not) depending on your use case.
>
> This topic is related to event-time alignment in sources, which has been
> actively discussed in the community in the past and we might be able to
> solve this in a similar way in the future.
>
> Cheers,
>
> Konstantin
>
> On Fri, Feb 8, 2019 at 5:48 PM Chirag Dewan 
> wrote:
>
> Hi Vadim,
>
> I would be interested in this too.
>
> Presently, I have to read my lookup source in the *open *method and keep
> it in a cache. By doing that I cannot make use of the broadcast state until
> ofcourse the first emit comes on the *Broadcast *stream.
>
> The problem with waiting the event stream is the lack of knowledge that I
> have read all the data from the lookup source. There is no possibility of
> having a special marker in the data as well for my use case.
>
> So pre loading the data seems to be the only option right now.
>
> Thanks,
>
> Chirag
>
>
>
> On Friday, 8 February, 2019, 7:45:37 pm IST, Vadim Vararu <
> vadim.var...@adswizz.com> wrote:
>
>
> Hi all,
>
> I need to use the broadcast state mechanism (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html)
> for the next scenario.
>
> I have a reference data stream (slow) and an events stream (fast running)
> and I want to do a kind of lookup in the reference stream for each
> event. The broadcast state mechanism seems to fit perfect the scenario.
>
> From documentation:
> *As an example where broadcast state can emerge as a natural fit, one can
> imagine a low-throughput stream containing a set of rules which we want to
> evaluate against all elements coming from another stream.*
>
> However, I am not sure what is the correct way to delay the consumption of
> the fast running stream until the slow one is fully read (in case of a
> file) or until a marker is emitted (in case of some other source). Is there
> any way to accomplish that? It doesn't seem to be a rare use case.
>
> Thanks, Vadim.
>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen