Re: Kafka Source Recovery Behavior

2021-11-18 Thread Steven Wu
Qingsheng,

For the scenario described by Mason in the original email, I think it is
safe to remove split/topic upson recovery without worrying about data loss,
since it is a conscious choice by the user to switch to a different set of
topics.

I thought the problem is that KafkaSourceReader just restores the
splits/partitions and reads from them without checking if they are still
valid (belong to subscribed topics). Not sure if this requires change in
the Kafka source checkpointing or not. Currently, I believe both the
enumerator and readers checkpoint their own states. if readers don't
checkpoint and always wait for the enumerator to assign splits/partitions
upon recovery, this may be easier as the filter/check can just be done by
the enumerator.

Thanks,
Steven


On Tue, Nov 16, 2021 at 7:17 PM Qingsheng Ren  wrote:

> Hi Mason,
>
> Sorry for my late response!
>
> “there was no logic to filter/remove splits”
>
>
> Yes we indeed miss a split removal mechanism. Actually this is quite a
> tricky one considering exactly-once semantic: there’s risk of losing data
> if we remove a partition / topic from Kafka. There was a discussion about
> this topic in the user mailing list:
> https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt
>
> An immature solution in my mind is that we can remove a split with the
> help of watermark. Once the watermark in a split has been pushed to end of
> global window, then we can assume that there’s no more new records in the
> split and we can remove it safely. But, this will invalidate all previous
> checkpoints because these split might not exist anymore in the external
> system (like topic has been removed in Kafka).
>
> Hope this could answer your question and looking forward to your inspiring
> ideas!
>
> --
> Best Regards,
>
> Qingsheng Ren
> Email: renqs...@gmail.com
> On Nov 10, 2021, 11:32 PM +0800, Mason Chen ,
> wrote:
>
>
> there was no logic to filter/remove splits
>
>


Re: Add control mode for flink

2021-06-08 Thread Steven Wu
> producing control events from JobMaster is similar to triggering a
savepoint.

Paul, here is what I see the difference. Upon job or jobmanager recovery,
we don't need to recover and replay the savepoint trigger signal.

On Tue, Jun 8, 2021 at 8:20 PM Paul Lam  wrote:

> +1 for this feature. Setting up a separate control stream is too much for
> many use cases, it would very helpful if users can leverage the built-in
> control flow of Flink.
>
> My 2 cents:
> 1. @Steven IMHO, producing control events from JobMaster is similar to
> triggering a savepoint. The REST api is non-blocking, and users should poll
> the results to confirm the operation is succeeded. If something goes wrong,
> it’s user’s responsibility to retry.
> 2. There are two kinds of existing special elements, special stream
> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
> flow through the whole DAG, but events needs to be acknowledged by
> downstream and can overtake records, while stream records are not). So I’m
> wondering if we plan to unify the two approaches in the new control flow
> (as Xintong mentioned both in the previous mails)?
>
> Best,
> Paul Lam
>
> 2021年6月8日 14:08,Steven Wu  写道:
>
>
> I can see the benefits of control flow. E.g., it might help the old (and
> inactive) FLIP-17 side input. I would suggest that we add more details of
> some of the potential use cases.
>
> Here is one mismatch with using control flow for dynamic config. Dynamic
> config is typically targeted/loaded by one specific operator. Control flow
> will propagate the dynamic config to all operators. not a problem per se
>
> Regarding using the REST api (to jobmanager) for accepting control
> signals from external system, where are we going to persist/checkpoint the
> signal? jobmanager can die before the control signal is propagated and
> checkpointed. Did we lose the control signal in this case?
>
>
> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song 
> wrote:
>
>> +1 on separating the effort into two steps:
>>
>>1. Introduce a common control flow framework, with flexible
>>interfaces for generating / reacting to control messages for various
>>purposes.
>>2. Features that leverating the control flow can be worked on
>>concurrently
>>
>> Meantime, keeping collecting potential features that may leverage the
>> control flow should be helpful. It provides good inputs for the control
>> flow framework design, to make the framework common enough to cover the
>> potential use cases.
>>
>> My suggestions on the next steps:
>>
>>1. Allow more time for opinions to be heard and potential use cases
>>to be collected
>>2. Draft a FLIP with the scope of common control flow framework
>>3. We probably need a poc implementation to make sure the framework
>>covers at least the following scenarios
>>   1. Produce control events from arbitrary operators
>>   2. Produce control events from JobMaster
>>   3. Consume control events from arbitrary operators downstream
>>   where the events are produced
>>
>>
>> Thank you~
>> Xintong Song
>>
>>
>>
>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  wrote:
>>
>>> Very thanks Jiangang for bringing this up and very thanks for the
>>> discussion!
>>>
>>> I also agree with the summarization by Xintong and Jing that control
>>> flow seems to be
>>> a common buidling block for many functionalities and dynamic
>>> configuration framework
>>> is a representative application that frequently required by users.
>>> Regarding the control flow,
>>> currently we are also considering the design of iteration for the
>>> flink-ml, and as Xintong has pointed
>>> out, it also required the control flow in cases like detection global
>>> termination inside the iteration
>>>  (in this case we need to broadcast an event through the iteration body
>>> to detect if there are still
>>> records reside in the iteration body). And regarding  whether to
>>> implement the dynamic configuration
>>> framework, I also agree with Xintong that the consistency guarantee
>>> would be a point to consider, we
>>> might consider if we need to ensure every operator could receive the
>>> dynamic configuration.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>>
>>> --
>>> Sender:kai wang
>>> Date:2021/06/08 11:52:12
>>> Recipient:JING ZHANG
>>> Cc:刘建刚; Xinton

Re: Re: Add control mode for flink

2021-06-08 Thread Steven Wu
option 2 is probably not feasible, as checkpoint may take a long time or
may fail.

Option 1 might work, although it complicates the job recovery and
checkpoint. After checkpoint completion, we need to clean up those control
signals stored in HA service.

On Tue, Jun 8, 2021 at 1:14 AM 刘建刚  wrote:

> Thanks for the reply. It is a good question. There are multi choices as
> follows:
>
>1. We can persist control signals in HighAvailabilityServices and replay
>them after failover.
>2. Only tell the users that the control signals take effect after they
>are checkpointed.
>
>
> Steven Wu [via Apache Flink User Mailing List archive.] <
> ml+s2336050n44278...@n4.nabble.com> 于2021年6月8日周二 下午2:15写道:
>
> >
> > I can see the benefits of control flow. E.g., it might help the old (and
> > inactive) FLIP-17 side input. I would suggest that we add more details of
> > some of the potential use cases.
> >
> > Here is one mismatch with using control flow for dynamic config. Dynamic
> > config is typically targeted/loaded by one specific operator. Control
> flow
> > will propagate the dynamic config to all operators. not a problem per se
> >
> > Regarding using the REST api (to jobmanager) for accepting control
> > signals from external system, where are we going to persist/checkpoint
> the
> > signal? jobmanager can die before the control signal is propagated and
> > checkpointed. Did we lose the control signal in this case?
> >
> >
> > On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email]
> > <http:///user/SendEmail.jtp?type=node=44278=0>> wrote:
> >
> >> +1 on separating the effort into two steps:
> >>
> >>1. Introduce a common control flow framework, with flexible
> >>interfaces for generating / reacting to control messages for various
> >>purposes.
> >>2. Features that leverating the control flow can be worked on
> >>concurrently
> >>
> >> Meantime, keeping collecting potential features that may leverage the
> >> control flow should be helpful. It provides good inputs for the control
> >> flow framework design, to make the framework common enough to cover the
> >> potential use cases.
> >>
> >> My suggestions on the next steps:
> >>
> >>1. Allow more time for opinions to be heard and potential use cases
> >>to be collected
> >>2. Draft a FLIP with the scope of common control flow framework
> >>3. We probably need a poc implementation to make sure the framework
> >>covers at least the following scenarios
> >>   1. Produce control events from arbitrary operators
> >>   2. Produce control events from JobMaster
> >>   3. Consume control events from arbitrary operators downstream
> >>   where the events are produced
> >>
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email]
> >> <http:///user/SendEmail.jtp?type=node=44278=1>> wrote:
> >>
> >>> Very thanks Jiangang for bringing this up and very thanks for the
> >>> discussion!
> >>>
> >>> I also agree with the summarization by Xintong and Jing that control
> >>> flow seems to be
> >>> a common buidling block for many functionalities and dynamic
> >>> configuration framework
> >>> is a representative application that frequently required by users.
> >>> Regarding the control flow,
> >>> currently we are also considering the design of iteration for the
> >>> flink-ml, and as Xintong has pointed
> >>> out, it also required the control flow in cases like detection global
> >>> termination inside the iteration
> >>>  (in this case we need to broadcast an event through the iteration body
> >>> to detect if there are still
> >>> records reside in the iteration body). And regarding  whether to
> >>> implement the dynamic configuration
> >>> framework, I also agree with Xintong that the consistency guarantee
> >>> would be a point to consider, we
> >>> might consider if we need to ensure every operator could receive the
> >>> dynamic configuration.
> >>>
> >>> Best,
> >>> Yun
> >>>
> >>>
> >>>
> >>> --
> >>> Sender:kai wang<[hidden email]
> >>

Re: Re: Add control mode for flink

2021-06-08 Thread Steven Wu
 things in common. A unified control flow model would help
>>>>> deduplicate the common logics, allowing us to focus on the use case
>>>>> specific parts.
>>>>>
>>>>> E.g.,
>>>>> - Watermarks: generated by source operators, handled by window
>>>>> operators.
>>>>> - Checkpoint barrier: generated by the checkpoint coordinator, handled
>>>>> by all tasks
>>>>> - Dynamic controlling: generated by JobMaster (in reaction to the REST
>>>>> command), handled by specific operators/UDFs
>>>>> - Operator defined events: The following features are still in
>>>>> planning, but may potentially benefit from the control flow model. (Please
>>>>> correct me if I'm wrong, @Yun, @Jark)
>>>>>   * Iteration: When a certain condition is met, we might want to
>>>>> signal downstream operators with an event
>>>>>   * Mini-batch assembling: Flink currently uses special watermarks for
>>>>> indicating the end of each mini-batch, which makes it tricky to deal with
>>>>> event time related computations.
>>>>>   * Hive dimension table join: For periodically reloaded hive tables,
>>>>> it would be helpful to have specific events signaling that a reloading is
>>>>> finished.
>>>>>   * Bootstrap dimension table join: This is similar to the previous
>>>>> one. In cases where we want to fully load the dimension table before
>>>>> starting joining the mainstream, it would be helpful to have an event
>>>>> signaling the finishing of the bootstrap.
>>>>>
>>>>> ## Dynamic REST controlling
>>>>> Back to the specific feature that Jiangang proposed, I personally
>>>>> think it's quite convenient. Currently, to dynamically change the behavior
>>>>> of an operator, we need to set up a separate source for the control events
>>>>> and leverage broadcast state. Being able to send the events via REST APIs
>>>>> definitely improves the usability.
>>>>>
>>>>> Leveraging dynamic configuration frameworks is for sure one possible
>>>>> approach. The reason we are in favor of introducing the control flow is
>>>>> that:
>>>>> - It benefits not only this specific dynamic controlling feature, but
>>>>> potentially other future features as well.
>>>>> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
>>>>> framework work together with Flink's consistency mechanism.
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node=44245=0>> wrote:
>>>>>
>>>>>> Thank you for the reply. I have checked the post you mentioned. The
>>>>>> dynamic config may be useful sometimes. But it is hard to keep data
>>>>>> consistent in flink, for example, what if the dynamic config will take
>>>>>> effect when failover. Since dynamic config is a desire for users, maybe
>>>>>> flink can support it in some way.
>>>>>>
>>>>>> For the control mode, dynamic config is just one of the control
>>>>>> modes. In the google doc, I have list some other cases. For example,
>>>>>> control events are generated in operators or external services. Besides
>>>>>> user's dynamic config, flink system can support some common dynamic
>>>>>> configuration, like qps limit, checkpoint control and so on.
>>>>>>
>>>>>> It needs good design to handle the control mode structure. Based on
>>>>>> that, other control features can be added easily later, like changing log
>>>>>> level when job is running. In the end, flink will not just process data,
>>>>>> but also interact with users to receive control events like a service.
>>>>>>
>>>>>> Steven Wu <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node=44245=1>> 于2021年6月4日周五
>>>>>> 下午11:11写道:
>>>>>>
>>>>>>> I am not sure if we should solve this problem in Flink. This is more
>>>>>>> like a dynamic config problem that probably should be solved by s

Re: Add control mode for flink

2021-06-04 Thread Steven Wu
I am not sure if we should solve this problem in Flink. This is more like a
dynamic config problem that probably should be solved by some configuration
framework. Here is one post from google search:
https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚  wrote:

> Hi everyone,
>
>   Flink jobs are always long-running. When the job is running, users
> may want to control the job but not stop it. The control reasons can be
> different as following:
>
>1.
>
>Change data processing’ logic, such as filter condition.
>2.
>
>Send trigger events to make the progress forward.
>3.
>
>Define some tools to degrade the job, such as limit input qps,
>sampling data.
>4.
>
>Change log level to debug current problem.
>
>   The common way to do this is to stop the job, do modifications and
> start the job. It may take a long time to recover. In some situations,
> stopping jobs is intolerable, for example, the job is related to money or
> important activities.So we need some technologies to control the running
> job without stopping the job.
>
>
> We propose to add control mode for flink. A control mode based on the
> restful interface is first introduced. It works by these steps:
>
>
>1. The user can predefine some logic which supports config control,
>such as filter condition.
>2. Run the job.
>3. If the user wants to change the job's running logic, just send a
>restful request with the responding config.
>
> Other control modes will also be considered in the future. More
> introduction can refer to the doc
> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
> . If the community likes the proposal, more discussion is needed and a more
> detailed design will be given later. Any suggestions and ideas are welcome.
>
>


Re: Direct Memory full

2020-12-16 Thread Steven Wu
if you are running out of direct buffer, you will see
"java.lang.OutOfMemoryError:
Direct buffer memory"

On Wed, Dec 16, 2020 at 9:47 AM Rex Fenley  wrote:

> Thanks for the reply. If what I'm understanding is correct there's no
> chance of an OOM, but since direct memory is for I/O, it being completely
> filled may be a sign of backpressure? Currently one of our operators takes
> a tremendous amount of time to align during a checkpoint. Could increasing
> direct memory help checkpointing by improving I/O performance across the
> whole plan (assuming I/O is at least part of the bottleneck)?
>
> On Tue, Dec 15, 2020 at 10:37 PM Robert Metzger 
> wrote:
>
>> Hey Rex,
>>
>> the direct memory is used for IO. There is no concept of direct memory
>> being "full". The only thing that can happen is that you have something in
>> place (Kubernetes, YARN) that limits / enforces the memory use of a Flink
>> process, and you run out of your memory allowance. The direct memory is
>> allocated outside of the heap's upper limit, thus you could run out of the
>> budget.
>> But Flink is usually properly configuring the memory limits correctly, to
>> avoid running into those situations.
>>
>> tl;dr: you don't need to worry about this.
>>
>>
>> On Tue, Dec 15, 2020 at 8:38 AM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> Our job consistently shows
>>> Outside JVM
>>> Type
>>> Count
>>> Used
>>> Capacity
>>> *Direct* 32,839 1.03 GB 1.03 GB
>>> for direct memory.
>>>
>>> Is it typical for it to be full? What are the consequences that we may
>>> not be noticing of direct memory being full?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Optimizing for super long checkpoint times

2020-12-12 Thread Steven Wu
> things are actually moving pretty smoothly

Do you mean the job is otherwise healthy? like there is no lag etc.

Do you see any bottleneck at system level, like CPU, network, disk I/O etc.?

On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley  wrote:

> Hi,
>
> We're running a job with on the order of >100GiB of state. For our initial
> run we wanted to keep things simple, so we allocated a single core node
> with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4
> disks on that machine). Overall, things are actually moving pretty
> smoothly, except for checkpointing. Checkpoints are set to be incremental,
> yet they're all in the range of 10-20 GiB -- we do have a lot of data being
> updated in real-time, retracts+appends -- and they take around 10-30 min.
> We have the Taskmanager to set to checkpoint every 5 min which means we're
> spending the majority of our time just checkpointing.
>
> My question is, what sort of bottlenecks should we be investigating and
> what are some things we can try to improve our checkpoint times?
>
> Some things we're considering are:
> Increasing parallelism, hoping that this will partition the data and each
> operator will therefore checkpoint faster.
> Changing time between checkpoints, though we don't have a good
> understanding of how this might affect total time.
>
> Also, we are hesitant to use unaligned checkpointing at the moment and are
> hoping for some other options.
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: How to debug a Flink Exception that does not have a stack trace?

2020-12-12 Thread Steven Wu
This is a performance optimization in JVM when the same exception is
thrown too frequently. You can set `-XX:-OmitStackTraceInFastThrow` to
disable the feature. You can typically find the full stack trace in the log
before the optimization kicks in.

On Sat, Dec 12, 2020 at 2:05 AM Till Rohrmann  wrote:

> Ok, then let's see whether it reoccurs. What you could do is to revert the
> fix and check the stack trace again.
>
> Cheers,
> Till
>
> On Sat, Dec 12, 2020, 02:16 Dan Hill  wrote:
>
>> Hmm, I don't have a good job I can separate for reproduction.  I was
>> using Table SQL and inserting a long field (which was null) into a table
>> that sinked out to avro.  The exception was being thrown from this Avro
>> function.  I can watch to see if it keeps happening.
>>
>>
>> https://github.com/apache/avro/blob/e982f2e6ee57c362a7fae21ba7373c1cfc964fce/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java#L127
>>
>> On Fri, Dec 11, 2020 at 3:31 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> Do you have an example job and some sample data to reproduce this
>>> problem? I couldn't reproduce it locally with a simple example job.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Dec 10, 2020 at 5:51 PM Dan Hill  wrote:
>>>
 Yea, the error makes sense and was an easy fix.

 Any idea what happened with the hidden stacktrace?  The hidden
 stacktrace made this 100x more difficult.

 On Thu, Dec 10, 2020 at 12:59 AM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> It looks like the problem is that there's a problem in reading a null
> value in the AvroRowDataDeserializationSchema (see below for the snippet 
> of
> code from Flink 1.11.1).
> The problem is due to the fact that there's a bad typing of the source
> so the call to createConverter() within the createNullableConverter()
> returns null, creating a null on  fieldConverters[i] and, in the end, a
> NullPointer in  fieldConverters[i].convert(). Does it make sense?
>
> static DeserializationRuntimeConverter createRowConverter(RowType
> rowType) {
> final DeserializationRuntimeConverter[] fieldConverters =
> rowType.getFields().stream()
> .map(RowType.RowField::getType)
> .map(AvroRowDataDeserializationSchema::createNullableConverter)
> .toArray(DeserializationRuntimeConverter[]::new);
> final int arity = rowType.getFieldCount();
> return avroObject -> {
> IndexedRecord record = (IndexedRecord) avroObject;
> GenericRowData row = new GenericRowData(arity);
> for (int i = 0; i < arity; ++i) {
> row.setField(i, fieldConverters[i].convert(record.get(i)));
> }
> return row;
> };
> }
>
> Best,
> Flavio
>
> On Thu, Dec 10, 2020 at 8:39 AM Dan Hill 
> wrote:
>
>> One of the Exception instances finally reported a stacktrace.  I'm
>> not sure why it's so infrequent.
>>
>> java.lang.NullPointerException: null
>>
>> at
>> org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:338)
>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.lambda$createRowConverter$6827278$1(AvroRowDataSerializationSchema.java:177)
>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]
>>
>> at
>> org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:251)
>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]
>>
>> at
>> org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:247)
>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]
>>
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:498)
>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:494)
>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:202)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> 

Re: Recommendation about RocksDB Metrics ?

2020-12-08 Thread Steven Wu
just a data point. we actually enabled all RocksDb metrics by default
(including very large jobs in terms of parallelism and state size). We
didn't see any significant performance impact. There is probably a small
impact. At least, it didn't jump out for our workload.

On Tue, Dec 8, 2020 at 9:00 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Kien,
>
> I am pulling in Yun who might know better.
>
> Regards,
> Roman
>
>
> On Sun, Dec 6, 2020 at 3:52 AM Truong Duc Kien 
> wrote:
>
>> Hi all,
>>
>> We are thinking about enabling RocksDB metrics to better monitor our
>> pipeline. However, since they will have performance impact, we will have to
>> be selective about which metrics we use.
>>
>> Does anyone have experience about which metrics are more important than
>> the others ?
>>
>> And what metrics have the largest performance impact ?
>>
>> Thanks,
>> Kien
>>
>


Re: Feature request: Removing state from operators

2020-10-30 Thread Steven Wu
not a solution, but a potential workaround. Maybe rename the operator uid
so that you can continue to leverage allowNonRestoredState?

On Thu, Oct 29, 2020 at 7:58 AM Peter Westermann 
wrote:

> Does that actually allow removing a state completely (vs. just modifying
> the values stored in state)?
>
>
>
> Ideally, we would want to just interact with state via *KeyedStateStore*.
> Maybe it would be possible to add a couple methods there, e.g. like this:
>
> // List all pre-existing states
>
>  List> listStates();
>
> // Completely remove a state
>
>  void dropState(StateDescriptor stateDescriptor);
>
>
>
>
>
> Thanks,
>
> Peter
>
>
>
>
>
>
>
> *From: *Congxian Qiu 
> *Date: *Thursday, October 29, 2020 at 10:38 AM
> *To: *Robert Metzger 
> *Cc: *Peter Westermann , "user@flink.apache.org"
> 
> *Subject: *Re: Feature request: Removing state from operators
>
>
>
> Hi Peter
>
>  Can applyToAllKeys[1] in KeyedStateBackend help you here? but
> currently, this is not exposed to users now.
>
>
>
> [1]
> https://github.com/apache/flink/blob/fada6fb6ac9fd7f6510f1f2d77b6baa06563e222/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L65
>
>
> Best,
>
> Congxian
>
>
>
>
>
> Robert Metzger  于2020年10月27日周二 下午5:51写道:
>
> Hi Peter,
>
>
>
> I'm adding two committers to this thread who can help answering your
> question.
>
>
>
> On Mon, Oct 26, 2020 at 3:22 PM Peter Westermann <
> no.westerm...@genesys.com> wrote:
>
> We use the feature for removing stateful operators via the
> *allowNonRestoredState* relatively often and it works great. However,
> there doesn’t seem to be anything like that for removing state from an
> existing operator (that we want to keep).
>
> Say my operator defines a *MapState* and a *ValueState*. Later on, the
> *ValueState* becomes obsolete. In this case, we can remove the actual
> data for each key by clearing it out but the state itself is still
> referenced in savepoints even if it’s not referenced in code anymore – that
> e.g. means one cannot remove any class that was previously used in state.
>
> Would it be possible to add support for completely removing state from an
> operator if it’s no longer referenced in code and *allowNonRestoredState*
> is set? (Or to add an explicit “drop this state option” in KeyedStateStore
> and OperatorStateStore?)
>
>
>
> Thanks,
>
> Peter
>
>
>
>


Re: [DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-13 Thread Steven Wu
Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg
sink use case, because we can't retrieve the checkpointId from
the FunctionInitializationContext during the restore case. But we can move
away from it if the restore context provides the checkpointId.

On Sat, Sep 12, 2020 at 8:20 AM Alexey Trenikhun  wrote:

> -1
>
> We use union state to generate sequences, each operator generates offset0
> + number-of-tasks -  task-index + task-specific-counter * number-of-tasks
> (e.g. for 2 instances of operator -one instance produce even number,
> another odd). Last generated sequence number is stored union list state, on
> restart from where we should start to avoid collision with already
> generated numbers, to do saw we calculate offset0 as max over union list
> state.
>
> Alexey
>
> --
> *From:* Seth Wiesman 
> *Sent:* Wednesday, September 9, 2020 9:37:03 AM
> *To:* dev 
> *Cc:* Aljoscha Krettek ; user 
> *Subject:* Re: [DISCUSS] Deprecate and remove UnionList OperatorState
>
> Generally +1
>
> The one use case I've seen of union state I've seen in production (outside
> of sources and sinks) is as a "poor mans" broadcast state. This was
> obviously before that feature was added which is now a few years ago so I
> don't know if those pipelines still exist. FWIW, if they do the state
> processor api can provide a migration path as it supports rewriting union
> state as broadcast state.
>
> Seth
>
> On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise  wrote:
>
> +1 to getting rid of non-keyed state as is in general and for union state
> in particular. I had a hard time to wrap my head around the semantics of
> non-keyed state when designing the rescale of unaligned checkpoint.
>
> The only plausible use cases are legacy source and sinks. Both should also
> be reworked in deprecated.
>
> My main question is how to represent state in these two cases. For sources,
> state should probably be bound to splits. In that regard, split (id) may
> act as a key. More generally, there should be probably a concept that
> supersedes keys and includes splits.
>
> For sinks, I can see two cases:
> - Either we are in a keyed context, then state should be bound to the key.
> - Or we are in a non-keyed context, then state might be bound to the split
> (?) in case of a source->sink chaining.
> - Maybe it should also be a new(?) concept like output partition.
>
> It's not clear to me if there are more cases and if we can always find a
> good way to bind state to some sort of key, especially for arbitrary
> communication patterns (which we may need to replace as well potentially).
>
> On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek 
> wrote:
>
> > Hi Devs,
> >
> > @Users: I'm cc'ing the user ML to see if there are any users that are
> > relying on this feature. Please comment here if that is the case.
> >
> > I'd like to discuss the deprecation and eventual removal of UnionList
> > Operator State, aka Operator State with Union Redistribution. If you
> > don't know what I'm talking about you can take a look in the
> > documentation: [1]. It's not documented thoroughly because it started
> > out as mostly an internal feature.
> >
> > The immediate main reason for removing this is also mentioned in the
> > documentation: "Do not use this feature if your list may have high
> > cardinality. Checkpoint metadata will store an offset to each list
> > entry, which could lead to RPC framesize or out-of-memory errors." The
> > insidious part of this limitation is that you will only notice that
> > there is a problem when it is too late. Checkpointing will still work
> > and a program can continue when the state size is too big. The system
> > will only fail when trying to restore from a snapshot that has union
> > state that is too big. This could be fixed by working around that issue
> > but I think there are more long-term issues with this type of state.
> >
> > I think we need to deprecate and remove API for state that is not tied
> > to a key. Keyed state is easy to reason about, the system can
> > re-partition state and also re-partition records and therefore scale the
> > system in and out. Operator state, on the other hand is not tied to a
> > key but an operator. This is a more "physical" concept, if you will,
> > that potentially ties business logic closer to the underlying runtime
> > execution model, which in turns means less degrees of freedom for the
> > framework, that is Flink. This is future work, though, but we should
> > start with deprecating union list state because it is the potentially
> > most dangerous type of state.
> >
> > We currently use this state type internally in at least the
> > StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> > we're in the process of hopefully getting rid of it there with our work
> > on sources and sinks. Before we fully remove it, we should of course
> > signal this to users by deprecating it.
> >
> > What do you think?
> >
> 

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-10 Thread Steven Wu
Guowei,

Thanks a lot for the proposal and starting the discussion thread. Very
excited.

For the big question of "Is the sink an operator or a topology?", I have a
few related sub questions.
* Where should we run the committers?
* Is the committer parallel or single parallelism?
* Can a single choice satisfy all sinks?

Trying to envision how some sinks can be implemented with this new unified
sink interface.

1. Kafka sink

Kafka supports non-transactional and transactional writes
* Non-transaction writes don't need commit action. we can have *parallel
writers and no/no-op committers*. This is probably true for other
non-transactional message queues.
* Transaction writes can be implemented as *parallel writers and parallel
committers*. In this case, I don't know if it makes sense to separate
writers and committers into two separate operators, because they probably
need to share the same KafkaProducer object.

Either way, both writers and committers probably should *run inside task
managers*.

2. ES sink

ES sink typically buffers the data up to a certain size or time threshold
and then uploads/commits a batch to ES. Writers buffer data and flush when
needed, and committer does the HTTP bulk upload to commit. To avoid
serialization/deserialization cost, we should run *parallel writers and
parallel committers* and they *should be* *chained or bundled together*
while *running inside task managers*.

It can also be implemented as *parallel writers and no/no-op committers*,
where all logics (batching and upload) are put inside the writers.

3. Iceberg [1] sink

It is currently implemented as two-stage operators with *parallel writers
and single-parallelism committers*.
* *parallel writers* that write records into data files. Upon checkpoint,
writers flush and upload the files, and send the metadata/location of the
data files to the downstream committer. Writers need to do the flush inside
the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method) before
forwarding the checkpoint barrier to the committer
* single-parallelism committer operator. It collects data files from
upstream writers. During "snapshotState", it saves collected data files (or
an uber metadata file) into state. When the checkpoint is completed, inside
"notifyCheckpointComplete" it commits those data files to Iceberg tables. *The
committer has to be single parallelism*, because we don't want hundreds or
thousands of parallel committers to compete for commit operations with
opportunistic concurrency control. It will be very inefficient and probably
infeasible if the parallelism is high. Too many tiny commits/transactions
can also slow down both the table write and read paths due to too many
manifest files.

Right now, both Iceberg writer and committer operators run inside task
managers. It has one major drawback. With Iceberg sink, embarrassingly
parallel jobs won't be embarrassingly parallel anymore. That breaks the
benefit of region recovery for embarrassingly parallel DAG. Conceptually,
the Writer-Committer sink pattern is like the mirroring of the FLIP-27
Enumerator-Reader source pattern. It will be better *if the committer can
run inside the job manager* like the SplitEnumerator for the FLIP-27
source.

---
Additional questions regarding the doc/API
* Any example for the writer shared state (Writer#snapshotSharedState)?
* We allow the case where the writer has no state, right? Meaning WriterS
can be Void.

[1] https://iceberg.apache.org/

Thanks,
Steven

On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma  wrote:

> Hi, devs & users
>
> As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor
> of DataStream API and Table API. Users should be able to use DataStream API
> to write jobs that support both bounded and unbounded execution modes.
> However, Flink does not provide a sink API to guarantee the Exactly-once
> semantics in both bounded and unbounded scenarios, which blocks the
> unification.
>
> So we want to introduce a new unified sink API which could let the user
> develop the sink once and run it everywhere. You could find more details in
> FLIP-143[2].
>
> The FLIP contains some open questions that I'd really appreciate inputs
> from the community. Some of the open questions include:
>
>1. We provide two alternative Sink API in the FLIP. The only
>difference between the two versions is how to expose the state to the user.
>We want to know which one is your preference?
>2. How does the sink API support to write to the Hive?
>3. Is the sink an operator or a topology?
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
>
> Best,
> Guowei
>


backup configuration in Flink doc

2020-07-16 Thread Steven Wu
The configuration page has this "backup" section. Can I assume that they
are public interfaces? The name "backup" is a little confusing to me. There
are some important pipeline and execution checkpointing configs here.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#backup

Thanks,
Steven


Re: Does savepoint reset the base for incremental checkpoint

2020-07-05 Thread Steven Wu
In a slightly different variation of sequence (checkpoint x, savepoint y,
redeploy/restart job from savepoint y, checkpoint x+1), checkpoint x+1
builds the incremental diff on savepoint y, right?

On Sun, Jul 5, 2020 at 8:08 PM Steven Wu  wrote:

>
> In this sequence of (checkpoint x, savepoint y, checkpoint x+1), does
> checkpoint x+1 build the incremental diff based on checkpoint x or
> savepoint y?
>
> Thanks,
> Steven
>


Does savepoint reset the base for incremental checkpoint

2020-07-05 Thread Steven Wu
In this sequence of (checkpoint x, savepoint y, checkpoint x+1), does
checkpoint x+1 build the incremental diff based on checkpoint x or
savepoint y?

Thanks,
Steven


Re: Interact with different S3 buckets from a shared Flink cluster

2020-06-23 Thread Steven Wu
Internally, we have our own ConfigurableCredentialsProvider. Based on the
config in core-site.xml, it does assume-role with the proper IAM
credentials using STSAssumeRoleSessionCredentialsProvider. We just need to
grant permission for the instance credentials to be able to assume the IAM
role for bucket access. We have a single core-site.xml that lays out all
the mapping.

  
aws.iam.role.arn.${BUCKET_NAME}
arn:aws:iam::${ACCOUNT_NUMBER}:role/${BUCKET_ROLE_NAME}
  

On Mon, Jun 22, 2020 at 7:07 AM Arvid Heise  wrote:

> Hi Ricardo,
>
> one option is to use s3p for checkpointing (Presto) and s3a for custom
> applications and attach different configurations.
>
> In general, I'd recommend to use a cluster per application to exactly
> avoid such issues. I'd use K8s and put the respective IAM roles on each
> application pod (e.g. with kiam).
>
> On Thu, Jun 18, 2020 at 1:46 AM Ricardo Cardante <
> ricardocarda...@tutanota.com> wrote:
>
>> Hi!
>>
>>
>> We are working in a use case where we have a shared Flink cluster to
>> deploy multiple jobs from different teams. With this strategy, we are
>> facing a challenge regarding the interaction with S3. Given that we already
>> configured S3 for the state backend (through flink-conf.yaml) every time we
>> use API functions that communicate with the file system (e.g., DataStream
>> readFile) the applicational configurations appear to be overridden by those
>> of the cluster while attempting to communicate with external S3 buckets.
>> What we've thought so far:
>>
>>
>> 1. Provide a core-site.xml resource file targeting the external S3
>> buckets we want to interact with. We've tested, and the credentials
>> ultimately seem to be ignored in behalf of the IAM roles that are
>> pre-loaded with the instances;
>>
>> 2. Load the cluster instances with multiple IAM roles. The problem with
>> this is that we would allow each job to interact with out-of-scope buckets;
>>
>> 3. Spin multiple clusters with different configurations - we would like
>> to avoid this since we started from the premise of sharing a single cluster
>> per context;
>>
>>
>> What would be a clean/recommended solution to interact with multiple S3
>> buckets with different security policies from a shared Flink cluster?
>>
>> Thanks in advance.
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: RocksDB savepoint recovery performance improvements

2020-05-26 Thread Steven Wu
Yun, you mentioned that checkpoint also supports rescale. I thought the
recommendation [1] is to use savepoint for rescale.

[1]
https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink

On Tue, May 26, 2020 at 6:46 AM Joey Pereira  wrote:

> Following up: I've put together the implementation,
> https://github.com/apache/flink/pull/12345. It's passing tests but is
> only partially complete, as it still needs some clean-up and configuration.
> I still need to try running this against a production cluster to check the
> performance, as well as getting some RocksDB benchmarks.
>
> On Mon, May 18, 2020 at 3:46 PM Joey Pereira  wrote:
>
>> Thanks Yun for highlighting this, it's very helpful! I'll give it a go
>> with that in mind.
>>
>> We have already begun using checkpoints for recovery. Having these
>> improvements would still be immensely helpful to reduce downtime for
>> savepoint recovery.
>>
>> On Mon, May 18, 2020 at 3:14 PM Yun Tang  wrote:
>>
>>> Hi Joey
>>>
>>> Previously, I also looked at the mechanism to create on-disk SSTables as
>>> I planed to use RocksDB's benchmark to mock scenario in Flink. However, I
>>> found the main challenge is how to ensure the keys are inserted in a
>>> strictly increasing order. The key order in java could differ from the
>>> bytes order in RocksDB. In your case, I think it could be much easier as
>>> RocksFullSnapshotStrategy write data per columnfamily per key group which
>>> should be in a strictly increasing order [1].
>>>
>>> FLINK-17288  could
>>> mitigate the performance and your solution could help improve the
>>> performance much better (and could integrate with state-processor-api
>>> story).
>>>
>>> On the other hand, for out-of-box to use in production for your
>>> scenario, how about using checkpoint to recover, as it also supports
>>> rescale and normal recover.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308
>>>
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Joey Pereira 
>>> *Sent:* Tuesday, May 19, 2020 2:27
>>> *To:* user@flink.apache.org 
>>> *Cc:* Mike Mintz ; Shahid Chohan <
>>> cho...@stripe.com>; Aaron Levin 
>>> *Subject:* RocksDB savepoint recovery performance improvements
>>>
>>> Hey,
>>>
>>> While running a Flink application with a large-state, savepoint recovery
>>> has been a painful part of operating the application because recovery time
>>> can be several hours. During some profiling that chohan (cc'd) had done, a
>>> red flag stood out — savepoint recovery consisted mostly of RocksDB Get and
>>> Put operations.
>>>
>>> When Flink is bootstrapping state for RocksDB instances this is not what
>>> I would have expected, as RocksDB supports direct ingestion of the on-disk
>>> format (SSTables):
>>> https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. 
>>> This
>>> was also recently reported on Jira:
>>> https://issues.apache.org/jira/browse/FLINK-17288.
>>>
>>> From what I understood of the current implementation:
>>>
>>> * The snapshot restoration pathways, RocksDBFullRestoreOperation and 
>>> RocksDBIncrementalRestoreOperation,
>>> use RocksDBWriteBatchWrapper.
>>>
>>> * RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This
>>> will provide atomicity of batches as well as performance benefits for
>>> batching, compared to individual Puts, but it will still involve RocksDB’s
>>> insert paths which can involve expensive operations[0].
>>>
>>> Instead, by creating SSTable files and instructing RocksDB to ingest the
>>> files, writes can be batched even further and avoid expensive operations in
>>> RocksDB. This is commonly utilized by other systems for restoration or
>>> import processes, such as in CockroachDB[1], TiDB[2], and Percona[3].  There
>>> are some restrictions on being able to generate SSTables, as well as
>>> limitations for ingestion to be performant. Unfortunately, it’s all not
>>> very well documented:
>>>
>>> 1. When generating an SSTable, keys need to be inserted in-order.
>>>
>>> 2. Ingested files should not have key-ranges that overlap with either
>>> existing or other ingested files[4]. It is possible to ingest overlapping
>>> SSTables, but this may incur significant overhead.
>>>
>>> To generate SSTables with non-overlapping key-ranges and to create them
>>> with keys in-order, it would mean that the savepoints would need to be
>>> ordered while processing them. I'm unsure if this is the case for how
>>> Flink's savepoints are stored.
>>>
>>> I have not dug into RocksDBIncrementalRestoreOperation yet, or how it
>>> is used (eg: for incremental checkpoint or something else). I did
>>> notice it is iterating over a temporary RocksDB instance and inserting into
>>> a "final” instance. 

Re: java.lang.IllegalStateException: The RPC connection is already closed

2020-05-04 Thread Steven Wu
Manish, might be related to this bug, which is fixed in 1.10.1.

https://issues.apache.org/jira/browse/FLINK-14316?focusedCommentId=16946580=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16946580

On Mon, May 4, 2020 at 5:52 AM Manish G 
wrote:

> Hi,
>
> I have set up flink and kafka locally. When I start my flink
> program(configured ot read messages from kafka topic), I get error as:
>
> 2020-05-04 18:17:58.035  INFO 23516 --- [lt-dispatcher-2]
> o.a.f.r.taskexecutor.JobLeaderService: Successful registration at job
> manager akka://flink/user/jobmanager_1 for job
> 4f1932f75aafb97028fdbf8cd165ee9d.
> 2020-05-04 18:17:58.035  INFO 23516 --- [lt-dispatcher-4]
> o.a.f.r.taskexecutor.JobLeaderService: Successful registration at job
> manager akka://flink/user/jobmanager_1 for job
> 4f1932f75aafb97028fdbf8cd165ee9d.
> 2020-05-04 18:17:58.035  INFO 23516 --- [lt-dispatcher-4]
> o.a.f.runtime.taskexecutor.TaskExecutor  : Establish JobManager connection
> for job 4f1932f75aafb97028fdbf8cd165ee9d.
> 2020-05-04 18:17:58.035  WARN 23516 --- [lt-dispatcher-5]
> o.a.f.r.h.n.e.EmbeddedLeaderService  : Error notifying leader listener
> about new leader
>
> java.lang.IllegalStateException: The RPC connection is already closed
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> ~[flink-core-1.7.1.jar:1.7.1]
> at
> org.apache.flink.runtime.registration.RegisteredRpcConnection.start(RegisteredRpcConnection.java:91)
> ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
>
> What can be the root cause for this?
>


Re: Configuring taskmanager.memory.task.off-heap.size in Flink 1.10

2020-04-29 Thread Steven Wu
Jiahui,

Based on my reading on the doc, for containerized environment, it is
probably better to set `taskmanager.memory.process.size` to the container
memory limit.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-process-size

Then I typically set `taskmanager.memory.jvm-overhead.max` to allocate some
overhead to non Flink memory. I think it matches the intention better than
'taskmanager.memory.task.off-heap.size' , which is used for calculating JVM
direct memory size.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-jvm-overhead-max

I also found this Flink doc pretty helpful
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html

Hope that helps.

Thanks,
Steven

On Tue, Apr 28, 2020 at 8:56 PM Xintong Song  wrote:

> Hi Jiahui,
>
> 'taskmanager.memory.task.off-heap.size' accounts for the off-heap memory
> reserved for your job / operators. There are other configuration options
> accounting for the off-heap memory usages for other purposes, e.g.,
> 'taskmanager.memory.framework.off-heap'. The default 'task.off-heap.size'
> being 0 only represents that in most cases user codes / operators do not
> use off-heap memory. User would need to explicitly increase this
> configuration if UDFs or libraries of the job uses off-heap memory.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Apr 29, 2020 at 11:07 AM Jiahui Jiang 
> wrote:
>
>> Hello! We are migrating our pipeline from Flink 1.8 to 1.10 in Kubernetes.
>>
>> In the first try, we simply copied the old 'taskmanager.heap.size' over
>> to 'taskmanager.memory.flink.size'. This caused the cluster to OOM.
>> Eventually we had to allocate a small amount of memory to
>> 'taskmanager.memory.task.off-heap.size' for it to stop failing. But we
>> don't quite understand why this needs to be overriden.
>>
>> I saw the default for 'taskmanager.memory.task.off-heap.size' is 0, does
>> that mean in most cases task managers won't need off-heap memory? What are
>> some examples that off-heap memory need to be non-zero?
>>
>> Thank you!
>>
>


Re: [1.10.0] flink-dist source jar is empty

2020-04-14 Thread Steven Wu
Chesnay, sorry it was my mistake. yes, we did have a local change of for
the shade plugin that I missed when porting local changes from 1.9 to 1.10.

true

On Tue, Apr 14, 2020 at 6:29 AM Chesnay Schepler  wrote:

> I just built the 1.8 and 1.9 flink-dist jars and neither contain the
> sources of any bundled modules.
>
> How were you building the jars, and were you making any modifications to
> the Flink source?
>
> On 14/04/2020 15:07, Steven Wu wrote:
>
> flink-dist is a uber/shadow jar. before 1.10, its source jar contains the
> source files for the flink modules that it bundles.
>
> On Tue, Apr 14, 2020 at 1:34 AM Chesnay Schepler 
> wrote:
>
>> That should not be a problem since the flink-dist module does not
>> contain any java sources
>>
>> On 14/04/2020 06:42, Steven Wu wrote:
>> >
>> > We build and publish flink-dist locally. But the source jar turns out
>> > empty. Other source jars (like flink-core) are good. Anyone else
>> > experienced similar problem?
>> >
>> > Thanks,
>> > Steven
>>
>>
>>
>


Re: [1.10.0] flink-dist source jar is empty

2020-04-14 Thread Steven Wu
flink-dist is a uber/shadow jar. before 1.10, its source jar contains the
source files for the flink modules that it bundles.

On Tue, Apr 14, 2020 at 1:34 AM Chesnay Schepler  wrote:

> That should not be a problem since the flink-dist module does not
> contain any java sources
>
> On 14/04/2020 06:42, Steven Wu wrote:
> >
> > We build and publish flink-dist locally. But the source jar turns out
> > empty. Other source jars (like flink-core) are good. Anyone else
> > experienced similar problem?
> >
> > Thanks,
> > Steven
>
>
>


[1.10.0] flink-dist source jar is empty

2020-04-13 Thread Steven Wu
We build and publish flink-dist locally. But the source jar turns out
empty. Other source jars (like flink-core) are good. Anyone else
experienced similar problem?

Thanks,
Steven


Non-heap memory usage jumped after 1.7 -> 1.10 upgrade

2020-04-12 Thread Steven Wu
This is a stateful stream join application using RocksDB state backend with
incremental checkpoint enabled.

   -

   JVM heap usage is pretty similar. Main difference is in non-heap usage,
   probably related to RocksDB state.
   -

Also observed cgroup memory failure count showing up in the 1.10 job,
   while 1.7 job has zero memory failure count (due to enough free memory).
   -

   The 1.10 job looks stable otherwise.


Is there any default config change for rocksdb state backend btw 1.7 and
1.10 due to FLIP-49? or maybe there are some implications of the FLIP-49
change that we don't understand?

I can confirm both 1.7.2 and 1.10.0 jobs have the same state size via
various metrics

   -

   rocksdb.estimate-num-keys
   -

   rocksdb.estimate-live-data-size
   -

   rocksdb.total-sst-files-size
   -

   lastCheckpointSize


1.7.2 job setup

   -

   Container memory: 128 GB
   -

   -Xms20G -Xmx20G -XX:MaxDirectMemorySize=6G
   -

   taskmanager.network.memory.max=4 gb


1.10.0 job setup

   -

   Container memory: 128 GB
   -

   -Xmx20G -Xms20G -XX:MaxDirectMemorySize=5.5G
   -

   taskmanager.memory.network.max=4 gb
   -

   taskmanager.memory.process.size=128 gb
   -

   taskmanager.memory.jvm-overhead.max=10 gb
   -

   taskmanager.memory.managed.size=90 gb


I tried different combinations of "taskmanager.memory.jvm-overhead.max" and
"taskmanager.memory.managed.size". They all lead to similar result.
1.7.2 job memory usage

free output


vmstat output


top output


1.10.0 job memory usage

free output


vmstat output


top output


Re: why operator not chained?

2019-11-24 Thread Steven Wu
Guowei, thanks a lot. Looks like chainingStrategy default is HEAD. Let me
try.

writer.setChainingStrategy(ChainingStrategy.ALWAYS);

On Sat, Nov 23, 2019 at 7:17 PM Guowei Ma  wrote:

> Hi, Steven
>
> 1. The `icebergsink-writer` operator does not chain with the first
> operator is because the "icebergsink-writer" operator 's ChainingStrategy
> is NULL or HEAD. You could verify it by printing writer.getChainingStrategy.
> 2. The two operators use the FORWARD partition if the parallelism of two
> operators is the same and the partitioner is not specified by the user.
>
> Best,
> Guowei
>
>
> Steven Wu  于2019年11月23日周六 上午5:17写道:
>
>>
>> I have this DAG screenshot from Flink UI.
>> [image: image.png]
>> I am wondering why is the middle "icebergsink-writer" operator not
>> chained with the first operator chain?
>> Or an equivalent question is why is forward partitioner used here?
>>
>> The first operator chain are all map functions after source. The last two
>> operators are added like this.
>> --
>> dataStream
>> .transform("icebergsink-writer", TypeInformation.of(SomeClass.class),
>> writer)
>> .addSink(committer)
>> .name("icebergsink-committer")
>> .uid("icebergsink-committer")
>> .setParallelism(1);
>>
>> Thanks,
>> Steven
>>
>


why operator not chained?

2019-11-22 Thread Steven Wu
I have this DAG screenshot from Flink UI.
[image: image.png]
I am wondering why is the middle "icebergsink-writer" operator not chained
with the first operator chain?
Or an equivalent question is why is forward partitioner used here?

The first operator chain are all map functions after source. The last two
operators are added like this.
--
dataStream
.transform("icebergsink-writer", TypeInformation.of(SomeClass.class),
writer)
.addSink(committer)
.name("icebergsink-committer")
.uid("icebergsink-committer")
.setParallelism(1);

Thanks,
Steven


Re: [ANNOUNCE] Progress of Apache Flink 1.10 #2

2019-11-01 Thread Steven Wu
Gary,  FLIP-27 seems to get omitted in the 2nd update. below is the info
from update #1.

- FLIP-27: Refactor Source Interface [20]
-  FLIP accepted. Implementation is in progress.



On Fri, Nov 1, 2019 at 7:01 AM Gary Yao  wrote:

> Hi community,
>
> Because we have approximately one month of development time left until the
> targeted Flink 1.10 feature freeze, we thought now would be a good time to
> give another progress update. Below we have included a list of the ongoing
> efforts that have made progress since our last release progress update
> [1]. As
> always, if you are working on something that is not included here, feel
> free
> to use this thread to share your progress.
>
> - Support Java 11 [2]
> - Implementation is in progress (18/21 subtasks resolved)
>
> - Table API improvements
> - Full Data Type Support in Planner [3]
> - Implementing (1/8 subtasks resolved)
> - FLIP-66 Support Time Attribute in SQL DDL [4]
> - Implementation is in progress (1/7 subtasks resolved).
> - FLIP-70 Support Computed Column [5]
> - FLIP voting [6]
> - FLIP-63 Rework Table Partition Support [7]
> - Implementation is in progress (3/15 subtasks resolved).
> - FLIP-51 Rework of Expression Design [8]
> - Implementation is in progress (2/12 subtasks resolved).
> - FLIP-64 Support for Temporary Objects in Table Module [9]
> - Implementation is in progress
>
> - Hive compatibility completion (DDL/UDF) to support full Hive integration
> - FLIP-57 Rework FunctionCatalog [10]
> - Implementation is in progress (6/9 subtasks resolved)
> - FLIP-68 Extend Core Table System with Modular Plugins [11]
> - Implementation is in progress (2/8 subtasks resolved)
>
> - Finer grained resource management
> - FLIP-49: Unified Memory Configuration for TaskExecutors [12]
> - Implementation is in progress (6/10 subtasks resolved)
> - FLIP-53: Fine Grained Operator Resource Management [13]
> - Implementation is in progress (1/9 subtasks resolved)
>
> - Finish scheduler re-architecture [14]
> - Integration tests are being enabled for new scheduler
>
> - Executor/Client refactoring [15]
> - FLIP-81: Executor-related new ConfigOptions [16]
> - done
> - FLIP-73: Introducing Executors for job submission [17]
> - Implementation is in progress
>
> - FLIP-36 Support Interactive Programming [18]
> - Is built on top of FLIP-67 [19], which has been accepted
> - Implementation in progress
>
> - FLIP-58: Flink Python User-Defined Stateless Function for Table [20]
> - Implementation is in progress (12/22 subtask resolved)
> - FLIP-50: Spill-able Heap Keyed State Backend [21]
> - Implementation is in progress (2/11 subtasks resolved)
>
> - RocksDB Backend Memory Control [22]
> - FLIP for resource management on state backend will be opened soon
> - Write Buffer Manager will be backported to FRocksDB due to
> performance regression [23] in new RocksDB versions
>
> - Unaligned Checkpoints
> - FLIP-76 [24] was published and received positive feedback
> - Implementation is in progress
>
> - Separate framework and user class loader in per-job mode [25]
> - First PR is almost done. Remaining PRs will be ready next week
>
> - Active Kubernetes Integration [26]
> - Implementation is in progress (6/11 in review, 3/11 in progress,
> 2/11 todo)
>
> - FLIP-39 Flink ML pipeline and ML libs [27]
> - A few abstract ML classes have been merged (FLINK-13339,
> FLINK-13513)
> - Starting review of algorithms
>
> Again, the feature freeze is targeted to be at the end of November. Please
> make sure that all important work threads can be completed until that date.
> Feel free to use this thread to communicate any concerns about features
> that
> might not be finished until then. We will send another announcement later
> in
> the release cycle to make the date of the feature freeze official.
>
> Best,
> Yu & Gary
>
> [1] https://s.apache.org/wc0dc
> [2] https://issues.apache.org/jira/browse/FLINK-10725
> [3] https://issues.apache.org/jira/browse/FLINK-14079
> [4]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+time+attribute+in+SQL+DDL
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-70%3A+Flink+SQL+Computed+Column+Design
> [6]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-70-Flink-SQL-Computed-Column-Design-td34385.html
> [7]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
> [8]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-51%3A+Rework+of+the+Expression+Design
> [9]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
> [10]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-57%3A+Rework+FunctionCatalog
> [11]
> 

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-25 Thread Steven Wu
Zhu Zhu, that is correct.

On Tue, Sep 24, 2019 at 8:04 PM Zhu Zhu  wrote:

> Hi Steven,
>
> As a conclusion, since we will have a meter metric[1] for restarts,
> customized restart strategy is not needed in your case.
> Is that right?
>
> [1] https://issues.apache.org/jira/browse/FLINK-14164
>
> Thanks,
> Zhu Zhu
>
> Steven Wu  于2019年9月25日周三 上午2:30写道:
>
>> Zhu Zhu,
>>
>> Sorry, I was using different terminology. yes, Flink meter is what I was
>> talking about regarding "fullRestarts" for threshold based alerting.
>>
>> On Mon, Sep 23, 2019 at 7:46 PM Zhu Zhu  wrote:
>>
>>> Steven,
>>>
>>> In my mind, Flink counter only stores its accumulated count and reports
>>> that value. Are you using an external counter directly?
>>> Maybe Flink Meter/MeterView is what you need? It stores the count and
>>> calculates the rate. And it will report its "count" as well as "rate" to
>>> external metric services.
>>>
>>> The counter "task_failures" only works if the individual failover
>>> strategy is enabled. However, it is not a public interface and is not
>>> suggested to use, as the fine grained recovery (region failover) now
>>> supersedes it.
>>> I've opened a ticket[1] to add a metric to show failovers that respects
>>> fine grained recovery.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-14164
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Steven Wu  于2019年9月24日周二 上午6:41写道:
>>>
>>>>
>>>> When we setup alert like "fullRestarts > 1" for some rolling window, we
>>>> want to use counter. if it is a Gauge, "fullRestarts" will never go below 1
>>>> after a first full restart. So alert condition will always be true after
>>>> first job restart. If we can apply a derivative to the Gauge value, I guess
>>>> alert can probably work. I can explore if that is an option or not.
>>>>
>>>> Yeah. Understood that "fullRestart" won't increment when fine grained
>>>> recovery happened. I think "task_failures" counter already exists in Flink.
>>>>
>>>>
>>>>
>>>> On Sun, Sep 22, 2019 at 7:59 PM Zhu Zhu  wrote:
>>>>
>>>>> Steven,
>>>>>
>>>>> Thanks for the information. If we can determine this a common issue,
>>>>> we can solve it in Flink core.
>>>>> To get to that state, I have two questions which need your help:
>>>>> 1. Why is gauge not good for alerting? The metric "fullRestart" is a
>>>>> Gauge. Does the metric reporter you use report Counter and
>>>>> Gauge to external services in different ways? Or anything else can 
>>>>> be
>>>>> different due to the metric type?
>>>>> 2. Is the "number of restarts" what you actually need, rather than
>>>>> the "fullRestart" count? If so, I believe we will have such a counter
>>>>> metric in 1.10, since the previous "fullRestart" metric value is not the
>>>>> number of restarts when grained recovery (feature added 1.9.0) is enabled.
>>>>> "fullRestart" reveals how many times entire job graph has been
>>>>> restarted. If grained recovery (feature added 1.9.0) is enabled, the graph
>>>>> would not be restarted when task failures happen and the "fullRestart"
>>>>> value will not increment in such cases.
>>>>>
>>>>> I'd appreciate if you can help with these questions and we can make
>>>>> better decisions for Flink.
>>>>>
>>>>> Thanks,
>>>>> Zhu Zhu
>>>>>
>>>>> Steven Wu  于2019年9月22日周日 上午3:31写道:
>>>>>
>>>>>> Zhu Zhu,
>>>>>>
>>>>>> Flink fullRestart metric is a Gauge, which is not good for alerting
>>>>>> on. We publish an equivalent Counter metric for alerting purpose.
>>>>>>
>>>>>> Thanks,
>>>>>> Steven
>>>>>>
>>>>>> On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:
>>>>>>
>>>>>>> Thanks Steven for the feedback!
>>>>>>> Could you share more information about the metrics you add in you
>>>>>>> customized restart strategy?
>>>>>>>
>>>>>>> Th

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-24 Thread Steven Wu
Zhu Zhu,

Sorry, I was using different terminology. yes, Flink meter is what I was
talking about regarding "fullRestarts" for threshold based alerting.

On Mon, Sep 23, 2019 at 7:46 PM Zhu Zhu  wrote:

> Steven,
>
> In my mind, Flink counter only stores its accumulated count and reports
> that value. Are you using an external counter directly?
> Maybe Flink Meter/MeterView is what you need? It stores the count and
> calculates the rate. And it will report its "count" as well as "rate" to
> external metric services.
>
> The counter "task_failures" only works if the individual failover strategy
> is enabled. However, it is not a public interface and is not suggested to
> use, as the fine grained recovery (region failover) now supersedes it.
> I've opened a ticket[1] to add a metric to show failovers that respects
> fine grained recovery.
>
> [1] https://issues.apache.org/jira/browse/FLINK-14164
>
> Thanks,
> Zhu Zhu
>
> Steven Wu  于2019年9月24日周二 上午6:41写道:
>
>>
>> When we setup alert like "fullRestarts > 1" for some rolling window, we
>> want to use counter. if it is a Gauge, "fullRestarts" will never go below 1
>> after a first full restart. So alert condition will always be true after
>> first job restart. If we can apply a derivative to the Gauge value, I guess
>> alert can probably work. I can explore if that is an option or not.
>>
>> Yeah. Understood that "fullRestart" won't increment when fine grained
>> recovery happened. I think "task_failures" counter already exists in Flink.
>>
>>
>>
>> On Sun, Sep 22, 2019 at 7:59 PM Zhu Zhu  wrote:
>>
>>> Steven,
>>>
>>> Thanks for the information. If we can determine this a common issue, we
>>> can solve it in Flink core.
>>> To get to that state, I have two questions which need your help:
>>> 1. Why is gauge not good for alerting? The metric "fullRestart" is a
>>> Gauge. Does the metric reporter you use report Counter and
>>> Gauge to external services in different ways? Or anything else can be
>>> different due to the metric type?
>>> 2. Is the "number of restarts" what you actually need, rather than
>>> the "fullRestart" count? If so, I believe we will have such a counter
>>> metric in 1.10, since the previous "fullRestart" metric value is not the
>>> number of restarts when grained recovery (feature added 1.9.0) is enabled.
>>> "fullRestart" reveals how many times entire job graph has been
>>> restarted. If grained recovery (feature added 1.9.0) is enabled, the graph
>>> would not be restarted when task failures happen and the "fullRestart"
>>> value will not increment in such cases.
>>>
>>> I'd appreciate if you can help with these questions and we can make
>>> better decisions for Flink.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Steven Wu  于2019年9月22日周日 上午3:31写道:
>>>
>>>> Zhu Zhu,
>>>>
>>>> Flink fullRestart metric is a Gauge, which is not good for alerting on.
>>>> We publish an equivalent Counter metric for alerting purpose.
>>>>
>>>> Thanks,
>>>> Steven
>>>>
>>>> On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:
>>>>
>>>>> Thanks Steven for the feedback!
>>>>> Could you share more information about the metrics you add in you
>>>>> customized restart strategy?
>>>>>
>>>>> Thanks,
>>>>> Zhu Zhu
>>>>>
>>>>> Steven Wu  于2019年9月20日周五 上午7:11写道:
>>>>>
>>>>>> We do use config like "restart-strategy:
>>>>>> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
>>>>>> metrics than the Flink provided ones.
>>>>>>
>>>>>> On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:
>>>>>>
>>>>>>> Thanks everyone for the input.
>>>>>>>
>>>>>>> The RestartStrategy customization is not recognized as a public
>>>>>>> interface as it is not explicitly documented.
>>>>>>> As it is not used from the feedbacks of this survey, I'll conclude
>>>>>>> that we do not need to support customized RestartStrategy for the new
>>>>>>> scheduler in Flink 1.10
>>>>>>>
>>>>>>> Othe

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-23 Thread Steven Wu
When we setup alert like "fullRestarts > 1" for some rolling window, we
want to use counter. if it is a Gauge, "fullRestarts" will never go below 1
after a first full restart. So alert condition will always be true after
first job restart. If we can apply a derivative to the Gauge value, I guess
alert can probably work. I can explore if that is an option or not.

Yeah. Understood that "fullRestart" won't increment when fine grained
recovery happened. I think "task_failures" counter already exists in Flink.



On Sun, Sep 22, 2019 at 7:59 PM Zhu Zhu  wrote:

> Steven,
>
> Thanks for the information. If we can determine this a common issue, we
> can solve it in Flink core.
> To get to that state, I have two questions which need your help:
> 1. Why is gauge not good for alerting? The metric "fullRestart" is a
> Gauge. Does the metric reporter you use report Counter and
> Gauge to external services in different ways? Or anything else can be
> different due to the metric type?
> 2. Is the "number of restarts" what you actually need, rather than
> the "fullRestart" count? If so, I believe we will have such a counter
> metric in 1.10, since the previous "fullRestart" metric value is not the
> number of restarts when grained recovery (feature added 1.9.0) is enabled.
> "fullRestart" reveals how many times entire job graph has been
> restarted. If grained recovery (feature added 1.9.0) is enabled, the graph
> would not be restarted when task failures happen and the "fullRestart"
> value will not increment in such cases.
>
> I'd appreciate if you can help with these questions and we can make better
> decisions for Flink.
>
> Thanks,
> Zhu Zhu
>
> Steven Wu  于2019年9月22日周日 上午3:31写道:
>
>> Zhu Zhu,
>>
>> Flink fullRestart metric is a Gauge, which is not good for alerting on.
>> We publish an equivalent Counter metric for alerting purpose.
>>
>> Thanks,
>> Steven
>>
>> On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:
>>
>>> Thanks Steven for the feedback!
>>> Could you share more information about the metrics you add in you
>>> customized restart strategy?
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Steven Wu  于2019年9月20日周五 上午7:11写道:
>>>
>>>> We do use config like "restart-strategy:
>>>> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
>>>> metrics than the Flink provided ones.
>>>>
>>>> On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:
>>>>
>>>>> Thanks everyone for the input.
>>>>>
>>>>> The RestartStrategy customization is not recognized as a public
>>>>> interface as it is not explicitly documented.
>>>>> As it is not used from the feedbacks of this survey, I'll conclude
>>>>> that we do not need to support customized RestartStrategy for the new
>>>>> scheduler in Flink 1.10
>>>>>
>>>>> Other usages are still supported, including all the strategies and
>>>>> configuring ways described in
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies
>>>>> .
>>>>>
>>>>> Feel free to share in this thread if you has any concern for it.
>>>>>
>>>>> Thanks,
>>>>> Zhu Zhu
>>>>>
>>>>> Zhu Zhu  于2019年9月12日周四 下午10:33写道:
>>>>>
>>>>>> Thanks Oytun for the reply!
>>>>>>
>>>>>> Sorry for not have stated it clearly. When saying "customized
>>>>>> RestartStrategy", we mean that users implement an
>>>>>> *org.apache.flink.runtime.executiongraph.restart.RestartStrategy* by
>>>>>> themselves and use it by configuring like "restart-strategy:
>>>>>> org.foobar.MyRestartStrategyFactoryFactory".
>>>>>>
>>>>>> The usage of restart strategies you mentioned will keep working with
>>>>>> the new scheduler.
>>>>>>
>>>>>> Thanks,
>>>>>> Zhu Zhu
>>>>>>
>>>>>> Oytun Tez  于2019年9月12日周四 下午10:05写道:
>>>>>>
>>>>>>> Hi Zhu,
>>>>>>>
>>>>>>> We are using custom restart strategy like this:
>>>>>>>
>>>>>>> environment.setRestartStrategy(failureRateRestart(2,
>>>>>>> Ti

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-21 Thread Steven Wu
Zhu Zhu,

Flink fullRestart metric is a Gauge, which is not good for alerting on. We
publish an equivalent Counter metric for alerting purpose.

Thanks,
Steven

On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:

> Thanks Steven for the feedback!
> Could you share more information about the metrics you add in you
> customized restart strategy?
>
> Thanks,
> Zhu Zhu
>
> Steven Wu  于2019年9月20日周五 上午7:11写道:
>
>> We do use config like "restart-strategy:
>> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
>> metrics than the Flink provided ones.
>>
>> On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:
>>
>>> Thanks everyone for the input.
>>>
>>> The RestartStrategy customization is not recognized as a public
>>> interface as it is not explicitly documented.
>>> As it is not used from the feedbacks of this survey, I'll conclude that
>>> we do not need to support customized RestartStrategy for the new scheduler
>>> in Flink 1.10
>>>
>>> Other usages are still supported, including all the strategies and
>>> configuring ways described in
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies
>>> .
>>>
>>> Feel free to share in this thread if you has any concern for it.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Zhu Zhu  于2019年9月12日周四 下午10:33写道:
>>>
>>>> Thanks Oytun for the reply!
>>>>
>>>> Sorry for not have stated it clearly. When saying "customized
>>>> RestartStrategy", we mean that users implement an
>>>> *org.apache.flink.runtime.executiongraph.restart.RestartStrategy* by
>>>> themselves and use it by configuring like "restart-strategy:
>>>> org.foobar.MyRestartStrategyFactoryFactory".
>>>>
>>>> The usage of restart strategies you mentioned will keep working with
>>>> the new scheduler.
>>>>
>>>> Thanks,
>>>> Zhu Zhu
>>>>
>>>> Oytun Tez  于2019年9月12日周四 下午10:05写道:
>>>>
>>>>> Hi Zhu,
>>>>>
>>>>> We are using custom restart strategy like this:
>>>>>
>>>>> environment.setRestartStrategy(failureRateRestart(2, Time.minutes(1),
>>>>> Time.minutes(10)));
>>>>>
>>>>>
>>>>> ---
>>>>> Oytun Tez
>>>>>
>>>>> *M O T A W O R D*
>>>>> The World's Fastest Human Translation Platform.
>>>>> oy...@motaword.com — www.motaword.com
>>>>>
>>>>>
>>>>> On Thu, Sep 12, 2019 at 7:11 AM Zhu Zhu  wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> I wanted to reach out to you and ask how many of you are using a
>>>>>> customized RestartStrategy[1] in production jobs.
>>>>>>
>>>>>> We are currently developing the new Flink scheduler[2] which
>>>>>> interacts with restart strategies in a different way. We have to 
>>>>>> re-design
>>>>>> the interfaces for the new restart strategies (so called
>>>>>> RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
>>>>>> work any more with the new scheduler.
>>>>>>
>>>>>> We want to know whether we should keep the way
>>>>>> to customized RestartBackoffTimeStrategy so that existing customized
>>>>>> RestartStrategy can be migrated.
>>>>>>
>>>>>> I'd appreciate if you can share the status if you are
>>>>>> using customized RestartStrategy. That will be valuable for use to make
>>>>>> decisions.
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-10429
>>>>>>
>>>>>> Thanks,
>>>>>> Zhu Zhu
>>>>>>
>>>>>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-19 Thread Steven Wu
We do use config like "restart-strategy:
org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
metrics than the Flink provided ones.

On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:

> Thanks everyone for the input.
>
> The RestartStrategy customization is not recognized as a public interface
> as it is not explicitly documented.
> As it is not used from the feedbacks of this survey, I'll conclude that we
> do not need to support customized RestartStrategy for the new scheduler in
> Flink 1.10
>
> Other usages are still supported, including all the strategies and
> configuring ways described in
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies
> .
>
> Feel free to share in this thread if you has any concern for it.
>
> Thanks,
> Zhu Zhu
>
> Zhu Zhu  于2019年9月12日周四 下午10:33写道:
>
>> Thanks Oytun for the reply!
>>
>> Sorry for not have stated it clearly. When saying "customized
>> RestartStrategy", we mean that users implement an
>> *org.apache.flink.runtime.executiongraph.restart.RestartStrategy* by
>> themselves and use it by configuring like "restart-strategy:
>> org.foobar.MyRestartStrategyFactoryFactory".
>>
>> The usage of restart strategies you mentioned will keep working with the
>> new scheduler.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Oytun Tez  于2019年9月12日周四 下午10:05写道:
>>
>>> Hi Zhu,
>>>
>>> We are using custom restart strategy like this:
>>>
>>> environment.setRestartStrategy(failureRateRestart(2, Time.minutes(1),
>>> Time.minutes(10)));
>>>
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oy...@motaword.com — www.motaword.com
>>>
>>>
>>> On Thu, Sep 12, 2019 at 7:11 AM Zhu Zhu  wrote:
>>>
 Hi everyone,

 I wanted to reach out to you and ask how many of you are using a
 customized RestartStrategy[1] in production jobs.

 We are currently developing the new Flink scheduler[2] which interacts
 with restart strategies in a different way. We have to re-design the
 interfaces for the new restart strategies (so called
 RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
 work any more with the new scheduler.

 We want to know whether we should keep the way
 to customized RestartBackoffTimeStrategy so that existing customized
 RestartStrategy can be migrated.

 I'd appreciate if you can share the status if you are using customized
 RestartStrategy. That will be valuable for use to make decisions.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
 [2] https://issues.apache.org/jira/browse/FLINK-10429

 Thanks,
 Zhu Zhu

>>>


Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-02 Thread Steven Wu
1s sounds a good tradeoff to me.

On Mon, Sep 2, 2019 at 1:30 PM Till Rohrmann  wrote:

> Thanks a lot for all your feedback. I see there is a slight tendency
> towards having a non zero default delay so far.
>
> However, Yu has brought up some valid points. Maybe I can shed some light
> on a).
>
> Before FLINK-9158 we set the default delay to 10s because Flink did not
> support queued scheduling which meant that if one slot was missing/still
> being occupied, then Flink would fail right away with
> a NoResourceAvailableException. In order to prevent this we added the
> delay. This also covered the case when the job was failing because of an
> overloaded external system.
>
> When we finished FLIP-6, we thought that we could improve the user
> experience by decreasing the default delay to 0s because all Flink related
> problems (slot still occupied, slot missing because of reconnecting TM)
> could be handled by the default slot request time out which allowed the
> slots to become ready after the scheduling was kicked off. However, we did
> not properly take the case of overloaded external systems into account.
>
> For b) I agree that any default value should be properly documented. This
> was clearly an oversight when FLINK-9158 has been merged. Moreover, I
> believe that there won't be the solve it all default value. There are
> always cases where one needs to adapt it to ones needs. But this is ok. The
> goal should be to find the default value which works for most cases.
>
> So maybe the middle ground between 10s and 0s could be a solution. Setting
> the default restart delay to 1s should prevent restart storms caused by
> overloaded external systems and still be fast enough to not slow down
> recoveries noticeably in most cases. If one needs a super fast recovery,
> then one should set the delay value to 0s. If one requires a longer delay
> because of a particular infrastructure, then one needs to change the value
> too. What do you think?
>
> Cheers,
> Till
>
> On Sun, Sep 1, 2019 at 11:56 PM Yu Li  wrote:
>
>> -1 on increasing the default delay to none zero, with below reasons:
>>
>> a) I could see some concerns about setting the delay to zero in the very
>> original JIRA (FLINK-2993
>> <https://issues.apache.org/jira/browse/FLINK-2993>) but later on in
>> FLINK-9158 <https://issues.apache.org/jira/browse/FLINK-9158> we still
>> decided to make the change, so I'm wondering whether the decision also came
>> from any customer requirement? If so, how could we judge whether one
>> requirement override the other?
>>
>> b) There could be valid reasons for both default values depending on
>> different use cases, as well as relative work around (like based on latest
>> policy, setting the config manually to 10s could resolve the problem
>> mentioned), and from former replies to this thread we could see users have
>> already taken actions. Changing it back to non-zero again won't affect such
>> users but might cause surprises to those depending on 0 as default.
>>
>> Last but not least, no matter what decision we make this time, I'd
>> suggest to make it final and document in our release note explicitly.
>> Checking the 1.5.0 release note [1] [2] it seems we didn't mention about
>> the change on default restart delay and we'd better learn from it this
>> time. Thanks.
>>
>> [1]
>> https://flink.apache.org/news/2018/05/25/release-1.5.0.html#release-notes
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html
>>
>> Best Regards,
>> Yu
>>
>>
>> On Sun, 1 Sep 2019 at 04:33, Steven Wu  wrote:
>>
>>> +1 on what Zhu Zhu said.
>>>
>>> We also override the default to 10 s.
>>>
>>> On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu  wrote:
>>>
>>>> In our production, we usually override the restart delay to be 10 s.
>>>> We once encountered cases that external services are overwhelmed by
>>>> reconnections from frequent restarted tasks.
>>>> As a safer though not optimized option, a default delay larger than 0 s
>>>> is better in my opinion.
>>>>
>>>>
>>>> 未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:
>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> I thinks it's better to increase the default value. +1
>>>>>
>>>>>
>>>>> Best.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> -- 原始邮件 --
>>>>&

Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-08-31 Thread Steven Wu
+1 on what Zhu Zhu said.

We also override the default to 10 s.

On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu  wrote:

> In our production, we usually override the restart delay to be 10 s.
> We once encountered cases that external services are overwhelmed by
> reconnections from frequent restarted tasks.
> As a safer though not optimized option, a default delay larger than 0 s is
> better in my opinion.
>
>
> 未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:
>
>> Hi,
>>
>>
>> I thinks it's better to increase the default value. +1
>>
>>
>> Best.
>>
>>
>>
>>
>> -- 原始邮件 --
>> 发件人: "Till Rohrmann";
>> 发送时间: 2019年8月30日(星期五) 晚上10:07
>> 收件人: "dev"; "user";
>> 主题: [SURVEY] Is the default restart delay of 0s causing problems?
>>
>>
>>
>> Hi everyone,
>>
>> I wanted to reach out to you and ask whether decreasing the default delay
>> to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
>> user reported that he would like to increase the default value because it
>> can cause restart storms in case of systematic faults [2].
>>
>> The downside of increasing the default delay would be a slightly increased
>> restart time if this config option is not explicitly set.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9158
>> [2] https://issues.apache.org/jira/browse/FLINK-11218
>>
>> Cheers,
>> Till
>
>


Re: Help debugging Kafka connection leaks after job failure/cancelation

2019-03-26 Thread Steven Wu
it might be related to this issue
https://issues.apache.org/jira/browse/FLINK-10774

On Tue, Mar 26, 2019 at 4:35 PM Fritz Budiyanto  wrote:

> Hi All,
>
> We're using Flink-1.4.2 and noticed many dangling connections to Kafka
> after job deletion/recreation. The trigger here is Job cancelation/failure
> due to network down event followed by Job recreation.
>
> Our flink job has checkpointing disabled, and upon job failure (due to
> network failure), the Job got deleted and re-created. There were network
> failure event which impacting communication between task manager(s) and
> task-manager <-> job-manager. Our custom job controller monitored this
> condition and tried to cancel the job, followed by recreating the job
> (after a minute or so).
>
> Because of the network failure, the above steps were repeated many times
> and eventually the flink-docker-container's socket file descriptors were
> exhausted.
> Looks like there were many Kafka connections from flink-task-manager to
> the local Kafka broker.
>
> netstat  -ntap | grep 9092 | grep java | wc -l
> 2235
>
> Is this a known issue which already fixed in later release ? If yes, could
> someone point out the Jira link?
> If this is a new issue, could someone let me know how to move forward and
> debug this issue ? Looks like kafka consumers were not cleaned up properly
> upon job cancelation.
>
> Thanks,
> Fritz


Re: [1.7.1] job stuck in suspended state

2019-03-04 Thread Steven Wu
Till,

I will send you the complete log offline. We don't know how to reliably
reproduce the problem. but it did happen quite frequently, like once every
a couple of days. Let me see if I can cherry pick the fix/commit to 1.7
branch.

Thanks,
Steven


On Mon, Mar 4, 2019 at 5:55 AM Till Rohrmann  wrote:

> Hi Steven,
>
> is this the tail of the logs or are there other statements following? I
> think your problem could indeed be related to FLINK-11537. Is it possible
> to somehow reliably reproduce this problem? If yes, then you could try out
> the RC for Flink 1.8.0 which should be published in the next days.
>
> Cheers,
> Till
>
> On Sat, Mar 2, 2019 at 12:47 AM Steven Wu  wrote:
>
>> We have observe that sometimes job stuck in suspended state, and no job
>> restart/recover were attempted once job is suspended.
>> * it is a high-parallelism job (like close to 2,000)
>> * there were a few job restarts before this
>> * there were high GC pause during the period
>> * zookeeper timeout. probably caused by high GC pause
>>
>> Is it related to https://issues.apache.org/jira/browse/FLINK-11537?
>>
>> I pasted some logs in the end.
>>
>> Thanks,
>> Steven
>>
>> 2019-02-28 19:04:36,357 WARN
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL
>> configuration failed: javax.security.auth.login.LoginException: No JAAS
>> configuration section named 'Client' was found in speci
>> fied JAAS configuration file: '/tmp/jaas-6664341082794720643.conf'. Will
>> continue connection to Zookeeper server without SASL authentication, if
>> Zookeeper server allows it.
>> 2019-02-28 19:04:36,357 INFO
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>> Opening socket connection to server 100.82.141.106/100.82.141.106:2181
>> 2019-02-28 19:04:36,357 ERROR
>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
>> Authentication failed
>> 2019-02-28 19:04:36,357 INFO
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket
>> connection established to 100.82.141.106/100.82.141.106:2181, initiating
>> session
>> 2019-02-28 19:04:36,359 INFO
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>> Session establishment complete on server
>> 100.82.141.106/100.82.141.106:2181, sessionid = 0x365ef9c4fe7f1f2,
>> negotiated timeout = 4
>> 2019-02-28 19:04:36,359 INFO
>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>> - State change: RECONNECTED
>> 2019-02-28 19:04:36,359 INFO
>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>> Connection to ZooKeeper was reconnected. Leader election can be restarted.
>> 2019-02-28 19:04:36,359 INFO
>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>> Connection to ZooKeeper was reconnected. Leader election can be restarted.
>> 2019-02-28 19:04:36,359 INFO
>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>> Connection to ZooKeeper was reconnected. Leader election can be restarted.
>> 2019-02-28 19:04:36,359 INFO
>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>> Connection to ZooKeeper was reconnected. Leader election can be restarted.
>> 2019-02-28 19:04:36,359 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
>> 2019-02-28 19:04:36,359 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
>> 2019-02-28 19:04:36,359 INFO
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>> ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are
>> monitored again.
>> 2019-02-28 19:04:36,360 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Connection to ZooKeeper was reconnected. Leader retrieval can be restarte
>> ...
>> 2019-02-28 19:05:09,400 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>> 2019-02-28 19:05:09,400 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> cybertron-flink (0e594c065c7f8319a12fa47e089ca9b0) switched from state
>> RESTARTING to SUSPENDING.
>> org.apache.flink.util.FlinkException: JobManager is no longer the leader.
>> at

[1.7.1] job stuck in suspended state

2019-03-01 Thread Steven Wu
We have observe that sometimes job stuck in suspended state, and no job
restart/recover were attempted once job is suspended.
* it is a high-parallelism job (like close to 2,000)
* there were a few job restarts before this
* there were high GC pause during the period
* zookeeper timeout. probably caused by high GC pause

Is it related to https://issues.apache.org/jira/browse/FLINK-11537?

I pasted some logs in the end.

Thanks,
Steven

2019-02-28 19:04:36,357 WARN
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL
configuration failed: javax.security.auth.login.LoginException: No JAAS
configuration section named 'Client' was found in speci
fied JAAS configuration file: '/tmp/jaas-6664341082794720643.conf'. Will
continue connection to Zookeeper server without SASL authentication, if
Zookeeper server allows it.
2019-02-28 19:04:36,357 INFO
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
Opening socket connection to server 100.82.141.106/100.82.141.106:2181
2019-02-28 19:04:36,357 ERROR
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
Authentication failed
2019-02-28 19:04:36,357 INFO
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket
connection established to 100.82.141.106/100.82.141.106:2181, initiating
session
2019-02-28 19:04:36,359 INFO
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
Session establishment complete on server 100.82.141.106/100.82.141.106:2181,
sessionid = 0x365ef9c4fe7f1f2, negotiated timeout = 4
2019-02-28 19:04:36,359 INFO
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
- State change: RECONNECTED
2019-02-28 19:04:36,359 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
Connection to ZooKeeper was reconnected. Leader election can be restarted.
2019-02-28 19:04:36,359 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
Connection to ZooKeeper was reconnected. Leader election can be restarted.
2019-02-28 19:04:36,359 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
Connection to ZooKeeper was reconnected. Leader election can be restarted.
2019-02-28 19:04:36,359 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
Connection to ZooKeeper was reconnected. Leader election can be restarted.
2019-02-28 19:04:36,359 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2019-02-28 19:04:36,359 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2019-02-28 19:04:36,359 INFO
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are
monitored again.
2019-02-28 19:04:36,360 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Connection to ZooKeeper was reconnected. Leader retrieval can be restarte
...
2019-02-28 19:05:09,400 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2019-02-28 19:05:09,400 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
cybertron-flink (0e594c065c7f8319a12fa47e089ca9b0) switched from state
RESTARTING to SUSPENDING.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeLeadership(JobManagerRunner.java:371)
at
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.notLeader(ZooKeeperLeaderElectionService.java:247)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:640)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:636)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93)
at
org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:635)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.handleStateChange(LeaderLatch.java:623)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.access$000(LeaderLatch.java:64)
at

Re: [Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

2019-01-24 Thread Steven Wu
Hi Andrey,

Weird that I didn't see your reply in my email inbox. My colleague happened
to see it in apache archive :)

nope, we didn't experience it with 1.4 (previous version)

Yes, we did use HA setup.

high-availability: zookeeper
high-availability.zookeeper.quorum: ...
high-availability.zookeeper.path.root: ...
high-availability.zookeeper.path.latch: /leaderlatch
high-availability.zookeeper.path.leader: /leader
high-availability.zookeeper.path.jobgraphs: /jobgraphs
high-availability.zookeeper.path.checkpoints: /checkpoints
recovery.zookeeper.path.checkpoint-counter: /checkpoint-counter
high-availability.storageDir: ...


My colleague (Mark Cho) will provide some additional observations.

Thanks,
Steven


Hi Steven,

Did you not experience this problem with previous Flink release (your
marked topic with 1.7)?

Do you use HA setup?

Without HA setup, the blob data, which belongs to the job, will be
distributed from job master node to all task executors.
Depending on the size of the blob data (jars, user serialised classes etc),
it might overwhelm job master node and network connections.
It can subsequently slow down UI, heart-beating and initialisation of task
executors and produced partitions because task executors contend for the
blob data. When the job is restored, the blob data might be not fetched
because it is already available.

With HA setup, you can configure high-availability.storageDir in DFS and
DFS will serve the blob data.

Otherwise, could you provide the JM log for the further investigation?

Best,
Andrey

On Wed, Jan 23, 2019 at 10:06 PM Steven Wu  wrote:

> When we start a high-parallelism (1,600) job without any
> checkpoint/savepoint, the job struggled to be deployed. After a few
> restarts, it eventually got deployed and was running fine after the initial
> struggle. jobmanager was very busy. Web UI was very slow. I saw these two
> exceptions/failures during the initial failures.
>
> I don't seem to see this issue when starting the same job from an external
> checkpoint. or at least very rarely.
>
> Anyone else experienced similar issue?
>
> Thanks,
> Steven
>
> Exception #1
>
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> fe55bf158e89cf555be6582e577b9621 timed out.
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)
>
> at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Exception #2
>
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627
> not found.
>
> at
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)
>
> at java.util.TimerThread.mainLoop(Timer.java:555)
>
> at java.util.TimerThread.run(Timer.java:505)
>
>


[Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

2019-01-23 Thread Steven Wu
When we start a high-parallelism (1,600) job without any
checkpoint/savepoint, the job struggled to be deployed. After a few
restarts, it eventually got deployed and was running fine after the initial
struggle. jobmanager was very busy. Web UI was very slow. I saw these two
exceptions/failures during the initial failures.

I don't seem to see this issue when starting the same job from an external
checkpoint. or at least very rarely.

Anyone else experienced similar issue?

Thanks,
Steven

Exception #1

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
fe55bf158e89cf555be6582e577b9621 timed out.

at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)

at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Exception #2

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627
not found.

at
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)

at
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)

at
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)

at java.util.TimerThread.mainLoop(Timer.java:555)

at java.util.TimerThread.run(Timer.java:505)


Re: delete all available flink timers on app start

2019-01-17 Thread Steven Wu
Vipul,

it sounds like you don't want to checkpoint timer at all. since 1.7, we can
configure timer state backend (HEAP/ROCKSDB). I guess a new option (NONE)
can be added to support such requirement.

but it is interesting to see your reasons. can you elaborate?

thanks,
Steven

On Thu, Jan 17, 2019 at 5:53 AM Fabian Hueske  wrote:

> Hi Vipul,
>
> I'm not aware of a way to do this.
> You could have a list of all registered timers per key as state to be able
> to delete them.
> However, the problem is to identify in user code when an application was
> restarted, i.e., to know when to delete timers.
> Also, timer deletion would need to be done per key.
>
> Best,
> Fabian
>
> Am Do., 17. Jan. 2019 um 08:27 Uhr schrieb vipul singh  >:
>
>> Hello,
>>
>> I have a custom app, in which when due to some exception, the app
>> restarts I want to cancel all registered flink timers in the
>> initializeState method. Based on the documentation I feel like all timer
>> state is saved in the state, so if the app restarts the timers are still
>> active.
>>
>> Is there a way to delete all available timers on app crash and restart?
>>
>> --
>> Thanks,
>> Vipul
>>
>


Re: [Flink 1.7.0] always got 10s ask timeout exception when submitting job with checkpoint via REST

2019-01-10 Thread Steven Wu
Gary, thanks a lot. web.timeout seems to help.

now I ran into a diff issue with loading the checkpoint. will take that
separately.

On Thu, Jan 10, 2019 at 12:25 PM Gary Yao  wrote:

> Hi all,
>
> I think increasing the default value of the config option web.timeout [1]
> is
> what you are looking for.
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java#L76
> [2]
> https://github.com/apache/flink/blob/a07ce7f6c88dc7d0c0d2ba55a0ab3f2283bf247c/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java#L177
>
> On Thu, Jan 10, 2019 at 9:19 PM Aaron Levin  wrote:
>
>> We are also experiencing this! Thanks for speaking up! It's relieving to
>> know we're not alone :)
>>
>> We tried adding `akka.ask.timeout: 1 min` to our `flink-conf.yaml`, which
>> did not seem to have any effect. I tried adding every other related akka,
>> rpc, etc. timeout and still continue to encounter these errors. I believe
>> they may also impact our ability to deploy (as we get a timeout when
>> submitting the job programmatically). I'd love to see a solution to this if
>> one exists!
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Thu, Jan 10, 2019 at 2:58 PM Steven Wu  wrote:
>>
>>> We are trying out Flink 1.7.0. We always get this exception when
>>> submitting a job with external checkpoint via REST. Job parallelism is
>>> 1,600. state size is probably in the range of 1-5 TBs. Job is actually
>>> started. Just REST api returns this failure.
>>>
>>> If we submitting the job without external checkpoint, everything works
>>> fine.
>>>
>>> Anyone else see such problem with 1.7? Appreciate your help!
>>>
>>> Thanks,
>>> Steven
>>>
>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>> akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
>>> Sender[null] sent message of type
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>> at
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$4(JarRunHandler.java:114)
>>> at
>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>> at
>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>> at
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at
>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>>> at
>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>> at
>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>> at
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
>>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>> at
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>> at
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>> at
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>> at java.lang.Thread.run(Thread.

[Flink 1.7.0] always got 10s ask timeout exception when submitting job with checkpoint via REST

2019-01-10 Thread Steven Wu
We are trying out Flink 1.7.0. We always get this exception when submitting
a job with external checkpoint via REST. Job parallelism is 1,600. state
size is probably in the range of 1-5 TBs. Job is actually started. Just
REST api returns this failure.

If we submitting the job without external checkpoint, everything works
fine.

Anyone else see such problem with 1.7? Appreciate your help!

Thanks,
Steven

org.apache.flink.runtime.rest.handler.RestHandlerException:
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$4(JarRunHandler.java:114)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException:
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 21 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
... 9 more


Re: backpressure metrics

2018-11-21 Thread Steven Wu
Nargarjun, thanks a lot for the reply, which makes sense to me. Yes, we are
running with AT_LEAST_ONCE mode.

On Wed, Nov 21, 2018 at 3:19 PM Nagarjun Guraja  wrote:

> Hi Steven,
>
> The metric 'Buffered During Alignment' you are talking about will always
> be zero when the job is run in ATLEAST_ONCE mode. Is that the case with
> your job? My understanding is, backpressure can only be monitored by
> sampling thread stacktraces and interpreting the situation based on the
> contention for network buffers on demand.
>
> Regards,
> Nagarjun
>
> *Success is not final, failure is not fatal: it is the courage to continue
> that counts. *
> *- Winston Churchill - *
>
>
> On Wed, Nov 21, 2018 at 1:50 PM Steven Wu  wrote:
>
>>
>> Flink has two backpressure related metrics: “
>> lastCheckpointAlignmentBuffered” and “checkpointAlignmentTime”. But they
>> seems to always report zero. Similar thing in web UI, “Buffered During
>> Alignment” always shows zero, even backpressure testing shows high
>> backpressure for some operators. Has anyone else seen similar problem?
>>
>> We are running flink 1.4.0 with some cherry-picked fixes. there was a bug
>> and fix for 1.5 and above, which shouldn't affect us
>> https://issues.apache.org/jira/browse/FLINK-10135
>>
>> Thanks,
>> Steven
>>
>


backpressure metrics

2018-11-21 Thread Steven Wu
Flink has two backpressure related metrics: “lastCheckpointAlignmentBuffered”
and “checkpointAlignmentTime”. But they seems to always report zero.
Similar thing in web UI, “Buffered During Alignment” always shows zero,
even backpressure testing shows high backpressure for some operators. Has
anyone else seen similar problem?

We are running flink 1.4.0 with some cherry-picked fixes. there was a bug
and fix for 1.5 and above, which shouldn't affect us
https://issues.apache.org/jira/browse/FLINK-10135

Thanks,
Steven


Re: Live configuration change

2018-11-06 Thread Steven Wu
for rate limiting, would quota at Kafka brokers help?

On Tue, Nov 6, 2018 at 10:29 AM Ning Shi  wrote:

> On Tue, Nov 06, 2018 at 07:44:50PM +0200, Nicos Maris wrote:
> > Ning can you provide another example except for rate limiting?
>
> Our main use case and concern is rate limiting because without it we
> could potentially overwhelm downstream systems (Cassandra) when the job
> plays catch up or replay events from Kafka. The consequence of that is
> that exceptions will be thrown in the Cassandra sink causing the whole
> job to restart and end up in the same situation over and over.
>
> With that said, we do have other use cases such as doing canary
> deployment. We'd like to start processing events for a subset of users,
> then expand it to all if things look good. Without live configuration,
> we have to take a savepoint and restart the job, which will cause the
> job to play catch up at the beginning, potentially overwhelming
> downstream system without rate limiting.
>
> Hope the use cases described above clarifies.
>
> Thanks,
>
> --
> Ning
>


Re: Savepoint failed with error "Checkpoint expired before completing"

2018-11-04 Thread Steven Wu
FYI, here is the jira to support timeout in savepoint REST api
https://issues.apache.org/jira/browse/FLINK-10360

On Fri, Nov 2, 2018 at 6:37 PM Gagan Agrawal  wrote:

> Great, thanks for sharing that info.
>
> Gagan
>
> On Thu, Nov 1, 2018 at 1:50 PM Yun Tang  wrote:
>
>> Haha, actually externalized checkpoint also support parallelism changes,
>> you could read my email
>> 
>> posted in dev-mail-list.
>>
>> Best
>> Yun Tang
>> --
>> *From:* Gagan Agrawal 
>> *Sent:* Thursday, November 1, 2018 13:38
>> *To:* myas...@live.com
>> *Cc:* happydexu...@gmail.com; user@flink.apache.org
>> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
>> completing"
>>
>> Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are
>> able to save save points now. In our case we wanted to increase parallelism
>> so I believe savepoint is the only option as checkpoint doesn't support
>> code/parallelism changes.
>>
>> Gagan
>>
>> On Wed, Oct 31, 2018 at 8:46 PM Yun Tang  wrote:
>>
>> Hi Gagan
>>
>> Savepoint would generally takes more time than usual incremental
>> checkpoint, you could try to increase checkpoint timeout time [1]
>>
>>env.getCheckpointConfig().setCheckpointTimeout(90);
>>
>> If you just want to resume from previous job without change the 
>> state-backend, I think you could also try to resume from a retained 
>> checkpoint without trigger savepoint [2].
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>> Apache Flink 1.6 Documentation: Checkpoints
>> 
>> Deployment & Operations; State & Fault Tolerance; Checkpoints;
>> Checkpoints. Overview; Retained Checkpoints. Directory Structure;
>> Difference to Savepoints; Resuming from a retained checkpoint
>> ci.apache.org
>>
>> Best
>> Yun Tang
>>
>> --
>> *From:* Gagan Agrawal 
>> *Sent:* Wednesday, October 31, 2018 19:03
>> *To:* happydexu...@gmail.com
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
>> completing"
>>
>> Hi Henry,
>> Thanks for your response. However we don't face this issue during normal
>> run as we have incremental checkpoints. Only when we try to take savepoint
>> (which tries to save entire state in one go), we face this problem.
>>
>> Gagan
>>
>> On Wed, Oct 31, 2018 at 11:41 AM 徐涛  wrote:
>>
>> Hi Gagan,
>> I have met with the error the checkpoint timeout too.
>> In my case, it is not due to big checkpoint size,  but due to
>> slow sink then cause high backpressure to the upper operator. Then the
>> barrier may take a long time to arrive to sink.
>> Please check if it is the case you have met.
>>
>> Best
>> Henry
>>
>> > 在 2018年10月30日,下午6:07,Gagan Agrawal  写道:
>> >
>> > Hi,
>> > We have a flink job (flink version 1.6.1) which unions 2 streams to
>> pass through custom KeyedProcessFunction with RocksDB state store which
>> final creates another stream into Kafka. Current size of checkpoint is
>> around ~100GB and checkpoints are saved to s3 with 5 mins interval and
>> incremental checkpoint enabled. Checkpoints mostly finish in less than 1
>> min. We are running this job on yarn with following parameters
>> >
>> > -yn 10  (10 task managers)
>> > -ytm 2048 (2 GB each)
>> > - Operator parallelism is also 10.
>> >
>> > While trying to run savepoint on this job, it runs for ~10mins and then
>> throws following error. Looks like checkpoint default timeout of 10mins is
>> causing this. What is recommended way to run savepoint for such job? Should
>> we increase checkpoint default timeout of 10mins? Also currently our state
>> size is 100GB but it is expected to grow unto 1TB. Is flink good for
>> usecases with that much of size? Also how much time savepoint is expected
>> to take with such state size and parallelism on Yarn? Any other
>> recommendation would be of great help.
>> >
>> > org.apache.flink.util.FlinkException: Triggering a savepoint for the
>> job 434398968e635a49329f59a019b41b6f failed.
>> >   at
>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>> >   at
>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>> >   at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>> >   at
>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>> >   at
>> 

Re: help understand/debug high memory footprint on jobmanager

2018-06-29 Thread Steven Wu
Thanks everyone for jumping in. BTW, we are using flink-1.4.1. deployment
is stand-alone mode.

here is the JIRA: https://issues.apache.org/jira/browse/FLINK-9693

On Fri, Jun 29, 2018 at 12:09 PM, Stephan Ewen  wrote:

> Just saw Stefan's response, it is basically the same.
>
> We either null out the field on deploy or archival. On deploy would be
> even more memory friendly.
>
> @Steven - can you open a JIRA ticket for this?
>
> On Fri, Jun 29, 2018 at 9:08 PM, Stephan Ewen  wrote:
>
>> The problem seems to be that the Executions that are kept for history
>> (mainly metrics / web UI) still hold a reference to their TaskStateSnapshot.
>>
>> Upon archival, that field needs to be cleared for GC.
>>
>> This is quite clearly a bug...
>>
>> On Fri, Jun 29, 2018 at 11:29 AM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi Steven,
>>>
>>> from your analysis, I would conclude the following problem.
>>> ExecutionVertexes hold executions, which are bootstrapped with the state
>>> (in form of the map of state handles) when the job is initialized from a
>>> checkpoint/savepoint. It holds a reference on this state, even when the
>>> task is already running. I would assume it is save to set the reference to
>>> TaskStateSnapshot to null at the end of the deploy() method and can be
>>> GC’ed. From the provided stats, I cannot say if maybe the JM is also
>>> holding references to too many ExecutionVertexes, but that would be a
>>> different story.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 29.06.2018 um 01:29 schrieb Steven Wu :
>>>
>>> First, some context about the job
>>> * embarrassingly parallel: all operators are chained together
>>> * parallelism is over 1,000
>>> * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>>> * set "state.backend.fs.memory-threshold" so that only jobmanager
>>> writes to S3 to checkpoint
>>> * internal checkpoint with 10 checkpoints retained in history
>>>
>>> We don't expect jobmanager to use much memory at all. But it seems that
>>> this high memory footprint (or leak) happened occasionally, maybe under
>>> certain conditions. Any hypothesis?
>>>
>>> Thanks,
>>> Steven
>>>
>>>
>>> 41,567 ExecutionVertex objects retained 9+ GB of memory
>>> 
>>>
>>>
>>> Expanded in one ExecutionVertex. it seems to storing the kafka offsets
>>> for source operator
>>> 
>>>
>>>
>>>
>>
>


Re: how to emit record to downstream operator in snapshotState and/or onProcessingTime

2018-06-11 Thread Steven Wu
Pirotr,

> However you could do it via a custom Operator (there you have a constant
access to output collector).

Can you elaborate that a little bit? are you referring to
"Output> output" in AbstractStreamOperator class?

> register processing time service in your ProcessFunction.

I think your timer proposal can work.

I was originally register timer like this. ProcessingTimeCallback interface
doesn't supply the Collector parameter

((StreamingRuntimeContext) getRuntimeContext())
.getProcessingTimeService()
.registerTimer(..., this);

Thanks,
Steven



On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski 
wrote:

> Hi,
>
> Indeed it seems like this is not possible to emit records on
> checkpoint/snapshot through ProcessFunction. However you could do it via a
> custom Operator (there you have a constant access to output collector).
> Another workaround might be to register processing time service in your
> ProcessFunction.
>
> @Override
> public void processElement(Integer value, Context ctx, Collector
> out) throws Exception {
>ctx.timerService().registerProcessingTimeTimer(...);
> }
>
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector<
> Integer> out) throws Exception {
>// …
> }
>
> Piotrek
>
> On 11 Jun 2018, at 01:07, Steven Wu  wrote:
>
> I have a process function defined with these interfaces
>
> public class MyProcessFunction extends ProcessFunction
> implements CheckpointedFunction, ProcessingTimeCallback {...}
>
> In snapshotState() method, I want to close files and emit the metadata
> about the closed files to downstream operator. it doesn't seem possible
> with *snapshotState(FunctionSnapshotContext context*) interface.
>
> I can keep metadata in snapshot and restore them during recovery. but if
> there is no input record coming for a long time, * processElement(T
> value, Context ctx, Collector out)* won't be called. Then I
> can't forward the restored data to downstream operator with guaranteed
> latency.
>
> I can add a timer. but it doesn't seem that *onProcessingTime(long
> timestamp)* allows me to forward output to downstream operator either.
>
> Thanks,
> Steven
>
>
>


how to emit record to downstream operator in snapshotState and/or onProcessingTime

2018-06-10 Thread Steven Wu
I have a process function defined with these interfaces

public class MyProcessFunction extends ProcessFunction
implements CheckpointedFunction, ProcessingTimeCallback {...}

In snapshotState() method, I want to close files and emit the metadata
about the closed files to downstream operator. it doesn't seem possible
with *snapshotState(FunctionSnapshotContext context*) interface.

I can keep metadata in snapshot and restore them during recovery. but if
there is no input record coming for a long time, * processElement(T value,
Context ctx, Collector out)* won't be called. Then I can't
forward the restored data to downstream operator with guaranteed latency.

I can add a timer. but it doesn't seem that *onProcessingTime(long
timestamp)* allows me to forward output to downstream operator either.

Thanks,
Steven


Re: Task Manager detached under load

2018-05-25 Thread Steven Wu
Till, thanks for the follow-up. looking forward to 1.5 :)

On Fri, May 25, 2018 at 2:11 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Steven,
>
> we don't have `jobmanager.exit-on-fatal-akka-error` because then the JM
> would also be killed if a single TM gets quarantined. This is also not a
> desired behaviour.
>
> With Flink 1.5 the problem with quarantining should be gone since we don't
> rely anymore on Akka's death watch and instead use our own heartbeats.
>
> Cheers,
> Till
>
> On Mon, May 14, 2018 at 1:07 AM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Till,
>>
>> thanks for the clarification. yes, that situation is undesirable either.
>>
>> In our case, restarting jobmanager could also recover the job from akk
>> association lock-out. it was actually the issue (high GC pause) on
>> jobmanager side that caused the akka failure.
>>
>> do we have sth like "jobmanager.exit-on-fatal-akka-error: true"? does it
>> make sense to terminate jobmanager in this case?
>>
>> Thanks,
>> Steven
>>
>> On Sun, May 13, 2018 at 1:12 PM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Steven,
>>>
>>> the reason why we did not turn on this feature per default was that in
>>> case of a true JM failure, all of the TMs will think that they got
>>> quarantined which triggers their shut down. Depending on how many container
>>> restarts you have left on Yarn, for example, this can lead to a situation
>>> where Flink is not able to recover the job even though it needed to only
>>> restart the JM container.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <stevenz...@gmail.com>
>>> wrote:
>>>
>>>> Till,
>>>>
>>>> We ran into the same issue. It started with high GC pause that caused
>>>> jobmanager to lose zk conn and leadership and caused jobmanager to
>>>> quarantine taskmanager in akka. Once quarantined, akka association btw
>>>> jobmanager and taskmanager is locked forever.
>>>>
>>>> Your suggestion of " taskmanager.exit-on-fatal-akka-error: true"
>>>> worked. taskmanager exited and replacement taskmanager joined the cluster
>>>> afterwards. I am wondering why is this not defaulted to "true". Any
>>>> downside?
>>>>
>>>> Thanks,
>>>> Steven
>>>>
>>>> On Sat, Feb 24, 2018 at 7:02 AM, ashish pok <ashish...@yahoo.com>
>>>> wrote:
>>>>
>>>>> @Jelmer, this is Till's las response on the issue.
>>>>>
>>>>> -- Ashish
>>>>>
>>>>> On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann
>>>>> <trohrm...@apache.org> wrote:
>>>>> Hi,
>>>>>
>>>>> this sounds like a serious regression wrt Flink 1.3.2 and we should
>>>>> definitely find out what's causing this problem. Given from what I see in
>>>>> the logs, the following happens:
>>>>>
>>>>> For some time the JobManager seems to no longer receive heartbeats
>>>>> from the TaskManager. This could be, for example, due to long GC pauses or
>>>>> heavy load which starves the ActorSystem's threads which are responsible
>>>>> for sending the heartbeats. Due to this, the TM's ActorSystem is
>>>>> quarantined which effectively renders them useless because the JM will
>>>>> henceforth ignore all messages from these systems. The only way to resolve
>>>>> this problem is to restart the ActorSystem. By
>>>>> setting taskmanager.exit-on-fatal-akka-error to true in
>>>>> flink-conf.yaml, a quarantined TM will shut down. If you run the Flink
>>>>> cluster on Yarn, then a new substitute TM will be started if you have 
>>>>> still
>>>>> some container restarts left. That way, the system should be able to
>>>>> recover.
>>>>>
>>>>> Additionally you could try to play around
>>>>> with akka.watch.heartbeat.interval and akka.watch.heartbeat.pause
>>>>> which control the heartbeat interval and the acceptable pause. By
>>>>> increasing the latter, the system should tolerate longer GC pauses and
>>>>> period of high load.
>>>>>
>>>>> However, this only addresses the symptoms of the problem and I'd like
>>>>>

Re: Task Manager detached under load

2018-05-13 Thread Steven Wu
Till,

thanks for the clarification. yes, that situation is undesirable either.

In our case, restarting jobmanager could also recover the job from akk
association lock-out. it was actually the issue (high GC pause) on
jobmanager side that caused the akka failure.

do we have sth like "jobmanager.exit-on-fatal-akka-error: true"? does it
make sense to terminate jobmanager in this case?

Thanks,
Steven

On Sun, May 13, 2018 at 1:12 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Steven,
>
> the reason why we did not turn on this feature per default was that in
> case of a true JM failure, all of the TMs will think that they got
> quarantined which triggers their shut down. Depending on how many container
> restarts you have left on Yarn, for example, this can lead to a situation
> where Flink is not able to recover the job even though it needed to only
> restart the JM container.
>
> Cheers,
> Till
>
> On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Till,
>>
>> We ran into the same issue. It started with high GC pause that caused
>> jobmanager to lose zk conn and leadership and caused jobmanager to
>> quarantine taskmanager in akka. Once quarantined, akka association btw
>> jobmanager and taskmanager is locked forever.
>>
>> Your suggestion of " taskmanager.exit-on-fatal-akka-error: true" worked.
>> taskmanager exited and replacement taskmanager joined the cluster
>> afterwards. I am wondering why is this not defaulted to "true". Any
>> downside?
>>
>> Thanks,
>> Steven
>>
>> On Sat, Feb 24, 2018 at 7:02 AM, ashish pok <ashish...@yahoo.com> wrote:
>>
>>> @Jelmer, this is Till's las response on the issue.
>>>
>>> -- Ashish
>>>
>>> On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann
>>> <trohrm...@apache.org> wrote:
>>> Hi,
>>>
>>> this sounds like a serious regression wrt Flink 1.3.2 and we should
>>> definitely find out what's causing this problem. Given from what I see in
>>> the logs, the following happens:
>>>
>>> For some time the JobManager seems to no longer receive heartbeats from
>>> the TaskManager. This could be, for example, due to long GC pauses or heavy
>>> load which starves the ActorSystem's threads which are responsible for
>>> sending the heartbeats. Due to this, the TM's ActorSystem is quarantined
>>> which effectively renders them useless because the JM will henceforth
>>> ignore all messages from these systems. The only way to resolve this
>>> problem is to restart the ActorSystem. By setting 
>>> taskmanager.exit-on-fatal-akka-error
>>> to true in flink-conf.yaml, a quarantined TM will shut down. If you run the
>>> Flink cluster on Yarn, then a new substitute TM will be started if you have
>>> still some container restarts left. That way, the system should be able to
>>> recover.
>>>
>>> Additionally you could try to play around with akka.watch.heartbeat.interval
>>> and akka.watch.heartbeat.pause which control the heartbeat interval and the
>>> acceptable pause. By increasing the latter, the system should tolerate
>>> longer GC pauses and period of high load.
>>>
>>> However, this only addresses the symptoms of the problem and I'd like to
>>> find out what's causing the problem. In order to further debug the problem,
>>> it would be really helpful to obtain the logs of the JobManager and the
>>> TaskManagers on DEBUG log level and with 
>>> taskmanager.debug.memory.startLogThread
>>> set to true. Additionally it would be interesting to see whats happening on
>>> the TaskManagers when you observe high load. So obtaining a profiler dump
>>> via VisualVM would be great. And last but not least, it also helps to learn
>>> more about the job you're running. What kind of connectors is it using? Are
>>> you using Flink's metric system? How is the Flink cluster deployed? Which
>>> other libraries are you using in your job?
>>>
>>> Thanks a lot for your help!
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cre...@gmail.com> wrote:
>>>
>>> I've seen a similar issue while running successive Flink SQL batches on
>>> 1.4. In my case, the Job Manager would fail with the log output about
>>> unreachability (with an additional statement about something going
>>> "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where
>>> everything works 

Re: Task Manager detached under load

2018-04-25 Thread Steven Wu
Till,

We ran into the same issue. It started with high GC pause that caused
jobmanager to lose zk conn and leadership and caused jobmanager to
quarantine taskmanager in akka. Once quarantined, akka association btw
jobmanager and taskmanager is locked forever.

Your suggestion of " taskmanager.exit-on-fatal-akka-error: true" worked.
taskmanager exited and replacement taskmanager joined the cluster
afterwards. I am wondering why is this not defaulted to "true". Any
downside?

Thanks,
Steven

On Sat, Feb 24, 2018 at 7:02 AM, ashish pok  wrote:

> @Jelmer, this is Till's las response on the issue.
>
> -- Ashish
>
> On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann
>  wrote:
> Hi,
>
> this sounds like a serious regression wrt Flink 1.3.2 and we should
> definitely find out what's causing this problem. Given from what I see in
> the logs, the following happens:
>
> For some time the JobManager seems to no longer receive heartbeats from
> the TaskManager. This could be, for example, due to long GC pauses or heavy
> load which starves the ActorSystem's threads which are responsible for
> sending the heartbeats. Due to this, the TM's ActorSystem is quarantined
> which effectively renders them useless because the JM will henceforth
> ignore all messages from these systems. The only way to resolve this
> problem is to restart the ActorSystem. By setting 
> taskmanager.exit-on-fatal-akka-error
> to true in flink-conf.yaml, a quarantined TM will shut down. If you run the
> Flink cluster on Yarn, then a new substitute TM will be started if you have
> still some container restarts left. That way, the system should be able to
> recover.
>
> Additionally you could try to play around with akka.watch.heartbeat.interval
> and akka.watch.heartbeat.pause which control the heartbeat interval and the
> acceptable pause. By increasing the latter, the system should tolerate
> longer GC pauses and period of high load.
>
> However, this only addresses the symptoms of the problem and I'd like to
> find out what's causing the problem. In order to further debug the problem,
> it would be really helpful to obtain the logs of the JobManager and the
> TaskManagers on DEBUG log level and with 
> taskmanager.debug.memory.startLogThread
> set to true. Additionally it would be interesting to see whats happening on
> the TaskManagers when you observe high load. So obtaining a profiler dump
> via VisualVM would be great. And last but not least, it also helps to learn
> more about the job you're running. What kind of connectors is it using? Are
> you using Flink's metric system? How is the Flink cluster deployed? Which
> other libraries are you using in your job?
>
> Thanks a lot for your help!
>
> Cheers,
> Till
>
> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick  wrote:
>
> I've seen a similar issue while running successive Flink SQL batches on
> 1.4. In my case, the Job Manager would fail with the log output about
> unreachability (with an additional statement about something going
> "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where
> everything works perfectly, but we will try again soon on 1.4. When we do I
> will post the actual log output.
>
> This was on YARN in AWS, with akka.ask.timeout = 60s.
>
> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel 
> wrote:
>
> I haven’t gotten much further with this. It doesn’t look like GC related -
> at least GC counters were not that atrocious. However, my main concern was
> once the load subsides why aren’t TM and JM connecting again? That doesn’t
> look normal. I could definitely tell JM was listening on the port and from
> logs it does appear TM is trying to message JM that is still alive.
>
> Thanks, Ashish
>
> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard 
> wrote:
>
> Hi.
>
> Did you find a reason for the detaching ?
> I sometimes see the same on our system running Flink 1.4 on dc/os. I have
> enabled taskmanager.Debug.memory.start logthread for debugging.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong :
>
> Hi,
>
> You should enable and check your garbage collection log.
>
> We've encountered case where Task Manager disassociated due to long GC
> pause.
>
>
> Regards,
>
> Kien
> On 1/20/2018 1:27 AM, ashish pok wrote:
>
> Hi All,
>
> We have hit some load related issues and was wondering if any one has some
> suggestions. We are noticing task managers and job managers being detached
> from each other under load and never really sync up again. As a result,
> Flink session shows 0 slots available for processing. Even though, apps are
> configured to restart it isn't really helping as there are no slots
> available to run the apps.
>
>
> Here are excerpt from logs that seemed relevant. (I am trimming out rest
> of the logs for brevity)
>
> *Job Manager:*
> 2018-01-19 12:38:00,423 INFO  

Re: How to customize triggering of checkpoints?

2018-04-12 Thread Steven Wu
Syed, I am very curious about the motivation if you can share.

On Wed, Apr 11, 2018 at 1:35 AM, Chesnay Schepler 
wrote:

> Hello,
>
> there is no way to manually trigger checkpoints or configure irregular
> intervals.
>
> You will have to modify the CheckpointCoordinator
> 
> and build Flink from source:
>
>- startCheckpointScheduler() should only schedule a one-time execution
>of the trigger
>- ScheduledTrigger#run() should reschedule itself
>   - Something like:
>  - triggerCheckpoint(System.currentTimeMillis(), true);
>  - long msUntilNextCheckpoint = // insert logic here
>  - timer.schedule(new ScheduledTrigger(), msUntilNextCheckpoint,
>  TimeUnit.MILLISECONDS)
>
> On 11.04.2018 05:15, syed wrote:
>
> I am new to the flink environment and looking to analyze the triggering of
> checkpoints. I am looking to trigger non-periodic checkpoints such that
> checkpoint intervals are not of equal length, but not sure how can I do this
> in Flink.
>
> My specific query is;
>
> (1) How can I trigger non-periodic checkpoints in Flink? I am looking to
> trigger first checkpoint say after 10 seconds, the next checkpoint at say 25
> seconds, third at 45 seconds and so on. Can I define my own function which
> triggers non-periodic checkpoints and generates no-uniform checkpoint
> intervals?
> Thanks.
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: Old JobManager lost its leadership in zk

2018-04-11 Thread Steven Wu
from Flink UI on jobmanager, sometimes I saw taskmanager connected and
heartbeat time got updated.


but then sometimes the taskmanager page become blank. maybe disconnected.



On Wed, Apr 11, 2018 at 1:31 PM, Steven Wu <stevenz...@gmail.com> wrote:

> Hi,
>
> After this error/exception, it seems that taskmanager never connects to
> jobmanager anymore.  Job stuck in failed state because there is not enough
> slots to recover the job.
>
> let's assume there was a temp glitch btw jobmanager and zk. would it cause
> such a permanent failure in Flink?
>
> I checked the zookeeper record.
> * leader zknode seems to have the correct info for "job_manager_lock"
> * I am not sure how to read the leaderlatch zknode
>
>
> A little more about the job
> * standalone cluster mode
> * 1 jobmanager
> * 1 taskmanager
>
> Thanks,
> Steven
>
> *2018-04-11 01:11:48,007 INFO  org.apache.flink.runtime.taskmanager.Task
> - Attempting to fail task externally Source:
> kafkasource -> Sink: s3sink (1/1)
> (5a7dba2e186b9fdaebb62bdd703dc7dc).2018-04-11 01:11:48,007 INFO
>  org.apache.flink.runtime.taskmanager.Task - Source:
> kafkasource -> Sink: s3sink (1/1) (5a7dba2e186b9fdaebb62bdd703dc7dc)
> switched from RUNNING to FAILED.java.lang.Exception: TaskManager
> akka://flink/user/taskmanager disconnects from JobManager
> akka.tcp://flink@1.2.3.4:42787/user/jobmanager
> <http://flink@1.2.3.4:42787/user/jobmanager>: Old JobManager lost its
> leadership.at
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1073)
>at org.apache.flink.runtime.taskmanager.TaskManager.org
> <http://org.apache.flink.runtime.taskmanager.TaskManager.org>$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1467)
>at
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:277)
>at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:502)at
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:121)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>at akka.actor.ActorCell.invoke(ActorCell.scala:495)at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)at
> akka.dispatch.Mailbox.run(Mailbox.scala:224)at
> akka.dispatch.Mailbox.exec(Mailbox.scala:234)at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)2018-04-11
> 01:11:48,011 INFO  org.apache.flink.runtime.taskmanager.Task
> - Triggering cancellation of task code Source:
> kafkasource -> Sink: s3sink (1/1)
> (5a7dba2e186b9fdaebb62bdd703dc7dc).2018-04-11 01:11:48,013 INFO
>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting
> down BLOB cache*
>
>


Old JobManager lost its leadership in zk

2018-04-11 Thread Steven Wu
Hi,

After this error/exception, it seems that taskmanager never connects to
jobmanager anymore.  Job stuck in failed state because there is not enough
slots to recover the job.

let's assume there was a temp glitch btw jobmanager and zk. would it cause
such a permanent failure in Flink?

I checked the zookeeper record.
* leader zknode seems to have the correct info for "job_manager_lock"
* I am not sure how to read the leaderlatch zknode


A little more about the job
* standalone cluster mode
* 1 jobmanager
* 1 taskmanager

Thanks,
Steven

*2018-04-11 01:11:48,007 INFO  org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally Source:
kafkasource -> Sink: s3sink (1/1)
(5a7dba2e186b9fdaebb62bdd703dc7dc).2018-04-11 01:11:48,007 INFO
 org.apache.flink.runtime.taskmanager.Task - Source:
kafkasource -> Sink: s3sink (1/1) (5a7dba2e186b9fdaebb62bdd703dc7dc)
switched from RUNNING to FAILED.java.lang.Exception: TaskManager
akka://flink/user/taskmanager disconnects from JobManager
akka.tcp://flink@1.2.3.4:42787/user/jobmanager
: Old JobManager lost its
leadership.at
org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1073)
   at org.apache.flink.runtime.taskmanager.TaskManager.org
$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1467)
   at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:277)
   at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
   at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
   at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
   at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
   at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
   at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
   at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)at
org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:121)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
   at akka.actor.ActorCell.invoke(ActorCell.scala:495)at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)at
akka.dispatch.Mailbox.run(Mailbox.scala:224)at
akka.dispatch.Mailbox.exec(Mailbox.scala:234)at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)2018-04-11
01:11:48,011 INFO  org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Source:
kafkasource -> Sink: s3sink (1/1)
(5a7dba2e186b9fdaebb62bdd703dc7dc).2018-04-11 01:11:48,013 INFO
 org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting
down BLOB cache*


Re: entrypoint for executing job in task manager

2018-03-21 Thread Steven Wu
Thanks, let me clarify the requirement. Sorry that it wasn't clear in the
original email.

Here is our setup.

these 3 dirs are added to classpath
* flink/lib: core flink jars (like flink-dist_2.11,
flink-shaded-hadoop2-uber)
* spaaslib: many jars pulled in our internal platform
* jobs: a single fat jar for app/job code lives here

running Flink
* "jobmanager.web.upload.dir" is configured to use the "jobs" dir above.
* we use REST api to submit job in a standalone cluster.

Here are the requirements for two level of init hooks
1) provide *JVM init hook* in JobManager and TaskManager class during JVM
startup for user to extend. right now, we overrides the main method and
others (as you were also suggesting). that is a little fragile as it is
tight coupling. JobManager and TaskManager class don't seem to implemented
for override. we have seen breaking changes when upgrading Flink from 1.2
-> 1.3 -> 1.4
2) *job init hook* during job execution. jobmanager computes the job graph,
ship it to taskmanager, which then execute the job (open/run operators).
  - my original email is to allow user (app developer) to supply additional
Guice binding modules at taskmanager with a job init hook. then we can
create a child injector with these additional modules. but Guice child
injector has some issue with de-dup modules and run into duplicate binding
problem. so we decided to not pursue this route
  - my colleague (Monal) mentioned another use case where we can leverage
such job init hook at taskmanager. e.g. Inside job init hook, we can decide
and attach EBS volumes based on key group assignment. Our understanding is
that such key group assignment is calculated by jobmanager during job
planning.

The problem with your suggestion of "InjectionUtil.installModulesIf
NotYetInstalled()" is that it will also be loaded/executed inside
JobManager when it loads this class. It is just a minor issue though.

Thanks,
Steven


On Wed, Mar 21, 2018 at 9:47 AM, Stephan Ewen <se...@apache.org> wrote:

> It would be great to understand a bit more what the exact requirements
> here are, and what setup you use.
>
> I am not a dependency injection expert, so let me know if what I am
> suggesting here is complete bogus.
>
>
> *(1) Fix set of libraries for Dependency Injection, or dedicated container
> images per application*
>
> If you have a dedicated JM and TM Flink image that you build per job, I
> would assume that you also put all the required the libraries directly into
> the lib folder, so everything is available on startup.
>
> In that case, could you just warp the TM and JM main methods to first call
> the initialization methods to set up dependency injection?
>
> This would also work if you have container images that are not
> job-specific, but all the libraries relevant to dependency injection are
> part of the image (the lib folder).
>
> *(2) Generic container images, plus dynamic set of libraries for
> dependency injection*
>
> Assuming you do not have job-specific container images, and each
> application brings its own dependencies it wants to set up for dependency
> injection,
> we could look in the following direction.
>
> The dependencies need to be set up for each Task on the TaskManager  ,
> because each task gets potentially a dedicated classloader.
> Have you tried an approach like the following?
>
>   - Create a static dependency initializer utility class that has a static "
> installModulesIfNotYetInstalled ()" method.
>
>   - Each class that you use should have as the first line a static
> initializer block that calls that utility:
>
> public class MyFunction implements MapFunction<A, B> {
>
> static {
> InjectionUtil.installModulesIfNotYetInstalled();
> }
>
> public A map(B value) {...}
>
> ...
> }
>
>
>   - You can probably create yourself a base class that does that from
> which all you functions extend.
>
>
> On Fri, Dec 22, 2017 at 11:23 AM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> I don’t think there is such hook in the Flink code now. You will have to
>> walk around this issue somehow in user space.
>>
>> Maybe you could make a contract that every operator before touching
>> Guice, should call static synchronized method `initializeGuiceContext`.
>> This method could search the classpath for classes with some specific
>> annotations, for example `@MyInitializationHook` and install/add all of
>> such hooks before actually using Guice?
>>
>> Piotrek
>>
>>
>> On 21 Dec 2017, at 17:49, Steven Wu <stevenz...@gmail.com> wrote:
>>
>> We use Guice for dependency injection. We need to install *additional*
>> Guice modu

Re: Dependency Injection and Flink

2018-03-17 Thread Steven Wu
Stephan,

That would be helpful. On job manager side, entry class provides such an
entry point hook. The problem is on the task manager side, where we don't
have such an initialization/entry point.

I have brought up the same question 3 months ago in this list with subject
"entrypoint for executing job in task manager".

Thanks,
Steven



On Thu, Mar 15, 2018 at 1:49 PM, Stephan Ewen <se...@apache.org> wrote:

> Would it help to be able to register "initializers", meaning some
> classes/methods that will be called at every process entry point, to set up
> something like this?
>
>
> On Tue, Mar 13, 2018 at 7:56 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Xiaochuan,
>>
>> We are doing exactly as you described. We keep the injector as a global
>> static var.
>>
>> But we extend from FlinkJobManager and FlinkTaskManager to override main
>> method and initialize the injector (and other things) during JVM startup,
>> which does cause tight code coupling. It is a little painful to upgrade
>> Flink because sometimes internal code structure change of FlinkJobManager
>> and FlinkTaskManager can break our extended class..
>>
>> Thanks,
>> Steven
>>
>>
>> On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu <xiaochuan...@kik.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm evaluating Flink with the intent to integrate it into a Java project
>>> that uses a lot of dependency injection via Guice. What would be the best
>>> way to work with DI/Guice given that injected fields aren't Serializable?
>>> I looked at this StackOverflow answer so far. To my understanding the
>>> strategy is as follows but I'm not sure about step 3:
>>>
>>>1. Use a RichFunction any time injection required.
>>>2. Do not use @Inject, instead mark each injected field as transient.
>>>3. Implement open() / close() and manually assign values to injected
>>>fields using Injector.getInstance(SomeClass.class)? But where do I
>>>get the injector? Create one on the spot each time? Keep one as a static
>>>var somewhere and use everywhere?
>>>
>>> Example:
>>>  public class MyFilter extends FilterFunction {
>>>  private transient DbClient dbClient;
>>>  //@Inject DbClient dbClient; //typical Guice field injection
>>>
>>>  public void open(Configuration parameters) {
>>>  // where am I suppose to get the injector?
>>>  // keep it as a static variable somewhere and init it in Main?
>>>  this.dbClient = MyInjectorHolder.injector().ge
>>> tInstance(DbClient.class);
>>>  }
>>>  public boolean filter(String value) {
>>>  return this.dbClient.query(value);
>>>  }
>>>  }
>>> I haven't setup a Flink environment to try the above yet though.
>>> Does anyone know of a less verbose way?
>>> I imagine this could get quite verbose with multiple injected fields.
>>>
>>> Thanks,
>>> Xiaochuan Yu
>>>
>>>
>>
>


Re: Dependency Injection and Flink

2018-03-13 Thread Steven Wu
Xiaochuan,

We are doing exactly as you described. We keep the injector as a global
static var.

But we extend from FlinkJobManager and FlinkTaskManager to override main
method and initialize the injector (and other things) during JVM startup,
which does cause tight code coupling. It is a little painful to upgrade
Flink because sometimes internal code structure change of FlinkJobManager
and FlinkTaskManager can break our extended class..

Thanks,
Steven


On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu  wrote:

> Hi,
>
> I'm evaluating Flink with the intent to integrate it into a Java project
> that uses a lot of dependency injection via Guice. What would be the best
> way to work with DI/Guice given that injected fields aren't Serializable?
> I looked at this StackOverflow answer so far. To my understanding the
> strategy is as follows but I'm not sure about step 3:
>
>1. Use a RichFunction any time injection required.
>2. Do not use @Inject, instead mark each injected field as transient.
>3. Implement open() / close() and manually assign values to injected
>fields using Injector.getInstance(SomeClass.class)? But where do I get
>the injector? Create one on the spot each time? Keep one as a static var
>somewhere and use everywhere?
>
> Example:
>  public class MyFilter extends FilterFunction {
>  private transient DbClient dbClient;
>  //@Inject DbClient dbClient; //typical Guice field injection
>
>  public void open(Configuration parameters) {
>  // where am I suppose to get the injector?
>  // keep it as a static variable somewhere and init it in Main?
>  this.dbClient = MyInjectorHolder.injector().
> getInstance(DbClient.class);
>  }
>  public boolean filter(String value) {
>  return this.dbClient.query(value);
>  }
>  }
> I haven't setup a Flink environment to try the above yet though.
> Does anyone know of a less verbose way?
> I imagine this could get quite verbose with multiple injected fields.
>
> Thanks,
> Xiaochuan Yu
>
>


Re: Joining data in Streaming

2018-02-05 Thread Steven Wu
There is also a discussion of side input
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API

I would load the smaller data set as static reference data set. Then you
can just do single source streaming of the larger data set.

On Wed, Jan 31, 2018 at 1:09 AM, Stefan Richter  wrote:

> Hi,
>
> if the workarounds that Xingcan and me mentioned are no options for your
> use-case, then I think this might currently be the better option. But I
> would expect some better support for stream joins in the near future.
>
> Best,
> Stefan
>
> > Am 31.01.2018 um 07:04 schrieb Marchant, Hayden <
> hayden.march...@citi.com>:
> >
> > Stefan,
> >
> > So are we essentially saying that in this case, for now, I should stick
> to DataSet / Batch Table API?
> >
> > Thanks,
> > Hayden
> >
> > -Original Message-
> > From: Stefan Richter [mailto:s.rich...@data-artisans.com]
> > Sent: Tuesday, January 30, 2018 4:18 PM
> > To: Marchant, Hayden [ICG-IT] 
> > Cc: user@flink.apache.org; Aljoscha Krettek 
> > Subject: Re: Joining data in Streaming
> >
> > Hi,
> >
> > as far as I know, this is not easily possible. What would be required is
> something like a CoFlatmap function, where one input stream is blocking
> until the second stream is fully consumed to build up the state to join
> against. Maybe Aljoscha (in CC) can comment on future plans to support this.
> >
> > Best,
> > Stefan
> >
> >> Am 30.01.2018 um 12:42 schrieb Marchant, Hayden <
> hayden.march...@citi.com>:
> >>
> >> We have a use case where we have 2 data sets - One reasonable large
> data set (a few million entities), and a smaller set of data. We want to do
> a join between these data sets. We will be doing this join after both data
> sets are available.  In the world of batch processing, this is pretty
> straightforward - we'd load both data sets into an application and execute
> a join operator on them through a common key.   Is it possible to do such a
> join using the DataStream API? I would assume that I'd use the connect
> operator, though I'm not sure exactly how I should do the join - do I need
> one 'smaller' set to be completely loaded into state before I start flowing
> the large set? My concern is that if I read both data sets from streaming
> sources, since I can't be guaranteed of the order that the data is loaded,
> I may lose lots of potential joined entities since their pairs might not
> have been read yet.
> >>
> >>
> >> Thanks,
> >> Hayden Marchant
> >>
> >>
> >
>
>


Re: Flink State monitoring

2018-01-04 Thread Steven Wu
Aljoscha/Stefan,

if incremental checkpoint is enabled, I assume the "checkpoint size" is
only the delta/incremental size (not the full state size), right?

Thanks,
Steven


On Thu, Jan 4, 2018 at 5:18 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> I'm afraid there is currently no metrics around state. I see that it's
> very good to have so I'm putting it on my list of stuff that we should have
> at some point.
>
> One thing that comes to mind is checking the size of checkpoints, which
> gives you an indirect way of figuring out how big state is but that's not
> very exact, i.e. doesn't give you "number of keys" or some such.
>
> Best,
> Aljoscha
>
> > On 20. Dec 2017, at 08:09, Netzer, Liron  wrote:
> >
> > Ufuk, Thanks for replying !
> >
> > Aljoscha, can you please assist with the questions below?
> >
> > Thanks,
> > Liron
> >
> > -Original Message-
> > From: Ufuk Celebi [mailto:u...@apache.org]
> > Sent: Friday, December 15, 2017 3:06 PM
> > To: Netzer, Liron [ICG-IT]
> > Cc: user@flink.apache.org
> > Subject: Re: Flink State monitoring
> >
> > Hey Liron,
> >
> > unfortunately, there are no built-in metrics related to state. In
> general, exposing the actual values as metrics is problematic, but exposing
> summary statistics would be a good idea. I'm not aware of a good work
> around at the moment that would work in the general case (taking into
> account state restore, etc.).
> >
> > Let me pull in Aljoscha (cc'd) who knows the state backend internals
> well.
> >
> > @Aljoscha:
> > 1) Are there any plans to expose keyed state related metrics (like
> number of keys)?
> > 2) Is there a way to work around the lack of these metrics in 1.3?
> >
> > – Ufuk
> >
> > On Thu, Dec 14, 2017 at 10:55 AM, Netzer, Liron 
> wrote:
> >> Hi group,
> >>
> >>
> >>
> >> We are using Flink keyed state in several operators.
> >>
> >> Is there an easy was to expose the data that is stored in the state,
> i.e.
> >> the key and the values?
> >>
> >> This is needed for both monitoring as well as debugging. We would like
> >> to understand how many key+values are stored in each state and also to
> >> view the data itself.
> >>
> >> I know that there is the "Queryable state" option, but this is still
> >> in Beta, and doesn't really give us what we want easily.
> >>
> >>
> >>
> >>
> >>
> >> *We are using Flink 1.3.2 with Java.
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Liron
>
>


Re: keyby() issue

2017-12-31 Thread Steven Wu
>  but soon later, no results produced, and flink seems busy doing
something forever.

Jinhua, don't know if you have checked these things. if not, maybe worth a
look.

have you tried to do a thread dump?
How is the GC pause?
do you see flink restart? check the exception tab in Flink web UI for your
job.



On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo  wrote:

> I take time to read some source codes about the keyed stream
> windowing, and I make below understanding:
>
> a) the keyed stream would be split and dispatched to downstream tasks
> in hash manner, and the hash base is the parallelism of the downstream
> operator:
>
> See org.apache.flink.runtime.state.KeyGroupRangeAssignment.
> computeKeyGroupForKeyHash(int,
> int):
> MathUtils.murmurHash(keyHash) % maxParallelism;
>
> That's what the doc said "hash partitioning".
>
> So the compiled execution graph already determines whose operator
> instance receive which key groups.
>
> b) with windowing, the key is used to index window states, so the
> window function would receive the deserialized value from its
> corresponding window state of some key.
>
> b.1) The element would be added into the state first:
>
> See org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(StreamRecord):
> windowState.add(element.getValue());
>
> b.2) when the trigger fires the window, the value would be
> deserialized from the keyed state:
>
> ACC contents = windowState.get();
> emitWindowContents(actualWindow, contents);
>
> For rocksdb backend, each input element would be taken back and forth
> from the disk in the processing.
>
> flink's keyed stream has the same functionality as storm's field
> grouping, and more complicated.
>
> Am I correct?
>
>
> But I still could not understand why keyby() stops flink from
> returning expected results.
>
> Let me explain my case more:
> I use kafka data source, which collects log lines of log files from
> tens of machines.
> The log line is in json format, which contains the "ip" field, the ip
> address of the user, so it could be valued in million of ip addresses
> of the Internet.
> The stream processing is expected to result in ip aggregation in {1
> hour, 1 min} sliding window.
>
> If I use keyBy("ip"), then at first minutes, the flink could give me
> correct aggregation results, but soon later, no results produced, and
> flink seems busy doing something forever.
>
> I doubt if keyby() could handle huge keys like this case, and when I
> remove keyby().window().fold() and use windowAll().fold() instead (the
> latter fold operator uses hashmap to aggregate ip by itself), flink
> works. But as known, the windowAll() is not scale-able.
>
> Could flink developers help me on this topic, I prefer flink and I
> believe flink is one of best stream processing frameworks, but I am
> really frustrated that flink could be fulfill its feature just like
> the doc said.
>
> Thank you all.
>
>
> 2017-12-29 17:42 GMT+08:00 Jinhua Luo :
> > I misuse the key selector. I checked the doc and found it must return
> > deterministic key, so using random is wrong, but I still could not
> > understand why it would cause oom.
> >
> >
> >
> > 2017-12-28 21:57 GMT+08:00 Jinhua Luo :
> >> It's very strange, when I change the key selector to use random key,
> >> the jvm reports oom.
> >>
> >>.keyBy(new KeySelector() {
> >>  public Integer getKey(MyEvent ev) { return
> >> ThreadLocalRandom.current().nextInt(1, 100);}
> >>})
> >>
> >> Caused by: java.lang.OutOfMemoryError: Java heap space
> >> at com.esotericsoftware.kryo.util.IdentityMap.resize(
> IdentityMap.java:469)
> >> at com.esotericsoftware.kryo.util.IdentityMap.push(
> IdentityMap.java:230)
> >> at com.esotericsoftware.kryo.util.IdentityMap.put(
> IdentityMap.java:144)
> >> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818)
> >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
> >> at com.esotericsoftware.kryo.serializers.MapSerializer.
> copy(MapSerializer.java:157)
> >> at com.esotericsoftware.kryo.serializers.MapSerializer.
> copy(MapSerializer.java:21)
> >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> >> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.copy(KryoSerializer.java:175)
> >> at org.apache.flink.api.java.typeutils.runtime.
> PojoSerializer.copy(PojoSerializer.java:239)
> >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
> >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:524)
> >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:504)
> >> at org.apache.flink.streaming.api.operators.
> 

Re: entrypoint for executing job in task manager

2017-12-21 Thread Steven Wu
We use Guice for dependency injection. We need to install *additional*
Guice modules (for bindings) when setting up this static context of Guice
injector.

Calling the static initializer from operator open method won't really help.
Not all operators are implemented by app developer who want to install
additional Guice modules. E.g. kafka source operator is
implemented/provided by our platform. I think the source operator will open
first, which means app operator won't get a chance to initialize the static
context. What would really help if there is a entry hook (at task manager)
that is executed before any operator opening.

On Thu, Dec 21, 2017 at 12:27 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Open method is called just before any elements are processed. You can hook
> in any initialisation logic there, including initialisation of a static
> context. However keep in mind, that since this context is static, it will
> be shared between multiple operators (if you are running parallelism >
> number of task managers), so accesses to it must be synchronized (including
> initialisation). Another thing to consider is that managing the life cycle
> of static context can be tricky (when to close it and release it’s
> resources).
>
> The questions is, whether you really need a static context?
>
> Thanks,
> Piotrek
>
>
> > On 21 Dec 2017, at 07:53, Steven Wu <stevenz...@gmail.com> wrote:
> >
> > Here is my understanding of how job submission works in Flink. When
> submitting a job to job manager via REST API, we provide a entry class. Job
> manager then evaluate job graph and ship serialized operators to task
> manager. Task manager then open operators and run tasks.
> >
> > My app would typically requires some initialization phase to setup my
> own running context in task manager (e.g. calling a static method of some
> class). Does Flink provide any entry hook in task manager when executing a
> job (and tasks)? As for job manager, the entry class provides such hook
> where I can initialize my static context.
> >
> > Thanks,
> > Steven
>
>


entrypoint for executing job in task manager

2017-12-20 Thread Steven Wu
Here is my understanding of how job submission works in Flink. When
submitting a job to job manager via REST API, we provide a entry class. Job
manager then evaluate job graph and ship serialized operators to task
manager. Task manager then open operators and run tasks.

My app would typically requires some initialization phase to setup my own
running context in task manager (e.g. calling a static method of some
class). Does Flink provide any entry hook in task manager when executing a
job (and tasks)? As for job manager, the entry class provides such hook
where I can initialize my static context.

Thanks,
Steven


Re: Checkpoint expired before completing

2017-12-02 Thread Steven Wu
One more question. Since I have set the "Maximum Concurrent Checkpoints" to
1. Will cascading effect still be true?

Whenever my sink operator returns to normal (in terms of latency), new
checkpoint after this point should work, right? there are no other
in-flight/concurrent checkpoints still in progress.

Or is the min pause just allowing Flink to catch up in-flight msgs in
various queues/buffers? is that the cascading impact?

On Sat, Dec 2, 2017 at 2:10 PM, Steven Wu <stevenz...@gmail.com> wrote:

> Stephan, thanks a lot for the explanation. Now everything makes sense to
> me. Will set the min pause.
>
> On Sat, Dec 2, 2017 at 8:58 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Steven!
>>
>> You are right, there could be some cascading effect from previous
>> checkpoints.
>> I think the best way to handle that is to set the "minimum pause between
>> checkpoints". In fact, I would actually recommend this over the checkpoint
>> interval parameter.
>>
>> The pause will allow the job to handle such effects that built up during
>> an unhealthy checkpoint. You can for example set the checkpoint interval to
>> 2 mins and set the pause to 1.5 mins. That way, if a checkpoint takes
>> longer than usual, the next one will still wait for 1.5 mins after the
>> previous one completed or expired, giving the job time to catch up.
>>
>> Best,
>> Stephan
>>
>>
>> On Fri, Dec 1, 2017 at 10:10 PM, Steven Wu <stevenz...@gmail.com> wrote:
>>
>>> Here is the checkpoint config. no concurrent checkpoints with 2 minute
>>> checkpoint interval and timeout.
>>>
>>> Problem is gone after redeployment. I will try if I can reproduce the
>>> issue
>>>
>>> [image: Inline image 1]
>>>
>>> On Fri, Dec 1, 2017 at 6:17 AM, Nico Kruber <n...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi Steven,
>>>> by default, checkpoints time out after 10 minutes if you haven't used
>>>> CheckpointConfig#setCheckpointTimeout() to change this timeout.
>>>>
>>>> Depending on your checkpoint interval, and your number of concurrent
>>>> checkpoints, there may already be some other checkpoint processes
>>>> running while you are waiting for the first to finish. In that case,
>>>> succeeding checkpoints may also fail with a timeout. However, they
>>>> should definitely get back to normal once your sink has caught up with
>>>> all buffered events.
>>>>
>>>> I included Stefan who may shed some more light onto it, but maybe you
>>>> can help us identifying the problem by providing logs at DEBUG level
>>>> (did akka report any connection loss and gated actors? or maybe some
>>>> other error in there?) or even a minimal program to reproduce.
>>>>
>>>>
>>>> Nico
>>>>
>>>> On 01/12/17 07:36, Steven Wu wrote:
>>>> >
>>>> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
>>>> Checkpoint
>>>> > 9353 expired before completing
>>>> >
>>>> > I might know why this happened in the first place. Our sink operator
>>>> > does synchronous HTTP post, which had a 15-mint latency spike when
>>>> this
>>>> > all started. This could block flink threads and prevent checkpoint
>>>> from
>>>> > completing in time. But I don't understand why checkpoint continued to
>>>> > fail after HTTP post latency returned to normal. there seems to be
>>>> some
>>>> > lingering/cascading effect of previous failed checkpoints on future
>>>> > checkpoints. Only after I redeploy/restart the job an hour later,
>>>> > checkpoint starts to work again.
>>>> >
>>>> > Would appreciate any suggestions/insights!
>>>> >
>>>> > Thanks,
>>>> > Steven
>>>>
>>>>
>>>
>>
>


Re: Checkpoint expired before completing

2017-12-02 Thread Steven Wu
Stephan, thanks a lot for the explanation. Now everything makes sense to
me. Will set the min pause.

On Sat, Dec 2, 2017 at 8:58 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi Steven!
>
> You are right, there could be some cascading effect from previous
> checkpoints.
> I think the best way to handle that is to set the "minimum pause between
> checkpoints". In fact, I would actually recommend this over the checkpoint
> interval parameter.
>
> The pause will allow the job to handle such effects that built up during
> an unhealthy checkpoint. You can for example set the checkpoint interval to
> 2 mins and set the pause to 1.5 mins. That way, if a checkpoint takes
> longer than usual, the next one will still wait for 1.5 mins after the
> previous one completed or expired, giving the job time to catch up.
>
> Best,
> Stephan
>
>
> On Fri, Dec 1, 2017 at 10:10 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Here is the checkpoint config. no concurrent checkpoints with 2 minute
>> checkpoint interval and timeout.
>>
>> Problem is gone after redeployment. I will try if I can reproduce the
>> issue
>>
>> [image: Inline image 1]
>>
>> On Fri, Dec 1, 2017 at 6:17 AM, Nico Kruber <n...@data-artisans.com>
>> wrote:
>>
>>> Hi Steven,
>>> by default, checkpoints time out after 10 minutes if you haven't used
>>> CheckpointConfig#setCheckpointTimeout() to change this timeout.
>>>
>>> Depending on your checkpoint interval, and your number of concurrent
>>> checkpoints, there may already be some other checkpoint processes
>>> running while you are waiting for the first to finish. In that case,
>>> succeeding checkpoints may also fail with a timeout. However, they
>>> should definitely get back to normal once your sink has caught up with
>>> all buffered events.
>>>
>>> I included Stefan who may shed some more light onto it, but maybe you
>>> can help us identifying the problem by providing logs at DEBUG level
>>> (did akka report any connection loss and gated actors? or maybe some
>>> other error in there?) or even a minimal program to reproduce.
>>>
>>>
>>> Nico
>>>
>>> On 01/12/17 07:36, Steven Wu wrote:
>>> >
>>> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
>>> > 9353 expired before completing
>>> >
>>> > I might know why this happened in the first place. Our sink operator
>>> > does synchronous HTTP post, which had a 15-mint latency spike when this
>>> > all started. This could block flink threads and prevent checkpoint from
>>> > completing in time. But I don't understand why checkpoint continued to
>>> > fail after HTTP post latency returned to normal. there seems to be some
>>> > lingering/cascading effect of previous failed checkpoints on future
>>> > checkpoints. Only after I redeploy/restart the job an hour later,
>>> > checkpoint starts to work again.
>>> >
>>> > Would appreciate any suggestions/insights!
>>> >
>>> > Thanks,
>>> > Steven
>>>
>>>
>>
>


Re: Checkpoint expired before completing

2017-12-01 Thread Steven Wu
Here is the checkpoint config. no concurrent checkpoints with 2 minute
checkpoint interval and timeout.

Problem is gone after redeployment. I will try if I can reproduce the issue

[image: Inline image 1]

On Fri, Dec 1, 2017 at 6:17 AM, Nico Kruber <n...@data-artisans.com> wrote:

> Hi Steven,
> by default, checkpoints time out after 10 minutes if you haven't used
> CheckpointConfig#setCheckpointTimeout() to change this timeout.
>
> Depending on your checkpoint interval, and your number of concurrent
> checkpoints, there may already be some other checkpoint processes
> running while you are waiting for the first to finish. In that case,
> succeeding checkpoints may also fail with a timeout. However, they
> should definitely get back to normal once your sink has caught up with
> all buffered events.
>
> I included Stefan who may shed some more light onto it, but maybe you
> can help us identifying the problem by providing logs at DEBUG level
> (did akka report any connection loss and gated actors? or maybe some
> other error in there?) or even a minimal program to reproduce.
>
>
> Nico
>
> On 01/12/17 07:36, Steven Wu wrote:
> >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
> > 9353 expired before completing
> >
> > I might know why this happened in the first place. Our sink operator
> > does synchronous HTTP post, which had a 15-mint latency spike when this
> > all started. This could block flink threads and prevent checkpoint from
> > completing in time. But I don't understand why checkpoint continued to
> > fail after HTTP post latency returned to normal. there seems to be some
> > lingering/cascading effect of previous failed checkpoints on future
> > checkpoints. Only after I redeploy/restart the job an hour later,
> > checkpoint starts to work again.
> >
> > Would appreciate any suggestions/insights!
> >
> > Thanks,
> > Steven
>
>


Checkpoint expired before completing

2017-11-30 Thread Steven Wu
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 9353
expired before completing

I might know why this happened in the first place. Our sink operator does
synchronous HTTP post, which had a 15-mint latency spike when this all
started. This could block flink threads and prevent checkpoint from
completing in time. But I don't understand why checkpoint continued to fail
after HTTP post latency returned to normal. there seems to be some
lingering/cascading effect of previous failed checkpoints on future
checkpoints. Only after I redeploy/restart the job an hour later,
checkpoint starts to work again.

Would appreciate any suggestions/insights!

Thanks,
Steven


Re: akka timeout

2017-09-26 Thread Steven Wu
Till, sorry for the confusion. I meant Flink documentation has the correct
info. our code was mistakenly referring to akka.ask.timeout for death watch.

On Mon, Sep 25, 2017 at 3:52 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> Quick question Steven. Where did you find the documentation concerning
> that the death watch interval is linke to the akka ask timeout? It was
> included in the past, but I couldn't find it anymore.
>
> Cheers,
> Till
>
> On Mon, Sep 25, 2017 at 9:47 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Great to hear that you could figure things out Steven.
>>
>> You are right. The death watch is no longer linked to the akka ask
>> timeout, because of FLINK-6495. Thanks for the feedback. I will correct the
>> documentation.
>>
>> Cheers,
>> Till
>>
>> On Sat, Sep 23, 2017 at 10:24 AM, Steven Wu <stevenz...@gmail.com> wrote:
>>
>>> just to close the thread. akka death watch was triggered by high GC
>>> pause, which is caused by memory leak in our code during Flink job restart.
>>>
>>> noted that akka.ask.timeout wasn't related to akka death watch, which
>>> Flink has documented and linked.
>>>
>>> On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu <stevenz...@gmail.com>
>>> wrote:
>>>
>>>> this is a stateless job. so we don't use RocksDB.
>>>>
>>>> yeah. network can also be a possibility. will keep it in the radar.
>>>> unfortunately, our metrics system don't have the tcp metrics when running
>>>> inside containers.
>>>>
>>>> On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger <rmetz...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> are you using the RocksDB state backend already?
>>>>> Maybe writing the state to disk would actually reduce the pressure on
>>>>> the GC (but of course it'll also reduce throughput a bit).
>>>>>
>>>>> Are there any known issues with the network? Maybe the network bursts
>>>>> on restart cause the timeouts?
>>>>>
>>>>>
>>>>> On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu <stevenz...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Bowen,
>>>>>>
>>>>>> Heap size is ~50G. CPU was actually pretty low (like <20%) when high
>>>>>> GC pause and akka timeout was happening. So maybe memory allocation and 
>>>>>> GC
>>>>>> wasn't really an issue. I also recently learned that JVM can pause for
>>>>>> writing to GC log for disk I/O. that is another lead I am pursuing.
>>>>>>
>>>>>> Thanks,
>>>>>> Steven
>>>>>>
>>>>>> On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <bowen...@offerupnow.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Steven,
>>>>>>> Yes, GC is a big overhead, it may cause your CPU utilization to
>>>>>>> reach 100%, and every process stopped working. We ran into this a while 
>>>>>>> too.
>>>>>>>
>>>>>>> How much memory did you assign to TaskManager? How much the your
>>>>>>> CPU utilization when your taskmanager is considered 'killed'?
>>>>>>>
>>>>>>> Bowen
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <stevenz...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Till,
>>>>>>>>
>>>>>>>> Once our job was restarted for some reason (e.g. taskmangaer
>>>>>>>> container got killed), it can stuck in continuous restart loop for 
>>>>>>>> hours.
>>>>>>>> Right now, I suspect it is caused by GC pause during restart, our job 
>>>>>>>> has
>>>>>>>> very high memory allocation in steady state. High GC pause then caused 
>>>>>>>> akka
>>>>>>>> timeout, which then caused jobmanager to think taksmanager containers 
>>>>>>>> are
>>>>>>>> unhealthy/dead and kill them. And the cycle repeats...
>>>>>>>>
>>>>>>>> But I hasn't been able to prove or disprove it yet. When I was
>&g

Re: akka timeout

2017-09-23 Thread Steven Wu
just to close the thread. akka death watch was triggered by high GC pause,
which is caused by memory leak in our code during Flink job restart.

noted that akka.ask.timeout wasn't related to akka death watch, which Flink
has documented and linked.

On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu <stevenz...@gmail.com> wrote:

> this is a stateless job. so we don't use RocksDB.
>
> yeah. network can also be a possibility. will keep it in the radar.
> unfortunately, our metrics system don't have the tcp metrics when running
> inside containers.
>
> On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi,
>> are you using the RocksDB state backend already?
>> Maybe writing the state to disk would actually reduce the pressure on the
>> GC (but of course it'll also reduce throughput a bit).
>>
>> Are there any known issues with the network? Maybe the network bursts on
>> restart cause the timeouts?
>>
>>
>> On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu <stevenz...@gmail.com> wrote:
>>
>>> Bowen,
>>>
>>> Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC
>>> pause and akka timeout was happening. So maybe memory allocation and GC
>>> wasn't really an issue. I also recently learned that JVM can pause for
>>> writing to GC log for disk I/O. that is another lead I am pursuing.
>>>
>>> Thanks,
>>> Steven
>>>
>>> On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <bowen...@offerupnow.com>
>>> wrote:
>>>
>>>> Hi Steven,
>>>> Yes, GC is a big overhead, it may cause your CPU utilization to
>>>> reach 100%, and every process stopped working. We ran into this a while 
>>>> too.
>>>>
>>>> How much memory did you assign to TaskManager? How much the your
>>>> CPU utilization when your taskmanager is considered 'killed'?
>>>>
>>>> Bowen
>>>>
>>>>
>>>>
>>>> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <stevenz...@gmail.com>
>>>> wrote:
>>>>
>>>>> Till,
>>>>>
>>>>> Once our job was restarted for some reason (e.g. taskmangaer container
>>>>> got killed), it can stuck in continuous restart loop for hours. Right now,
>>>>> I suspect it is caused by GC pause during restart, our job has very high
>>>>> memory allocation in steady state. High GC pause then caused akka timeout,
>>>>> which then caused jobmanager to think taksmanager containers are
>>>>> unhealthy/dead and kill them. And the cycle repeats...
>>>>>
>>>>> But I hasn't been able to prove or disprove it yet. When I was asking
>>>>> the question, I was still sifting through metrics and error logs.
>>>>>
>>>>> Thanks,
>>>>> Steven
>>>>>
>>>>>
>>>>> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <
>>>>> till.rohrm...@gmail.com> wrote:
>>>>>
>>>>>> Hi Steven,
>>>>>>
>>>>>> quick correction for Flink 1.2. Indeed the MetricFetcher does not
>>>>>> pick up the right timeout value from the configuration. Instead it uses a
>>>>>> hardcoded 10s timeout. This has only been changed recently and is already
>>>>>> committed in the master. So with the next release 1.4 it will properly 
>>>>>> pick
>>>>>> up the right timeout settings.
>>>>>>
>>>>>> Just out of curiosity, what's the instability issue you're observing?
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <stevenz...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Till/Chesnay, thanks for the answers. Look like this is a
>>>>>>> result/symptom of underline stability issue that I am trying to track 
>>>>>>> down.
>>>>>>>
>>>>>>> It is Flink 1.2.
>>>>>>>
>>>>>>> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <
>>>>>>> ches...@apache.org> wrote:
>>>>>>>
>>>>>>>> The MetricFetcher always use the default akka timeout value.
>>>>>>>>
>>>>>>>>
>>>>>>>> On

Re: heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

2017-09-19 Thread Steven Wu
Stephan, agree that it is not a real memory leak. I haven't found it
affecting the system. so it is sth odd for now.

but if it is not really necessary, why do we want to defer memory release
with unpredictable behavior? can StreamTask stop() method take care of the
cleanup work and don't need to rely on finalizer() or PhantomReference?

On Tue, Sep 19, 2017 at 2:56 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> From my understanding, overriding finalize() still has some use cases and
> is valid if done correctly, (although PhantomReference has more control
> over the cleanup process). finalize() is still used in JDK classes as well.
>
> Whenever one overrides finalize(), the object cannot be immediately
> garbage collected because the finalize() method may make it reachable
> again. It results in the following life cycle:
>
>   1) object becomes unreachable, is detected eligible for GC
>   2) In the GC cycle, object is NOT collected, but finalize() is called
>   3) If object is still not reachable, it will be collected in the
> subsequent GC cycle
>
> In essence, objects that override finalize() stay for one more GC cycle.
> That may be what you are seeing. It should not be a real memory leak, but
> deferred memory release.
>
> Is this a problem that is affecting the system, or only something that
> seems odd for now?
>
> If you are very concerned about this, would you be up to contribute a
> change that uses a PhantomReference and Reference Queue for cleanup instead?
>
> Stephan
>
>
> On Tue, Sep 19, 2017 at 12:56 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Steven,
>>
>> the finalize method in StreamTask acts as a safety net in case the
>> services of the StreamTask haven't been properly shut down. In the code,
>> however, it looks as if the TimerService, for example, is always being
>> stopped in the finally block of the invoke method. Thus, it might not be
>> necessary to have the finalize method as a safety net.
>>
>> How did you kill the TaskManagers? I assume you didn't kill the JVM
>> process because otherwise you wouldn't see the finalizer objects piling up.
>>
>> I think that you can create a JIRA issue for removing the finalizer
>> method.
>>
>> Cheers,
>> Till
>>
>>
>>
>> On Thu, Sep 14, 2017 at 12:26 PM, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>
>>> Hi Steven,
>>>
>>> thanks for reporting this issue.
>>> Looping in Till who's more familiar with the task lifecycles.
>>>
>>> Thanks, Fabian
>>>
>>> 2017-09-12 7:08 GMT+02:00 Steven Wu <stevenz...@gmail.com>:
>>>
>>>> Hi ,
>>>>
>>>> I was using Chaos Monkey to test Flink's behavior against frequent
>>>> killing of task manager nodes. I found that stopped/disposed StreamTask got
>>>> retained by java finalizer. It is kind like a memory leak. Since each
>>>> StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for
>>>> 8-CPU container, there are 2.6 * 20 * 8 MB retained in heap.
>>>>
>>>> [image: Inline image 1]
>>>>
>>>> finalize() is generally not recommended for cleanup, because "*Finalizers
>>>> are unpredictable, often dangerous, and generally unnecessary*",
>>>> quoted from Joshua Bloch's book.
>>>> http://www.informit.com/articles/article.aspx?p=1216151=7
>>>>
>>>> This code from StreamTask.java seems to be the cause. Is it necessary?
>>>> can it be removed? We are using flink-1.2 release branch. But I see the
>>>> same code in flink-1.3 and master branch
>>>>
>>>> /**
>>>> * The finalize method shuts down the timer. This is a fail-safe
>>>> shutdown, in case the original
>>>> * shutdown method was never called.
>>>> *
>>>> * 
>>>> * This should not be relied upon! It will cause shutdown to happen much
>>>> later than if manual
>>>> * shutdown is attempted, and cause threads to linger for longer than
>>>> needed.
>>>> */
>>>> @Override
>>>> protected void finalize() throws Throwable {
>>>> super.finalize();
>>>> if (timerService != null) {
>>>> if (!timerService.isTerminated()) {
>>>> LOG.info("Timer service is shutting down.");
>>>> timerService.shutdownService();
>>>> }
>>>> }
>>>>
>>>> cancelables.close();
>>>> }
>>>>
>>>> Thanks,
>>>> Steven
>>>>
>>>
>>>
>>
>


heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

2017-09-11 Thread Steven Wu
Hi ,

I was using Chaos Monkey to test Flink's behavior against frequent killing
of task manager nodes. I found that stopped/disposed StreamTask got
retained by java finalizer. It is kind like a memory leak. Since each
StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for
8-CPU container, there are 2.6 * 20 * 8 MB retained in heap.

[image: Inline image 1]

finalize() is generally not recommended for cleanup, because "*Finalizers
are unpredictable, often dangerous, and generally unnecessary*", quoted
from Joshua Bloch's book.
http://www.informit.com/articles/article.aspx?p=1216151=7

This code from StreamTask.java seems to be the cause. Is it necessary? can
it be removed? We are using flink-1.2 release branch. But I see the same
code in flink-1.3 and master branch

/**
* The finalize method shuts down the timer. This is a fail-safe shutdown,
in case the original
* shutdown method was never called.
*
* 
* This should not be relied upon! It will cause shutdown to happen much
later than if manual
* shutdown is attempted, and cause threads to linger for longer than needed.
*/
@Override
protected void finalize() throws Throwable {
super.finalize();
if (timerService != null) {
if (!timerService.isTerminated()) {
LOG.info("Timer service is shutting down.");
timerService.shutdownService();
}
}

cancelables.close();
}

Thanks,
Steven


Re: akka timeout

2017-08-25 Thread Steven Wu
this is a stateless job. so we don't use RocksDB.

yeah. network can also be a possibility. will keep it in the radar.
unfortunately, our metrics system don't have the tcp metrics when running
inside containers.

On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi,
> are you using the RocksDB state backend already?
> Maybe writing the state to disk would actually reduce the pressure on the
> GC (but of course it'll also reduce throughput a bit).
>
> Are there any known issues with the network? Maybe the network bursts on
> restart cause the timeouts?
>
>
> On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Bowen,
>>
>> Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC
>> pause and akka timeout was happening. So maybe memory allocation and GC
>> wasn't really an issue. I also recently learned that JVM can pause for
>> writing to GC log for disk I/O. that is another lead I am pursuing.
>>
>> Thanks,
>> Steven
>>
>> On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <bowen...@offerupnow.com>
>> wrote:
>>
>>> Hi Steven,
>>> Yes, GC is a big overhead, it may cause your CPU utilization to
>>> reach 100%, and every process stopped working. We ran into this a while too.
>>>
>>> How much memory did you assign to TaskManager? How much the your CPU
>>> utilization when your taskmanager is considered 'killed'?
>>>
>>> Bowen
>>>
>>>
>>>
>>> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <stevenz...@gmail.com>
>>> wrote:
>>>
>>>> Till,
>>>>
>>>> Once our job was restarted for some reason (e.g. taskmangaer container
>>>> got killed), it can stuck in continuous restart loop for hours. Right now,
>>>> I suspect it is caused by GC pause during restart, our job has very high
>>>> memory allocation in steady state. High GC pause then caused akka timeout,
>>>> which then caused jobmanager to think taksmanager containers are
>>>> unhealthy/dead and kill them. And the cycle repeats...
>>>>
>>>> But I hasn't been able to prove or disprove it yet. When I was asking
>>>> the question, I was still sifting through metrics and error logs.
>>>>
>>>> Thanks,
>>>> Steven
>>>>
>>>>
>>>> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <till.rohrm...@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi Steven,
>>>>>
>>>>> quick correction for Flink 1.2. Indeed the MetricFetcher does not pick
>>>>> up the right timeout value from the configuration. Instead it uses a
>>>>> hardcoded 10s timeout. This has only been changed recently and is already
>>>>> committed in the master. So with the next release 1.4 it will properly 
>>>>> pick
>>>>> up the right timeout settings.
>>>>>
>>>>> Just out of curiosity, what's the instability issue you're observing?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <stevenz...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Till/Chesnay, thanks for the answers. Look like this is a
>>>>>> result/symptom of underline stability issue that I am trying to track 
>>>>>> down.
>>>>>>
>>>>>> It is Flink 1.2.
>>>>>>
>>>>>> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <
>>>>>> ches...@apache.org> wrote:
>>>>>>
>>>>>>> The MetricFetcher always use the default akka timeout value.
>>>>>>>
>>>>>>>
>>>>>>> On 18.08.2017 09:07, Till Rohrmann wrote:
>>>>>>>
>>>>>>> Hi Steven,
>>>>>>>
>>>>>>> I thought that the MetricFetcher picks up the right timeout from the
>>>>>>> configuration. Which version of Flink are you using?
>>>>>>>
>>>>>>> The timeout is not a critical problem for the job health.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <stevenz...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>

Re: akka timeout

2017-08-25 Thread Steven Wu
Bowen,

Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC
pause and akka timeout was happening. So maybe memory allocation and GC
wasn't really an issue. I also recently learned that JVM can pause for
writing to GC log for disk I/O. that is another lead I am pursuing.

Thanks,
Steven

On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <bowen...@offerupnow.com> wrote:

> Hi Steven,
> Yes, GC is a big overhead, it may cause your CPU utilization to reach
> 100%, and every process stopped working. We ran into this a while too.
>
> How much memory did you assign to TaskManager? How much the your CPU
> utilization when your taskmanager is considered 'killed'?
>
> Bowen
>
>
>
> On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Till,
>>
>> Once our job was restarted for some reason (e.g. taskmangaer container
>> got killed), it can stuck in continuous restart loop for hours. Right now,
>> I suspect it is caused by GC pause during restart, our job has very high
>> memory allocation in steady state. High GC pause then caused akka timeout,
>> which then caused jobmanager to think taksmanager containers are
>> unhealthy/dead and kill them. And the cycle repeats...
>>
>> But I hasn't been able to prove or disprove it yet. When I was asking the
>> question, I was still sifting through metrics and error logs.
>>
>> Thanks,
>> Steven
>>
>>
>> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <till.rohrm...@gmail.com>
>> wrote:
>>
>>> Hi Steven,
>>>
>>> quick correction for Flink 1.2. Indeed the MetricFetcher does not pick
>>> up the right timeout value from the configuration. Instead it uses a
>>> hardcoded 10s timeout. This has only been changed recently and is already
>>> committed in the master. So with the next release 1.4 it will properly pick
>>> up the right timeout settings.
>>>
>>> Just out of curiosity, what's the instability issue you're observing?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <stevenz...@gmail.com> wrote:
>>>
>>>> Till/Chesnay, thanks for the answers. Look like this is a
>>>> result/symptom of underline stability issue that I am trying to track down.
>>>>
>>>> It is Flink 1.2.
>>>>
>>>> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <ches...@apache.org>
>>>> wrote:
>>>>
>>>>> The MetricFetcher always use the default akka timeout value.
>>>>>
>>>>>
>>>>> On 18.08.2017 09:07, Till Rohrmann wrote:
>>>>>
>>>>> Hi Steven,
>>>>>
>>>>> I thought that the MetricFetcher picks up the right timeout from the
>>>>> configuration. Which version of Flink are you using?
>>>>>
>>>>> The timeout is not a critical problem for the job health.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <stevenz...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed
>>>>>> the setting in Flink UI. But I saw akka timeout of 10 s for metric query
>>>>>> service. two questions
>>>>>> 1) why doesn't metric query use the 60 s value configured in yaml
>>>>>> file? does it always use default 10 s value?
>>>>>> 2) could this cause heartbeat failure between task manager and job
>>>>>> manager? or is this jut non-critical failure that won't affect job 
>>>>>> health?
>>>>>>
>>>>>> Thanks,
>>>>>> Steven
>>>>>>
>>>>>> 2017-08-17 23:34:33,421 WARN 
>>>>>> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
>>>>>> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask
>>>>>> timed out on [Actor[akka.tcp://flink@1.2.3.4
>>>>>> :39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]]
>>>>>> after [1 ms] at akka.pattern.PromiseActorRef$$
>>>>>> anonfun$1.apply$mcV$sp(AskSupport.scala:334) at
>>>>>> akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
>>>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>>>> at 
>>>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>>>> at 
>>>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>>>> at 
>>>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>>>>>> at 
>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>>>>>> at 
>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>>>>>> at 
>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: akka timeout

2017-08-23 Thread Steven Wu
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got
killed), it can stuck in continuous restart loop for hours. Right now, I
suspect it is caused by GC pause during restart, our job has very high
memory allocation in steady state. High GC pause then caused akka timeout,
which then caused jobmanager to think taksmanager containers are
unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the
question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <till.rohrm...@gmail.com>
wrote:

> Hi Steven,
>
> quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up
> the right timeout value from the configuration. Instead it uses a hardcoded
> 10s timeout. This has only been changed recently and is already committed
> in the master. So with the next release 1.4 it will properly pick up the
> right timeout settings.
>
> Just out of curiosity, what's the instability issue you're observing?
>
> Cheers,
> Till
>
> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Till/Chesnay, thanks for the answers. Look like this is a result/symptom
>> of underline stability issue that I am trying to track down.
>>
>> It is Flink 1.2.
>>
>> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> The MetricFetcher always use the default akka timeout value.
>>>
>>>
>>> On 18.08.2017 09:07, Till Rohrmann wrote:
>>>
>>> Hi Steven,
>>>
>>> I thought that the MetricFetcher picks up the right timeout from the
>>> configuration. Which version of Flink are you using?
>>>
>>> The timeout is not a critical problem for the job health.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <stevenz...@gmail.com> wrote:
>>>
>>>>
>>>> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
>>>> setting in Flink UI. But I saw akka timeout of 10 s for metric query
>>>> service. two questions
>>>> 1) why doesn't metric query use the 60 s value configured in yaml file?
>>>> does it always use default 10 s value?
>>>> 2) could this cause heartbeat failure between task manager and job
>>>> manager? or is this jut non-critical failure that won't affect job health?
>>>>
>>>> Thanks,
>>>> Steven
>>>>
>>>> 2017-08-17 23:34:33,421 WARN 
>>>> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
>>>> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed
>>>> out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryServic
>>>> e_23cd9db754bb7d123d80e6b1c0be21d6]] after [1 ms] at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> at 
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> at 
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> at 
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>>>> at 
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>>>> at 
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>>>> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>>
>>>
>>>
>>>
>>
>


Re: akka timeout

2017-08-18 Thread Steven Wu
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of
underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <ches...@apache.org>
wrote:

> The MetricFetcher always use the default akka timeout value.
>
>
> On 18.08.2017 09:07, Till Rohrmann wrote:
>
> Hi Steven,
>
> I thought that the MetricFetcher picks up the right timeout from the
> configuration. Which version of Flink are you using?
>
> The timeout is not a critical problem for the job health.
>
> Cheers,
> Till
>
> On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <stevenz...@gmail.com> wrote:
>
>>
>> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
>> setting in Flink UI. But I saw akka timeout of 10 s for metric query
>> service. two questions
>> 1) why doesn't metric query use the 60 s value configured in yaml file?
>> does it always use default 10 s value?
>> 2) could this cause heartbeat failure between task manager and job
>> manager? or is this jut non-critical failure that won't affect job health?
>>
>> Thanks,
>> Steven
>>
>> 2017-08-17 23:34:33,421 WARN 
>> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
>> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed
>> out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryServic
>> e_23cd9db754bb7d123d80e6b1c0be21d6]] after [1 ms] at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>> at 
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>> at 
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>> at akka.actor.LightArrayRevolverScheduler$TaskHolder.
>> executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverS
>> cheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at
>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>> at java.lang.Thread.run(Thread.java:748)
>>
>
>
>


akka timeout with metric fetcher

2017-08-17 Thread Steven Wu
We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
setting in Flink UI. But I saw akka timeout of 10 s for metric query
service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file?
does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager?
or is this just non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN
org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
- Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out
on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_
23cd9db754bb7d123d80e6b1c0be21d6]] after [1 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
scala.concurrent.Future$InternalCallbackExecutor$.
unbatchedExecute(Future.scala:599) at scala.concurrent.
BatchingExecutor$class.execute(BatchingExecutor.scala:109) at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:748)


akka timeout

2017-08-17 Thread Steven Wu
We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the
setting in Flink UI. But I saw akka timeout of 10 s for metric query
service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file?
does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager?
or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN
org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching
metrics failed. akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]]
after [1 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:748)