Restore from savepoint with Iterations

2020-05-04 Thread Ashish Pokharel
Hi all,

Hope everyone is doing well!

I am running into what seems like a deadlock (application stalled) situation 
with a Flink streaming job upon restore from savepoint. Job has a slowly moving 
stream (S1) that needs to be “stateful” and a continuous stream (S2) which is 
“joined” with slow moving stream (S1). Some level of loss/repetition is 
acceptable in continuous stream (S2) and hence can rely on something like Kafka 
consumer states upon restarts etc. Continuous stream (S2) however needs to be 
iterated through states from slowly moving streams (S1) a few times (mostly 2). 
States are fair sized (ends up being 15GB on HDFS). When job is restarted with 
no continuous data (S2) on topic job starts up, restores states and does it’s 
initial checkpoint within 3 minutes. However, when app is started from 
savepoint and continuous stream (S2) is actually present in Kafka it seems like 
application comes to a halt. Looking at progress of checkpoints, it seems like 
every attempt is stuck after until some timeouts happen at around 10 mins. If 
iteration on stream is removed app can successfully start and checkpoint even 
when continuous stream (S2) is flowing in as well. Unfortunately we are working 
on a hosted environment for both data and platform, hence debugging with thread 
dumps etc will be challenging. 

I couldn’t find a known issue on this but was wondering if anyone has seen such 
behavior or know of any issues in past. It does look like checkpointing has to 
be set to forced to get an iterative job to checkpoint in the first place (an 
option that is marked deprecated already - working on 1.8.2 version as of now). 
I do understand challenges around consistent checkpointing of iterative stream. 
As I mentioned earlier, what I really want to maintain for the most part are 
states of slowly moving dimensions. Iterations does solve the problem at hand 
(multiple loops of logic) pretty gracefully but not being able to restore from 
savepoint will be a show stopper. 

Will appreciate any pointer / suggestions.

Thanks in advance, 

Ashish

Re: Restore from savepoint with Iterations

2020-05-04 Thread Ashish Pokharel
Could this be FLIP-15 related as well then?

> On May 4, 2020, at 9:41 PM, Ashish Pokharel  wrote:
> 
> Hi all,
> 
> Hope everyone is doing well!
> 
> I am running into what seems like a deadlock (application stalled) situation 
> with a Flink streaming job upon restore from savepoint. Job has a slowly 
> moving stream (S1) that needs to be “stateful” and a continuous stream (S2) 
> which is “joined” with slow moving stream (S1). Some level of loss/repetition 
> is acceptable in continuous stream (S2) and hence can rely on something like 
> Kafka consumer states upon restarts etc. Continuous stream (S2) however needs 
> to be iterated through states from slowly moving streams (S1) a few times 
> (mostly 2). States are fair sized (ends up being 15GB on HDFS). When job is 
> restarted with no continuous data (S2) on topic job starts up, restores 
> states and does it’s initial checkpoint within 3 minutes. However, when app 
> is started from savepoint and continuous stream (S2) is actually present in 
> Kafka it seems like application comes to a halt. Looking at progress of 
> checkpoints, it seems like every attempt is stuck after until some timeouts 
> happen at around 10 mins. If iteration on stream is removed app can 
> successfully start and checkpoint even when continuous stream (S2) is flowing 
> in as well. Unfortunately we are working on a hosted environment for both 
> data and platform, hence debugging with thread dumps etc will be challenging. 
> 
> I couldn’t find a known issue on this but was wondering if anyone has seen 
> such behavior or know of any issues in past. It does look like checkpointing 
> has to be set to forced to get an iterative job to checkpoint in the first 
> place (an option that is marked deprecated already - working on 1.8.2 version 
> as of now). I do understand challenges around consistent checkpointing of 
> iterative stream. As I mentioned earlier, what I really want to maintain for 
> the most part are states of slowly moving dimensions. Iterations does solve 
> the problem at hand (multiple loops of logic) pretty gracefully but not being 
> able to restore from savepoint will be a show stopper. 
> 
> Will appreciate any pointer / suggestions.
> 
> Thanks in advance, 
> 
> Ashish



Re: Restore from savepoint with Iterations

2020-05-04 Thread Ashish Pokharel
Hi Ken,

Thanks for the quick response!

I came across FLIP-15 on my next google search after I sent email :) It 
DEFINITELY looks that way. As I was watching logs and nature of how job gets 
stuck it does look like buffer is blocked. But FLIP-15 has not moved further 
though. So there are no workarounds at all at this point? Perhaps a technique 
to block Kafka Consumer for some time? Even that may get me going but looks 
like there is probability of this happening during the normal processing as 
your use case demonstrates. I am using iteration with no timeouts for prod job, 
using timeouts only in unit testing.Theory was in prod input stream will be 
indefinite and sometime long lull of no event might happen during maintenance, 
backlog etc. I really would like to avoid a bloat in the DAG by repeating same 
functions with filters and side outputs. Other than obvious repetition, it will 
increase the site of states by a factor. Even those slowly moving dimensions 
are not light (around half billion every day) :) 

> On May 4, 2020, at 10:13 PM, Ken Krugler  wrote:
> 
> Hi Ashish,
> 
> Wondering if you’re running into the gridlock problem I mention on slide #25 
> here: 
> https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink
>  
> <https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink>
> 
> If the iteration path has too much data in it, then the network buffer at the 
> head of the iteration can fill up, and it never clears out because the 
> operator consuming those buffers is blocked writing to the next operator in 
> the iteration, and so on back to the head.
> 
> We ran into this when outlinks from web pages caused fan-out/amplification of 
> the data being iterated, but maybe you hit it with restoring from state.
> 
> — Ken
> 
> 
>> On May 4, 2020, at 6:41 PM, Ashish Pokharel > <mailto:ashish...@yahoo.com>> wrote:
>> 
>> Hi all,
>> 
>> Hope everyone is doing well!
>> 
>> I am running into what seems like a deadlock (application stalled) situation 
>> with a Flink streaming job upon restore from savepoint. Job has a slowly 
>> moving stream (S1) that needs to be “stateful” and a continuous stream (S2) 
>> which is “joined” with slow moving stream (S1). Some level of 
>> loss/repetition is acceptable in continuous stream (S2) and hence can rely 
>> on something like Kafka consumer states upon restarts etc. Continuous stream 
>> (S2) however needs to be iterated through states from slowly moving streams 
>> (S1) a few times (mostly 2). States are fair sized (ends up being 15GB on 
>> HDFS). When job is restarted with no continuous data (S2) on topic job 
>> starts up, restores states and does it’s initial checkpoint within 3 
>> minutes. However, when app is started from savepoint and continuous stream 
>> (S2) is actually present in Kafka it seems like application comes to a halt. 
>> Looking at progress of checkpoints, it seems like every attempt is stuck 
>> after until some timeouts happen at around 10 mins. If iteration on stream 
>> is removed app can successfully start and checkpoint even when continuous 
>> stream (S2) is flowing in as well. Unfortunately we are working on a hosted 
>> environment for both data and platform, hence debugging with thread dumps 
>> etc will be challenging. 
>> 
>> I couldn’t find a known issue on this but was wondering if anyone has seen 
>> such behavior or know of any issues in past. It does look like checkpointing 
>> has to be set to forced to get an iterative job to checkpoint in the first 
>> place (an option that is marked deprecated already - working on 1.8.2 
>> version as of now). I do understand challenges around consistent 
>> checkpointing of iterative stream. As I mentioned earlier, what I really 
>> want to maintain for the most part are states of slowly moving dimensions. 
>> Iterations does solve the problem at hand (multiple loops of logic) pretty 
>> gracefully but not being able to restore from savepoint will be a show 
>> stopper. 
>> 
>> Will appreciate any pointer / suggestions.
>> 
>> Thanks in advance, 
>> 
>> Ashish
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 



Re: IoT Use Case, Problem and Thoughts

2018-06-13 Thread Ashish Pokharel
Hi Fabian,

Thanks for the prompt response and apologies for delayed response. 

You wrapped up the bottom lines pretty well - if I were to wrap it up I’d say 
“best possible” recovery on “known" restarts either say manual cancel + start 
OR framework initiated ones like on operator failures with these constraints 
 - some data loss is ok
 - avoid periodic checkpoints as states are really transient (less than 5 
seconds of lifetime if not milliseconds) and almost all events make it to 
state. I do understand that checkpointing performance has drastically been 
improved and with async and RocksDB options, it should technically not add 
latency in application etc. However, I feel like even with improvements and 
local checkpointing (which we already are doing) it is a lot of “unused” 
IOPS/resource utilization especially if we start to spin up more apps handling 
similar data sources and with similar requirements. On a first blush it feels 
like those resources are better utilized in cluster for apps with stricter SLAs 
for data loss and recovery etc instead.

Basically, I suppose I am thinking Checkpointing feature that is initialized by 
certain actions / events rather than periodic ones. Let me know I am off-base 
here and I should just enable checkpointing in all of these apps and move on :) 

I tried Savepoint again and it looks like the issue is caused by the fact that 
Memory states are large as it is throwing error states are larger than certain 
size. So solution of (1) will possibly solve (2) as well. 

Thanks again,

Ashish


> On Jun 7, 2018, at 4:25 PM, Fabian Hueske  wrote:
> 
> Hi Ashish,
> 
> Thanks for the great write up. 
> If I understood you correctly, there are two different issues that are caused 
> by the disabled checkpointing.
> 
> 1) Recovery from a failure without restarting all operators to preserve the 
> state in the running tasks
> 2) Planned restarts an application without losing all state (even with 
> disabled checkpointing).
> 
> Ad 1) The community is constantly working on reducing the time for 
> checkpointing and recovery. 
> For 1.5, local task recovery was added, which basically stores a state copy 
> on the local disk which is read in case of a recovery. So, tasks are 
> restarted but don't read the to restore state from distributed storage but 
> from the local disk.
> AFAIK, this can only be used together with remote checkpoints. I think this 
> might be an interesting option for you if it would be possible to write 
> checkpoints only to local disk and not remote storage. AFAIK, there are also 
> other efforts to reduce the number of restarted tasks in case of a failure. I 
> guess, you've played with other features such as RocksDBStateBackend, 
> incremental and async checkpoints already. 
> 
> Ad 2) It sounds as if savepoints are exactly the feature your are looking 
> for. It would be good to know what exactly did not work for you. The 
> MemoryStateBackend is not suitable for large state sizes because it backups 
> into the heap memory of the JobManager. 
> 
> Best, Fabian
> 
> 2018-06-05 21:57 GMT+02:00 ashish pok  >:
> Fabian, Stephan, All,
> 
> I started a discussion a while back around having a form of event-based 
> checkpointing policy that will help us in some of our high volume data 
> pipelines. Here is an effort to put this in front of community and understand 
> what capabilities can support these type of use cases, how much others feel 
> the same need and potentially a feature that can make it to a user story.
> 
> Use Case Summary:
> - Extremely high volume of data (events from consumer devices with customer 
> base of over 100M)
> - Multiple events need to be combined using a windowing streaming app grouped 
> by keys (something like 5 min floor of timestamp and unique identifiers for 
> customer devices)
> - "Most" events by a group/key arrive in few seconds if not milliseconds 
> however events can sometimes delay or get lost in transport (so delayed event 
> handling and timeouts will be needed)
> - Extremely low (pretty vague but hopefully details below clarify it more) 
> data loss is acceptable
> - Because of the volume and transient nature of source, checkpointing is 
> turned off (saves on writes to persistence as states/sessions are active for 
> only few seconds during processing)
> 
> Problem Summary:
> Of course, none of the above is out of the norm for Flink and as a matter of 
> factor we already have a Flink app doing this. The issue arises when it comes 
> to graceful shutdowns and on operator failures (eg: Kafka timeouts etc.) On 
> operator failures, entire job graph restarts which essentially flushes out 
> in-memory states/sessions. I think there is a feature in works (not sure if 
> it made it to 1.5) to perform selective restarts which will control the 
> damage but still will result in data loss. Also, it doesn't help when 
> application restarts are needed. We did try going savepoint route for 

Re: IoT Use Case, Problem and Thoughts

2018-06-15 Thread Ashish Pokharel
ilto:fhue...@gmail.com>> wrote:
> Hi Ashish,
> 
> (I think) I understand your requirements and the approach of just keep 
> non-failing tasks running is intuitively a good idea.
> However, this is only an option for use cases that are OK with at-least-once 
> semantics (otherwise, we'd need to reset the state of the still running tasks 
> and hence take checkpoints). 
> Moreover, the distributed task coordination for keeping some tasks running, 
> restarting others, and connecting them is obviously more difficult than 
> "just" canceling the whole job and starting it again.
> 
> I have to admit that I'm not that familiar with Flink's distributed task 
> coordination. Till in CC knows much more about that.
> However, I think the question here is, how many use cases would benefit from 
> a recovery mode with at-most-once state guarantees and how much 
> implementation effort would it be to support it.
> 
> Regarding the savepoints, if you are using the MemoryStateBackend failure at 
> too large state size is expected since all state is replicated into the 
> JobManager JVM. 
> Did you try to use the FsStateBackend? It also holds the state on the 
> TaskManager heap but backups it to a (distributed) filesystem.
> 
> Best, Fabian
> 
> 2018-06-14 4:18 GMT+02:00 Ashish Pokharel  <mailto:ashish...@yahoo.com>>:
> Hi Fabian,
> 
> Thanks for the prompt response and apologies for delayed response. 
> 
> You wrapped up the bottom lines pretty well - if I were to wrap it up I’d say 
> “best possible” recovery on “known" restarts either say manual cancel + start 
> OR framework initiated ones like on operator failures with these constraints 
>  - some data loss is ok
>  - avoid periodic checkpoints as states are really transient (less than 5 
> seconds of lifetime if not milliseconds) and almost all events make it to 
> state. I do understand that checkpointing performance has drastically been 
> improved and with async and RocksDB options, it should technically not add 
> latency in application etc. However, I feel like even with improvements and 
> local checkpointing (which we already are doing) it is a lot of “unused” 
> IOPS/resource utilization especially if we start to spin up more apps 
> handling similar data sources and with similar requirements. On a first blush 
> it feels like those resources are better utilized in cluster for apps with 
> stricter SLAs for data loss and recovery etc instead.
> 
> Basically, I suppose I am thinking Checkpointing feature that is initialized 
> by certain actions / events rather than periodic ones. Let me know I am 
> off-base here and I should just enable checkpointing in all of these apps and 
> move on :) 
> 
> I tried Savepoint again and it looks like the issue is caused by the fact 
> that Memory states are large as it is throwing error states are larger than 
> certain size. So solution of (1) will possibly solve (2) as well. 
> 
> Thanks again,
> 
> Ashish
> 
> 
>> On Jun 7, 2018, at 4:25 PM, Fabian Hueske > <mailto:fhue...@gmail.com>> wrote:
>> 
>> Hi Ashish,
>> 
>> Thanks for the great write up. 
>> If I understood you correctly, there are two different issues that are 
>> caused by the disabled checkpointing.
>> 
>> 1) Recovery from a failure without restarting all operators to preserve the 
>> state in the running tasks
>> 2) Planned restarts an application without losing all state (even with 
>> disabled checkpointing).
>> 
>> Ad 1) The community is constantly working on reducing the time for 
>> checkpointing and recovery. 
>> For 1.5, local task recovery was added, which basically stores a state copy 
>> on the local disk which is read in case of a recovery. So, tasks are 
>> restarted but don't read the to restore state from distributed storage but 
>> from the local disk.
>> AFAIK, this can only be used together with remote checkpoints. I think this 
>> might be an interesting option for you if it would be possible to write 
>> checkpoints only to local disk and not remote storage. AFAIK, there are also 
>> other efforts to reduce the number of restarted tasks in case of a failure. 
>> I guess, you've played with other features such as RocksDBStateBackend, 
>> incremental and async checkpoints already. 
>> 
>> Ad 2) It sounds as if savepoints are exactly the feature your are looking 
>> for. It would be good to know what exactly did not work for you. The 
>> MemoryStateBackend is not suitable for large state sizes because it backups 
>> into the heap memory of the JobManager. 
>> 
>> Best, Fabian
>> 
>> 2018-06-05 21:57 GMT

Re: How to partition within same physical node in Flink

2018-07-04 Thread Ashish Pokharel
Thanks - I will wait for Stefan’s comments before I start digging in.

> On Jul 4, 2018, at 4:24 AM, Fabian Hueske  wrote:
> 
> Hi Ashish,
> 
> I think we don't want to make it an official public API (at least not at this 
> point), but maybe you can dig into the internal API and leverage it for your 
> use case.
> I'm not 100% sure about all the implications, that's why I pulled in Stefan 
> in this thread.
> 
> Best, Fabian
> 
> 2018-07-02 15:37 GMT+02:00 ashish pok  >:
> Thanks Fabian! It sounds like KeyGroup will do the trick if that can be made 
> publicly accessible.
> 
> On Monday, July 2, 2018, 5:43:33 AM EDT, Fabian Hueske  > wrote:
> 
> 
> Hi Ashish, hi Vijay,
> 
> Flink does not distinguish between different parts of a key (parent, child) 
> in the public APIs. However, there is an internal concept of KeyGroups which 
> is similar to what you call physical partitioning. A KeyGroup is a group of 
> keys that are always processed on the same physical node. The motivation for 
> this feature is operator scaling because all keys of a group are always 
> processed by the same node and hence their state is always distributed 
> together. However, AFAIK, KeyGroups are not exposed to the user API. 
> Moreover, KeyGroups are distributed to slots, i.e., each KeyGroup is 
> processed by a single slot, but each slot might processes multiple key 
> groups. This distribution is done with hash partitioning and hence hard to 
> tune.
> 
> There might be a way to tweak this by implementing an own low-level operator 
> but I'm not sure. Stefan (in CC) might be able  to give some hints.
> 
> Best, Fabian
> 
> 2018-06-29 18:35 GMT+02:00 Vijay Balakrishnan  >:
> Thanks for the clarification, Fabian.
> This is what I compromised on for my use-case-doesn't exactly do what I 
> intended to do.
> Partition by a key, and then spawn threads inside that partition to do my 
> task and then finally repartition again(for a subsequent connect).
> 
> DataStream keyedByCamCameraStream = env
> .addSource(new Source())
> .keyBy((cameraWithCube) -> cameraWithCube.getCam() );
> AsyncFunction cameraWithCubeAsyncFunction =
> new SampleAsyncFunction(, nThreads);//spawn threads here 
> with the second key ts here
> DataStream cameraWithCubeDataStreamAsync =
> AsyncDataStream.orderedWait( keyedByCamCameraStream, 
> cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nThreads)
> .setParallelism( parallelCamTasks);//capacity= max # 
> of inflight requests - how much; timeout - max time until considered failed
> 
> DataStream cameraWithCubeDataStream = 
> cameraWithCubeDataStreamAsync. keyBy((cameraWithCube) -> 
> cameraWithCube.getTs());  
>  
>   
> 
> 
> On Thu, Jun 28, 2018 at 9:22 AM ashish pok  > wrote:
> Fabian, All,
> 
> Along this same line, we have a datasource where we have parent key and child 
> key. We need to first keyBy parent and then by child. If we want to have 
> physical partitioning in a way where physical partiotioning happens first by 
> parent key and localize grouping by child key, is there a need to using 
> custom partitioner? Obviously we can keyBy twice but was wondering if we can 
> minimize the re-partition stress.
> 
> Thanks,
> 
> Ashish
> 
> 
> - Ashish
> 
> On Thursday, June 28, 2018, 9:02 AM, Fabian Hueske  > wrote:
> 
> Hi Vijay,
> 
> Flink does not provide fine-grained control to place keys to certain slots or 
> machines. 
> When specifying a key, it is up to Flink (i.e., its internal hash function) 
> where the data is processed. This works well for large key spaces, but can be 
> difficult if you have only a few keys.
> 
> So, even if you keyBy(cam) and handle the parallelization of seq# internally 
> (which I would not recommend), it might still happen that the data of two 
> cameras is processed on the same slot.
> The only way to change that would be to fiddle with the hash of your keys, 
> but this might give you a completely different distribution when scaling out 
> the application at a later point in time.
> 
> Best, Fabian
> 2018-06-26 19:54 GMT+02:00 Vijay Balakrishnan  >:
> Hi Fabian,
> Thanks once again for your reply. I need to get the data from each cam/camera 
> into 1 partition/slot and not move the gigantic video data around as much as 
> I perform other operations on it. For eg, I can get seq#1 and seq#2 for cam1 
> in cam1 partition/slot and then combine, split,parse, stitch etc. operations 
> on it in multiple threads within the same cam1 partition.
> 
> I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in 1 
> partition(eg: cam1). The idea is to then work within the cam1 partition with 
> variou

Re: Memory Leak in ProcessingTimeSessionWindow

2018-07-22 Thread Ashish Pokharel
One more attempt to get some feedback on this. It basically boils down to using 
High-Level Window API in scenarios where keys are unbounded / infinite but can 
be expired after certain time. From what we have observed (solution 2 below), 
some properties of keys are still in state (guessing key itself and watermarks 
etc). Is there any way to clean these up as FIRE_AND_PURGE trigger doesn’t seem 
to do it? I am of an option that even if we end up using HDFS or RocksDB backed 
State, we would think we would still want to clean those up. Any suggestions on 
this before we start re-writing our apps to start using Low-Level Process APIs 
in general? 

Thanks, Ashish

> On Jul 2, 2018, at 10:47 AM, ashish pok  wrote:
> 
> All,
> 
> I have been doing a little digging on this and to Stefan's earlier point 
> incrementing memory (not necessarily leak) was essentially because of keys 
> that were incrementing as I was using time buckets concatenated with actual 
> key to make unique sessions.
> 
> Taking a couple of steps back, use case is very simple tumbling window of 15 
> mins by keys. Stream can be viewed simply as:
> 
> ||
> 
> We have a few of these type of pipelines and one catch here is we wanted to 
> create an app which can process historical and current data. HIstorical data 
> is mainly because users adhoc request for "backfill". In order to easily 
> manage processing pipeline, we are making no distinction between historical 
> and current data as processing is based on event time. 
> 
> 1) Of course, easiest way to solve this problem is to create TumblingWindow 
> of 15mins with some allowed lateness. One issue here was watermarks are moved 
> forward and backfill data appeared to be viewed as late arrival data, which 
> is a correct behavior from Flink perspective but seems to be causing issues 
> in how we are trying to handle streams.
> 
> 2) Another issue is our data collectors are highly distributed - we regularly 
> get data from later event time buckets faster than older buckets. Also, it is 
> also more consistent to actually create 15min buckets using concept of 
> Session instead. So I am creating a key with | 
> and a session gap of say 10 mins. This works perfectly from business logic 
> perspective. However, now I am introducing quite a lot of keys which based on 
> my heap dumps seem to be hanging around causing memory issues.
> 
> 3) We converted the apps to a Process function and manage all states using 
> key defined in step (2) and registering/unregistering timeouts. 
> 
> Solution (3) seems to be working pretty stable from memory perspective. 
> However, it just feels like with so much high-level APIs available, we are 
> not using them properly and keep reverting back to low level Process APIs - 
> in the last month we have migrated about 5 or 6 apps to Process now :) 
> 
> For solution (2) it feels like any other Session aggregation use case will 
> have the issue of keys hanging around (eg: for click streams with user 
> sessions etc). Isn't there a way to clear those session windows? Sorry, I 
> just feel like we are missing something simple and have been reverting to low 
> level APIs instead.
> 
> Thanks, Ashish



Re: IoT Use Case, Problem and Thoughts

2018-07-22 Thread Ashish Pokharel
Till, Fabian,

Looping back after a gap on this, for some reason this looks like a need very 
specific to us (I would have thought this would be of interest to others as 
well). We on-boarded one of our new IoT data sources and our total checkpoints 
right now are over 1TB and checkpoint period is 5 seconds - those are with 
delta states enabled (I explained how transient the states are previously). I 
sincerely don’t see any need of this, especially given tolerances we have for 
little loss/dups and also the fact that we are going to on-board a few data 
sources at this scale. We switched over to local SSDs on our cluster just to 
isolate this use case from destroying our HDFS :)

Of source it is easier said than done, but event based Checkpoint (eg: we are 
able to checkpoint when RESTART happens etc) as discussed below would be great. 

Thanks, Ashish


> On Jun 15, 2018, at 10:28 PM, Ashish Pokharel  wrote:
> 
> Hi Till, Fabian,
> 
> Thanks for your responses again. 
> 
> Till, you have nailed it. I will comment on them individually. But first, I 
> feel like I am still not stating it well enough to illustrate the need. May 
> be I am overthinking :)
> 
> So let me try one more time with a preface that we are talking about millions 
> of sensors reporting logs/metrics. So in my cluster we can potentially have 
> 10s if not 100s of such apps for variety of data. I currently have 1 app in 
> Prod so I can do a lot testing :) Just as a test, I enabled RocksDB State 
> Backend and Checkpointing every 5 seconds with Graphite metrics enabled. On 
> an average I could see almost 25GB of total state being written across couple 
> of hundred slots based on Graphite numbers - it is setup with incremental and 
> async Checkpoints. I am assuming main reason being states are transient and 
> deltas are essentially entire set of new states. Our main concern is 
> real-time processing vs no data loss or even possibly a few duplicates. To 
> Fabian’s point, at least once vs exactly once semantics are also not of 
> utmost concern at this point. Now, bottom line is I have Checkpointing 
> disabled and use MemoryStateBackend with the thought that writing massive 
> states to persistence every few seconds didn’t seem like best use of 
> resources - I’d rather fit in more of these apps in cluster and use stateful 
> processing for apps we really need them on. However, this leads to 2 main 
> issue
> 
> 1- If an operator fails (let’s say Kafka timeouts), entire job graph restarts 
> which leads us to more than desirable gap of data (lost states across 100s of 
> operators) as obviously there is no recoverable state
> 2- Same issue happens in planned restarts
> 
> Per Fabian’s suggestion, I am going to try RocksDB State Backend with local 
> drives and run some baseline tests - hoping states are kept in memory for the 
> most part unless spillover is needed. This should at least allow us with 
> decent solution of (2). I am still not convinced we should enable periodic 
> Checkpointing (perhaps I am wrong here but again I have highlighted my 
> reasons above).
> 
> "
>> Just for my own understanding: What do you want to do with event-based 
>> checkpointing triggers (onRestart, onShutdown?). Do you want to draw a 
>> partial snapshot which should then be used for recovery? Technically, this 
>> is possible but I'm not sure how much value this would add for Flink users. 
>> A partial snapshot could also be completely empty (equivalent of disabling 
>> checkpointing).
> 
>> I can see the point of making the checkpoint triggering more flexible and 
>> giving some control to the user. In contrast to savepoints, checkpoints are 
>> considered for recovery. My question here would be, what would be the 
>> triggering condition in your case (other than time)?
> "
> I’d think trigger condition would be based on life-cycle hook like RESTART 
> (or perhaps even an external message when FLINK-6131 is available may be). 
> Partial (best possible) snapshot is exactly what it would be - states from 
> failing operators cannot be expected to be recoverable obviously.
> 
>> What the community will add very soon is an atomic stop with savepoint call 
>> which will take a savepoint of your job's state when and shut it down.
> 
> Very nice! Would this also have same need to use Fs or RocksDB State Backend? 
> It shouldn’t be an issue for us as long as my tests above turn out to be 
> decent. 
> 
> Thanks again guys for your advice and feedback. Really appreciate it.
> 
> — Ashish
> 
>  
>> On Jun 15, 2018, at 5:43 AM, Till Rohrmann > <mailto:trohrm...@apache.org>> wrote:
>> 
>> Hi,
>> 
>> ideally we would not have to cancel all ta

Permissions to delete Checkpoint on cancel

2018-07-22 Thread Ashish Pokharel
All,

We recently moved our Checkpoint directory from HDFS to local SSDs mounted on 
Data Nodes (we were starting to see perf impacts on checkpoints etc as complex 
ML apps were spinning up more and more in YARN). This worked great other than 
the fact that when jobs are being canceled or canceled with Savepoint, local 
data is not being cleaned up. In HDFS, Checkpoint directories were cleaned up 
on Cancel and Cancel with Savepoints as far as I can remember. I am wondering 
if it is permissions issue. Local disks have RWX permissions for both yarn and 
flink headless users (flink headless user submits the apps to YARN using our 
CICD pipeline). 

Appreciate any pointers on this.

Thanks, Ashish

Re: Questions on Unbounded number of keys

2018-07-28 Thread Ashish Pokharel
Andrey, Till,

This doesn’t jive with what I have noticed (fully acknowledge that I am still 
getting hang of the framework). I sent a couple of notes on this in earlier 
threads. 

With this very simple processing, I am running into slow creep up of memory 
with unbounded keys, which eventually ends up with OOM. 

DataStream processedData = rawTuples
.keyBy(PlatformTuple::getKey)

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(120
 
.trigger(new ProcessingTimePurgeTrigger())
.apply(new MetricWindowFn())
.name("windowFunctionTuple")
.map(new TupleToEventMapFn())
.name("mapTupleEvent")
;


I initially didnt even have ProcessingTmePurgeTrigger and it was using default 
Trigger. In an effort to fix this issue, I created my own Trigger from default 
ProcessingTimeTrigger with simple override to onProcessingTime method 
(essentially replacing FIRE with FIRE_AND_PURGE)

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, 
TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}

Of course, I can switch to RocksDB backend but it feels like we are simply 
pushing problem down to storage now. We may end up using RocksDB but wanted to 
make sure our clean-ups are working properly before doing so. I attempted heap 
dump as well and from what I could see some Key related objects seems to be 
hanging around (perhaps watermarks etc???). Heap was over 8GB and hence time 
consuming to perform more introspection. I have been meaning to get back to 
creating same app with smaller footprint to scan heap dump better but haven’t 
had time (esp because below works fine and it is not burning issue, however 
reverting to low level API is not my preference as we are bypassing nice high 
level APIs)

If I replace window function with Process function and use timeService to 
implement the same logic and clear state onTimer method, this issue goes away. 
@Till’s comment seems to indicate this as a possible solution, is that correct? 
If so, what would be the best way to get this done in above snippet? I would 
have thought FIRE_AND_PURGE would have done the same thing.

Appreciate your pointers on this.

Thanks, Ashish


> On Jul 26, 2018, at 4:55 AM, Andrey Zagrebin  wrote:
> 
> Hi Chang Liu,
> 
> The unbounded nature of the stream keyed or not should not lead to out of 
> memory. 
> 
> Flink parallel keyed operator instances have fixed number (parallelism) and 
> just process some range of keyed elements, in your example it is a subrange 
> of session ids. 
> 
> The keyed processed elements (http requests) are objects created when they 
> enter the pipeline and garage collected after having been processed in 
> streaming fashion. 
> 
> If they arrive very rapidly it can lead to high back pressure from upstream 
> to downstream operators, buffers can become full and pipeline stops/slows 
> down processing external inputs, it usually means that your pipeline is under 
> provisioned. 
> 
> The only accumulated data comes from state (windows, user state etc), so if 
> you control its memory consumption, as Till described, there should be no 
> other source of out of memory.
> 
> Cheers,
> Andrey
> 
>> On 25 Jul 2018, at 19:06, Chang Liu > > wrote:
>> 
>> Hi Till,
>> 
>> Thanks for your reply. But I think maybe I did not make my question clear. 
>> My question is not about whether the States within each keyed operator 
>> instances will run out of memory. My question is about, whether the 
>> unlimited keyed operator instances themselves will run out of memory.
>> 
>> So to reply to your answers, no matter using different State backends or 
>> regularly cleaning up the States (which is exactly what I am doing), it does 
>> not concern the number of keyed operator instances.
>> 
>> I would like to know:
>> Will the number of keyed operator instances (Java objects?) grow unbounded? 
>> If so, will they run out of memory? This is not actually related to the 
>> memory used by the keyed Stated inside.
>> If not, then how Flink is managing this multiple keyed operator instances?
>> 
>> I think this needs more knowledge about how Flink works internally to 
>> understand how keyed operator instances are created, maintained and 
>> destroyed. That’s why I would like your help understanding this.
>> 
>> Many Thanks.
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
>>> On 24 Jul 2018, at 14:31, Till Rohrmann >> > wrote:
>>> 
>>> Hi Chang Liu,
>>> 
>>> if you are dealing with an unlimited number of keys and keep state around 
>>> for every key, then your state size will keep growing with the number of 
>>

Re: Kafka consumer behavior with same group Id, 2 instances of apps in separate cluster?

2019-09-03 Thread Ashish Pokharel
Thanks Becket,

Sorry for delayed response. That’s what I thought as well. I built a hacky 
custom source today directly using Kafka client which was able to join consumer 
group etc. which works as I expected but not sure about production readiness 
for something like that :)

The need arises because of (1) Business continuity needs (2) Some of the 
pipelines we are building are close to network edge and need to run on nodes 
where we are not allowed to create cluster (yea - let’s not get into that can 
of security related worms :)). We will get there at some point but for now we 
are trying to support business continuity on those edge nodes by not actually 
forming a cluster but using “walled garden” individual Flink server. I fully 
understand this is not ideal. And all of this started because some of the work 
we were doing with Logstash needed to be migrated out as Logstash wasn’t able 
to keep up with data rates unless we put some ridiculous number of servers. In 
essence, we have pre-approved constraints to connect to Kafka and southbound 
interfaces using Logstash, which we need to replace for some datasets as they 
are massive for Logstash to keep up with. 

Hope that explains a bit where our head is at.

Thanks, Ashish 

> On Aug 29, 2019, at 11:40 AM, Becket Qin  wrote:
> 
> Hi Ashish,
> 
> You are right. Flink does not use Kafka based group management. So if you 
> have two clusters consuming the same topic, they will not divide the 
> partitions. The cross cluster HA is not quite possible at this point. It 
> would be good to know the reason you want to have such HA and see if Flink 
> meets you requirement in another way.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On Thu, Aug 29, 2019 at 9:19 PM ashish pok  > wrote:
> Looks like Flink is using “assign” partitions instead of “subscribe” which 
> will not allow participating in a group if I read the code correctly. 
> 
> Has anyone solved this type of problem in past of active-active HA across 2 
> clusters using Kafka? 
> 
> 
> - Ashish
> On Wednesday, August 28, 2019, 6:52 PM, ashish pok  > wrote:
> 
> All,
> 
> I was wondering what the expected default behavior is when same app is 
> deployed in 2 separate clusters but with same group Id. In theory idea was to 
> create active-active across separate clusters but it seems like both apps are 
> getting all the data from Kafka. 
> 
> Anyone else has tried something similar or have an insight on expected 
> behavior? I was expecting to see partial data on both apps and to get all 
> data in one app if other was turned off.
> 
> Thanks in advance,
> 
> - Ashish



Capacity Planning For Large State in YARN Cluster

2017-10-26 Thread Ashish Pokharel
Hi Everyone,

We have hit a roadblock moving an app at Production scale and was hoping to get 
some guidance. Application is pretty common use case in stream processing but 
does require maintaining large number of keyed states. We are processing 2 
streams - one of which is a daily burst of stream (normally around 50 mil but 
could go upto 100 mil in one hour burst) and other is constant stream of around 
70-80 mil per hour. We are doing a low level join using CoProcess function 
between the two keyed streams. CoProcess function needs to refresh (upsert) 
state from the daily burst stream and decorate constantly streaming data with 
values from state built using bursty stream. All of the logic is working pretty 
well in a standalone Dev environment. We are throwing about 500k events of 
bursty traffic for state and about 2-3 mil of data stream. We have 1 TM with 
16GB memory, 1 JM with 8 GB memory and 16 slots (1 per core on the server) on 
the server. We have been taking savepoints in case we need to restart app for 
with code changes etc. App does seem to recover from state very well as well. 
Based on the savepoints, total volume of state in production flow should be 
around 25-30GB. 

At this point, however, we are trying deploy the app at production scale. App 
also has a flag that can be set at startup time to ignore data stream so we can 
simply initialize state. So basically we are trying to see if we can initialize 
the state first and take a savepoint as test. At this point we are using 10 TM 
with 4 slots and 8GB memory each (idea was to allocate around 3 times estimated 
state size to start with) but TMs keep getting killed by YARN with a GC 
Overhead Limit Exceeded error. We have gone through quite a few blogs/docs on 
Flink Management Memory, off-heap vs heap memory, Disk Spill over, State 
Backend etc. We did try to tweak managed-memory configs in multiple ways 
(off/on heap, fraction, network buffers etc) but can’t seem to figure out good 
way to fine tune the app to avoid issues. Ideally, we would hold state in 
memory (we do have enough capacity in Production environment for it) for 
performance reasons and spill over to disk (which I believe Flink should 
provide out of the box?). It feels like 3x anticipated state volume in cluster 
memory should have been enough to just initialize state. So instead of just 
continuing to increase memory (which may or may not help as error is regarding 
GC overhead) we wanted to get some input from experts on best practices and 
approach to plan this application better. 

Appreciate your input in advance!

Re: Capacity Planning For Large State in YARN Cluster

2017-10-29 Thread Ashish Pokharel
Hi Till,

I got the same feedback from Robert Metzger over in Stackflow. I have switched 
my app to use RocksDB and as yes, it did stabilize the app :) 

However, I am still struggling with how to map out my TMs and JMs memory, 
number of slots per TMs etc. Currently I am using 60 slots with 10 TMs and 60 
GB of total cluster memory. Idea was to make the states distributed and approx. 
1 GB of memory per slot. I have also changed containerized.heap-cutoff-ratio 
config to 0.3 to allow for a little room for RocksDB (RocksDB is using basic 
spinning disk optimized pre-defined configs but we do have SSDs on our Prod 
machines that we can leverage in future too) and set 
taskmanager.memory.off-heap to true.It feels more experimental at this point 
than an exact science :) If there are any further guidelines on how we can plan 
for this as we open up the flood gates to stream heavy continuous streams, that 
will be great.

Thanks again,

Ashish

> On Oct 27, 2017, at 8:45 AM, Till Rohrmann  wrote:
> 
> Hi Ashish,
> 
> what you are describing should be a good use case for Flink and it should be 
> able to run your program.
> 
> When you are seeing a GC overhead limit exceeded error, then it means that 
> Flink or your program are creating too many/too large objects filling up the 
> memory in a short time. I would recommend checking your user program to see 
> whether you can avoid unnecessary object instantiations and whether it is 
> possible to reuse created objects.
> 
> Concerning Flink's state backends, the memory state backend is currently not 
> able to spill to disk. Also the managed memory is only relevant for 
> DataSet/batch programs and not streaming programs. Therefore, I would 
> recommend you to try out the RocksDB state backend which is able to 
> gracefully spill to disk if the state size should grow too large. 
> Consequently, you don't have to adjust the managed memory settings because 
> they currently don't have an effect on streaming programs. 
> 
> My gut feeling is that switching to the RocksDBStateBackend could already 
> solve your problems. If this should not be the case, then please let me know 
> again.
> 
> Cheers,
> Till
> 
> On Fri, Oct 27, 2017 at 5:27 AM, Ashish Pokharel  <mailto:ashish...@yahoo.com>> wrote:
> Hi Everyone,
> 
> We have hit a roadblock moving an app at Production scale and was hoping to 
> get some guidance. Application is pretty common use case in stream processing 
> but does require maintaining large number of keyed states. We are processing 
> 2 streams - one of which is a daily burst of stream (normally around 50 mil 
> but could go upto 100 mil in one hour burst) and other is constant stream of 
> around 70-80 mil per hour. We are doing a low level join using CoProcess 
> function between the two keyed streams. CoProcess function needs to refresh 
> (upsert) state from the daily burst stream and decorate constantly streaming 
> data with values from state built using bursty stream. All of the logic is 
> working pretty well in a standalone Dev environment. We are throwing about 
> 500k events of bursty traffic for state and about 2-3 mil of data stream. We 
> have 1 TM with 16GB memory, 1 JM with 8 GB memory and 16 slots (1 per core on 
> the server) on the server. We have been taking savepoints in case we need to 
> restart app for with code changes etc. App does seem to recover from state 
> very well as well. Based on the savepoints, total volume of state in 
> production flow should be around 25-30GB.
> 
> At this point, however, we are trying deploy the app at production scale. App 
> also has a flag that can be set at startup time to ignore data stream so we 
> can simply initialize state. So basically we are trying to see if we can 
> initialize the state first and take a savepoint as test. At this point we are 
> using 10 TM with 4 slots and 8GB memory each (idea was to allocate around 3 
> times estimated state size to start with) but TMs keep getting killed by YARN 
> with a GC Overhead Limit Exceeded error. We have gone through quite a few 
> blogs/docs on Flink Management Memory, off-heap vs heap memory, Disk Spill 
> over, State Backend etc. We did try to tweak managed-memory configs in 
> multiple ways (off/on heap, fraction, network buffers etc) but can’t seem to 
> figure out good way to fine tune the app to avoid issues. Ideally, we would 
> hold state in memory (we do have enough capacity in Production environment 
> for it) for performance reasons and spill over to disk (which I believe Flink 
> should provide out of the box?). It feels like 3x anticipated state volume in 
> cluster memory should have been enough to just initialize state. So instead 
> of just continuing to increase memory (which may or may not help as error is 
> regarding GC overhead) we wanted to get some input from experts on best 
> practices and approach to plan this application better.
> 
> Appreciate your input in advance!
> 



Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-05 Thread Ashish Pokharel
All,

I am starting to notice a strange behavior in a particular streaming app. I 
initially thought it was a Producer issue as I was seeing timeout exceptions 
(records expiring in queue. I did try to modify request.timeout.ms, linger.ms 
etc to help with the issue if it were caused by a sudden burst of data or 
something along those lines. However, what it caused the app to increase back 
pressure and made the slower and slower until that timeout is reached. With 
lower timeouts, app would actually raise exception and recover faster. I can 
tell it is not related to connectivity as other apps are running just fine 
around the same time frame connected to same brokers (we have at least 10 
streaming apps connected to same list of brokers) from the same data nodes. We 
have enabled Graphite Reporter in all of our applications. After deep diving 
into some of consumer and producer stats, I noticed that consumer fetch-rate 
drops tremendously while fetch-size grows exponentially BEFORE the producer 
actually start to show higher response-time and lower rates. Eventually, I 
noticed connection resets start to occur and connection counts go up 
momentarily. After which, things get back to normal. Data producer rates remain 
constant around that timeframe - we have Logstash producer sending data over. 
We checked both Logstash and Kafka metrics and they seem to be showing same 
pattern (sort of sin wave) throughout.

It seems to point to Kafka issue (perhaps some tuning between Flink App and 
Kafka) but wanted to check with the experts before I start knocking down Kafka 
Admin’s doors. Are there anything else I can look into. There are quite a few 
default stats in Graphite but those were the ones that made most sense. 

Thanks, Ashish

Re: Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-08 Thread Ashish Pokharel
  org.apache.kafka.clients.producer.ProducerConfig  - 
ProducerConfig values:
acks = 1
batch.size = 4096
block.on.buffer.full = false
bootstrap.servers =[xxx:xxx,xx:xxx]
buffer.memory = 33554432
client.id = 
compression.type = none
connections.max.idle.ms = 54
interceptor.classes = null
key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 500
max.block.ms = 6
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 25000
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 3
retries = 5
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 3
value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

Thanks, Ashish

> On Nov 8, 2017, at 5:09 AM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Ashish,
> 
> From your description I do not yet have much of an idea of what may be 
> happening.
> However, some of your observations seems reasonable. I’ll go through them one 
> by one:
> 
>> I did try to modify request.timeout.ms, linger.ms etc to help with the issue 
>> if it were caused by a sudden burst of data or something along those lines. 
>> However, what it caused the app to increase back pressure and made the 
>> slower and slower until that timeout is reached.
> 
> If the client is experiencing trouble in writing outstanding records to 
> Kafka, and the timeout is increased, then I think increased back pressure is 
> indeed the expected behavior.
> 
>> I noticed that consumer fetch-rate drops tremendously while fetch-size grows 
>> exponentially BEFORE the producer actually start to show higher 
>> response-time and lower rates.
> 
> 
> Drops on fetch-rate and growth on fetch-size in the Flink Kafka consumer 
> should be a natural consequence of backpressure in the job.
> The fetch loop in the consumer will be blocked temporarily when backpressure 
> is propagated from downstream operators, resulting in longer fetch intervals 
> and larger batches on each fetch (given that events rate are still constant).
> Therefore, I think the root cause is still along the lines of the producer 
> side.
> 
> Would you happen to have any logs that maybe shows any useful information on 
> the producer side?
> I think we might have a better chance of finding out what is going on by 
> digging there.
> Also, which Flink version & Kafka version are you using?
> 
> Cheers,
> Gordon
> On 5 November 2017 at 11:24:49 PM, Ashish Pokharel (ashish...@yahoo.com 
> <mailto:ashish...@yahoo.com>) wrote:
> 
>> All, 
>> 
>> I am starting to notice a strange behavior in a particular streaming app. I 
>> initially thought it was a Producer issue as I was seeing timeout exceptions 
>> (records expiring in queue. I did try to modify request.timeout.ms, 
>> linger.ms etc to help with the issue if it were caused by a sudden burst of 
>> data or something along those lines. However, what it caused the app to 
>> increase back pressure and made the slower and s

Re: Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-11 Thread Ashish Pokharel
Hi Gordon,

Any further thoughts on this?

I forgot to mention I am using Flink 1.3.2 and our Kafka is 0.10. We are in the 
process of upgrading Kafka but won’t be in Prod for at least couple of months.

Thanks, Ashish

> On Nov 8, 2017, at 9:35 PM, Ashish Pokharel  wrote:
> 
> Hi Grodon,
> 
> Thanks for your responses. It definitely makes sense. 
> 
> I could pull this stack from the logs, entire log itself is pretty big - let 
> me know if some samples before/after this may help.
>  
> TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>  Could not forward element to next operator}
> at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at 
> org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:552)
> at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:253)
> at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:217)
> ... 7 more
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> ... 18 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: Expiring 7 
> record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
&

Metric Registry Warnings

2017-11-11 Thread Ashish Pokharel
All,

Hopefully this is a quick one. I enabled Graphite reporter in my App and I 
started to see the following warning messages all over the place:

2017-11-07 20:54:09,288 WARN  org.apache.flink.runtime.metrics.MetricRegistry   
- Error while registering metric.
java.lang.IllegalArgumentException: A metric named 
flink.taskmanager.pndapoc-cdh-dn-14.8823e32fae717d08e211fceec56479b7.normalizeData.parseRawStats
 -> Filter.numBytesOut already exists

I saw some threads about this regarding JMX as well but I don’t think I see a 
resolution for it. 

One thing I made sure was I haven’t reused name (like parseRawStats) in my App. 
Also, this seems to happen for every metric, not only for a select few where I 
could have mistakenly set the same name.

Appreciate any help on this.

Thanks, Ashish

Re: kafka consumer client seems not auto commit offset

2017-11-15 Thread Ashish Pokharel
Gordon, Tony,

Thought I would chime in real quick as I have tested this a few different ways 
in the last month (not sure if this will be helpful but thought I’d throw it 
out there). I actually haven’t noticed issue auto committing with any of those 
configs using Kafka property auto.offset.reset instead of using those methods. 
However, I have come across one interesting scenario - even when Checkpointing 
is disabled BUT if App is started from a Savepoint, auto commit doesn’t seem to 
work. I am not sure if Tony has the same scenario. I assumed that starting from 
Savepoint sort of expects Checkpointing to be enabled to commit offsets similar 
to how it behaves when Checkpointing is enabled. At this point, I am generating 
a  random UID for my Kafka consumer (as I really don’t want to enable 
checkpointing — not really needed in my use case and want to save on some 
resources) but I do have some really slow moving states which I’d like save on 
app shutdown etc.

Thanks, Ashish

> On Nov 15, 2017, at 4:22 AM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Tony,
> 
> Thanks for the report. At first glance of the description, what you described 
> doesn’t seem to match the expected behavior.
> I’ll spend some time later today to check this out.
> 
> Cheers,
> Gordon
> 
> 
> On 15 November 2017 at 5:08:34 PM, Tony Wei (tony19920...@gmail.com 
> ) wrote:
> 
>> Hi Gordon,
>> 
>> When I used FlinkKafkaConsumer010 to consume log from Kafka, I found that if 
>> I used `setStartFromLatest()` the kafka consumer api didn't auto commit 
>> offsets back to consumer group, but if I used `setStartFromGroupOffsets()` 
>> it worked fine.
>> 
>> I am sure that the configuration for Kafka has `auto.commit.interval.ms 
>>  = 5000` and `enable.auto.commit = true` 
>> and I didn't enable checkpointing.
>> 
>> All the difference is only the change from `setStartFromGroupOffsets()` to 
>> `setStartFromLatest()`, but the auto commit mechanism just stopped working.
>> 
>> My Flink cluster version is 1.3.2.
>> My Kafka cluster version is 0.10.2.1.
>> My Zookeeper version for Kafka is 3.4.6-1569965, built on 02/20/2014 09:09 
>> GMT.
>> My Kafka connector library is "org.apache.flink" % 
>> "flink-connector-kafka-0.10_2.10" % "1.3.2"
>> 
>> Thanks for your help in advance.
>> 
>> Best Regards,
>> Tony Wei



Re: Task Manager detached under load

2018-01-24 Thread Ashish Pokharel
I haven’t gotten much further with this. It doesn’t look like GC related - at 
least GC counters were not that atrocious. However, my main concern was once 
the load subsides why aren’t TM and JM connecting again? That doesn’t look 
normal. I could definitely tell JM was listening on the port and from logs it 
does appear TM is trying to message JM that is still alive. 

Thanks, Ashish

> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard  
> wrote:
> 
> Hi. 
> 
> Did you find a reason for the detaching ?
> I sometimes see the same on our system running Flink 1.4 on dc/os. I have 
> enabled taskmanager.Debug.memory.startlogthread for debugging. 
> 
> Med venlig hilsen / Best regards
> Lasse Nedergaard
> 
> 
> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong  >:
> 
>> Hi,
>> 
>> You should enable and check your garbage collection log.
>> 
>> We've encountered case where Task Manager disassociated due to long GC pause.
>> 
>> Regards,
>> 
>> Kien
>> On 1/20/2018 1:27 AM, ashish pok wrote:
>>> Hi All,
>>> 
>>> We have hit some load related issues and was wondering if any one has some 
>>> suggestions. We are noticing task managers and job managers being detached 
>>> from each other under load and never really sync up again. As a result, 
>>> Flink session shows 0 slots available for processing. Even though, apps are 
>>> configured to restart it isn't really helping as there are no slots 
>>> available to run the apps.
>>> 
>>> 
>>> Here are excerpt from logs that seemed relevant. (I am trimming out rest of 
>>> the logs for brevity)
>>> 
>>> Job Manager:
>>> 2018-01-19 12:38:00,423 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager-  Starting 
>>> JobManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>> 
>>> 2018-01-19 12:38:00,792 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager-  Maximum 
>>> heap size: 16384 MiBytes
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager-  Hadoop 
>>> version: 2.6.5
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager-  JVM 
>>> Options:
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager- 
>>> -Xms16384m
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager- 
>>> -Xmx16384m
>>> 2018-01-19 12:38:00,795 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager- 
>>> -XX:+UseG1GC
>>> 
>>> 2018-01-19 12:38:00,908 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration- Loading 
>>> configuration property: jobmanager.rpc.port, 6123
>>> 2018-01-19 12:38:00,908 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration- Loading 
>>> configuration property: jobmanager.heap.mb, 16384
>>> 
>>> 
>>> 2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher 
>>> - Detected unreachable: [akka.tcp://flink@:37840]
>>> 2018-01-19 12:53:34,676 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager- Task 
>>> manager akka.tcp://flink@:37840/user/taskmanager terminated.
>>> 
>>> -- So once Flink session boots up, we are hitting it with pretty heavy 
>>> load, which typically results in the WARN above
>>> 
>>> Task Manager:
>>> 2018-01-19 12:38:01,002 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
>>> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
>>> version: 2.6.5
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM 
>>> Options:
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  - 
>>> -Xms16384M
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  - 
>>> -Xmx16384M
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  - 
>>> -XX:MaxDirectMemorySize=8388607T
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  - 
>>> -XX:+UseG1GC
>>> 
>>> 2018-01-19 12:38:01,392 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration- Loading 
>>> configuration property: jobmanager.rpc.port, 6123
>>> 2018-01-19 12:38:01,392 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration- Loading 
>>> configuration property: jobmanager.heap.mb, 16384
>>> 
>>> 
>>> 2018-01-19 12:54:48,626 WARN  akka.remote.RemoteWatcher 
>>> - Detected unreachable: [akka.tcp://flink@:6123]
>>> 2018-01-19 12:54:48,690 INFO  akka.remote.Remoting  
>>> - Quarantined address [akka.tcp://flink@:6123

Re: Kafka Producer timeout causing data loss

2018-01-24 Thread Ashish Pokharel
Fabian,

Thanks for your feedback - very helpful as usual !

This is sort of becoming a huge problem for us right now because of our Kafka 
situation. For some reason I missed this detail going through the docs. We are 
definitely seeing heavy dose of data loss when Kafka timeouts are happening. 

We actually have 1.4 version - I’d be interested to understand if anything can 
be done in 1.4 to prevent this scenario.

One other thought I had was an ability to invoke “Checkpointing before Restart 
/ Recovery” -> meaning I don’t necessarily need to checkpoint periodically but 
I do want to make sure on a explicit restart / rescheduling like this, we do 
have a decent “last known” state. Not sure if this is currently doable.

Thanks, Ashish

> On Jan 23, 2018, at 5:03 AM, Fabian Hueske  wrote:
> 
> Hi Ashish,
> 
> Originally, Flink always performed full recovery in case of a failure, i.e., 
> it restarted the complete application.
> There is some ongoing work to improve this and make recovery more 
> fine-grained (FLIP-1 [1]). 
> Some parts have been added for 1.3.0.
> 
> I'm not familiar with the details, but Stefan (in CC) should be able to 
> answer your specific question.
> 
> Best, Fabian
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
>  
> 
> 
> 2018-01-19 20:59 GMT+01:00 ashish pok  >:
> Team,
> 
> One more question to the community regarding hardening Flink Apps.
> 
> Let me start off by saying we do have known Kafka bottlenecks which we are in 
> the midst of resolving. So during certain times of day, a lot of our Flink 
> Apps are seeing Kafka Producer timeout issues. Most of the logs are some 
> flavor of this:
> 
> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) for 
> dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>   at org.apache.flink.streaming.runtime.io 
> .StreamInputProcessor.processInput(StreamInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28 
> record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation 
> plus linger time
> 
> Timeouts are not necessarily good but I am sure we understand this is bound 
> to happen (hopefully lesser). 
> 
> The issue for us however is it almost looks like Flink is stopping and 
> restarting all operators (a lot of other operators including Map, Reduce and 
> Process functions if not all) along with Kafka Producers. We are processing 
> pretty substantial load in Flink and dont really intend to enable Rocks/HDFS 
> checkpointing in some of these Apps - we are ok to sustain so

Re: Understanding Restart Strategy

2018-01-24 Thread Ashish Pokharel
FYI,

I think I have gotten to the bottom this situation. For anyone who might be in 
situation hopefully my observations will help.

In my case, it had nothing to do with Flink Restart Strategy, it was doing it’s 
thing as expected. Issue really was, Kafka Producer timeout counters. As I 
mentioned in other thread, we have a capacity issue with our Kafka cluster that 
ends up causing some timeout in our Flink Applications (we do have throttle in 
place in Kafka to manage it better but still we run into timeout pretty often 
right unfortunately). 

We had set our Kafka Producer retries to 10. It seems like that retry counter 
never gets reset. So over life of an App if it hits 10 timeouts, it basically 
couldn’t start and went to a Failed state. I am yet to dig into whether this 
can be solved from Flink Kafka wrapper or not. But, for now we have set the 
retries to 0 and hopefully this situation will not happen.

If anyone has any similar observations pl feel free to share.

Thanks, Ashish

> On Jan 19, 2018, at 2:43 PM, ashish pok  wrote:
> 
> Team,
> 
> Hopefully, this is a quick one. 
> 
> We have setup restart strategy as follows in pretty much all of our apps:
> 
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 
> Time.of(30, TimeUnit.SECONDS)));
> 
> This seems pretty straight-forward. App should retry starting 10 times every 
> 30 seconds - so about 5 minutes. Either we are not understanding this or it 
> seems inconsistent. Some of the applications restart and come back fine on 
> issues like Kafka timeout (which I will come back to later) but in some cases 
> same issues pretty much shuts the app down. 
> 
> My first guess here was that total count of 10 is not reset after App 
> recovered normally. Is there a need to manually reset the counter in an App? 
> I doubt Flink would be treating it like a counter that spans the life of an 
> App instead of resetting on successful start-up - but not sure how else to 
> explain the behavior.
> 
> Along the same line, what actually constitutes as a "restart"? Our Kafka 
> cluster has known performance bottlenecks during certain times of day that we 
> are working to resolve. I do notice Kafka producer timeouts quite a few times 
> during these times. When App hits these timeouts, it does recover fine but I 
> dont necessary see entire application restarting as I dont see bootstrap logs 
> of my App. Does something like this count as a restart of App from Restart 
> Strategy perspective as well vs things like apps crashes/Yarn killing 
> application etc. where App is actually restarted from scratch?
> 
> We are really liking Flink, just need to hash out these operational issues to 
> make it prime time for all streaming apps we have in our cluster.
> 
> Thanks,
> 
> Ashish



Re: Task Manager detached under load

2018-02-05 Thread Ashish Pokharel
Hi Till,

Thanks for detailed response. I will try to gather some of this information 
during the week and follow up.

— Ashish

> On Feb 5, 2018, at 5:55 AM, Till Rohrmann  wrote:
> 
> Hi,
> 
> this sounds like a serious regression wrt Flink 1.3.2 and we should 
> definitely find out what's causing this problem. Given from what I see in the 
> logs, the following happens:
> 
> For some time the JobManager seems to no longer receive heartbeats from the 
> TaskManager. This could be, for example, due to long GC pauses or heavy load 
> which starves the ActorSystem's threads which are responsible for sending the 
> heartbeats. Due to this, the TM's ActorSystem is quarantined which 
> effectively renders them useless because the JM will henceforth ignore all 
> messages from these systems. The only way to resolve this problem is to 
> restart the ActorSystem. By setting taskmanager.exit-on-fatal-akka-error to 
> true in flink-conf.yaml, a quarantined TM will shut down. If you run the 
> Flink cluster on Yarn, then a new substitute TM will be started if you have 
> still some container restarts left. That way, the system should be able to 
> recover.
> 
> Additionally you could try to play around with akka.watch.heartbeat.interval 
> and akka.watch.heartbeat.pause which control the heartbeat interval and the 
> acceptable pause. By increasing the latter, the system should tolerate longer 
> GC pauses and period of high load.
> 
> However, this only addresses the symptoms of the problem and I'd like to find 
> out what's causing the problem. In order to further debug the problem, it 
> would be really helpful to obtain the logs of the JobManager and the 
> TaskManagers on DEBUG log level and with 
> taskmanager.debug.memory.startLogThread set to true. Additionally it would be 
> interesting to see whats happening on the TaskManagers when you observe high 
> load. So obtaining a profiler dump via VisualVM would be great. And last but 
> not least, it also helps to learn more about the job you're running. What 
> kind of connectors is it using? Are you using Flink's metric system? How is 
> the Flink cluster deployed? Which other libraries are you using in your job?
> 
> Thanks a lot for your help!
> 
> Cheers,
> Till
> 
> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick  <mailto:cre...@gmail.com>> wrote:
> I've seen a similar issue while running successive Flink SQL batches on 1.4. 
> In my case, the Job Manager would fail with the log output about 
> unreachability (with an additional statement about something going "horribly 
> wrong"). Under workload pressure, I reverted to 1.3.2 where everything works 
> perfectly, but we will try again soon on 1.4. When we do I will post the 
> actual log output.
> 
> This was on YARN in AWS, with akka.ask.timeout = 60s.
> 
> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel  <mailto:ashish...@yahoo.com>> wrote:
> I haven’t gotten much further with this. It doesn’t look like GC related - at 
> least GC counters were not that atrocious. However, my main concern was once 
> the load subsides why aren’t TM and JM connecting again? That doesn’t look 
> normal. I could definitely tell JM was listening on the port and from logs it 
> does appear TM is trying to message JM that is still alive. 
> 
> Thanks, Ashish
> 
>> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard > <mailto:lassenederga...@gmail.com>> wrote:
>> 
>> Hi. 
>> 
>> Did you find a reason for the detaching ?
>> I sometimes see the same on our system running Flink 1.4 on dc/os. I have 
>> enabled taskmanager.Debug.memory.startlogthread for debugging. 
>> 
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>> 
>> 
>> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong > <mailto:duckientru...@gmail.com>>:
>> 
>>> Hi,
>>> 
>>> You should enable and check your garbage collection log.
>>> 
>>> We've encountered case where Task Manager disassociated due to long GC 
>>> pause.
>>> 
>>> Regards,
>>> 
>>> Kien
>>> On 1/20/2018 1:27 AM, ashish pok wrote:
>>>> Hi All,
>>>> 
>>>> We have hit some load related issues and was wondering if any one has some 
>>>> suggestions. We are noticing task managers and job managers being detached 
>>>> from each other under load and never really sync up again. As a result, 
>>>> Flink session shows 0 slots available for processing. Even though, apps 
>>>> are configured to restart it isn't really helping as there are no slots 
>&g

Re: Restart hook and checkpoint

2018-03-06 Thread Ashish Pokharel
Hi Gordon,

The issue really is we are trying to avoid checkpointing as datasets are really 
heavy and all of the states are really transient in a few of our apps (flushed 
within few seconds). So high volume/velocity and transient nature of state make 
those app good candidates to just not have checkpoints. 

We do have offsets committed to Kafka AND we have “some” tolerance for gap / 
duplicate. However, we do want to handle “graceful” restarts / shutdown. For 
shutdown, we have been taking savepoints (which works great) but for restart, 
we just can’t find a way. 

Bottom line - we are trading off resiliency for resource utilization and 
performance but would like to harden apps for production deployments as much as 
we can.

Hope that makes sense.

Thanks, Ashish

> On Mar 6, 2018, at 10:19 PM, Tzu-Li Tai  wrote:
> 
> Hi Ashish,
> 
> Could you elaborate a bit more on why you think the restart of all operators
> lead to data loss?
> 
> When restart occurs, Flink will restart the job from the latest complete
> checkpoint.
> All operator states will be reloaded with state written in that checkpoint,
> and the position of the input stream will also be re-winded.
> 
> I don't think there is a way to force a checkpoint before restarting occurs,
> but as I mentioned, that should not be required, because the last complete
> checkpoint will be used.
> Am I missing something in your particular setup?
> 
> Cheers,
> Gordon
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Restart hook and checkpoint

2018-03-18 Thread Ashish Pokharel
Thanks Fabian!

Yes, that is exactly what we are looking to achieve. I looked at fine grained 
recovery FLIP but not sure if that will do the trick. Like Fabian mentioned, we 
haven’t been enabling checkpointing (reasons below). I do understand it might 
not always be possible to actually take a checkpoint of an operator that is 
failing but as long as whole job graph is not restarted and only that failing 
operator is restarted EVEN IF checkpointing is not enabled I feel like that 
will do the trick. It is “acceptable” to lose state on that failing operator. 
Further, if a lifecycle hook is provided in operators say restart (similar to 
open / close), perhaps app developers can make an attempt to checkpoint state 
(if a mechanism is provided to programmatically do so) before restarting. Just 
some thoughts there… 

Back to our scenario - A lot of those high volume datasets we are processing 
generally require few events to be grouped by key but those events arrive 
within few seconds (if not milliseconds). However, there are low percentages of 
events which arrive late or endpoints just can’t send all the group events fast 
enough and hence are in operator memory until all the events in group arrive or 
a configured timeout is reached. We are talking about 100s of thousands of 
endpoints (we will soon be millions actually) streaming data at high volume 
here. Hence, currently we are not even enabling checkpointing and are relying 
on Kafka auto commits for the most part if apps need to be restarted (we were 
hoping to avoid perf issues and resource constraints - also because of 
transient nature of the datasets, benefits of not checkpointing seemed higher). 
However, a single operator failure causing entire job graph to restart is 
causing data loss. I think it is necessary to point out that we have slight 
leeway here in the sense that it is “okay” to have a little data loss (eg: data 
loss in operator that is actually failing) or some duplicates (say 1 of the 
Kafka consumers crashed). However, what we are running into is, one operator 
failing is causing data loss in 100s of operators that are running in parallel. 
We would really like to avoid that data loss. 

Thanks, Ashish

> On Mar 15, 2018, at 3:41 AM, Fabian Hueske  wrote:
> 
> If I understand fine-grained recovery correctly, one would still need to take 
> checkpoints.
> 
> Ashish would like to avoid checkpointing and accept to lose the state of the 
> failed task. 
> However, he would like to avoid losing more state than necessary due to 
> restarting of tasks that did not fail.
> 
> Best, Fabian
> 
> 2018-03-15 1:45 GMT+01:00 Aljoscha Krettek  <mailto:aljos...@apache.org>>:
> Hi,
> 
> Have you looked into fine-grained recovery? 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+:+Fine+Grained+Recovery+from+Task+Failures>
> 
> Stefan cc'ed might be able to give you some pointers about configuration.
> 
> Best,
> Aljoscha
> 
> 
>> On 6. Mar 2018, at 22:35, Ashish Pokharel > <mailto:ashish...@yahoo.com>> wrote:
>> 
>> Hi Gordon,
>> 
>> The issue really is we are trying to avoid checkpointing as datasets are 
>> really heavy and all of the states are really transient in a few of our apps 
>> (flushed within few seconds). So high volume/velocity and transient nature 
>> of state make those app good candidates to just not have checkpoints. 
>> 
>> We do have offsets committed to Kafka AND we have “some” tolerance for gap / 
>> duplicate. However, we do want to handle “graceful” restarts / shutdown. For 
>> shutdown, we have been taking savepoints (which works great) but for 
>> restart, we just can’t find a way. 
>> 
>> Bottom line - we are trading off resiliency for resource utilization and 
>> performance but would like to harden apps for production deployments as much 
>> as we can.
>> 
>> Hope that makes sense.
>> 
>> Thanks, Ashish
>> 
>>> On Mar 6, 2018, at 10:19 PM, Tzu-Li Tai >> <mailto:tzuli...@gmail.com>> wrote:
>>> 
>>> Hi Ashish,
>>> 
>>> Could you elaborate a bit more on why you think the restart of all operators
>>> lead to data loss?
>>> 
>>> When restart occurs, Flink will restart the job from the latest complete
>>> checkpoint.
>>> All operator states will be reloaded with state written in that checkpoint,
>>> and the position of the input stream will also be re-winded.
>>> 
>>> I don't think there is a way to force a checkpoint before restarting occurs,
>>> but as I mentioned, that should not be required, because the last complete
>>> checkpoint will be used.
>>> Am I missing something in your particular setup?
>>> 
>>> Cheers,
>>> Gordon
>>> 
>>> 
>>> 
>>> --
>>> Sent from: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> 
> 
> 



Re: Restart hook and checkpoint

2018-03-20 Thread Ashish Pokharel
I definitely like the idea of event based checkpointing :) 

Fabian, I do agree with your point that it is not possible to take a rescue 
checkpoint consistently. The basis here however is not around the operator that 
actually failed. It’s to avoid data loss across 100s (probably 1000s of 
parallel operators) which are being restarted and are “healthy”. We have 100k 
(nearing million soon) elements pushing data. Losing few seconds worth of data 
for few is not good but “acceptable” as long as damage can be controlled. Of 
course, we are going to use rocksdb + 2-phase commit with Kafka where we need 
exactly once guarantees. The proposal of “fine grain recovery 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
 
”
 seems like a good start at least from damage control perspective but even with 
that it feels like something like “event based approach” can be done for a 
sub-set of job graph that are “healthy”. 

Thanks, Ashish


> On Mar 20, 2018, at 9:53 AM, Fabian Hueske  wrote:
> 
> Well, that's not that easy to do, because checkpoints must be coordinated and 
> triggered the JobManager.
> Also, the checkpointing mechanism with flowing checkpoint barriers (to ensure 
> checkpoint consistency) won't work once a task failed because it cannot 
> continue processing and forward barriers. If the task failed with an OOME, 
> the whole JVM is gone anyway.
> I don't think it is possible to take something like a consistent rescue 
> checkpoint in case of a failure. 
> 
> I might be possible to checkpoint application state of non-failed tasks, but 
> this would result in data loss for the failed task and we would need to weigh 
> the use cases for such a feature are the implementation effort.
> Maybe there are better ways to address such use cases.
> 
> Best, Fabian
> 
> 2018-03-20 6:43 GMT+01:00 makeyang  >:
> currently there is only time based way to trigger a checkpoint. based on this
> discussion, I think flink need to introduce event based way to trigger
> checkpoint such as restart a task manager should be count as a event.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
> 



Re: Error running on Hadoop 2.7

2018-03-22 Thread Ashish Pokharel
Hi All,

Looks like we are out of the woods for now (so we think) - we went with Hadoop 
free version and relied on client libraries on edge node. 

However, I am still not very confident as I started digging into that stack as 
well and realized what Till pointed out (traces leads to a class that is part 
of 2.9). I did dig around env variables and nothing was set. This is a brand 
new clustered installed a week back and our team is literally the first hands 
on deck. I will fish around and see if Hortonworks back-ported something for 
HDP (dots are still not completely connected but nonetheless, we have a test 
session and app running in our brand new Prod)

Thanks, Ashish

> On Mar 22, 2018, at 4:47 AM, Till Rohrmann  wrote:
> 
> Hi Ashish,
> 
> the class `RequestHedgingRMFailoverProxyProvider` was only introduced with 
> Hadoop 2.9.0. My suspicion is thus that you start the client with some Hadoop 
> 2.9.0 dependencies on the class path. Could you please check the logs of the 
> client what's on its class path? Maybe you could also share the logs with us. 
> Please also check whether HADOOP_CLASSPATH is set to something suspicious.
> 
> Thanks a lot!
> 
> Cheers,
> Till
> 
> On Wed, Mar 21, 2018 at 6:25 PM, ashish pok  > wrote:
> Hi Piotrek,
> 
> At this point we are simply trying to start a YARN session. 
> 
> BTW, we are on Hortonworks HDP 2.6 which is on 2.7 Hadoop if anyone has 
> experienced similar issues. 
> 
> We actually pulled 2.6 binaries for the heck of it and ran into same issues. 
> 
> I guess we are left with getting non-hadoop binaries and set HADOOP_CLASSPATH 
> then?
> 
> -- Ashish
> 
> On Wed, Mar 21, 2018 at 12:03 PM, Piotr Nowojski
> mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> > Does some simple word count example works on the cluster after the upgrade?
> 
> If not, maybe your job is pulling some dependency that’s causing this version 
> conflict?
> 
> Piotrek
> 
>> On 21 Mar 2018, at 16:52, ashish pok > > wrote:
>> 
>> Hi Piotrek,
>> 
>> Yes, this is a brand new Prod environment. 2.6 was in our lab.
>> 
>> Thanks,
>> 
>> -- Ashish
>> 
>> On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski
>> mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> Have you replaced all of your old Flink binaries with freshly downloaded 
>>  Hadoop 2.7 versions? Are you sure 
>> that something hasn't mix in the process?
>> 
>> Does some simple word count example works on the cluster after the upgrade?
>> 
>> Piotrek
>> 
>>> On 21 Mar 2018, at 16:11, ashish pok >> > wrote:
>>> 
>>> Hi All,
>>> 
>>> We ran into a roadblock in our new Hadoop environment, migrating from 2.6 
>>> to 2.7. It was supposed to be an easy lift to get a YARN session but doesnt 
>>> seem like :) We definitely are using 2.7 binaries but it looks like there 
>>> is a call here to a private methos which screams runtime incompatibility. 
>>> 
>>> Anyone has seen this and have pointers?
>>> 
>>> Thanks, Ashish
>>> Exception in thread "main" java.lang.IllegalAccessError: tried to access 
>>> method 
>>> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
>>>  from class 
>>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider <>
>>> at 
>>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
>>> at 
>>> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
>>> at 
>>> org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
>>> at 
>>> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
>>> at 
>>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
>>> at 
>>> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
>>> at 
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
>>> at 
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
>>> at 
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
>>> at 
>>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
>>> at 
>>> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
>>> at 
>>> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> at 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>  

Re: Restart hook and checkpoint

2018-03-22 Thread Ashish Pokharel
Fabian, that sounds good. Should I recap some bullets in an email and start a 
new thread then?

Thanks, Ashish

> On Mar 22, 2018, at 5:16 AM, Fabian Hueske  wrote:
> 
> Hi Ashish,
> 
> Agreed! 
> I think the right approach would be to gather the requirements and start a 
> discussion on the dev mailing list. 
> Contributors and committers who are more familiar with the checkpointing and 
> recovery internals should discuss a solution that can be integrated and 
> doesn't break with the current mechanism.
> For instance (not sure whether this is feasible or solves the problem) one 
> could only do local checkpoints and not write to the distributed persistent 
> storage. That would bring down checkpointing costs and the recovery life 
> cycle would not need to be radically changed.
> 
> Best, Fabian
> 
> 2018-03-20 22:56 GMT+01:00 Ashish Pokharel  <mailto:ashish...@yahoo.com>>:
> I definitely like the idea of event based checkpointing :) 
> 
> Fabian, I do agree with your point that it is not possible to take a rescue 
> checkpoint consistently. The basis here however is not around the operator 
> that actually failed. It’s to avoid data loss across 100s (probably 1000s of 
> parallel operators) which are being restarted and are “healthy”. We have 100k 
> (nearing million soon) elements pushing data. Losing few seconds worth of 
> data for few is not good but “acceptable” as long as damage can be 
> controlled. Of course, we are going to use rocksdb + 2-phase commit with 
> Kafka where we need exactly once guarantees. The proposal of “fine grain 
> recovery 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+:+Fine+Grained+Recovery+from+Task+Failures>”
>  seems like a good start at least from damage control perspective but even 
> with that it feels like something like “event based approach” can be done for 
> a sub-set of job graph that are “healthy”. 
> 
> Thanks, Ashish
> 
> 
>> On Mar 20, 2018, at 9:53 AM, Fabian Hueske > <mailto:fhue...@gmail.com>> wrote:
>> 
>> Well, that's not that easy to do, because checkpoints must be coordinated 
>> and triggered the JobManager.
>> Also, the checkpointing mechanism with flowing checkpoint barriers (to 
>> ensure checkpoint consistency) won't work once a task failed because it 
>> cannot continue processing and forward barriers. If the task failed with an 
>> OOME, the whole JVM is gone anyway.
>> I don't think it is possible to take something like a consistent rescue 
>> checkpoint in case of a failure. 
>> 
>> I might be possible to checkpoint application state of non-failed tasks, but 
>> this would result in data loss for the failed task and we would need to 
>> weigh the use cases for such a feature are the implementation effort.
>> Maybe there are better ways to address such use cases.
>> 
>> Best, Fabian
>> 
>> 2018-03-20 6:43 GMT+01:00 makeyang > <mailto:riverbuild...@hotmail.com>>:
>> currently there is only time based way to trigger a checkpoint. based on this
>> discussion, I think flink need to introduce event based way to trigger
>> checkpoint such as restart a task manager should be count as a event.
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> 
> 
> 



Re: Error running on Hadoop 2.7

2018-03-25 Thread Ashish Pokharel
Hi Ken,

Yes - we are on 1.4. Thanks for that link - it certainly now explains how 
things are working :) 

We currently don’t have HADOOP_CLASSPATH env var setup and “hadoop class path” 
command basically points to HDP2.6 locations (HDP = Hortonworks Data Platform). 
Best guess I have for this right now is HDP2.6 back ported some 2.9 changes 
into their distro. This is on my list to get to the bottom of (hopefully no 
hiccups till prod) - we double checked our Salt Orchestration packages which 
were used to built the cluster but couldn’t find a reference to hadoop 2.9. For 
now, we are moving on with our testing to prepare for deployment with hadoop 
free version which is using hadoop classpath as described in FLINK-7477.  

Thanks, Ashish

> On Mar 23, 2018, at 12:31 AM, Ken Krugler  wrote:
> 
> Hi Ashish,
> 
> Are you using Flink 1.4? If so, what does the “hadoop classpath” command 
> return from the command line where you’re trying to start the job?
> 
> Asking because I’d run into issues with 
> https://issues.apache.org/jira/browse/FLINK-7477 
> <https://issues.apache.org/jira/browse/FLINK-7477>, where I had a old version 
> of Hadoop being referenced by the “hadoop" command.
> 
> — Ken
> 
> 
>> On Mar 22, 2018, at 7:05 PM, Ashish Pokharel > <mailto:ashish...@yahoo.com>> wrote:
>> 
>> Hi All,
>> 
>> Looks like we are out of the woods for now (so we think) - we went with 
>> Hadoop free version and relied on client libraries on edge node. 
>> 
>> However, I am still not very confident as I started digging into that stack 
>> as well and realized what Till pointed out (traces leads to a class that is 
>> part of 2.9). I did dig around env variables and nothing was set. This is a 
>> brand new clustered installed a week back and our team is literally the 
>> first hands on deck. I will fish around and see if Hortonworks back-ported 
>> something for HDP (dots are still not completely connected but nonetheless, 
>> we have a test session and app running in our brand new Prod)
>> 
>> Thanks, Ashish
>> 
>>> On Mar 22, 2018, at 4:47 AM, Till Rohrmann >> <mailto:trohrm...@apache.org>> wrote:
>>> 
>>> Hi Ashish,
>>> 
>>> the class `RequestHedgingRMFailoverProxyProvider` was only introduced with 
>>> Hadoop 2.9.0. My suspicion is thus that you start the client with some 
>>> Hadoop 2.9.0 dependencies on the class path. Could you please check the 
>>> logs of the client what's on its class path? Maybe you could also share the 
>>> logs with us. Please also check whether HADOOP_CLASSPATH is set to 
>>> something suspicious.
>>> 
>>> Thanks a lot!
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Wed, Mar 21, 2018 at 6:25 PM, ashish pok >> <mailto:ashish...@yahoo.com>> wrote:
>>> Hi Piotrek,
>>> 
>>> At this point we are simply trying to start a YARN session. 
>>> 
>>> BTW, we are on Hortonworks HDP 2.6 which is on 2.7 Hadoop if anyone has 
>>> experienced similar issues. 
>>> 
>>> We actually pulled 2.6 binaries for the heck of it and ran into same 
>>> issues. 
>>> 
>>> I guess we are left with getting non-hadoop binaries and set 
>>> HADOOP_CLASSPATH then?
>>> 
>>> -- Ashish
>>> 
>>> On Wed, Mar 21, 2018 at 12:03 PM, Piotr Nowojski
>>> mailto:pi...@data-artisans.com>> wrote:
>>> Hi,
>>> 
>>> > Does some simple word count example works on the cluster after the 
>>> > upgrade?
>>> 
>>> If not, maybe your job is pulling some dependency that’s causing this 
>>> version conflict?
>>> 
>>> Piotrek
>>> 
>>>> On 21 Mar 2018, at 16:52, ashish pok >>> <mailto:ashish...@yahoo.com>> wrote:
>>>> 
>>>> Hi Piotrek,
>>>> 
>>>> Yes, this is a brand new Prod environment. 2.6 was in our lab.
>>>> 
>>>> Thanks,
>>>> 
>>>> -- Ashish
>>>> 
>>>> On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski
>>>> mailto:pi...@data-artisans.com>> wrote:
>>>> Hi,
>>>> 
>>>> Have you replaced all of your old Flink binaries with freshly downloaded 
>>>> <https://flink.apache.org/downloads.html> Hadoop 2.7 versions? Are you 
>>>> sure that something hasn't mix in the process?
>>>> 
>>>> Does some simple word count example works on the cluster after the up