Re: Restore from savepoint with Iterations
Let me see if I can do artificial throttle somewhere. Volume of data is really high and hence trying to avoid rounds in Kafka too. Looks like options are “not so elegant” until FLIP-15. Thanks for pointers again!!! On Monday, May 4, 2020, 11:06 PM, Ken Krugler wrote: Hi Ashish, The workaround we did was to throttle data flowing in the iteration (in code), though not sure if that’s possible for your situation. You could remove the iteration by writing to a Kafka topic at the end of the part of your workflow that is currently an iteration, and then consuming from that same topic as your “iteration" source. — Ken On May 4, 2020, at 7:32 PM, Ashish Pokharel wrote: Hi Ken, Thanks for the quick response! I came across FLIP-15 on my next google search after I sent email :) It DEFINITELY looks that way. As I was watching logs and nature of how job gets stuck it does look like buffer is blocked. But FLIP-15 has not moved further though. So there are no workarounds at all at this point? Perhaps a technique to block Kafka Consumer for some time? Even that may get me going but looks like there is probability of this happening during the normal processing as your use case demonstrates. I am using iteration with no timeouts for prod job, using timeouts only in unit testing.Theory was in prod input stream will be indefinite and sometime long lull of no event might happen during maintenance, backlog etc. I really would like to avoid a bloat in the DAG by repeating same functions with filters and side outputs. Other than obvious repetition, it will increase the site of states by a factor. Even those slowly moving dimensions are not light (around half billion every day) :) On May 4, 2020, at 10:13 PM, Ken Krugler wrote: Hi Ashish, Wondering if you’re running into the gridlock problem I mention on slide #25 here: https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink If the iteration path has too much data in it, then the network buffer at the head of the iteration can fill up, and it never clears out because the operator consuming those buffers is blocked writing to the next operator in the iteration, and so on back to the head. We ran into this when outlinks from web pages caused fan-out/amplification of the data being iterated, but maybe you hit it with restoring from state. — Ken On May 4, 2020, at 6:41 PM, Ashish Pokharel wrote: Hi all, Hope everyone is doing well! I am running into what seems like a deadlock (application stalled) situation with a Flink streaming job upon restore from savepoint. Job has a slowly moving stream (S1) that needs to be “stateful” and a continuous stream (S2) which is “joined” with slow moving stream (S1). Some level of loss/repetition is acceptable in continuous stream (S2) and hence can rely on something like Kafka consumer states upon restarts etc. Continuous stream (S2) however needs to be iterated through states from slowly moving streams (S1) a few times (mostly 2). States are fair sized (ends up being 15GB on HDFS). When job is restarted with no continuous data (S2) on topic job starts up, restores states and does it’s initial checkpoint within 3 minutes. However, when app is started from savepoint and continuous stream (S2) is actually present in Kafka it seems like application comes to a halt. Looking at progress of checkpoints, it seems like every attempt is stuck after until some timeouts happen at around 10 mins. If iteration on stream is removed app can successfully start and checkpoint even when continuous stream (S2) is flowing in as well. Unfortunately we are working on a hosted environment for both data and platform, hence debugging with thread dumps etc will be challenging. I couldn’t find a known issue on this but was wondering if anyone has seen such behavior or know of any issues in past. It does look like checkpointing has to be set to forced to get an iterative job to checkpoint in the first place (an option that is marked deprecated already - working on 1.8.2 version as of now). I do understand challenges around consistent checkpointing of iterative stream. As I mentioned earlier, what I really want to maintain for the most part are states of slowly moving dimensions. Iterations does solve the problem at hand (multiple loops of logic) pretty gracefully but not being able to restore from savepoint will be a show stopper. Will appreciate any pointer / suggestions. Thanks in advance, Ashish --Ken Kruglerhttp://www.scaleunlimited.comcustom big data solutions & trainingHadoop, Cascading, Cassandra & Solr --Ken Kruglerhttp://www.scaleunlimited.comcustom big data solutions & trainingHadoop, Cascading, Cassandra & Solr
Re: Kafka consumer behavior with same group Id, 2 instances of apps in separate cluster?
Thanks - that sounds like a good model to at least explore. We are essentially stateless at this point for this particular need. - Ashish On Tuesday, September 3, 2019, 11:28 PM, Becket Qin wrote: Thanks for the explanation Ashish. Glad you made it work with custom source. I guess your application is probably stateless. If so, another option might be having a geo-distributed Flink deployment. That means there will be TM in different datacenter to form a single Flink cluster. This will also come with failover if one of the TM is down. I am not sure if anyone have tried this. It is probably a heavier solution than using Kafka to do the failover, but the good thing is that you may also do some stateful processing if you have a globally accessible storage for the state backup. Thanks, Jiangjie (Becket) Qin On Wed, Sep 4, 2019 at 11:00 AM Ashish Pokharel wrote: Thanks Becket, Sorry for delayed response. That’s what I thought as well. I built a hacky custom source today directly using Kafka client which was able to join consumer group etc. which works as I expected but not sure about production readiness for something like that :) The need arises because of (1) Business continuity needs (2) Some of the pipelines we are building are close to network edge and need to run on nodes where we are not allowed to create cluster (yea - let’s not get into that can of security related worms :)). We will get there at some point but for now we are trying to support business continuity on those edge nodes by not actually forming a cluster but using “walled garden” individual Flink server. I fully understand this is not ideal. And all of this started because some of the work we were doing with Logstash needed to be migrated out as Logstash wasn’t able to keep up with data rates unless we put some ridiculous number of servers. In essence, we have pre-approved constraints to connect to Kafka and southbound interfaces using Logstash, which we need to replace for some datasets as they are massive for Logstash to keep up with. Hope that explains a bit where our head is at. Thanks, Ashish On Aug 29, 2019, at 11:40 AM, Becket Qin wrote: Hi Ashish, You are right. Flink does not use Kafka based group management. So if you have two clusters consuming the same topic, they will not divide the partitions. The cross cluster HA is not quite possible at this point. It would be good to know the reason you want to have such HA and see if Flink meets you requirement in another way. Thanks, Jiangjie (Becket) Qin On Thu, Aug 29, 2019 at 9:19 PM ashish pok wrote: Looks like Flink is using “assign” partitions instead of “subscribe” which will not allow participating in a group if I read the code correctly. Has anyone solved this type of problem in past of active-active HA across 2 clusters using Kafka? - Ashish On Wednesday, August 28, 2019, 6:52 PM, ashish pok wrote: All, I was wondering what the expected default behavior is when same app is deployed in 2 separate clusters but with same group Id. In theory idea was to create active-active across separate clusters but it seems like both apps are getting all the data from Kafka. Anyone else has tried something similar or have an insight on expected behavior? I was expecting to see partial data on both apps and to get all data in one app if other was turned off. Thanks in advance, - Ashish
Re: Kafka consumer behavior with same group Id, 2 instances of apps in separate cluster?
Looks like Flink is using “assign” partitions instead of “subscribe” which will not allow participating in a group if I read the code correctly. Has anyone solved this type of problem in past of active-active HA across 2 clusters using Kafka? - Ashish On Wednesday, August 28, 2019, 6:52 PM, ashish pok wrote: All, I was wondering what the expected default behavior is when same app is deployed in 2 separate clusters but with same group Id. In theory idea was to create active-active across separate clusters but it seems like both apps are getting all the data from Kafka. Anyone else has tried something similar or have an insight on expected behavior? I was expecting to see partial data on both apps and to get all data in one app if other was turned off. Thanks in advance, - Ashish
Kafka consumer behavior with same group Id, 2 instances of apps in separate cluster?
All, I was wondering what the expected default behavior is when same app is deployed in 2 separate clusters but with same group Id. In theory idea was to create active-active across separate clusters but it seems like both apps are getting all the data from Kafka. Anyone else has tried something similar or have an insight on expected behavior? I was expecting to see partial data on both apps and to get all data in one app if other was turned off. Thanks in advance, - Ashish
Re: JSON to CEP coversion
Awesome. Let me look into it. Thanks a lot! - Ashish On Tuesday, January 22, 2019, 3:31 PM, Dominik Wosiński wrote: Hey Anish, I have done some abstraction over the logic of CEP, but with the use of Apache Bahir[1], which introduces SIddhi CEP[2][ engine that allows SQL like definitions of the logic. Best, Dom. [1] https://github.com/apache/bahir[2] https://github.com/wso2/siddhi wt., 22 sty 2019 o 20:20 ashish pok napisał(a): All, Wondering if anyone in community has started something along the line - idea being CEP logic is abstracted out to metadata instead. That can then further be exposed out to users from a REST API/UI etc. Obviously, it would really need some additional information like data catalog etc for it to be really helpful. But to get started we were thinking of allowing power users make some modifications to existing rule outside of CI/CD process. Thanks, - Ashish
JSON to CEP coversion
All, Wondering if anyone in community has started something along the line - idea being CEP logic is abstracted out to metadata instead. That can then further be exposed out to users from a REST API/UI etc. Obviously, it would really need some additional information like data catalog etc for it to be really helpful. But to get started we were thinking of allowing power users make some modifications to existing rule outside of CI/CD process. Thanks, - Ashish
Re: Unit / Integration Test Timer
Hi Till, A quick update. I added a MockFlatMap function with the following logic: public MockStreamPause(int pauseSeconds) { this.pauseSeconds = pauseSeconds; } @Override public void flatMap(PlatformEvent event, Collector out) throws Exception { if(event.getSrc().startsWith(EventTupleGeneratorUtil.PAUSE_EVENT_SRC_PREFIX)) { if (pauseSeconds>0) { try { Thread.sleep(pauseSeconds*1000); } catch (InterruptedException intEx) { logger.info("Mock pause interrupted", intEx); } } } else { out.collect(event); } } This seems to let me move forward with testing. Let me know if you recommend using latest test utils with 1.4.2 core as a test. Thanks, Ashish On Monday, September 17, 2018, 9:33:56 AM EDT, ashish pok wrote: Hi Till, I am still in 1.4.2 version and will need some time before we can get later version certified in our Prod env. Timers are definitely not completing in my tests with 1.4.2 utils, I can see them being registered in debugger though. Having said that, should I pull latest test utils only and try it out? Creating a simple example shouldn't take that long, I can create one sometime this week. Thanks, Ashish On Monday, September 17, 2018, 3:53:59 AM EDT, Till Rohrmann wrote: Hi Ashish, I think you are right. In the current master, the system should wait until all timers have completed before terminating. Could you check whether this is the case? If not, then this might indicate a problem. Which version of Flink are you using? I guess it would also be helpful to have access to a working example where the problem is visible. Cheers,Till On Fri, Sep 14, 2018 at 7:57 PM ashish pok wrote: Hi Till, To answer your first question, I currently don't (and honestly now sure how other than of course in IDE I can use breakpoint, or if something like MockIto can do it). So did I interpret it correctly that it sounds like execution env started using flink-test-utils will essentially tear down once it consumes last data point (ie. end of collection I am passing) even though there could be active Timers Registered? Further, most of our pipelines are using low-level process functions - we toyed around with other windowing and session functions but process functions gave the most amount of flexibility (at least at this point until we can re-visit) and we generate keys for aggregation/windowing somewhere upstream (say map, flatMap or another process functions). Meaning some pipelines are event / processing time agnostic in a sense. Although technically within the process functions we will have timers registered etc. This helped us with unbounded keys, sensor data that could potentially be backfilled (ie. watermarks have passed way back etc). I wouldn't doubt a bit there are probably better solutions :) With that background, I am sort of not following your second note about event time and how we can leverage that for testing. Our intent is to create sampled input from results and compare output from tests to results (ie. end to end integration tests) as part of our CICD. Normal flow seems to work well, just getting "negative" test cases of timeouts seems to be mystery right now :) So Single Operator harnesses doesn't sound like the right approach. let me know otherwise. Thanks, On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann wrote: Hi Ashish, how do you make sure that all of your data is not consumed within a fraction of the 2 seconds? For this it would be better to use event time which allows you to control how time passes. If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness. Cheers,Till On Fri, Sep 14, 2018 at 3:36 PM ashish pok wrote: All, Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-utils) seems to complete even though Timers are registered and have not been invoked. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); CollectSink.values.clear(); // create a stream of custom elements and apply transformations env.fromCollection(t.getTestInputs()) .process(new TupleProcessFn()) .keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2)) .addSink(getSink()) env.execute(); I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn() after a few Tuples are collected I can see onTimer being invoked. So what is the trick here? I went as far as putting in a MapFunction after second process function that has a sleep to no avail. Thanks, Ashish
Re: Unit / Integration Test Timer
Hi Till, I am still in 1.4.2 version and will need some time before we can get later version certified in our Prod env. Timers are definitely not completing in my tests with 1.4.2 utils, I can see them being registered in debugger though. Having said that, should I pull latest test utils only and try it out? Creating a simple example shouldn't take that long, I can create one sometime this week. Thanks, Ashish On Monday, September 17, 2018, 3:53:59 AM EDT, Till Rohrmann wrote: Hi Ashish, I think you are right. In the current master, the system should wait until all timers have completed before terminating. Could you check whether this is the case? If not, then this might indicate a problem. Which version of Flink are you using? I guess it would also be helpful to have access to a working example where the problem is visible. Cheers,Till On Fri, Sep 14, 2018 at 7:57 PM ashish pok wrote: Hi Till, To answer your first question, I currently don't (and honestly now sure how other than of course in IDE I can use breakpoint, or if something like MockIto can do it). So did I interpret it correctly that it sounds like execution env started using flink-test-utils will essentially tear down once it consumes last data point (ie. end of collection I am passing) even though there could be active Timers Registered? Further, most of our pipelines are using low-level process functions - we toyed around with other windowing and session functions but process functions gave the most amount of flexibility (at least at this point until we can re-visit) and we generate keys for aggregation/windowing somewhere upstream (say map, flatMap or another process functions). Meaning some pipelines are event / processing time agnostic in a sense. Although technically within the process functions we will have timers registered etc. This helped us with unbounded keys, sensor data that could potentially be backfilled (ie. watermarks have passed way back etc). I wouldn't doubt a bit there are probably better solutions :) With that background, I am sort of not following your second note about event time and how we can leverage that for testing. Our intent is to create sampled input from results and compare output from tests to results (ie. end to end integration tests) as part of our CICD. Normal flow seems to work well, just getting "negative" test cases of timeouts seems to be mystery right now :) So Single Operator harnesses doesn't sound like the right approach. let me know otherwise. Thanks, On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann wrote: Hi Ashish, how do you make sure that all of your data is not consumed within a fraction of the 2 seconds? For this it would be better to use event time which allows you to control how time passes. If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness. Cheers,Till On Fri, Sep 14, 2018 at 3:36 PM ashish pok wrote: All, Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-utils) seems to complete even though Timers are registered and have not been invoked. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); CollectSink.values.clear(); // create a stream of custom elements and apply transformations env.fromCollection(t.getTestInputs()) .process(new TupleProcessFn()) .keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2)) .addSink(getSink()) env.execute(); I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn() after a few Tuples are collected I can see onTimer being invoked. So what is the trick here? I went as far as putting in a MapFunction after second process function that has a sleep to no avail. Thanks, Ashish
Re: Unit / Integration Test Timer
Hi Till, To answer your first question, I currently don't (and honestly now sure how other than of course in IDE I can use breakpoint, or if something like MockIto can do it). So did I interpret it correctly that it sounds like execution env started using flink-test-utils will essentially tear down once it consumes last data point (ie. end of collection I am passing) even though there could be active Timers Registered? Further, most of our pipelines are using low-level process functions - we toyed around with other windowing and session functions but process functions gave the most amount of flexibility (at least at this point until we can re-visit) and we generate keys for aggregation/windowing somewhere upstream (say map, flatMap or another process functions). Meaning some pipelines are event / processing time agnostic in a sense. Although technically within the process functions we will have timers registered etc. This helped us with unbounded keys, sensor data that could potentially be backfilled (ie. watermarks have passed way back etc). I wouldn't doubt a bit there are probably better solutions :) With that background, I am sort of not following your second note about event time and how we can leverage that for testing. Our intent is to create sampled input from results and compare output from tests to results (ie. end to end integration tests) as part of our CICD. Normal flow seems to work well, just getting "negative" test cases of timeouts seems to be mystery right now :) So Single Operator harnesses doesn't sound like the right approach. let me know otherwise. Thanks, On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann wrote: Hi Ashish, how do you make sure that all of your data is not consumed within a fraction of the 2 seconds? For this it would be better to use event time which allows you to control how time passes. If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness. Cheers,Till On Fri, Sep 14, 2018 at 3:36 PM ashish pok wrote: All, Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-utils) seems to complete even though Timers are registered and have not been invoked. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); CollectSink.values.clear(); // create a stream of custom elements and apply transformations env.fromCollection(t.getTestInputs()) .process(new TupleProcessFn()) .keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2)) .addSink(getSink()) env.execute(); I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn() after a few Tuples are collected I can see onTimer being invoked. So what is the trick here? I went as far as putting in a MapFunction after second process function that has a sleep to no avail. Thanks, Ashish
Unit / Integration Test Timer
All, Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-utils) seems to complete even though Timers are registered and have not been invoked. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); CollectSink.values.clear(); // create a stream of custom elements and apply transformations env.fromCollection(t.getTestInputs()) .process(new TupleProcessFn()) .keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2)) .addSink(getSink()) env.execute(); I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn() after a few Tuples are collected I can see onTimer being invoked. So what is the trick here? I went as far as putting in a MapFunction after second process function that has a sleep to no avail. Thanks, Ashish
Re: Questions on Unbounded number of keys
Thanks Till, I will try to create an instance of app will smaller heap and get a couple of dumps as well. I should be ok to share that on google drive. - Ashish On Tuesday, July 31, 2018, 7:49 AM, Till Rohrmann wrote: Hi Ashish, FIRE_AND_PURGE should also clear the window state. Yes I mean with active windows, windows which have not been purged yet. Maybe Aljoscha knows more about why the window state is growing (I would not rule out a bug). Cheers,Till On Tue, Jul 31, 2018 at 1:45 PM ashish pok wrote: Hi Till, Keys are unbounded (a group of events have same key but that key doesnt repeat after it is fired other than some odd delayed events). So basically there 1 key that will be aligned to a window. When you say key space of active windows, does that include keys for windows that have already fired and could be in memory footprint? If so, that is basically the problem I would get into and looking for a solution to clean-up. Like I said earlier overriding tigger to FIRE_AND_PURGE did not help. If I take the same stream and key and refactor it to how Chang is doing it with Process Function, issue goes away. If you mean only currently processing key space of active windows (not the ones that have already fired) then I would say, that cannot be the case. We are getting the data from period poll of same number of devices and uniqueness of key is simply a time identifier prefixed to device identifier. Even though there could be a little delayed data, the chances of number of unique keys growing constantly for days is probably none as device list is constant. Thanks, Ashish - Ashish On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann wrote: Hi Ashish, the processing time session windows need to store state in the StateBackends and I assume that your key space of active windows is constantly growing. That could explain why you are seeing an ever increasing memory footprint. But without knowing the input stream and what the UDFs do this is only a guess. Cheers,Till On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske wrote: Hi Chang, The state handle objects are not created per key but just once per function instance. Instead they route state accesses to the backend (JVM heap or RocksDB) for the currently active key. Best, Fabian 2018-07-30 12:19 GMT+02:00 Chang Liu : Hi Andrey, Thanks for your reply. My question might be silly, but there is still one part I would like to fully understand. For example, in the following example: class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed by Session ID lazy val userId: ValueState[String] = getRuntimeContext.getState( new ValueStateDescriptor[String]("userId", BasicTypeInfo.STRING_TYPE_INFO)) lazy val clicks: ListState[Click] = getRuntimeContext.getListState( new ListStateDescriptor[Click]("clicks", createTypeInformation[Click])) override def processElement( click: Click, context: KeyedProcessFunction[String, Click, Click]#Context, out: Collector[Click]) : Unit = { // process, output, clear state if necessary } override def onTimer( timestamp: Long, ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext, out: Collector[Click]) : Unit = { // output and clear state } } Even though I am regularly clearing the two states, userId and clicks (which means I am cleaning up the values stored in the States), my question is: then what about the two State objects themselves: userId and clicks? These States objects are also created per Session ID right? If the number of Session IDs are unbounded, than the number of these State objects are also unbounded. That means, there are userId-state-1 and clicks-state-1 for session-id-1, userId-state-2 and clicks-state-2 for session-id-2, userId-state-3 and clicks-state-3 for session-id-3, …, which are handled by different (or same if two from different range, as you call it, are assigned to the same one) keyed operator instance. I am not concerning the actual value in the State (which will be managed carefully, if I am clearing them carefully). I am thinking about the State objects themselves, which I have no idea what is happening to them and what will happen to them. Many thanks :) Best regards/祝好, Chang Liu 刘畅 On 26 Jul 2018, at 10:55, Andrey Zagrebin wrote: Hi Chang Liu, The unbounded nature of the stream keyed or not should not lead to out of memory. Flink parallel keyed operator instances have fixed number (parallelism) and just process some range of keyed elements, in your example it is a subrange of session ids. The keyed processed elements (http requests) are objects created when they enter the pipeline and garage collected after having been processed in streaming fashion. If they arrive very rapidly it can lead to high back pressure from upstream to downstream operators, buffers can become full and pipeline stops/slows down processing external inputs, i
Re: Questions on Unbounded number of keys
Hi Till, Keys are unbounded (a group of events have same key but that key doesnt repeat after it is fired other than some odd delayed events). So basically there 1 key that will be aligned to a window. When you say key space of active windows, does that include keys for windows that have already fired and could be in memory footprint? If so, that is basically the problem I would get into and looking for a solution to clean-up. Like I said earlier overriding tigger to FIRE_AND_PURGE did not help. If I take the same stream and key and refactor it to how Chang is doing it with Process Function, issue goes away. If you mean only currently processing key space of active windows (not the ones that have already fired) then I would say, that cannot be the case. We are getting the data from period poll of same number of devices and uniqueness of key is simply a time identifier prefixed to device identifier. Even though there could be a little delayed data, the chances of number of unique keys growing constantly for days is probably none as device list is constant. Thanks, Ashish - Ashish On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann wrote: Hi Ashish, the processing time session windows need to store state in the StateBackends and I assume that your key space of active windows is constantly growing. That could explain why you are seeing an ever increasing memory footprint. But without knowing the input stream and what the UDFs do this is only a guess. Cheers,Till On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske wrote: Hi Chang, The state handle objects are not created per key but just once per function instance. Instead they route state accesses to the backend (JVM heap or RocksDB) for the currently active key. Best, Fabian 2018-07-30 12:19 GMT+02:00 Chang Liu : Hi Andrey, Thanks for your reply. My question might be silly, but there is still one part I would like to fully understand. For example, in the following example: class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed by Session ID lazy val userId: ValueState[String] = getRuntimeContext.getState( new ValueStateDescriptor[String]("userId", BasicTypeInfo.STRING_TYPE_INFO)) lazy val clicks: ListState[Click] = getRuntimeContext.getListState( new ListStateDescriptor[Click]("clicks", createTypeInformation[Click])) override def processElement( click: Click, context: KeyedProcessFunction[String, Click, Click]#Context, out: Collector[Click]) : Unit = { // process, output, clear state if necessary } override def onTimer( timestamp: Long, ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext, out: Collector[Click]) : Unit = { // output and clear state } } Even though I am regularly clearing the two states, userId and clicks (which means I am cleaning up the values stored in the States), my question is: then what about the two State objects themselves: userId and clicks? These States objects are also created per Session ID right? If the number of Session IDs are unbounded, than the number of these State objects are also unbounded. That means, there are userId-state-1 and clicks-state-1 for session-id-1, userId-state-2 and clicks-state-2 for session-id-2, userId-state-3 and clicks-state-3 for session-id-3, …, which are handled by different (or same if two from different range, as you call it, are assigned to the same one) keyed operator instance. I am not concerning the actual value in the State (which will be managed carefully, if I am clearing them carefully). I am thinking about the State objects themselves, which I have no idea what is happening to them and what will happen to them. Many thanks :) Best regards/祝好, Chang Liu 刘畅 On 26 Jul 2018, at 10:55, Andrey Zagrebin wrote: Hi Chang Liu, The unbounded nature of the stream keyed or not should not lead to out of memory. Flink parallel keyed operator instances have fixed number (parallelism) and just process some range of keyed elements, in your example it is a subrange of session ids. The keyed processed elements (http requests) are objects created when they enter the pipeline and garage collected after having been processed in streaming fashion. If they arrive very rapidly it can lead to high back pressure from upstream to downstream operators, buffers can become full and pipeline stops/slows down processing external inputs, it usually means that your pipeline is under provisioned. The only accumulated data comes from state (windows, user state etc), so if you control its memory consumption, as Till described, there should be no other source of out of memory. Cheers,Andrey On 25 Jul 2018, at 19:06, Chang Liu wrote: Hi Till, Thanks for your reply. But I think maybe I did not make my question clear. My question is not about whether the States within each keyed operator instances will run out of memory. My question is about, whether the unlimited keyed
Re: Implement Joins with Lookup Data
Hi Michael, We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of memory on each TM. We have 15 partitions on Kafka for stream and 6 for context/smaller stream. Heap is around 50%, GC is about 150ms and CPU loads are low. We may be able to reduce resources on this if need be. Thanks, - Ashish On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman wrote: Hi Ashish, We are planning for a similar use case and I was wondering if you can share the amount of resources you have allocated for this flow? Thanks,Michael On Tue, Jul 24, 2018, 18:57 ashish pok wrote: BTW, We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states. Hope this helps! - Ashish On Tuesday, July 24, 2018, 11:37 AM, Elias Levy wrote: Alas, this suffer from the bootstrap problem. At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing. Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key. On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann wrote: Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution. On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal wrote: Hi Till, How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? Regards,Harsh On Tue, Jul 24, 2018 at 09:22 Till Rohrmann wrote: Hi Harshvardhan, I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html Cheers,Till On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal wrote: Hi, Thanks for your responses. There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change. It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. Which approach would you recommend? Regards,Harshvardhan On Tue, Jul 24, 2018 at 04:04 Jain, Ankit wrote: How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc. Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time. Thanks Ankit From: Jörn Franke Date: Monday, July 23, 2018 at 10:10 PM To: Harshvardhan Agrawal Cc: Subject: Re: Implement Joins with Lookup Data For the first one (lookup of single entries) you could use a NoSQL db (eg key value store) - a relational database will not scale. Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. On 24. Jul 2018, at 05:25, Harshvardhan Agrawal wrote: Hi, We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of
Re: Implement Joins with Lookup Data
App is checkpointing, so will pick up if an operation fails. I suppose you mean a TM completely crashes and even in that case another TM would spin up and it “should” pick up from checkpoint. We are running YARN but I would assume TM recovery would be possible in any other cluster. I havent tested this specifically during init phase but we have killed TMs during normal processing as test case in stateful processing and dont remember seeing an issue. - Ashish On Tuesday, July 24, 2018, 12:31 PM, Harshvardhan Agrawal wrote: What happens when one of your workers dies? Say the machine is dead is not recoverable. How do you recover from that situation? Will the pipeline die and you go over the entire bootstrap process? On Tue, Jul 24, 2018 at 11:56 ashish pok wrote: BTW, We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states. Hope this helps! - Ashish On Tuesday, July 24, 2018, 11:37 AM, Elias Levy wrote: Alas, this suffer from the bootstrap problem. At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing. Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key. On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann wrote: Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution. On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal wrote: Hi Till, How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? Regards,Harsh On Tue, Jul 24, 2018 at 09:22 Till Rohrmann wrote: Hi Harshvardhan, I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html Cheers,Till On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal wrote: Hi, Thanks for your responses. There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change. It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. Which approach would you recommend? Regards,Harshvardhan On Tue, Jul 24, 2018 at 04:04 Jain, Ankit wrote: How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc. Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time. Thanks Ankit From: Jörn Franke Date: Monday, July 23, 2018 at 10:10 PM To: Harshvardhan Agrawal Cc: Subject: Re: Implement Joins with Lookup Data For the first one (lookup of single entries) you could use a NoSQL db (eg key value store) - a relational database will not scale. Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. On 24. Jul 2018, at 05:25, Harshvardhan Agrawal wrote: Hi, We are using Flink for financial data enrichment and aggregations. We have Positions data t
Re: Implement Joins with Lookup Data
BTW, We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states. Hope this helps! - Ashish On Tuesday, July 24, 2018, 11:37 AM, Elias Levy wrote: Alas, this suffer from the bootstrap problem. At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing. Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key. On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann wrote: Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution. On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal wrote: Hi Till, How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? Regards,Harsh On Tue, Jul 24, 2018 at 09:22 Till Rohrmann wrote: Hi Harshvardhan, I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html Cheers,Till On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal wrote: Hi, Thanks for your responses. There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change. It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. Which approach would you recommend? Regards,Harshvardhan On Tue, Jul 24, 2018 at 04:04 Jain, Ankit wrote: How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc. Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time. Thanks Ankit From: Jörn Franke Date: Monday, July 23, 2018 at 10:10 PM To: Harshvardhan Agrawal Cc: Subject: Re: Implement Joins with Lookup Data For the first one (lookup of single entries) you could use a NoSQL db (eg key value store) - a relational database will not scale. Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. On 24. Jul 2018, at 05:25, Harshvardhan Agrawal wrote: Hi, We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it: 1) First Approach: a) Get positions from Kafka and key by product key. b) Perform lookup from the database for each key and then obtain Tuple2 2) Second Approach: a) Get positions from Kafka and key by product key. b) Window the keyed stream into say 15 seconds each. c) For each window get the unique product keys and perform a single lookup. d) Somehow join Positions and Products In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos t
Re: Permissions to delete Checkpoint on cancel
Stefan, Can’t thank you enough for this write-up. This is awesome explanation. I had misunderstood concepts of RocksDB working directory and Checkpoint FS. My main intent is to boost performance of RocksDB with SSD available locally. Recovery time from HDFS is not much of a concern but load on HDFS “may” be a concern in future - we will see. Going over the documentation again after reading your email, it looks like what I intended to do was change my RocksDB working directory to local SSD, which I believe is Java IO Tmp dir by default, by using state.backend.rocksdb.checkpointdir option first and perform any tuning necessary to optimize SSD. Thanks, - Ashish On Monday, July 23, 2018, 10:39 AM, Stefan Richter wrote: Hi, ok, let me briefly explain the differences between local working director, checkpoint directory, and savepoint directory and also outline their best practises/requirements/tradeoffs. First easy comment is that typically checkpoints and savepoints have similar requirements and most users write them to the same fs. The working directory, i.e. the directory for spilling or where RocksDB operates is transient, it does not require replication because it is not part of the fault tolerance strategy. Here the main concern is speed and that is why it is ideally a local, physically attached disk on the TM machine. In contrast to that, checkpoints and savepoints are part of the fault tolerance strategy and that is why they typically should be on fault tolerant file systems. In database terms, think of checkpoints as a recovery mechanism and savepoints as backups. As we usually want to survive node failures, those file systems should be fault tolerant/replicated, and also accessible for read/write from all TMs and the JM. TMs obviously need to write the data, and read in recovery. Under node failures, this means that a TM might have to read state that was written on a different machine, that is why TMs should be able to access the files written by other TMs. The JM is responsible for deleting checkpoints, because TMs might go down and that is why the JM needs access as well. Those requirements typically hold for most Flink users. However, you might get away with certain particular trade-offs. You can write checkpoints to local disk if: - Everything runs on one machine, or - (not sure somebody ever did this, but it could work)1) You will do the cleanup of old checkpoints manually (because JM cannot reach them), e.g. with scripts and2) You will never try to rescale from a checkpoint and 3) Tasks will never migrate to a different machine. You ignore node/disk/etc failures, and ensure that your job „owns" the cluster with no other jobs running in parallel. This means accepting data loss in the previous cases. Typically, it should be ok to use a dfs only for checkpoints and savepoints, the local working directories should not go to dfs or else things will slow down dramatically. If you are just worried about recovery times, you might want to take a look at the local recovery feature [1], that keeps a secondary copy of the state on local disk for faster restores, but still ensures fault tolerance with a primary copy in dfs. Best,Stefan [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#task-local-recovery Am 23.07.2018 um 14:18 schrieb ashish pok : Sorry, Just a follow-up. In absence of NAS then the best option to go with here is checkpoint and savepoints both on HDFS and StateBackend using local SSDs then? We were trying to not even hit HDFS other than for savepoints. - Ashish On Monday, July 23, 2018, 7:45 AM, ashish pok wrote: Stefan, I did have first point at the back of my mind. I was under the impression though for checkpoints, cleanup would be done by TMs as they are being taken by TMs. So for a standalone cluster with its own zookeeper for JM high availability, a NAS is a must have? We were going to go with local checkpoints with access to remote HDFS for savepoints. This sounds like it will be a bad idea then. Unfortunately we can’t run on YARN and NAS is also a no-no in one of our datacenters - there is a mountain of security complainace to climb before we will in Production if we need to go that route. Thanks, Ashish On Monday, July 23, 2018, 5:10 AM, Stefan Richter wrote: Hi, I am wondering how this can even work properly if you are using a local fs for checkpoints instead of a distributed fs. First, what happens under node failures, if the SSD becomes unavailable or if a task gets scheduled to a different machine, and can no longer access the disk with the corresponding state data, or if you want to scale-out. Second, the same problem is also what you can observe with the job manager: how could the checkpoint coordinator, that runs on the JM, access a file on a local FS on a different node to cleanup the checkpoint data? The purpose of using a distributed fs here is that a
Re: Permissions to delete Checkpoint on cancel
Sorry, Just a follow-up. In absence of NAS then the best option to go with here is checkpoint and savepoints both on HDFS and StateBackend using local SSDs then? We were trying to not even hit HDFS other than for savepoints. - Ashish On Monday, July 23, 2018, 7:45 AM, ashish pok wrote: Stefan, I did have first point at the back of my mind. I was under the impression though for checkpoints, cleanup would be done by TMs as they are being taken by TMs. So for a standalone cluster with its own zookeeper for JM high availability, a NAS is a must have? We were going to go with local checkpoints with access to remote HDFS for savepoints. This sounds like it will be a bad idea then. Unfortunately we can’t run on YARN and NAS is also a no-no in one of our datacenters - there is a mountain of security complainace to climb before we will in Production if we need to go that route. Thanks, Ashish On Monday, July 23, 2018, 5:10 AM, Stefan Richter wrote: Hi, I am wondering how this can even work properly if you are using a local fs for checkpoints instead of a distributed fs. First, what happens under node failures, if the SSD becomes unavailable or if a task gets scheduled to a different machine, and can no longer access the disk with the corresponding state data, or if you want to scale-out. Second, the same problem is also what you can observe with the job manager: how could the checkpoint coordinator, that runs on the JM, access a file on a local FS on a different node to cleanup the checkpoint data? The purpose of using a distributed fs here is that all TM and the JM can access the checkpoint files. Best, Stefan > Am 22.07.2018 um 19:03 schrieb Ashish Pokharel : > > All, > > We recently moved our Checkpoint directory from HDFS to local SSDs mounted on > Data Nodes (we were starting to see perf impacts on checkpoints etc as > complex ML apps were spinning up more and more in YARN). This worked great > other than the fact that when jobs are being canceled or canceled with > Savepoint, local data is not being cleaned up. In HDFS, Checkpoint > directories were cleaned up on Cancel and Cancel with Savepoints as far as I > can remember. I am wondering if it is permissions issue. Local disks have RWX > permissions for both yarn and flink headless users (flink headless user > submits the apps to YARN using our CICD pipeline). > > Appreciate any pointers on this. > > Thanks, Ashish
Re: Permissions to delete Checkpoint on cancel
Stefan, I did have first point at the back of my mind. I was under the impression though for checkpoints, cleanup would be done by TMs as they are being taken by TMs. So for a standalone cluster with its own zookeeper for JM high availability, a NAS is a must have? We were going to go with local checkpoints with access to remote HDFS for savepoints. This sounds like it will be a bad idea then. Unfortunately we can’t run on YARN and NAS is also a no-no in one of our datacenters - there is a mountain of security complainace to climb before we will in Production if we need to go that route. Thanks, Ashish On Monday, July 23, 2018, 5:10 AM, Stefan Richter wrote: Hi, I am wondering how this can even work properly if you are using a local fs for checkpoints instead of a distributed fs. First, what happens under node failures, if the SSD becomes unavailable or if a task gets scheduled to a different machine, and can no longer access the disk with the corresponding state data, or if you want to scale-out. Second, the same problem is also what you can observe with the job manager: how could the checkpoint coordinator, that runs on the JM, access a file on a local FS on a different node to cleanup the checkpoint data? The purpose of using a distributed fs here is that all TM and the JM can access the checkpoint files. Best, Stefan > Am 22.07.2018 um 19:03 schrieb Ashish Pokharel : > > All, > > We recently moved our Checkpoint directory from HDFS to local SSDs mounted on > Data Nodes (we were starting to see perf impacts on checkpoints etc as > complex ML apps were spinning up more and more in YARN). This worked great > other than the fact that when jobs are being canceled or canceled with > Savepoint, local data is not being cleaned up. In HDFS, Checkpoint > directories were cleaned up on Cancel and Cancel with Savepoints as far as I > can remember. I am wondering if it is permissions issue. Local disks have RWX > permissions for both yarn and flink headless users (flink headless user > submits the apps to YARN using our CICD pipeline). > > Appreciate any pointers on this. > > Thanks, Ashish
Re: Multi-tenancy environment with mutual auth
Nico, This is great news. This is exactly what we are looking for. - Ashish On Monday, July 16, 2018, 8:28 AM, Nico Kruber wrote: Hi Ashish, this was just merged today for Flink 1.6. Please have a look at https://github.com/apache/flink/pull/6326 to check whether this fulfils your needs. Nico On 14/07/18 14:02, ashish pok wrote: > All, > > We are running into a blocking production deployment issue. It looks > like Flink inter-communications doesnt support SSL mutual auth. Any > plans/ways to support it? We are going to have to create DMZ for each > tenant without that, not preferable of course. > > > - Ashish -- Nico Kruber | Software Engineer data Artisans Follow us @dataArtisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Multi-tenancy environment with mutual auth
All, We are running into a blocking production deployment issue. It looks like Flink inter-communications doesnt support SSL mutual auth. Any plans/ways to support it? We are going to have to create DMZ for each tenant without that, not preferable of course. - Ashish
Re: Flink Kafka TimeoutException
Our experience on this has been that if Kafka cluster is healthy, JVM resource contentions on our Flink app caused by high heap utilization and there by lost CPU cycles on GC also did result in this issue. Getting basic JVM metrics like CPU load, GC times and Heap Util from your app (we use Graphite reporter) might help point out if you have same issue. - Ashish On Thursday, July 5, 2018, 11:46 AM, Ted Yu wrote: Have you tried increasing the request.timeout.ms parameter (Kafka) ? Which Flink / Kafka release are you using ? Cheers On Thu, Jul 5, 2018 at 5:39 AM Amol S - iProgrammer wrote: Hello, I am using flink with kafka and getting below exception. org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for helloworld.t-7: 30525 ms has passed since last append --- *Amol Suryawanshi* Java Developer am...@iprogrammer.com *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com
Re: Memory Leak in ProcessingTimeSessionWindow
All, I have been doing a little digging on this and to Stefan's point incrementing memory (not necessarily leak) was essentially because of keys that were incrementing as I was using time buckets concatenated with actual key to make unique sessions. Taking a couple of steps back, use case is very simple tumbling window of 15 mins by keys. Stream can be viewed simply as: || We have a few of these type of pipelines and one catch here is we wanted to create an app which can process historical and current data. HIstorical data is mainly because users adhoc request for "backfill". In order to easily manage processing pipeline, we are making no distinction between historical and current data as processing is based on event time. 1) Of course, easiest way to solve this problem is to create TumblingWindow of 15mins with some allowed lateness. One issue here was watermarks are moved forward and backfill data appeared to be viewed as late arrival data, which is a correct behavior from Flink perspective but seems to be causing issues in how we are trying to handle streams. 2) Another issue is our data collectors are highly distributed - we regularly get data from later event time buckets faster than older buckets. Also, it is also more consistent to actually create 15min buckets using concept of Session instead. So I am creating a key with | and a session gap of say 10 mins. This works perfectly from business logic perspective. However, now I am introducing quite a lot of keys which based on my heap dumps seem to be hanging around causing memory issues. 3) We converted the apps to a Process function and manage all states using key defined in step (2) and registering/unregistering timeouts. Solution (3) seems to be working pretty stable from memory perspective. However, it just feels like with so much high-level APIs available, we are not using them properly and keep reverting back to low level Process APIs - in the last month we have migrated about 5 or 6 apps to Process now :) For solution (2) it feels like any other Session aggregation use case will have the issue of keys hanging around (eg: for click streams with user sessions etc). Isn't there a way to clear those session windows? Sorry, I just feel like we are missing something simple and have been reverting to low level APIs instead. Thanks, On Friday, June 22, 2018, 9:00:14 AM EDT, ashish pok wrote: Stefan, All, If there are no further thoughts on this I am going to switch my app to low level Process API. I still think there is an easier solution here which I am missing but I will revisit that after I fix Production issue. Thanks, Ashish On Thursday, June 21, 2018, 7:28 AM, ashish pok wrote: Hi Stefan, Thanks for outlining the steps and are similar to what we have been doing for OOM issues. However, I was looking for something more high level on whether state / key handling needs some sort of cleanup specifically that is not done by default. I am about 99% (nothing is certain:)) sure that if I switch this app to a lower lever API like Process Function and manage my own state and timers, I will not see this issue. When I had same issue in the past it was for Global Window and Fabian point d out that new keys are constantly being created. I built a simple Process Function for that and issue went away. I think your first statement sort of hints that as well. So let me hone in on that. I am processing a time series data for network elements. Keys are 10 mins floor of event time concat with element ID. Idea here was to create 10 min buckets of data with windows that start with first event in that bucket and fire when no events arrive for 12 or so minutes.So new keys are definitely being created. So, 1- Am I adding to the memory constantly by doing that? Sounds like it based on your comments.2- If so, whats the way to clear those keys when windows fire if any?3- It seems like a very simple use case, so now I am wondering if I am even using the right high level API? Thanks, Ashish Sent from Yahoo Mail for iPhone On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter wrote: Hi, it is possible that the number of processing time timers can grow, because internal timers are scoped by time, key, and namespace (typically this means „window“, because each key can be part of multiple windows). So if the number of keys in your application is steadily growing this can happen. To analyse the heap dump, I usually take the following approach:- obviously include only reachable objects. If dumps are very big, try limit the size or to trigger the OOM earlier by configuring a lower heap size. It should still give you the problematic object accumulation, if there is one.- like at the statistics of „heavy hitter“ classes, i.e. classes for which the instances contribute the most to the overall heap consumption. Sometimes this will show you classes that are also part of classes that
Re: How to partition within same physical node in Flink
Thanks Fabian! It sounds like KeyGroup will do the trick if that can be made publicly accessible. On Monday, July 2, 2018, 5:43:33 AM EDT, Fabian Hueske wrote: Hi Ashish, hi Vijay, Flink does not distinguish between different parts of a key (parent, child) in the public APIs. However, there is an internal concept of KeyGroups which is similar to what you call physical partitioning. A KeyGroup is a group of keys that are always processed on the same physical node. The motivation for this feature is operator scaling because all keys of a group are always processed by the same node and hence their state is always distributed together. However, AFAIK, KeyGroups are not exposed to the user API. Moreover, KeyGroups are distributed to slots, i.e., each KeyGroup is processed by a single slot, but each slot might processes multiple key groups. This distribution is done with hash partitioning and hence hard to tune. There might be a way to tweak this by implementing an own low-level operator but I'm not sure. Stefan (in CC) might be able to give some hints. Best, Fabian 2018-06-29 18:35 GMT+02:00 Vijay Balakrishnan : Thanks for the clarification, Fabian.This is what I compromised on for my use-case-doesn't exactly do what I intended to do.Partition by a key, and then spawn threads inside that partition to do my task and then finally repartition again(for a subsequent connect). DataStream keyedByCamCameraStream = env .addSource(new Source()) .keyBy((cameraWithCube) -> cameraWithCube.getCam() );AsyncFunction cameraWithCubeAsyncFunction = new SampleAsyncFunction(, nThreads);//spawn threads here with the second key ts here DataStream cameraWithCubeDataStreamAsync = AsyncDataStream.orderedWait( keyedByCamCameraStream, cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nThreads) .setParallelism( parallelCamTasks);//capacity= max # of inflight requests - how much; timeout - max time until considered failed DataStream cameraWithCubeDataStream = cameraWithCubeDataStreamAsync. keyBy((cameraWithCube) -> cameraWithCube.getTs()); On Thu, Jun 28, 2018 at 9:22 AM ashish pok wrote: Fabian, All, Along this same line, we have a datasource where we have parent key and child key. We need to first keyBy parent and then by child. If we want to have physical partitioning in a way where physical partiotioning happens first by parent key and localize grouping by child key, is there a need to using custom partitioner? Obviously we can keyBy twice but was wondering if we can minimize the re-partition stress. Thanks, Ashish - Ashish On Thursday, June 28, 2018, 9:02 AM, Fabian Hueske wrote: Hi Vijay, Flink does not provide fine-grained control to place keys to certain slots or machines. When specifying a key, it is up to Flink (i.e., its internal hash function) where the data is processed. This works well for large key spaces, but can be difficult if you have only a few keys. So, even if you keyBy(cam) and handle the parallelization of seq# internally (which I would not recommend), it might still happen that the data of two cameras is processed on the same slot.The only way to change that would be to fiddle with the hash of your keys, but this might give you a completely different distribution when scaling out the application at a later point in time. Best, Fabian 2018-06-26 19:54 GMT+02:00 Vijay Balakrishnan : Hi Fabian,Thanks once again for your reply. I need to get the data from each cam/camera into 1 partition/slot and not move the gigantic video data around as much as I perform other operations on it. For eg, I can get seq#1 and seq#2 for cam1 in cam1 partition/slot and then combine, split,parse, stitch etc. operations on it in multiple threads within the same cam1 partition. I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in 1 partition(eg: cam1). The idea is to then work within the cam1 partition with various seq#'s 1,2 etc on various threads within the same slot/partition of TaskManager. The data is stored in EFS keyed based on seq#/cam# folder structure. Our actual problem is managing network bandwidth as a resource in each partition. We want to make sure that the processing of 1 camera(split into multiple seq# tasks) is not running on the same node as the processing of another camera as in that case, the required network bandwidth for storing the output of the process running in the partition would exceed the network bandwidth of the hardware. Camera processing is expected to run on the same hardware as the video decode step which is an earlier sequential process in the same Dataflow pipeline. I guess I might have to use a ThreadPool within each Slot(cam partition) to work on each seq# ?? TIA On Tue, Jun 26, 2018 at 1:06 AM Fabian Hueske wrote: Hi, keyBy() does no
Re: How to partition within same physical node in Flink
Fabian, All, Along this same line, we have a datasource where we have parent key and child key. We need to first keyBy parent and then by child. If we want to have physical partitioning in a way where physical partiotioning happens first by parent key and localize grouping by child key, is there a need to using custom partitioner? Obviously we can keyBy twice but was wondering if we can minimize the re-partition stress. Thanks, Ashish - Ashish On Thursday, June 28, 2018, 9:02 AM, Fabian Hueske wrote: Hi Vijay, Flink does not provide fine-grained control to place keys to certain slots or machines. When specifying a key, it is up to Flink (i.e., its internal hash function) where the data is processed. This works well for large key spaces, but can be difficult if you have only a few keys. So, even if you keyBy(cam) and handle the parallelization of seq# internally (which I would not recommend), it might still happen that the data of two cameras is processed on the same slot.The only way to change that would be to fiddle with the hash of your keys, but this might give you a completely different distribution when scaling out the application at a later point in time. Best, Fabian 2018-06-26 19:54 GMT+02:00 Vijay Balakrishnan : Hi Fabian,Thanks once again for your reply. I need to get the data from each cam/camera into 1 partition/slot and not move the gigantic video data around as much as I perform other operations on it. For eg, I can get seq#1 and seq#2 for cam1 in cam1 partition/slot and then combine, split,parse, stitch etc. operations on it in multiple threads within the same cam1 partition. I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in 1 partition(eg: cam1). The idea is to then work within the cam1 partition with various seq#'s 1,2 etc on various threads within the same slot/partition of TaskManager. The data is stored in EFS keyed based on seq#/cam# folder structure. Our actual problem is managing network bandwidth as a resource in each partition. We want to make sure that the processing of 1 camera(split into multiple seq# tasks) is not running on the same node as the processing of another camera as in that case, the required network bandwidth for storing the output of the process running in the partition would exceed the network bandwidth of the hardware. Camera processing is expected to run on the same hardware as the video decode step which is an earlier sequential process in the same Dataflow pipeline. I guess I might have to use a ThreadPool within each Slot(cam partition) to work on each seq# ?? TIA On Tue, Jun 26, 2018 at 1:06 AM Fabian Hueske wrote: Hi, keyBy() does not work hierarchically. Each keyBy() overrides the previous partitioning.You can keyBy(cam, seq#) which guarantees that all records with the same (cam, seq#) are processed by the same parallel instance.However, Flink does not give any guarantees about how the (cam, seq#) partitions are distributed across slots (or even physical nodes). Btw. why is it important that all records of the same cam are processed by the same physical node? Fabian 2018-06-25 21:36 GMT+02:00 Vijay Balakrishnan : I see a .slotSharingGroup for SingleOutputStreamOperator which can put parallel instances of operations in same TM slot.I also see a CoLocationGroup but do not see a .coLocationGroup for SingleOutputStreamOperator to put a task on the same slot.Seems CoLocationGroup is defined at JobVertex level and has nothing to do with for SingleOutputStreamOperator.TaskManager has many slots. Slots have many threads within it.I want to be able to put the cam1 partition(keyBy(cam) in 1 slot and then use a keyBy(seq#) to run on many threads within that cam1 slot. Vijay On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan wrote: Thanks, Fabian.Been reading your excellent book on Flink Streaming.Can't wait for more chapters.Attached a pic. I have records with seq# 1 and cam1 and cam2. I also have records with varying seq#'s.By partitioning on cam field first(keyBy(cam)), I can get cam1 partition on the same task manager instance/slot/vCore(???)Can I then have seq# 1 and seq# 2 for cam1 partition run in different slots/threads on the same Task Manager instance(aka cam1 partition) using keyBy(seq#) & setParallelism() ? Can forward Strategy be used to achieve this ? TIA On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske wrote: Hi, Flink distributes task instances to slots and does not expose physical machines.Records are partitioned to task instances by hash partitioning. It is also not possible to guarantee that the records in two different operators are send to the same slot.Sharing information by side-passing it (e.g., via a file on a machine or in a static object) is an anti-pattern and should be avoided. Best, Fabian 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan : Hi, Need to partition by cameraWithCube.getCam() 1st using parallelCamTasks(passed in as args). Then wit
Re: Memory Leak in ProcessingTimeSessionWindow
Stefan, All, If there are no further thoughts on this I am going to switch my app to low level Process API. I still think there is an easier solution here which I am missing but I will revisit that after I fix Production issue. Thanks, Ashish Sent from Yahoo Mail for iPhone On Thursday, June 21, 2018, 7:28 AM, ashish pok wrote: Hi Stefan, Thanks for outlining the steps and are similar to what we have been doing for OOM issues. However, I was looking for something more high level on whether state / key handling needs some sort of cleanup specifically that is not done by default. I am about 99% (nothing is certain:)) sure that if I switch this app to a lower lever API like Process Function and manage my own state and timers, I will not see this issue. When I had same issue in the past it was for Global Window and Fabian point d out that new keys are constantly being created. I built a simple Process Function for that and issue went away. I think your first statement sort of hints that as well. So let me hone in on that. I am processing a time series data for network elements. Keys are 10 mins floor of event time concat with element ID. Idea here was to create 10 min buckets of data with windows that start with first event in that bucket and fire when no events arrive for 12 or so minutes.So new keys are definitely being created. So, 1- Am I adding to the memory constantly by doing that? Sounds like it based on your comments.2- If so, whats the way to clear those keys when windows fire if any?3- It seems like a very simple use case, so now I am wondering if I am even using the right high level API? Thanks, Ashish Sent from Yahoo Mail for iPhone On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter wrote: Hi, it is possible that the number of processing time timers can grow, because internal timers are scoped by time, key, and namespace (typically this means „window“, because each key can be part of multiple windows). So if the number of keys in your application is steadily growing this can happen. To analyse the heap dump, I usually take the following approach:- obviously include only reachable objects. If dumps are very big, try limit the size or to trigger the OOM earlier by configuring a lower heap size. It should still give you the problematic object accumulation, if there is one.- like at the statistics of „heavy hitter“ classes, i.e. classes for which the instances contribute the most to the overall heap consumption. Sometimes this will show you classes that are also part of classes that rank higher up, e.g. 1st place could be string, and second place char[]. But you can figure that out in the next step.- explore the instances of the top heavy hitter class(es). If there is a leak, if you just randomly sample into some objects, the likelihood is usually *very* high that you catch an object that is part of the leak (as determined in the next step). Otherwise just repeat and sample another object.- inspect the object instance and follow the reference links to the parent objects in the object graph that hold a reference to the leak object candidate. You will typically end up in some array where the leak accumulates. Inspect the object holding references to the leaking objects. You can see the field values and this can help to determine if the collection of objects is justified or if data is actually leaking. So in your case, you can start from some InternalTimer or Window object, backwards through the reference chain to see what class is holding onto them and why (e.g. should they already be gone w.r.t. to their timestamp). Walking through the references should be supported by all major heap analysis tools, including JVisualVM that comes with your JDK. You can also use OQL[1] to query for timers or windows that should already be gone. Overall I think it could at least be helpful to see the statistics for heavy hitter classes and screenshots of representative reference chains to objects to figure out the problem cause. If it is not possible to share heap dumps, unfortunately I think giving you this strategy is currently the best I can offer to help. Best,Stefan [1] https://blogs.oracle.com/sundararajan/querying-java-heap-with-oql Am 20.06.2018 um 02:33 schrieb ashish pok : All, I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, they are 5GB to 8GB. I did some compares and what I can see is heap shows data tuples (basically instances of object that is maintained as states) counts going up slowly. Only thing I could possibly relate that to were streaming.api.operators.InternalTimer and streaming.api.windowing.windows.TimeWindow both were trending up as well. There are definitely lot more windows created than the increments I could notice but nevertheless those objects are trending up. Input stream has a very consistent sin wave throughput. So it really doesn't make sense for windows and tuples to
Re: Memory Leak in ProcessingTimeSessionWindow
Hi Stefan, Thanks for outlining the steps and are similar to what we have been doing for OOM issues. However, I was looking for something more high level on whether state / key handling needs some sort of cleanup specifically that is not done by default. I am about 99% (nothing is certain:)) sure that if I switch this app to a lower lever API like Process Function and manage my own state and timers, I will not see this issue. When I had same issue in the past it was for Global Window and Fabian point d out that new keys are constantly being created. I built a simple Process Function for that and issue went away. I think your first statement sort of hints that as well. So let me hone in on that. I am processing a time series data for network elements. Keys are 10 mins floor of event time concat with element ID. Idea here was to create 10 min buckets of data with windows that start with first event in that bucket and fire when no events arrive for 12 or so minutes.So new keys are definitely being created. So, 1- Am I adding to the memory constantly by doing that? Sounds like it based on your comments.2- If so, whats the way to clear those keys when windows fire if any?3- It seems like a very simple use case, so now I am wondering if I am even using the right high level API? Thanks, Ashish Sent from Yahoo Mail for iPhone On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter wrote: Hi, it is possible that the number of processing time timers can grow, because internal timers are scoped by time, key, and namespace (typically this means „window“, because each key can be part of multiple windows). So if the number of keys in your application is steadily growing this can happen. To analyse the heap dump, I usually take the following approach:- obviously include only reachable objects. If dumps are very big, try limit the size or to trigger the OOM earlier by configuring a lower heap size. It should still give you the problematic object accumulation, if there is one.- like at the statistics of „heavy hitter“ classes, i.e. classes for which the instances contribute the most to the overall heap consumption. Sometimes this will show you classes that are also part of classes that rank higher up, e.g. 1st place could be string, and second place char[]. But you can figure that out in the next step.- explore the instances of the top heavy hitter class(es). If there is a leak, if you just randomly sample into some objects, the likelihood is usually *very* high that you catch an object that is part of the leak (as determined in the next step). Otherwise just repeat and sample another object.- inspect the object instance and follow the reference links to the parent objects in the object graph that hold a reference to the leak object candidate. You will typically end up in some array where the leak accumulates. Inspect the object holding references to the leaking objects. You can see the field values and this can help to determine if the collection of objects is justified or if data is actually leaking. So in your case, you can start from some InternalTimer or Window object, backwards through the reference chain to see what class is holding onto them and why (e.g. should they already be gone w.r.t. to their timestamp). Walking through the references should be supported by all major heap analysis tools, including JVisualVM that comes with your JDK. You can also use OQL[1] to query for timers or windows that should already be gone. Overall I think it could at least be helpful to see the statistics for heavy hitter classes and screenshots of representative reference chains to objects to figure out the problem cause. If it is not possible to share heap dumps, unfortunately I think giving you this strategy is currently the best I can offer to help. Best,Stefan [1] https://blogs.oracle.com/sundararajan/querying-java-heap-with-oql Am 20.06.2018 um 02:33 schrieb ashish pok : All, I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, they are 5GB to 8GB. I did some compares and what I can see is heap shows data tuples (basically instances of object that is maintained as states) counts going up slowly. Only thing I could possibly relate that to were streaming.api.operators.InternalTimer and streaming.api.windowing.windows.TimeWindow both were trending up as well. There are definitely lot more windows created than the increments I could notice but nevertheless those objects are trending up. Input stream has a very consistent sin wave throughput. So it really doesn't make sense for windows and tuples to keep trending up. There is also no event storm or anything of that sort (ie. source stream has been very steady as far as throughput is concerned). Here is a plot of heap utilization: <1529454480422blob.jpg> So it has a typical sin wave pattern which is definitely expected as input stream has the same pattern but source doesnt h
Re: Memory Leak in ProcessingTimeSessionWindow
All, I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, they are 5GB to 8GB. I did some compares and what I can see is heap shows data tuples (basically instances of object that is maintained as states) counts going up slowly. Only thing I could possibly relate that to were streaming.api.operators.InternalTimer and streaming.api.windowing.windows.TimeWindow both were trending up as well. There are definitely lot more windows created than the increments I could notice but nevertheless those objects are trending up. Input stream has a very consistent sin wave throughput. So it really doesn't make sense for windows and tuples to keep trending up. There is also no event storm or anything of that sort (ie. source stream has been very steady as far as throughput is concerned). Here is a plot of heap utilization: So it has a typical sin wave pattern which is definitely expected as input stream has the same pattern but source doesnt have a trend upwards like heap utilization shown above. Screenshot above is showing spike from 60% utilization to 80% and trend keeps going up until an issue occurs that resets the app. Since processing is based on ProcessingTime, I really would have expected memory to reach a steady state and remain sort of flat from a trending perspective. Appreciate any pointers anyone might have. Thanks, Ashish On Monday, June 18, 2018, 12:54:03 PM EDT, ashish pok wrote: Right, thats where I am headed now but was wondering there are any “gochas” I am missing before I try and dig into a few gigs of heap dump. Thanks, Ashish Sent from Yahoo Mail for iPhone On Monday, June 18, 2018, 3:37 AM, Stefan Richter wrote: Hi, can you take a heap dump from a JVM that runs into the problem and share it with us? That would make finding the cause a lot easier. Best,Stefan Am 15.06.2018 um 23:01 schrieb ashish pok : All, I have another slow Memory Leak situation using basic TimeSession Window (earlier it was GlobalWindow related that Fabian helped clarify). I have a very simple data pipeline: DataStream processedData = rawTuples .window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780 .trigger(new ProcessingTimePurgeTrigger()) .apply(new IPSLAMetricWindowFn()) .name("windowFunctionTuple") .map(new TupleToPlatformEventMapFn()) .name("mapTupleEvent") ; I initially didnt even have ProcessingTmePurgeTrigger and it was using default Trigger. In an effort to fix this issue, I created my own Trigger from default ProcessingTimeTrigger with simple override to onProcessingTime method (essentially replacing FIRE with FIRE_AND_PURGE) @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; } This seems to have done nothing (may have delayed issue by couple of hours - not certain). But, I still see heap utilization creep up slowly and eventually reaches a point when GC starts to take too long and then the dreaded OOM. For completeness here is my Window Function (still using old function interface). It creates few metrics for reporting and applies logic by looping over the Iterable. NO states are explicitly kept in this function, needed RichWindowFunction to generate metrics basically. public class IPSLAMetricWindowFn extends RichWindowFunction { private static final long serialVersionUID = 1L; private static Logger logger = LoggerFactory.getLogger(IPSLAMetricWindowFn.class); private Meter in; private Meter out; private Meter error; @Override public void open(Configuration conf) throws Exception { this.in = getRuntimeContext() .getMetricGroup() .addGroup(AppConstants.APP_METRICS.PROCESS) .meter(AppConstants.APP_METRICS.IN, new MeterView(AppConstants.APP_METRICS.INTERVAL_30)); this.out = getRuntimeContext() .getMetricGroup() .addGroup(AppConstants.APP_METRICS.PROCESS) .meter(AppConstants.APP_METRICS.OUT, new MeterView(AppConstants.APP_METRICS.INTERVAL_30)); this.error = getRuntimeContext() .getMetricGroup() .addGroup(AppConstants.APP_METRICS.PROCESS) .meter(AppConstants.APP_METRICS.ERROR, new MeterView(AppConstants.APP_METRICS.INTERVAL_30)); super.open(conf); } @Override public void apply(String key, TimeWindow window, Iterable events, Collector collector) throws Exception { } } Appreciate any pointers on what could be causing leaks here. This seems pretty straight-forward. Thanks, Ashish
Re: Memory Leak in ProcessingTimeSessionWindow
Right, thats where I am headed now but was wondering there are any “gochas” I am missing before I try and dig into a few gigs of heap dump. Thanks, Ashish Sent from Yahoo Mail for iPhone On Monday, June 18, 2018, 3:37 AM, Stefan Richter wrote: Hi, can you take a heap dump from a JVM that runs into the problem and share it with us? That would make finding the cause a lot easier. Best,Stefan Am 15.06.2018 um 23:01 schrieb ashish pok : All, I have another slow Memory Leak situation using basic TimeSession Window (earlier it was GlobalWindow related that Fabian helped clarify). I have a very simple data pipeline: DataStream processedData = rawTuples .window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780 .trigger(new ProcessingTimePurgeTrigger()) .apply(new IPSLAMetricWindowFn()) .name("windowFunctionTuple") .map(new TupleToPlatformEventMapFn()) .name("mapTupleEvent") ; I initially didnt even have ProcessingTmePurgeTrigger and it was using default Trigger. In an effort to fix this issue, I created my own Trigger from default ProcessingTimeTrigger with simple override to onProcessingTime method (essentially replacing FIRE with FIRE_AND_PURGE) @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; } This seems to have done nothing (may have delayed issue by couple of hours - not certain). But, I still see heap utilization creep up slowly and eventually reaches a point when GC starts to take too long and then the dreaded OOM. For completeness here is my Window Function (still using old function interface). It creates few metrics for reporting and applies logic by looping over the Iterable. NO states are explicitly kept in this function, needed RichWindowFunction to generate metrics basically. public class IPSLAMetricWindowFn extends RichWindowFunction { private static final long serialVersionUID = 1L; private static Logger logger = LoggerFactory.getLogger(IPSLAMetricWindowFn.class); private Meter in; private Meter out; private Meter error; @Override public void open(Configuration conf) throws Exception { this.in = getRuntimeContext() .getMetricGroup() .addGroup(AppConstants.APP_METRICS.PROCESS) .meter(AppConstants.APP_METRICS.IN, new MeterView(AppConstants.APP_METRICS.INTERVAL_30)); this.out = getRuntimeContext() .getMetricGroup() .addGroup(AppConstants.APP_METRICS.PROCESS) .meter(AppConstants.APP_METRICS.OUT, new MeterView(AppConstants.APP_METRICS.INTERVAL_30)); this.error = getRuntimeContext() .getMetricGroup() .addGroup(AppConstants.APP_METRICS.PROCESS) .meter(AppConstants.APP_METRICS.ERROR, new MeterView(AppConstants.APP_METRICS.INTERVAL_30)); super.open(conf); } @Override public void apply(String key, TimeWindow window, Iterable events, Collector collector) throws Exception { } } Appreciate any pointers on what could be causing leaks here. This seems pretty straight-forward. Thanks, Ashish
Memory Leak in ProcessingTimeSessionWindow
All, I have another slow Memory Leak situation using basic TimeSession Window (earlier it was GlobalWindow related that Fabian helped clarify). I have a very simple data pipeline: DataStream processedData = rawTuples .window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780 .trigger(new ProcessingTimePurgeTrigger()) .apply(new IPSLAMetricWindowFn()) .name("windowFunctionTuple") .map(new TupleToPlatformEventMapFn()) .name("mapTupleEvent") ; I initially didnt even have ProcessingTmePurgeTrigger and it was using default Trigger. In an effort to fix this issue, I created my own Trigger from default ProcessingTimeTrigger with simple override to onProcessingTime method (essentially replacing FIRE with FIRE_AND_PURGE) @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; } This seems to have done nothing (may have delayed issue by couple of hours - not certain). But, I still see heap utilization creep up slowly and eventually reaches a point when GC starts to take too long and then the dreaded OOM. For completeness here is my Window Function (still using old function interface). It creates few metrics for reporting and applies logic by looping over the Iterable. NO states are explicitly kept in this function, needed RichWindowFunction to generate metrics basically. public class IPSLAMetricWindowFn extends RichWindowFunction { private static final long serialVersionUID = 1L; private static Logger logger = LoggerFactory.getLogger(IPSLAMetricWindowFn.class); private Meter in; private Meter out; private Meter error; @Override public void open(Configuration conf) throws Exception { this.in = getRuntimeContext() .getMetricGroup() .addGroup(AppConstants.APP_METRICS.PROCESS) .meter(AppConstants.APP_METRICS.IN, new MeterView(AppConstants.APP_METRICS.INTERVAL_30)); this.out = getRuntimeContext() .getMetricGroup() .addGroup(AppConstants.APP_METRICS.PROCESS) .meter(AppConstants.APP_METRICS.OUT, new MeterView(AppConstants.APP_METRICS.INTERVAL_30)); this.error = getRuntimeContext() .getMetricGroup() .addGroup(AppConstants.APP_METRICS.PROCESS) .meter(AppConstants.APP_METRICS.ERROR, new MeterView(AppConstants.APP_METRICS.INTERVAL_30)); super.open(conf); } @Override public void apply(String key, TimeWindow window, Iterable events, Collector collector) throws Exception { } } Appreciate any pointers on what could be causing leaks here. This seems pretty straight-forward. Thanks, Ashish
IoT Use Case, Problem and Thoughts
Fabian, Stephan, All, I started a discussion a while back around having a form of event-based checkpointing policy that will help us in some of our high volume data pipelines. Here is an effort to put this in front of community and understand what capabilities can support these type of use cases, how much others feel the same need and potentially a feature that can make it to a user story. Use Case Summary:- Extremely high volume of data (events from consumer devices with customer base of over 100M)- Multiple events need to be combined using a windowing streaming app grouped by keys (something like 5 min floor of timestamp and unique identifiers for customer devices)- "Most" events by a group/key arrive in few seconds if not milliseconds however events can sometimes delay or get lost in transport (so delayed event handling and timeouts will be needed)- Extremely low (pretty vague but hopefully details below clarify it more) data loss is acceptable- Because of the volume and transient nature of source, checkpointing is turned off (saves on writes to persistence as states/sessions are active for only few seconds during processing) Problem Summary:Of course, none of the above is out of the norm for Flink and as a matter of factor we already have a Flink app doing this. The issue arises when it comes to graceful shutdowns and on operator failures (eg: Kafka timeouts etc.) On operator failures, entire job graph restarts which essentially flushes out in-memory states/sessions. I think there is a feature in works (not sure if it made it to 1.5) to perform selective restarts which will control the damage but still will result in data loss. Also, it doesn't help when application restarts are needed. We did try going savepoint route for explicit restart needs but I think MemoryBackedState ran into issues for larger states or something along those line(not certain). We obviously cannot recover an operator that actually fails because it's own state could be unrecoverable. However, it feels like Flink already has a lot of plumbing to help with overall problem of allowing some sort of recoverable state to handle graceful shutdowns and restarts with minimal data loss. Solutions:Some in community commented on my last email with decent ideas like having an event-based checkpointing trigger (on shutdown, on restart etc) or life-cycle hooks (onCancel, onRestart etc) in Functions that can be implemented if this type of behavior is needed etc. Appreciate feedback from community on how useful this might be for others and from core contributors on their thoughts as well. Thanks in advance, Ashish
Re: Best way to clean-up states in memory
Thanks Fabian, Kostas, Here is what I had in the Trigger - idea is to run bitwise OR until a threshold is reached or a timeout is reached (nothing too fancy here). Let me know what you guys think. Like I said, I moved this logic to Process Function and I haven't seen the same issue I was with this. @PublicEvolvingpublic class BitwiseOrTrigger extends Trigger { private static final long serialVersionUID = 1L; private final int threshold; private final long epocDelta; private final ReducingStateDescriptor> stateDesc = new ReducingStateDescriptor<>("bitwiseOr", new BitwiseOr(), TypeInformation.of(new TypeHint>() {})); private BitwiseOrTrigger(int threshold, long allowedLateness) { this.threshold = threshold; this.epocDelta = allowedLateness; } @Override public TriggerResult onElement(FactoredEvent event, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState> currState = ctx.getPartitionedState(stateDesc); if (this.epocDelta>0) { ctx.registerProcessingTimeTimer(System.currentTimeMillis() + this.epocDelta); } currState.add(new Tuple2(event.getFactor(), this.epocDelta)); if (currState.get().f0 >= threshold) { currState.clear(); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.FIRE_AND_PURGE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); } @Override public boolean canMerge() { return true; } @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); } @Override public String toString() { return "BitwiseOrTrigger(" + threshold + ")"; } public static BitwiseOrTrigger of(int threshold, long expirationEpoc) { return new BitwiseOrTrigger<>(threshold, expirationEpoc); } private static class BitwiseOr implements ReduceFunction> { private static final long serialVersionUID = 1L; @Override public Tuple2 reduce(Tuple2 tup1, Tuple2 tup2) throws Exception { Tuple2 retTup = tup1; retTup.f0 = tup1.f0 | tup2.f0; return retTup; } }} On Monday, May 14, 2018, 6:00:11 AM EDT, Fabian Hueske wrote: Hi Ashish, Did you use per-window state (also called partitioned state) in your Trigger? If yes, you need to make sure that it is completely removed in the clear() method because processing time timers won't fire once a window was purged. So you cannot (fully) rely on timers to clean up per-window state. Best, Fabian 2018-05-14 9:34 GMT+02:00 Kostas Kloudas : Hi Ashish, It would be helpful to share the code of your custom trigger for the first case.Without that, we cannot tell what state you create and how/when you update/clear it. Cheers,Kostas On May 14, 2018, at 1:04 AM, ashish pok wrote: Hi Till, Thanks for getting back. I am sure that will fix the issue but I feel like that would potentially mask an issue. I have been going back and forth with Fabian on a use case where for some of our highly transient datasets, it might make sense to just use memory based state (except of course data loss becomes an issue when apps occasionally hit a problem and whole job restarts or app has to be taken down etc - ie. handling graceful shutdowns / restarts better essentially). I was on the hook to create a business case and post it back to this forum (which I am hoping I can get around to at some point soon). Long story short, this is one of those datasets. States in this case are either fired and cleared normally or on processing timeout. So technically, unless there is a memory leak in app code, memory usage should plateau out at a high-point. What I was noticing was memory would start to creep up ever so slowly. I couldn't tell exactly why heap utilization kept on growing (ever so slowly but it had upward trend for sure) because the states should technically be cleared if not as part of a reducing function then on timeout. App after running for couple of days would then run into Java Heap issues. So changing to RocksDB probably will fix the issue but not necessarily leak of states that should be cleared IMO. Interestingly, I switched my app from using something like this: WindowedStream windowedStats = statsStream .keyBy(BasicFactTuple::getKey) .window(GlobalWindows.create() ) .trigger(BitwiseOrTrigger.of( 60, AppConfigs.getWindowSize(5*60* 1000))) ; To DataStream processStats = pipStatsStream .keyBy(BasicFactTuple::getKey) .process(new IfStatsReduceProcessFn( AppConfigs.getWindowSize(5*60* 1000), 60)) I basically moved logic of trigger to process function over the weekend. Once I did
Re: Best way to clean-up states in memory
Hi Till, Thanks for getting back. I am sure that will fix the issue but I feel like that would potentially mask an issue. I have been going back and forth with Fabian on a use case where for some of our highly transient datasets, it might make sense to just use memory based state (except of course data loss becomes an issue when apps occasionally hit a problem and whole job restarts or app has to be taken down etc - ie. handling graceful shutdowns / restarts better essentially). I was on the hook to create a business case and post it back to this forum (which I am hoping I can get around to at some point soon). Long story short, this is one of those datasets. States in this case are either fired and cleared normally or on processing timeout. So technically, unless there is a memory leak in app code, memory usage should plateau out at a high-point. What I was noticing was memory would start to creep up ever so slowly. I couldn't tell exactly why heap utilization kept on growing (ever so slowly but it had upward trend for sure) because the states should technically be cleared if not as part of a reducing function then on timeout. App after running for couple of days would then run into Java Heap issues. So changing to RocksDB probably will fix the issue but not necessarily leak of states that should be cleared IMO. Interestingly, I switched my app from using something like this: WindowedStream windowedStats = statsStream .keyBy(BasicFactTuple::getKey) .window(GlobalWindows.create()) .trigger(BitwiseOrTrigger.of(60, AppConfigs.getWindowSize(5*60*1000))) ; To DataStream processStats = pipStatsStream .keyBy(BasicFactTuple::getKey) .process(new IfStatsReduceProcessFn(AppConfigs.getWindowSize(5*60*1000), 60)) I basically moved logic of trigger to process function over the weekend. Once I did that, heap is completely stabilized. In trigger implementation, I was using FIRE_AND_PURGE on trigger condition or onProcessingTime and in process implementation I am using .clear() method for same. I seem to have solved the problem by using process but I'd be interested to understand the cause of why heap would creep up in trigger scenario. Hope this makes sense, Ashish On Sunday, May 13, 2018, 4:06:59 PM EDT, Till Rohrmann wrote: Hi Ashish, have you tried using Flink's RocksDBStateBackend? If your job accumulates state exceeding the available main memory, then you have to use a state backend which can spill to disk. The RocksDBStateBackend offers you exactly this functionality. Cheers,Till On Mon, Apr 30, 2018 at 3:54 PM, ashish pok wrote: All, I am using noticing heap utilization creeping up slowly in couple of apps which eventually lead to OOM issue. Apps only have 1 process function that cache state. I did make sure I have a clear method invoked when events are collected normally, on exception and on timeout. Are any other best practices others follow for memory backed states? Thanks, -- Ashish
Best way to clean-up states in memory
All, I am using noticing heap utilization creeping up slowly in couple of apps which eventually lead to OOM issue. Apps only have 1 process function that cache state. I did make sure I have a clear method invoked when events are collected normally, on exception and on timeout. Are any other best practices others follow for memory backed states? Thanks, -- Ashish
Re: Scaling down Graphite metrics
Thanks for that tip about override, will give that a shot at some point. We are already using interval. -- Ashish On Sun, Apr 15, 2018 at 6:18 PM, Chesnay Schepler wrote: Hello, you can configure the rate at which metrics are reported by setting "metrics.reporter..interval" as described in the reporter documentation. At this time there is no way to disable specific metrics. You can however extend the reporter that you are using and override "MetricReporter#notifyOfAddedMetric(Metric, String, MetricGroup)" to ignore metrics that you aren't interested in. On 13.04.2018 18:52, ashish pok wrote: All, We love Flinks OOTB metrics but boy there is a ton :) Any way to scale them down (frequency and metric itself)? Flink apps are becoming huge source of data right now. Thanks, -- Ashish
Scaling down Graphite metrics
All, We love Flinks OOTB metrics but boy there is a ton :) Any way to scale them down (frequency and metric itself)? Flink apps are becoming huge source of data right now. Thanks, -- Ashish
Re: Fw: 1.4.2 - Unable to start YARN containers with more than 1 vCores per Task Manager
Thanks Shashank, I will check ot out. -- Ashish On Tue, Apr 3, 2018 at 10:11 AM, shashank734 wrote: CHeck in your Yarn configuration, Are you using DeafaultResourceCalculater which only uses memory while allocating resources. So you have to change it to DominantResourceCalculator. yarn.scheduler.capacity.resource-calculator org.apache.hadoop.yarn.util.resource.DominantResourceCalculator Check this: https://hortonworks.com/blog/managing-cpu-resources-in-your-hadoop-yarn-clusters/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Fw: 1.4.2 - Unable to start YARN containers with more than 1 vCores per Task Manager
Hi All, I had been using the following command in a Lab environment successfully in 1.3 Flink version. yarn-session.sh -n 4 -s 4 -jm 2048 -tm 2048 -Dyarn.containers.vcores=2 -nm infra.test3 As expected, I see 4 TMs with 16 slots and taking 8 vCores from YARN. In a new Prod environment, I am using 1.4.2 version and I am only seeing 1 vCore per container with the same command. I did check YARN config and max vCores allowed per container is configured as 48. Anyone has seen something similar or has idea on what we can look into? Might be YARN config? Thanks, Ashish
Re: Error running on Hadoop 2.7
Stephan, we are in 1.4.2. Thanks, -- Ashish On Mon, Mar 26, 2018 at 7:38 AM, Stephan Ewen wrote: If you are on Flink 1.4.0 or 1.4.1, please check if you accidentally have Hadoop in your application jar. That can mess up things with child-first classloading. 1.4.2 should handle Hadoop properly in any case. On Sun, Mar 25, 2018 at 3:26 PM, Ashish Pokharel wrote: Hi Ken, Yes - we are on 1.4. Thanks for that link - it certainly now explains how things are working :) We currently don’t have HADOOP_CLASSPATH env var setup and “hadoop class path” command basically points to HDP2.6 locations (HDP = Hortonworks Data Platform). Best guess I have for this right now is HDP2.6 back ported some 2.9 changes into their distro. This is on my list to get to the bottom of (hopefully no hiccups till prod) - we double checked our Salt Orchestration packages which were used to built the cluster but couldn’t find a reference to hadoop 2.9. For now, we are moving on with our testing to prepare for deployment with hadoop free version which is using hadoop classpath as described in FLINK-7477. Thanks, Ashish On Mar 23, 2018, at 12:31 AM, Ken Krugler wrote: Hi Ashish, Are you using Flink 1.4? If so, what does the “hadoop classpath” command return from the command line where you’re trying to start the job? Asking because I’d run into issues with https://issues.apache. org/jira/browse/FLINK-7477, where I had a old version of Hadoop being referenced by the “hadoop" command. — Ken On Mar 22, 2018, at 7:05 PM, Ashish Pokharel wrote: Hi All, Looks like we are out of the woods for now (so we think) - we went with Hadoop free version and relied on client libraries on edge node. However, I am still not very confident as I started digging into that stack as well and realized what Till pointed out (traces leads to a class that is part of 2.9). I did dig around env variables and nothing was set. This is a brand new clustered installed a week back and our team is literally the first hands on deck. I will fish around and see if Hortonworks back-ported something for HDP (dots are still not completely connected but nonetheless, we have a test session and app running in our brand new Prod) Thanks, Ashish On Mar 22, 2018, at 4:47 AM, Till Rohrmann wrote: Hi Ashish, the class ` RequestHedgingRMFailoverProxyP rovider` was only introduced with Hadoop 2.9.0. My suspicion is thus that you start the client with some Hadoop 2.9.0 dependencies on the class path. Could you please check the logs of the client what's on its class path? Maybe you could also share the logs with us. Please also check whether HADOOP_CLASSPATH is set to something suspicious. Thanks a lot! Cheers,Till On Wed, Mar 21, 2018 at 6:25 PM, ashish pok wrote: Hi Piotrek, At this point we are simply trying to start a YARN session. BTW, we are on Hortonworks HDP 2.6 which is on 2.7 Hadoop if anyone has experienced similar issues. We actually pulled 2.6 binaries for the heck of it and ran into same issues. I guess we are left with getting non-hadoop binaries and set HADOOP_CLASSPATH then? -- Ashish On Wed, Mar 21, 2018 at 12:03 PM, Piotr Nowojski wrote: Hi, > Does some simple word count example works on the cluster after the upgrade? If not, maybe your job is pulling some dependency that’s causing this version conflict? Piotrek On 21 Mar 2018, at 16:52, ashish pok wrote: Hi Piotrek, Yes, this is a brand new Prod environment. 2.6 was in our lab. Thanks, -- Ashish On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski wrote: Hi, Have you replaced all of your old Flink binaries with freshly downloaded Hadoop 2.7 versions? Are you sure that something hasn't mix in the process? Does some simple word count example works on the cluster after the upgrade? Piotrek On 21 Mar 2018, at 16:11, ashish pok wrote: Hi All, We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem like :) We definitely are using 2.7 binaries but it looks like there is a call here to a private methos which screams runtime incompatibility. Anyone has seen this and have pointers? Thanks, Ashish Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client. ConfiguredRMFailoverProxyProvi der.getProxyInternal()Ljava/la ng/Object; from class org.apache.hadoop.yarn.client. RequestHedgingRMFailoverProxyP rovider at org.apache.hadoop.yarn.client. RequestHedgingRMFailoverProxyP rovider.init(RequestHedgingRMF ailoverProxyProvider.java:75) at org.apache.hadoop.yarn.client. RMProxy.createRMFailoverProxyP rovider(RMProxy.java:163) at org.apache.hadoop.yarn.client. RMProxy.createRMProxy(RMProxy. java:94) at org.apache.hadoop.yarn.client. ClientRMProxy.createRMProxy(Cl ientRMProxy.java:72) at org.apache.hadoop.yarn.client. ap
Re: "dynamic" bucketing sink
Hi Christophe, Have you looked at Kite SDK? We do something like this but using Gobblin and Kite SDK, which is a parallel pipeline to Flink. It feels like if you partition by something logical like topic name, you should be able to sink using Kite SDK. Kite allows you good ways to handle further partitoning like using timestamp and also schema evolution if you are using AVRO. -- Ashish On Mon, Mar 26, 2018 at 4:57 AM, Timo Walther wrote: Hi Christophe, I think this will require more effort. As far as I know there is no such "dynamic" feature. Have you looked in to the bucketing sink code? Maybe you can adapt it to your needs? Otherwise it might also make sense to open an issue for it to discuss a design for it. Maybe other contributors are interested in this feature as well. Regards, Timo Am 23.03.18 um 18:20 schrieb Christophe Jolif: Hi all, I'm using the nice topic pattern feature on the KafkaConsumer to read from multiple topics, automatically discovering new topics added into the system. At the end of the processing I'm sinking the result into a Hadoop Filesystem using a BucketingSink. All works great until I get the requirement to sink into a different Hadoop Filesystem based on the input topic. One way to do this would obviously be to get rid of the topic pattern and start a (similar) job per topic which would each get its own sink to its own filesystem. And start new jobs when new topics are added. But that's far from being ideal. This would lead to the usual issues with Flink and a dynamic number of jobs (requiring new task slots...) also obviously it would require some external machinery to know new topics have been added and create new jobs etc... What would be the recommended way to have a "dynamic" BucketingSink that can not only write to several basePath (not too hard I guess) but also dynamically add new base path when new topics are coming into the system. Thanks, -- Christophe
Re: Error running on Hadoop 2.7
Hi Piotrek, At this point we are simply trying to start a YARN session. BTW, we are on Hortonworks HDP 2.6 which is on 2.7 Hadoop if anyone has experienced similar issues. We actually pulled 2.6 binaries for the heck of it and ran into same issues. I guess we are left with getting non-hadoop binaries and set HADOOP_CLASSPATH then? -- Ashish On Wed, Mar 21, 2018 at 12:03 PM, Piotr Nowojski wrote: Hi, > Does some simple word count example works on the cluster after the upgrade? If not, maybe your job is pulling some dependency that’s causing this version conflict? Piotrek On 21 Mar 2018, at 16:52, ashish pok wrote: Hi Piotrek, Yes, this is a brand new Prod environment. 2.6 was in our lab. Thanks, -- Ashish On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski wrote: Hi, Have you replaced all of your old Flink binaries with freshly downloaded Hadoop 2.7 versions? Are you sure that something hasn't mix in the process? Does some simple word count example works on the cluster after the upgrade? Piotrek On 21 Mar 2018, at 16:11, ashish pok wrote: Hi All, We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem like :) We definitely are using 2.7 binaries but it looks like there is a call here to a private methos which screams runtime incompatibility. Anyone has seen this and have pointers? Thanks, Ashish Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75) at org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163) at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94) at org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
Re: Error running on Hadoop 2.7
Hi Piotrek, Yes, this is a brand new Prod environment. 2.6 was in our lab. Thanks, -- Ashish On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski wrote: Hi, Have you replaced all of your old Flink binaries with freshly downloaded Hadoop 2.7 versions? Are you sure that something hasn't mix in the process? Does some simple word count example works on the cluster after the upgrade? Piotrek On 21 Mar 2018, at 16:11, ashish pok wrote: Hi All, We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem like :) We definitely are using 2.7 binaries but it looks like there is a call here to a private methos which screams runtime incompatibility. Anyone has seen this and have pointers? Thanks, Ashish Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75) at org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163) at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94) at org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
Error running on Hadoop 2.7
Hi All, We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem like :) We definitely are using 2.7 binaries but it looks like there is a call here to a private methos which screams runtime incompatibility. Anyone has seen this and have pointers? Thanks, Ashish Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider at org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75) at org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163) at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94) at org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
Restart hook and checkpoint
All, It looks like Flink's default behavior is to restart all operators on a single operator error - in my case it is a Kafka Producer timing out. When this happens, I see logs that all operators are restarted. This essentially leads to data loss. In my case the volume of data is so high that it is becoming very expensive to checkpoint. I was wondering if Flink has a lifecycle hook to attach a forced checkpointing before restarting operators. That will solve a dire production issue for us. Thanks, -- Ashish
Re: Task Manager detached under load
@Jelmer, this is Till's las response on the issue. -- Ashish On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann wrote: Hi, this sounds like a serious regression wrt Flink 1.3.2 and we should definitely find out what's causing this problem. Given from what I see in the logs, the following happens: For some time the JobManager seems to no longer receive heartbeats from the TaskManager. This could be, for example, due to long GC pauses or heavy load which starves the ActorSystem's threads which are responsible for sending the heartbeats. Due to this, the TM's ActorSystem is quarantined which effectively renders them useless because the JM will henceforth ignore all messages from these systems. The only way to resolve this problem is to restart the ActorSystem. By setting taskmanager.exit-on-fatal-akka-error to true in flink-conf.yaml, a quarantined TM will shut down. If you run the Flink cluster on Yarn, then a new substitute TM will be started if you have still some container restarts left. That way, the system should be able to recover. Additionally you could try to play around with akka.watch.heartbeat.interval and akka.watch.heartbeat.pause which control the heartbeat interval and the acceptable pause. By increasing the latter, the system should tolerate longer GC pauses and period of high load. However, this only addresses the symptoms of the problem and I'd like to find out what's causing the problem. In order to further debug the problem, it would be really helpful to obtain the logs of the JobManager and the TaskManagers on DEBUG log level and with taskmanager.debug.memory.startLogThread set to true. Additionally it would be interesting to see whats happening on the TaskManagers when you observe high load. So obtaining a profiler dump via VisualVM would be great. And last but not least, it also helps to learn more about the job you're running. What kind of connectors is it using? Are you using Flink's metric system? How is the Flink cluster deployed? Which other libraries are you using in your job? Thanks a lot for your help! Cheers,Till On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick wrote: I've seen a similar issue while running successive Flink SQL batches on 1.4. In my case, the Job Manager would fail with the log output about unreachability (with an additional statement about something going "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where everything works perfectly, but we will try again soon on 1.4. When we do I will post the actual log output. This was on YARN in AWS, with akka.ask.timeout = 60s. On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel wrote: I haven’t gotten much further with this. It doesn’t look like GC related - at least GC counters were not that atrocious. However, my main concern was once the load subsides why aren’t TM and JM connecting again? That doesn’t look normal. I could definitely tell JM was listening on the port and from logs it does appear TM is trying to message JM that is still alive. Thanks, Ashish On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard wrote: Hi. Did you find a reason for the detaching ?I sometimes see the same on our system running Flink 1.4 on dc/os. I have enabled taskmanager.Debug.memory.start logthread for debugging. Med venlig hilsen / Best regardsLasse Nedergaard Den 20. jan. 2018 kl. 12.57 skrev Kien Truong : Hi, You should enable and check your garbage collection log. We've encountered case where Task Manager disassociated due to long GC pause. Regards, Kien On 1/20/2018 1:27 AM, ashish pok wrote: Hi All, We have hit some load related issues and was wondering if any one has some suggestions. We are noticing task managers and job managers being detached from each other under load and never really sync up again. As a result, Flink session shows 0 slots available for processing. Even though, apps are configured to restart it isn't really helping as there are no slots available to run the apps. Here are excerpt from logs that seemed relevant. (I am trimming out rest of the logs for brevity) Job Manager: 2018-01-19 12:38:00,423 INFO org.apache.flink.runtime.jobma nager.JobManager - Starting JobManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) 2018-01-19 12:38:00,792 INFO org.apache.flink.runtime.jobma nager.JobManager - Maximum heap size: 16384 MiBytes 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma nager.JobManager - Hadoop version: 2.6.5 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma nager.JobManager - JVM Options: 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma nager.JobManager - -Xms16384m 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma nager.JobManager - -Xmx16384m 2018-01-19 1
Re: Task manager not able to rejoin job manager after network hicup
We see the same in 1.4. I dont think we could see this in 1.3. I had started a thread a while back on this. Till asked for more details. I havent had a chance to get back to him on this. If you can repro this easily perhaps you can get to it faster. I will find the thread and resend. Thanks, -- Ashish On Fri, Feb 23, 2018 at 9:56 AM, jelmer wrote: We found out there's a taskmanager.exit-on-fatal-akka-error property that will restart flink in this situation but it is not enabled by default and that feels like a rather blunt tool. I expect systems like this to be more resilient to this On 23 February 2018 at 14:42, Aljoscha Krettek wrote: @Till Is this the expected behaviour or do you suspect something could be going wrong? On 23. Feb 2018, at 08:59, jelmer wrote: We've observed on our flink 1.4.0 setup that if for some reason the networking between the task manager and the job manager gets disrupted then the task manager is never able to reconnect. You'll end up with messages like this getting printed to the log repeatedly Trying to register at JobManager akka.tcp://flink@jobmanager: 6123/user/jobmanager (attempt 17, timeout: 3 milliseconds) Quarantined address [akka.tcp://flink@jobmanager: 6123] is still unreachable or has not been restarted. Keeping it quarantined. Or alternatively Tried to associate with unreachable remote address [akka.tcp://flink@jobmanager: 6123]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted. But it never recovers until you either restart the job manager or the task manager I was able to successfully reproduce this behaviour in two docker containers here : https://github.com/jelmerk/ flink-worker-not-rejoining Has anyone else seen this problem ?
Re: Strata San Jose
Awesome, I will send a note from my work email :) -- Ashish On Fri, Feb 9, 2018 at 5:12 AM, Fabian Hueske wrote: Hi Ashish, I'll be at Strata San Jose and give two talks. Just ping me and we can meet there :-) Cheers, Fabian 2018-02-09 0:53 GMT+01:00 ashish pok : Wondering if any of the core Flink team members are planning to be at the conference? It would be great to meet in peson. Thanks, -- Ashish
Strata San Jose
Wondering if any of the core Flink team members are planning to be at the conference? It would be great to meet in peson. Thanks, -- Ashish
Kafka Producer timeout causing data loss
Team, One more question to the community regarding hardening Flink Apps. Let me start off by saying we do have known Kafka bottlenecks which we are in the midst of resolving. So during certain times of day, a lot of our Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are some flavor of this: java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28 record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time Timeouts are not necessarily good but I am sure we understand this is bound to happen (hopefully lesser). The issue for us however is it almost looks like Flink is stopping and restarting all operators (a lot of other operators including Map, Reduce and Process functions if not all) along with Kafka Producers. We are processing pretty substantial load in Flink and dont really intend to enable Rocks/HDFS checkpointing in some of these Apps - we are ok to sustain some data loss when App crashes completely or something along those lines. However, what we are noticing here is all the data that are in memory for sliding window functions are also lost completely because of this. I would have thought because of the retry settings in Kafka Producer, even those 28 events in queue should have been recovered let alone over a million events in Memory State waiting to be Folded/Reduced for the sliding window. This doesnt feel right :) Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would almost all Job Graph restart on an operator timeout? Do I need to do something simple like disable Operator chaining? We really really are trying to just use Memory and not any other state for these heavy hitting streams. Thanks for your help, Ashish
Understanding Restart Strategy
Team, Hopefully, this is a quick one. We have setup restart strategy as follows in pretty much all of our apps: env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.of(30, TimeUnit.SECONDS))); This seems pretty straight-forward. App should retry starting 10 times every 30 seconds - so about 5 minutes. Either we are not understanding this or it seems inconsistent. Some of the applications restart and come back fine on issues like Kafka timeout (which I will come back to later) but in some cases same issues pretty much shuts the app down. My first guess here was that total count of 10 is not reset after App recovered normally. Is there a need to manually reset the counter in an App? I doubt Flink would be treating it like a counter that spans the life of an App instead of resetting on successful start-up - but not sure how else to explain the behavior. Along the same line, what actually constitutes as a "restart"? Our Kafka cluster has known performance bottlenecks during certain times of day that we are working to resolve. I do notice Kafka producer timeouts quite a few times during these times. When App hits these timeouts, it does recover fine but I dont necessary see entire application restarting as I dont see bootstrap logs of my App. Does something like this count as a restart of App from Restart Strategy perspective as well vs things like apps crashes/Yarn killing application etc. where App is actually restarted from scratch? We are really liking Flink, just need to hash out these operational issues to make it prime time for all streaming apps we have in our cluster. Thanks, Ashish
Task Manager detached under load
Hi All, We have hit some load related issues and was wondering if any one has some suggestions. We are noticing task managers and job managers being detached from each other under load and never really sync up again. As a result, Flink session shows 0 slots available for processing. Even though, apps are configured to restart it isn't really helping as there are no slots available to run the apps. Here are excerpt from logs that seemed relevant. (I am trimming out rest of the logs for brevity) Job Manager:2018-01-19 12:38:00,423 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) 2018-01-19 12:38:00,792 INFO org.apache.flink.runtime.jobmanager.JobManager - Maximum heap size: 16384 MiBytes 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobmanager.JobManager - Hadoop version: 2.6.52018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobmanager.JobManager - JVM Options:2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xms16384m2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xmx16384m2018-01-19 12:38:00,795 INFO org.apache.flink.runtime.jobmanager.JobManager - -XX:+UseG1GC 2018-01-19 12:38:00,908 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 61232018-01-19 12:38:00,908 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.mb, 16384 2018-01-19 12:53:34,671 WARN akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@:37840]2018-01-19 12:53:34,676 INFO org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@:37840/user/taskmanager terminated. -- So once Flink session boots up, we are hitting it with pretty heavy load, which typically results in the WARN above Task Manager:2018-01-19 12:38:01,002 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - Hadoop version: 2.6.52018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - JVM Options:2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xms16384M2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xmx16384M2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - -XX:MaxDirectMemorySize=8388607T2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - -XX:+UseG1GC 2018-01-19 12:38:01,392 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 61232018-01-19 12:38:01,392 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.mb, 16384 2018-01-19 12:54:48,626 WARN akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@:6123]2018-01-19 12:54:48,690 INFO akka.remote.Remoting - Quarantined address [akka.tcp://flink@:6123] is still unreachable or has not been restarted. Keeping it quarantined.018-01-19 12:54:48,774 WARN akka.remote.Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@:6123]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has a UID that has been quarantined. Association aborted.] 2018-01-19 12:54:48,833 WARN akka.remote.Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@:6123]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.] 2018-01-19 12:56:51,244 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp://flink@:6123/user/jobmanager (attempt 10, timeout: 3 milliseconds)2018-01-19 12:56:51,253 WARN akka.remote.Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@:6123]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further asso
Re: Metric Registry Warnings
Thanks Fabian! Sent from Yahoo Mail on Android On Mon, Nov 13, 2017 at 4:44 AM, Fabian Hueske wrote: Hi Ashish, this is a known issue and has been fixed for the next version [1]. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-7100 2017-11-11 16:02 GMT+01:00 Ashish Pokharel : All, Hopefully this is a quick one. I enabled Graphite reporter in my App and I started to see the following warning messages all over the place: 2017-11-07 20:54:09,288 WARN org.apache.flink.runtime. metrics.MetricRegistry - Error while registering metric.java.lang. IllegalArgumentException: A metric named flink.taskmanager.pndapoc-cdh- dn-14. 8823e32fae717d08e211fceec56479 b7.normalizeData.parseRawStats -> Filter.numBytesOut already exists I saw some threads about this regarding JMX as well but I don’t think I see a resolution for it. One thing I made sure was I haven’t reused name (like parseRawStats) in my App. Also, this seems to happen for every metric, not only for a select few where I could have mistakenly set the same name. Appreciate any help on this. Thanks, Ashish
Re: Kafka Consumer fetch-size/rate and Producer queue timeout
Thanks Fabian. I am seeing thia consistently and can definitely use some help. I have plenty of graphana views I can share if that helps :) Sent from Yahoo Mail on Android On Tue, Nov 7, 2017 at 3:54 AM, Fabian Hueske wrote: Hi Ashish, Gordon (in CC) might be able to help you. Cheers, Fabian 2017-11-05 16:24 GMT+01:00 Ashish Pokharel : All, I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout. It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense. Thanks, Ashish
Re: Capacity Planning For Large State in YARN Cluster
Thanks Till, I will pull it out today then. Sent from Yahoo Mail on Android On Mon, Oct 30, 2017 at 3:48 AM, Till Rohrmann wrote: Hi Ashish, great to hear that things work better with the RocksDB state backend. I would only start playing with the containerized.heap-cutoff-ratio if you see TMs failing due to exceeding the direct memory limit. Currently, not all of the cutoff memory is set as the direct memory limit. We have a pending fix for that. Apart from that, it is indeed a good idea to test your application and monitor how it behaves when increasing the workload. Cheers, Till On Mon, Oct 30, 2017 at 1:34 AM, ashish pok wrote: Jorn, correct and I suppose that's where we are at this point. RocksDB based backend is definitely looking promising for our use case. Since I haven't gotten a definite no-no on using 30% for YARN cut-off ratio (about 1.8GB from 6GB memory) and off-heap flag turned on, we will continue on that path. Current plan is to increase throughput on input streams - state streams are pretty much processing already and preserved in RocksDB and we can control streams for joining with those states and monitor resource utilizations + join performance. We are seeing 200-500ms processing times with pretty decent amount of logging, which is pretty good for our needs. Agree about the way to estimate the size of state and hence one of the reasons of my original question on what others have done. Our states are essentially tuples (few primitive values like string, long and a Map of string and string, which hold about 10-12 keys, values are small - not more than 128 bytes tops). We created a savepoint after processing about 500k records and that's where my estimate came from. I'd be the first one to admit it is not accurate but that's the best we could think of. Thanks, Ashish From: Jörn Franke To: Ashish Pokharel Cc: Till Rohrmann ; user Sent: Sunday, October 29, 2017 6:05 PM Subject: Re: Capacity Planning For Large State in YARN Cluster Well you can only performance test it beforehand in different scenarios with different configurations. I am not sure what exactly your state holds (eg how many objects etc), but if it is Java objects then 3 times might be a little bit low (depends also how you initially tested state size) - however Flink optimizes this as well. Nevertheless, something like Rocksdb is probably a better solution for larger states. On 29. Oct 2017, at 21:15, Ashish Pokharel wrote: Hi Till, I got the same feedback from Robert Metzger over in Stackflow. I have switched my app to use RocksDB and as yes, it did stabilize the app :) However, I am still struggling with how to map out my TMs and JMs memory, number of slots per TMs etc. Currently I am using 60 slots with 10 TMs and 60 GB of total cluster memory. Idea was to make the states distributed and approx. 1 GB of memory per slot. I have also changed containerized.heap- cutoff-ratio config to 0.3 to allow for a little room for RocksDB (RocksDB is using basic spinning disk optimized pre-defined configs but we do have SSDs on our Prod machines that we can leverage in future too) and set taskmanager.memory.off- heap to true.It feels more experimental at this point than an exact science :) If there are any further guidelines on how we can plan for this as we open up the flood gates to stream heavy continuous streams, that will be great. Thanks again, Ashish On Oct 27, 2017, at 8:45 AM, Till Rohrmann wrote: Hi Ashish, what you are describing should be a good use case for Flink and it should be able to run your program. When you are seeing a GC overhead limit exceeded error, then it means that Flink or your program are creating too many/too large objects filling up the memory in a short time. I would recommend checking your user program to see whether you can avoid unnecessary object instantiations and whether it is possible to reuse created objects. Concerning Flink's state backends, the memory state backend is currently not able to spill to disk. Also the managed memory is only relevant for DataSet/batch programs and not streaming programs. Therefore, I would recommend you to try out the RocksDB state backend which is able to gracefully spill to disk if the state size should grow too large. Consequently, you don't have to adjust the managed memory settings because they currently don't have an effect on streaming programs. My gut feeling is that switching to the RocksDBStateBackend could already solve your problems. If this should not be the case, then please let me know again. Cheers,Till On Fri, Oct 27, 2017 at 5:27 AM, Ashish Pokharel wrote: Hi Everyone, We have hit a roadblock moving an app at Production scale and was hoping to get some guidance. Application is pretty common use case in stream processing but does require maintaining large number of keyed states. We are processing 2 streams - one of wh
Re: Capacity Planning For Large State in YARN Cluster
Jorn, correct and I suppose that's where we are at this point. RocksDB based backend is definitely looking promising for our use case. Since I haven't gotten a definite no-no on using 30% for YARN cut-off ratio (about 1.8GB from 6GB memory) and off-heap flag turned on, we will continue on that path. Current plan is to increase throughput on input streams - state streams are pretty much processing already and preserved in RocksDB and we can control streams for joining with those states and monitor resource utilizations + join performance. We are seeing 200-500ms processing times with pretty decent amount of logging, which is pretty good for our needs. Agree about the way to estimate the size of state and hence one of the reasons of my original question on what others have done. Our states are essentially tuples (few primitive values like string, long and a Map of string and string, which hold about 10-12 keys, values are small - not more than 128 bytes tops). We created a savepoint after processing about 500k records and that's where my estimate came from. I'd be the first one to admit it is not accurate but that's the best we could think of. Thanks, Ashish From: Jörn Franke To: Ashish Pokharel Cc: Till Rohrmann ; user Sent: Sunday, October 29, 2017 6:05 PM Subject: Re: Capacity Planning For Large State in YARN Cluster Well you can only performance test it beforehand in different scenarios with different configurations. I am not sure what exactly your state holds (eg how many objects etc), but if it is Java objects then 3 times might be a little bit low (depends also how you initially tested state size) - however Flink optimizes this as well. Nevertheless, something like Rocksdb is probably a better solution for larger states. On 29. Oct 2017, at 21:15, Ashish Pokharel wrote: Hi Till, I got the same feedback from Robert Metzger over in Stackflow. I have switched my app to use RocksDB and as yes, it did stabilize the app :) However, I am still struggling with how to map out my TMs and JMs memory, number of slots per TMs etc. Currently I am using 60 slots with 10 TMs and 60 GB of total cluster memory. Idea was to make the states distributed and approx. 1 GB of memory per slot. I have also changed containerized.heap-cutoff-ratio config to 0.3 to allow for a little room for RocksDB (RocksDB is using basic spinning disk optimized pre-defined configs but we do have SSDs on our Prod machines that we can leverage in future too) and set taskmanager.memory.off-heap to true.It feels more experimental at this point than an exact science :) If there are any further guidelines on how we can plan for this as we open up the flood gates to stream heavy continuous streams, that will be great. Thanks again, Ashish On Oct 27, 2017, at 8:45 AM, Till Rohrmann wrote: Hi Ashish, what you are describing should be a good use case for Flink and it should be able to run your program. When you are seeing a GC overhead limit exceeded error, then it means that Flink or your program are creating too many/too large objects filling up the memory in a short time. I would recommend checking your user program to see whether you can avoid unnecessary object instantiations and whether it is possible to reuse created objects. Concerning Flink's state backends, the memory state backend is currently not able to spill to disk. Also the managed memory is only relevant for DataSet/batch programs and not streaming programs. Therefore, I would recommend you to try out the RocksDB state backend which is able to gracefully spill to disk if the state size should grow too large. Consequently, you don't have to adjust the managed memory settings because they currently don't have an effect on streaming programs. My gut feeling is that switching to the RocksDBStateBackend could already solve your problems. If this should not be the case, then please let me know again. Cheers,Till On Fri, Oct 27, 2017 at 5:27 AM, Ashish Pokharel wrote: Hi Everyone, We have hit a roadblock moving an app at Production scale and was hoping to get some guidance. Application is pretty common use case in stream processing but does require maintaining large number of keyed states. We are processing 2 streams - one of which is a daily burst of stream (normally around 50 mil but could go upto 100 mil in one hour burst) and other is constant stream of around 70-80 mil per hour. We are doing a low level join using CoProcess function between the two keyed streams. CoProcess function needs to refresh (upsert) state from the daily burst stream and decorate constantly streaming data with values from state built using bursty stream. All of the logic is working pretty well in a standalone Dev environment. We are throwing about 500k events of bursty traffic for state and about 2-3 mil of data stream. We have 1 TM with 16GB memory, 1 JM with 8 GB memory and 16 slots (1 per core on the server) on the server. We h