Re: Restore from savepoint with Iterations

2020-05-05 Thread ashish pok
Let me see if I can do artificial throttle somewhere. Volume of data is really 
high and hence trying to avoid rounds in Kafka too. Looks like options are “not 
so elegant” until FLIP-15. Thanks for pointers again!!!


On Monday, May 4, 2020, 11:06 PM, Ken Krugler  
wrote:

Hi Ashish,
The workaround we did was to throttle data flowing in the iteration (in code), 
though not sure if that’s possible for your situation.
You could remove the iteration by writing to a Kafka topic at the end of the 
part of your workflow that is currently an iteration, and then consuming from 
that same topic as your “iteration" source.
— Ken


On May 4, 2020, at 7:32 PM, Ashish Pokharel  wrote:

Hi Ken,
Thanks for the quick response!
I came across FLIP-15 on my next google search after I sent email :) It 
DEFINITELY looks that way. As I was watching logs and nature of how job gets 
stuck it does look like buffer is blocked. But FLIP-15 has not moved further 
though. So there are no workarounds at all at this point? Perhaps a technique 
to block Kafka Consumer for some time? Even that may get me going but looks 
like there is probability of this happening during the normal processing as 
your use case demonstrates. I am using iteration with no timeouts for prod job, 
using timeouts only in unit testing.Theory was in prod input stream will be 
indefinite and sometime long lull of no event might happen during maintenance, 
backlog etc. I really would like to avoid a bloat in the DAG by repeating same 
functions with filters and side outputs. Other than obvious repetition, it will 
increase the site of states by a factor. Even those slowly moving dimensions 
are not light (around half billion every day) :) 


On May 4, 2020, at 10:13 PM, Ken Krugler  wrote:

Hi Ashish,
Wondering if you’re running into the gridlock problem I mention on slide #25 
here: 
https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink
If the iteration path has too much data in it, then the network buffer at the 
head of the iteration can fill up, and it never clears out because the operator 
consuming those buffers is blocked writing to the next operator in the 
iteration, and so on back to the head.
We ran into this when outlinks from web pages caused fan-out/amplification of 
the data being iterated, but maybe you hit it with restoring from state.
— Ken


On May 4, 2020, at 6:41 PM, Ashish Pokharel  wrote:
Hi all,

Hope everyone is doing well!

I am running into what seems like a deadlock (application stalled) situation 
with a Flink streaming job upon restore from savepoint. Job has a slowly moving 
stream (S1) that needs to be “stateful” and a continuous stream (S2) which is 
“joined” with slow moving stream (S1). Some level of loss/repetition is 
acceptable in continuous stream (S2) and hence can rely on something like Kafka 
consumer states upon restarts etc. Continuous stream (S2) however needs to be 
iterated through states from slowly moving streams (S1) a few times (mostly 2). 
States are fair sized (ends up being 15GB on HDFS). When job is restarted with 
no continuous data (S2) on topic job starts up, restores states and does it’s 
initial checkpoint within 3 minutes. However, when app is started from 
savepoint and continuous stream (S2) is actually present in Kafka it seems like 
application comes to a halt. Looking at progress of checkpoints, it seems like 
every attempt is stuck after until some timeouts happen at around 10 mins. If 
iteration on stream is removed app can successfully start and checkpoint even 
when continuous stream (S2) is flowing in as well. Unfortunately we are working 
on a hosted environment for both data and platform, hence debugging with thread 
dumps etc will be challenging. 

I couldn’t find a known issue on this but was wondering if anyone has seen such 
behavior or know of any issues in past. It does look like checkpointing has to 
be set to forced to get an iterative job to checkpoint in the first place (an 
option that is marked deprecated already - working on 1.8.2 version as of now). 
I do understand challenges around consistent checkpointing of iterative stream. 
As I mentioned earlier, what I really want to maintain for the most part are 
states of slowly moving dimensions. Iterations does solve the problem at hand 
(multiple loops of logic) pretty gracefully but not being able to restore from 
savepoint will be a show stopper. 

Will appreciate any pointer / suggestions.

Thanks in advance, 

Ashish

--Ken Kruglerhttp://www.scaleunlimited.comcustom big 
data solutions & trainingHadoop, Cascading, Cassandra & Solr


--Ken Kruglerhttp://www.scaleunlimited.comcustom big 
data solutions & trainingHadoop, Cascading, Cassandra & Solr





Re: Kafka consumer behavior with same group Id, 2 instances of apps in separate cluster?

2019-09-04 Thread ashish pok
Thanks - that sounds like a good model to at least explore. We are essentially 
stateless at this point for this particular need. 


- Ashish

On Tuesday, September 3, 2019, 11:28 PM, Becket Qin  
wrote:

Thanks for the explanation Ashish. Glad you made it work with custom source.
I guess your application is probably stateless. If so, another option might be 
having a geo-distributed Flink deployment. That means there will be TM in 
different datacenter to form a single Flink cluster. This will also come with 
failover if one of the TM is down. I am not sure if anyone have tried this. It 
is probably a heavier solution than using Kafka to do the failover, but the 
good thing is that you may also do some stateful processing if you have a 
globally accessible storage for the state backup.
Thanks,
Jiangjie (Becket) Qin
On Wed, Sep 4, 2019 at 11:00 AM Ashish Pokharel  wrote:

Thanks Becket,
Sorry for delayed response. That’s what I thought as well. I built a hacky 
custom source today directly using Kafka client which was able to join consumer 
group etc. which works as I expected but not sure about production readiness 
for something like that :)
The need arises because of (1) Business continuity needs (2) Some of the 
pipelines we are building are close to network edge and need to run on nodes 
where we are not allowed to create cluster (yea - let’s not get into that can 
of security related worms :)). We will get there at some point but for now we 
are trying to support business continuity on those edge nodes by not actually 
forming a cluster but using “walled garden” individual Flink server. I fully 
understand this is not ideal. And all of this started because some of the work 
we were doing with Logstash needed to be migrated out as Logstash wasn’t able 
to keep up with data rates unless we put some ridiculous number of servers. In 
essence, we have pre-approved constraints to connect to Kafka and southbound 
interfaces using Logstash, which we need to replace for some datasets as they 
are massive for Logstash to keep up with. 
Hope that explains a bit where our head is at.
Thanks, Ashish 


On Aug 29, 2019, at 11:40 AM, Becket Qin  wrote:
Hi Ashish,
You are right. Flink does not use Kafka based group management. So if you have 
two clusters consuming the same topic, they will not divide the partitions. The 
cross cluster HA is not quite possible at this point. It would be good to know 
the reason you want to have such HA and see if Flink meets you requirement in 
another way.
Thanks,
Jiangjie (Becket) Qin
On Thu, Aug 29, 2019 at 9:19 PM ashish pok  wrote:

Looks like Flink is using “assign” partitions instead of “subscribe” which will 
not allow participating in a group if I read the code correctly. 
Has anyone solved this type of problem in past of active-active HA across 2 
clusters using Kafka? 


- Ashish

On Wednesday, August 28, 2019, 6:52 PM, ashish pok  wrote:

All,
I was wondering what the expected default behavior is when same app is deployed 
in 2 separate clusters but with same group Id. In theory idea was to create 
active-active across separate clusters but it seems like both apps are getting 
all the data from Kafka. 
Anyone else has tried something similar or have an insight on expected 
behavior? I was expecting to see partial data on both apps and to get all data 
in one app if other was turned off.

Thanks in advance,

- Ashish











Re: Kafka consumer behavior with same group Id, 2 instances of apps in separate cluster?

2019-08-29 Thread ashish pok
Looks like Flink is using “assign” partitions instead of “subscribe” which will 
not allow participating in a group if I read the code correctly. 
Has anyone solved this type of problem in past of active-active HA across 2 
clusters using Kafka? 


- Ashish

On Wednesday, August 28, 2019, 6:52 PM, ashish pok  wrote:

All,
I was wondering what the expected default behavior is when same app is deployed 
in 2 separate clusters but with same group Id. In theory idea was to create 
active-active across separate clusters but it seems like both apps are getting 
all the data from Kafka. 
Anyone else has tried something similar or have an insight on expected 
behavior? I was expecting to see partial data on both apps and to get all data 
in one app if other was turned off.

Thanks in advance,

- Ashish




Kafka consumer behavior with same group Id, 2 instances of apps in separate cluster?

2019-08-28 Thread ashish pok
All,
I was wondering what the expected default behavior is when same app is deployed 
in 2 separate clusters but with same group Id. In theory idea was to create 
active-active across separate clusters but it seems like both apps are getting 
all the data from Kafka. 
Anyone else has tried something similar or have an insight on expected 
behavior? I was expecting to see partial data on both apps and to get all data 
in one app if other was turned off.

Thanks in advance,

- Ashish

Re: JSON to CEP coversion

2019-01-22 Thread ashish pok
Awesome. Let me look into it. Thanks a lot!


- Ashish

On Tuesday, January 22, 2019, 3:31 PM, Dominik Wosiński  
wrote:

Hey Anish, 

I have done some abstraction over the logic of CEP, but with the use of Apache 
Bahir[1], which introduces SIddhi CEP[2][ engine that allows SQL like 
definitions of the logic.

Best, Dom.

[1] https://github.com/apache/bahir[2] https://github.com/wso2/siddhi
wt., 22 sty 2019 o 20:20 ashish pok  napisał(a):

All,
Wondering if anyone in community has started something along the line - idea 
being CEP logic is abstracted out to metadata instead. That can then further be 
exposed out to users from a REST API/UI etc. Obviously, it would really need 
some additional information like data catalog etc for it to be really helpful. 
But to get started we were thinking of allowing power users make some 
modifications to existing rule outside of CI/CD process.

Thanks,

- Ashish





JSON to CEP coversion

2019-01-22 Thread ashish pok
All,
Wondering if anyone in community has started something along the line - idea 
being CEP logic is abstracted out to metadata instead. That can then further be 
exposed out to users from a REST API/UI etc. Obviously, it would really need 
some additional information like data catalog etc for it to be really helpful. 
But to get started we were thinking of allowing power users make some 
modifications to existing rule outside of CI/CD process.

Thanks,

- Ashish

Re: Unit / Integration Test Timer

2018-09-17 Thread ashish pok
 Hi Till,
A quick update. I added a MockFlatMap function with the following logic:
public MockStreamPause(int pauseSeconds) { this.pauseSeconds = 
pauseSeconds; }
@Override public void flatMap(PlatformEvent event, 
Collector out) throws Exception { 
if(event.getSrc().startsWith(EventTupleGeneratorUtil.PAUSE_EVENT_SRC_PREFIX)) { 
if (pauseSeconds>0) { try { Thread.sleep(pauseSeconds*1000); } catch 
(InterruptedException intEx) { logger.info("Mock pause interrupted", intEx); } 
} } else { out.collect(event); } }
This seems to let me move forward with testing. Let me know if you recommend 
using latest test utils with 1.4.2 core as a test. 
Thanks, Ashish
On Monday, September 17, 2018, 9:33:56 AM EDT, ashish pok 
 wrote:  
 
  Hi Till,
I am still in 1.4.2 version and will need some time before we can get later 
version certified in our Prod env. Timers are definitely not completing in my 
tests with 1.4.2 utils, I can see them being registered in debugger though. 
Having said that, should I pull latest test utils only and try it out?  
Creating a simple example shouldn't take that long, I can create one sometime 
this week.
Thanks, Ashish
On Monday, September 17, 2018, 3:53:59 AM EDT, Till Rohrmann 
 wrote:  
 
 Hi Ashish,
I think you are right. In the current master, the system should wait until all 
timers have completed before terminating. Could you check whether this is the 
case? If not, then this might indicate a problem. Which version of Flink are 
you using? I guess it would also be helpful to have access to a working example 
where the problem is visible.
Cheers,Till
On Fri, Sep 14, 2018 at 7:57 PM ashish pok  wrote:

 Hi Till,
To answer your first question, I currently don't (and honestly now sure how 
other than of course in IDE I can use breakpoint, or if something like MockIto 
can do it). So did I interpret it correctly that it sounds like execution env 
started using flink-test-utils will essentially tear down once it consumes last 
data point (ie. end of collection I am passing) even though there could be 
active Timers Registered? 
Further, most of our pipelines are using low-level process functions - we toyed 
around with other windowing and session functions but process functions gave 
the most amount of flexibility (at least at this point until we can re-visit) 
and we generate keys for aggregation/windowing somewhere upstream (say map, 
flatMap or another process functions). Meaning some pipelines are event / 
processing time agnostic in a sense. Although technically within the process 
functions we will have timers registered etc. This helped us with unbounded 
keys, sensor data that could potentially be backfilled (ie. watermarks have 
passed way back etc). I wouldn't doubt a bit there are probably better 
solutions :)
With that background, I am sort of not following your second note about event 
time and how we can leverage that for testing. Our intent is to create sampled 
input from results and compare output from tests to results (ie. end to end 
integration tests) as part of our CICD. Normal flow seems to work well, just 
getting "negative" test cases of timeouts seems to be mystery right now :) So 
Single Operator harnesses doesn't sound like the right approach. let me know 
otherwise.
Thanks,

On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann 
 wrote:  
 
 Hi Ashish,
how do you make sure that all of your data is not consumed within a fraction of 
the 2 seconds? For this it would be better to use event time which allows you 
to control how time passes. If you want to test a specific operator you could 
try out the One/TwoInputStreamOperatorTestHarness.
Cheers,Till
On Fri, Sep 14, 2018 at 3:36 PM ashish pok  wrote:

All,
Hopefully a quick one. I feel like I have seen this answered before a few times 
before but can't find an appropriate example. I am trying to run few tests 
where registered timeouts are invoked (snippet below). Simple example as show 
in documentation for integration test (using flink-test-utils) seems to 
complete even though Timers are registered and have not been invoked. 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();        
env.setParallelism(1);        CollectSink.values.clear();        // create a 
stream of custom elements and apply transformations        
env.fromCollection(t.getTestInputs()) .process(new TupleProcessFn()) 
.keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2)) 
.addSink(getSink())
    env.execute();

I have a 2 second processing timer registered. If I put a breakpoint in first 
TupleProcessFn() after a few Tuples are collected I can see onTimer being 
invoked. So what is the trick here? I went as far as putting in a MapFunction 
after second process function that has a sleep to no avail.
Thanks,
Ashish 
  


Re: Unit / Integration Test Timer

2018-09-17 Thread ashish pok
 Hi Till,
I am still in 1.4.2 version and will need some time before we can get later 
version certified in our Prod env. Timers are definitely not completing in my 
tests with 1.4.2 utils, I can see them being registered in debugger though. 
Having said that, should I pull latest test utils only and try it out?  
Creating a simple example shouldn't take that long, I can create one sometime 
this week.
Thanks, Ashish
On Monday, September 17, 2018, 3:53:59 AM EDT, Till Rohrmann 
 wrote:  
 
 Hi Ashish,
I think you are right. In the current master, the system should wait until all 
timers have completed before terminating. Could you check whether this is the 
case? If not, then this might indicate a problem. Which version of Flink are 
you using? I guess it would also be helpful to have access to a working example 
where the problem is visible.
Cheers,Till
On Fri, Sep 14, 2018 at 7:57 PM ashish pok  wrote:

 Hi Till,
To answer your first question, I currently don't (and honestly now sure how 
other than of course in IDE I can use breakpoint, or if something like MockIto 
can do it). So did I interpret it correctly that it sounds like execution env 
started using flink-test-utils will essentially tear down once it consumes last 
data point (ie. end of collection I am passing) even though there could be 
active Timers Registered? 
Further, most of our pipelines are using low-level process functions - we toyed 
around with other windowing and session functions but process functions gave 
the most amount of flexibility (at least at this point until we can re-visit) 
and we generate keys for aggregation/windowing somewhere upstream (say map, 
flatMap or another process functions). Meaning some pipelines are event / 
processing time agnostic in a sense. Although technically within the process 
functions we will have timers registered etc. This helped us with unbounded 
keys, sensor data that could potentially be backfilled (ie. watermarks have 
passed way back etc). I wouldn't doubt a bit there are probably better 
solutions :)
With that background, I am sort of not following your second note about event 
time and how we can leverage that for testing. Our intent is to create sampled 
input from results and compare output from tests to results (ie. end to end 
integration tests) as part of our CICD. Normal flow seems to work well, just 
getting "negative" test cases of timeouts seems to be mystery right now :) So 
Single Operator harnesses doesn't sound like the right approach. let me know 
otherwise.
Thanks,

On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann 
 wrote:  
 
 Hi Ashish,
how do you make sure that all of your data is not consumed within a fraction of 
the 2 seconds? For this it would be better to use event time which allows you 
to control how time passes. If you want to test a specific operator you could 
try out the One/TwoInputStreamOperatorTestHarness.
Cheers,Till
On Fri, Sep 14, 2018 at 3:36 PM ashish pok  wrote:

All,
Hopefully a quick one. I feel like I have seen this answered before a few times 
before but can't find an appropriate example. I am trying to run few tests 
where registered timeouts are invoked (snippet below). Simple example as show 
in documentation for integration test (using flink-test-utils) seems to 
complete even though Timers are registered and have not been invoked. 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();        
env.setParallelism(1);        CollectSink.values.clear();        // create a 
stream of custom elements and apply transformations        
env.fromCollection(t.getTestInputs()) .process(new TupleProcessFn()) 
.keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2)) 
.addSink(getSink())
    env.execute();

I have a 2 second processing timer registered. If I put a breakpoint in first 
TupleProcessFn() after a few Tuples are collected I can see onTimer being 
invoked. So what is the trick here? I went as far as putting in a MapFunction 
after second process function that has a sleep to no avail.
Thanks,
Ashish 
  
  

Re: Unit / Integration Test Timer

2018-09-14 Thread ashish pok
 Hi Till,
To answer your first question, I currently don't (and honestly now sure how 
other than of course in IDE I can use breakpoint, or if something like MockIto 
can do it). So did I interpret it correctly that it sounds like execution env 
started using flink-test-utils will essentially tear down once it consumes last 
data point (ie. end of collection I am passing) even though there could be 
active Timers Registered? 
Further, most of our pipelines are using low-level process functions - we toyed 
around with other windowing and session functions but process functions gave 
the most amount of flexibility (at least at this point until we can re-visit) 
and we generate keys for aggregation/windowing somewhere upstream (say map, 
flatMap or another process functions). Meaning some pipelines are event / 
processing time agnostic in a sense. Although technically within the process 
functions we will have timers registered etc. This helped us with unbounded 
keys, sensor data that could potentially be backfilled (ie. watermarks have 
passed way back etc). I wouldn't doubt a bit there are probably better 
solutions :)
With that background, I am sort of not following your second note about event 
time and how we can leverage that for testing. Our intent is to create sampled 
input from results and compare output from tests to results (ie. end to end 
integration tests) as part of our CICD. Normal flow seems to work well, just 
getting "negative" test cases of timeouts seems to be mystery right now :) So 
Single Operator harnesses doesn't sound like the right approach. let me know 
otherwise.
Thanks,

On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann 
 wrote:  
 
 Hi Ashish,
how do you make sure that all of your data is not consumed within a fraction of 
the 2 seconds? For this it would be better to use event time which allows you 
to control how time passes. If you want to test a specific operator you could 
try out the One/TwoInputStreamOperatorTestHarness.
Cheers,Till
On Fri, Sep 14, 2018 at 3:36 PM ashish pok  wrote:

All,
Hopefully a quick one. I feel like I have seen this answered before a few times 
before but can't find an appropriate example. I am trying to run few tests 
where registered timeouts are invoked (snippet below). Simple example as show 
in documentation for integration test (using flink-test-utils) seems to 
complete even though Timers are registered and have not been invoked. 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();        
env.setParallelism(1);        CollectSink.values.clear();        // create a 
stream of custom elements and apply transformations        
env.fromCollection(t.getTestInputs()) .process(new TupleProcessFn()) 
.keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2)) 
.addSink(getSink())
    env.execute();

I have a 2 second processing timer registered. If I put a breakpoint in first 
TupleProcessFn() after a few Tuples are collected I can see onTimer being 
invoked. So what is the trick here? I went as far as putting in a MapFunction 
after second process function that has a sleep to no avail.
Thanks,
Ashish 
  

Unit / Integration Test Timer

2018-09-14 Thread ashish pok
All,
Hopefully a quick one. I feel like I have seen this answered before a few times 
before but can't find an appropriate example. I am trying to run few tests 
where registered timeouts are invoked (snippet below). Simple example as show 
in documentation for integration test (using flink-test-utils) seems to 
complete even though Timers are registered and have not been invoked. 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();        
env.setParallelism(1);        CollectSink.values.clear();        // create a 
stream of custom elements and apply transformations        
env.fromCollection(t.getTestInputs()) .process(new TupleProcessFn()) 
.keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2)) 
.addSink(getSink())
    env.execute();

I have a 2 second processing timer registered. If I put a breakpoint in first 
TupleProcessFn() after a few Tuples are collected I can see onTimer being 
invoked. So what is the trick here? I went as far as putting in a MapFunction 
after second process function that has a sleep to no avail.
Thanks,
Ashish 

Re: Questions on Unbounded number of keys

2018-07-31 Thread ashish pok
Thanks Till, I will try to create an instance of app will smaller heap and get 
a couple of dumps as well. I should be ok to share that on google drive. 


- Ashish

On Tuesday, July 31, 2018, 7:49 AM, Till Rohrmann  wrote:

Hi Ashish,
FIRE_AND_PURGE should also clear the window state. Yes I mean with active 
windows, windows which have not been purged yet.
Maybe Aljoscha knows more about why the window state is growing (I would not 
rule out a bug).
Cheers,Till
On Tue, Jul 31, 2018 at 1:45 PM ashish pok  wrote:

Hi Till,
Keys are unbounded (a group of events have same key but that key doesnt repeat 
after it is fired other than some odd delayed events). So basically there 1 key 
that will be aligned to a window. When you say key space of active windows, 
does that include keys for windows that have already fired and could be in 
memory footprint? If so, that is basically the problem I would get into and 
looking for a solution to clean-up. Like I said earlier overriding tigger to 
FIRE_AND_PURGE did not help. If I take the same stream and key and refactor it 
to how Chang is doing it with Process Function, issue goes away.
If you mean only currently processing key space of active windows (not the ones 
that have already fired)  then I would say, that cannot be the case. We are 
getting the data from period poll of same number of devices and uniqueness of 
key is simply a time identifier prefixed to device identifier. Even though 
there could be a little delayed data, the chances of number of unique keys 
growing constantly for days is probably none as device list is constant.
Thanks, Ashish


- Ashish

On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann  wrote:

Hi Ashish,
the processing time session windows need to store state in the StateBackends 
and I assume that your key space of active windows is constantly growing. That 
could explain why you are seeing an ever increasing memory footprint. But 
without knowing the input stream and what the UDFs do this is only a guess.
Cheers,Till
On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske  wrote:

Hi Chang,
The state handle objects are not created per key but just once per function 
instance.
Instead they route state accesses to the backend (JVM heap or RocksDB) for the 
currently active key.
Best, Fabian

2018-07-30 12:19 GMT+02:00 Chang Liu :

Hi Andrey,
Thanks for your reply. My question might be silly, but there is still one part 
I would like to fully understand. For example, in the following example:
class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed 
by Session ID
  lazy val userId: ValueState[String] = getRuntimeContext.getState(
new ValueStateDescriptor[String]("userId", BasicTypeInfo.STRING_TYPE_INFO))

  lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))

  override def processElement(
  click: Click,
  context: KeyedProcessFunction[String, Click, Click]#Context,
  out: Collector[Click])
  : Unit = {
// process, output, clear state if necessary
  }

  override def onTimer(
  timestamp: Long,
  ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
  out: Collector[Click])
  : Unit = {
// output and clear state
  }
}
Even though I am regularly clearing the two states, userId and clicks (which 
means I am cleaning up the values stored in the States), my question is: then 
what about the two State objects themselves: userId and clicks?  These States 
objects are also created per Session ID right? If the number of Session IDs are 
unbounded, than the number of these State objects are also unbounded.
That means, there are userId-state-1 and clicks-state-1 for session-id-1, 
userId-state-2 and clicks-state-2 for session-id-2, userId-state-3 and 
clicks-state-3 for session-id-3, …, which are handled by different (or same if 
two from different range, as you call it, are assigned to the same one) keyed 
operator instance.
I am not concerning the actual value in the State (which will be managed 
carefully, if I am clearing them carefully). I am thinking about the State 
objects themselves, which I have no idea what is happening to them and what 
will happen to them.
Many thanks :)
Best regards/祝好,

Chang Liu 刘畅



On 26 Jul 2018, at 10:55, Andrey Zagrebin  wrote:
Hi Chang Liu,
The unbounded nature of the stream keyed or not should not lead to out of 
memory. 
Flink parallel keyed operator instances have fixed number (parallelism) and 
just process some range of keyed elements, in your example it is a subrange of 
session ids. 
The keyed processed elements (http requests) are objects created when they 
enter the pipeline and garage collected after having been processed in 
streaming fashion. 
If they arrive very rapidly it can lead to high back pressure from upstream to 
downstream operators, buffers can become full and pipeline stops/slows down 
processing external inputs, i

Re: Questions on Unbounded number of keys

2018-07-31 Thread ashish pok
Hi Till,
Keys are unbounded (a group of events have same key but that key doesnt repeat 
after it is fired other than some odd delayed events). So basically there 1 key 
that will be aligned to a window. When you say key space of active windows, 
does that include keys for windows that have already fired and could be in 
memory footprint? If so, that is basically the problem I would get into and 
looking for a solution to clean-up. Like I said earlier overriding tigger to 
FIRE_AND_PURGE did not help. If I take the same stream and key and refactor it 
to how Chang is doing it with Process Function, issue goes away.
If you mean only currently processing key space of active windows (not the ones 
that have already fired)  then I would say, that cannot be the case. We are 
getting the data from period poll of same number of devices and uniqueness of 
key is simply a time identifier prefixed to device identifier. Even though 
there could be a little delayed data, the chances of number of unique keys 
growing constantly for days is probably none as device list is constant.
Thanks, Ashish


- Ashish

On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann  wrote:

Hi Ashish,
the processing time session windows need to store state in the StateBackends 
and I assume that your key space of active windows is constantly growing. That 
could explain why you are seeing an ever increasing memory footprint. But 
without knowing the input stream and what the UDFs do this is only a guess.
Cheers,Till
On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske  wrote:

Hi Chang,
The state handle objects are not created per key but just once per function 
instance.
Instead they route state accesses to the backend (JVM heap or RocksDB) for the 
currently active key.
Best, Fabian

2018-07-30 12:19 GMT+02:00 Chang Liu :

Hi Andrey,
Thanks for your reply. My question might be silly, but there is still one part 
I would like to fully understand. For example, in the following example:
class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed 
by Session ID
  lazy val userId: ValueState[String] = getRuntimeContext.getState(
new ValueStateDescriptor[String]("userId", BasicTypeInfo.STRING_TYPE_INFO))

  lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))

  override def processElement(
  click: Click,
  context: KeyedProcessFunction[String, Click, Click]#Context,
  out: Collector[Click])
  : Unit = {
// process, output, clear state if necessary
  }

  override def onTimer(
  timestamp: Long,
  ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
  out: Collector[Click])
  : Unit = {
// output and clear state
  }
}
Even though I am regularly clearing the two states, userId and clicks (which 
means I am cleaning up the values stored in the States), my question is: then 
what about the two State objects themselves: userId and clicks?  These States 
objects are also created per Session ID right? If the number of Session IDs are 
unbounded, than the number of these State objects are also unbounded.
That means, there are userId-state-1 and clicks-state-1 for session-id-1, 
userId-state-2 and clicks-state-2 for session-id-2, userId-state-3 and 
clicks-state-3 for session-id-3, …, which are handled by different (or same if 
two from different range, as you call it, are assigned to the same one) keyed 
operator instance.
I am not concerning the actual value in the State (which will be managed 
carefully, if I am clearing them carefully). I am thinking about the State 
objects themselves, which I have no idea what is happening to them and what 
will happen to them.
Many thanks :)
Best regards/祝好,

Chang Liu 刘畅



On 26 Jul 2018, at 10:55, Andrey Zagrebin  wrote:
Hi Chang Liu,
The unbounded nature of the stream keyed or not should not lead to out of 
memory. 
Flink parallel keyed operator instances have fixed number (parallelism) and 
just process some range of keyed elements, in your example it is a subrange of 
session ids. 
The keyed processed elements (http requests) are objects created when they 
enter the pipeline and garage collected after having been processed in 
streaming fashion. 
If they arrive very rapidly it can lead to high back pressure from upstream to 
downstream operators, buffers can become full and pipeline stops/slows down 
processing external inputs, it usually means that your pipeline is under 
provisioned. 
The only accumulated data comes from state (windows, user state etc), so if you 
control its memory consumption, as Till described, there should be no other 
source of out of memory.
Cheers,Andrey

On 25 Jul 2018, at 19:06, Chang Liu  wrote:
Hi Till,
Thanks for your reply. But I think maybe I did not make my question clear. My 
question is not about whether the States within each keyed operator instances 
will run out of memory. My question is about, whether the unlimited keyed 

Re: Implement Joins with Lookup Data

2018-07-25 Thread ashish pok
Hi Michael,
We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of memory on 
each TM. We have 15 partitions on Kafka for stream and 6 for context/smaller 
stream. Heap is around 50%, GC is about 150ms and CPU loads are low. We may be 
able to reduce resources on this if need be. 
Thanks,


- Ashish

On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman  
wrote:

Hi Ashish,
We are planning for a similar use case and I was wondering if you can share the 
amount of resources you have allocated for this flow?
Thanks,Michael

On Tue, Jul 24, 2018, 18:57 ashish pok  wrote:

BTW, 
We got around bootstrap problem for similar use case using a “nohup” topic as 
input stream. Our CICD pipeline currently passes an initialize option to app IF 
there is a need to bootstrap and waits for X minutes before taking a savepoint 
and restart app normally listening to right topic(s). I believe there is work 
underway to handle this gracefully using Side Input as well. Other than 
determining X minutes for initialization to complete, we havent had any issue 
with this solution - we have over 40 million states refreshes daily and close 
to 200Mbps input streams being joined to states.
Hope this helps!


- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy  
wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not 
allow you to pause a source (the positions), so you can't fully consume the and 
preload the accounts or products to perform the join before the positions start 
flowing.  Additionally, Flink SQL does not support materializing an upset table 
for the accounts or products to perform the join, so yo have to develop your 
own KeyedProcessFunction, maintain the state, and perform the join on your own 
if you only want to join against the latest value for each key.
On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann  wrote:

Yes, using Kafka which you initialize with the initial values and then feed 
changes to the Kafka topic from which you consume could be a solution.
On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal 
 wrote:

Hi Till,
How would we do the initial hydration of the Product and Account data since 
it’s currently in a relational DB? Do we have to copy over data to Kafka and 
then use them? 
Regards,Harsh
On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:

Hi Harshvardhan,
I agree with Ankit that this problem could actually be solved quite elegantly 
with Flink's state. If you can ingest the product/account information changes 
as a stream, you can keep the latest version of it in Flink state by using a 
co-map function [1, 2]. One input of the co-map function would be the 
product/account update stream which updates the respective entries in Flink's 
state and the other input stream is the one to be enriched. When receiving 
input from this stream one would lookup the latest information contained in the 
operator's state and join it with the incoming event.
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
Cheers,Till
On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal 
 wrote:

Hi,
Thanks for your responses.
There is no fixed interval for the data being updated. It’s more like whenever 
you onboard a new product or there are any mandates that change will trigger 
the reference data to change.
It’s not just the enrichment we are doing here. Once we have enriched the data 
we will be performing a bunch of aggregations using the enriched data. 
Which approach would you recommend?
Regards,Harshvardhan
On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:


How often is the product db updated? Based on that you can store product 
metadata as state in Flink, maybe setup the state on cluster startup and then 
update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on 
top of Kafka. As Jorn said below, you can very well store all the events in an 
external store and then periodically run a cron to enrich later since your 
processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke 
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal 
Cc: 
Subject: Re: Implement Joins with Lookup Data

 

For the first one (lookup of single entries) you could use a NoSQL db (eg key 
value store) - a relational database will not scale.

 

Depending on when you need to do the enrichment you could also first store the 
data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal  
wrote:


Hi,

 

We are using Flink for financial data enrichment and aggregations. We have 
Positions data that we are currently receiving from Kafka. We want to enrich 
that data with reference data like Product and Account information that is 
present in a relational database. From my understanding of 

Re: Implement Joins with Lookup Data

2018-07-24 Thread ashish pok
App is checkpointing, so will pick up if an operation fails. I suppose you mean 
a TM completely crashes and even in that case another TM would spin up and it 
“should” pick up from checkpoint. We are running YARN but I would assume TM 
recovery would be possible in any other cluster. I havent tested this 
specifically during init phase but we have killed TMs during normal processing 
as test case in stateful processing and dont remember seeing an issue.


- Ashish

On Tuesday, July 24, 2018, 12:31 PM, Harshvardhan Agrawal 
 wrote:

What happens when one of your workers dies? Say the machine is dead is not 
recoverable. How do you recover from that situation? Will the pipeline die and 
you go over the entire bootstrap process?
On Tue, Jul 24, 2018 at 11:56 ashish pok  wrote:

BTW, 
We got around bootstrap problem for similar use case using a “nohup” topic as 
input stream. Our CICD pipeline currently passes an initialize option to app IF 
there is a need to bootstrap and waits for X minutes before taking a savepoint 
and restart app normally listening to right topic(s). I believe there is work 
underway to handle this gracefully using Side Input as well. Other than 
determining X minutes for initialization to complete, we havent had any issue 
with this solution - we have over 40 million states refreshes daily and close 
to 200Mbps input streams being joined to states.
Hope this helps!


- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy  
wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not 
allow you to pause a source (the positions), so you can't fully consume the and 
preload the accounts or products to perform the join before the positions start 
flowing.  Additionally, Flink SQL does not support materializing an upset table 
for the accounts or products to perform the join, so yo have to develop your 
own KeyedProcessFunction, maintain the state, and perform the join on your own 
if you only want to join against the latest value for each key.
On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann  wrote:

Yes, using Kafka which you initialize with the initial values and then feed 
changes to the Kafka topic from which you consume could be a solution.
On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal 
 wrote:

Hi Till,
How would we do the initial hydration of the Product and Account data since 
it’s currently in a relational DB? Do we have to copy over data to Kafka and 
then use them? 
Regards,Harsh
On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:

Hi Harshvardhan,
I agree with Ankit that this problem could actually be solved quite elegantly 
with Flink's state. If you can ingest the product/account information changes 
as a stream, you can keep the latest version of it in Flink state by using a 
co-map function [1, 2]. One input of the co-map function would be the 
product/account update stream which updates the respective entries in Flink's 
state and the other input stream is the one to be enriched. When receiving 
input from this stream one would lookup the latest information contained in the 
operator's state and join it with the incoming event.
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
Cheers,Till
On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal 
 wrote:

Hi,
Thanks for your responses.
There is no fixed interval for the data being updated. It’s more like whenever 
you onboard a new product or there are any mandates that change will trigger 
the reference data to change.
It’s not just the enrichment we are doing here. Once we have enriched the data 
we will be performing a bunch of aggregations using the enriched data. 
Which approach would you recommend?
Regards,Harshvardhan
On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:


How often is the product db updated? Based on that you can store product 
metadata as state in Flink, maybe setup the state on cluster startup and then 
update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on 
top of Kafka. As Jorn said below, you can very well store all the events in an 
external store and then periodically run a cron to enrich later since your 
processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke 
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal 
Cc: 
Subject: Re: Implement Joins with Lookup Data

 

For the first one (lookup of single entries) you could use a NoSQL db (eg key 
value store) - a relational database will not scale.

 

Depending on when you need to do the enrichment you could also first store the 
data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal  
wrote:


Hi,

 

We are using Flink for financial data enrichment and aggregations. We have 
Positions data t

Re: Implement Joins with Lookup Data

2018-07-24 Thread ashish pok
BTW, 
We got around bootstrap problem for similar use case using a “nohup” topic as 
input stream. Our CICD pipeline currently passes an initialize option to app IF 
there is a need to bootstrap and waits for X minutes before taking a savepoint 
and restart app normally listening to right topic(s). I believe there is work 
underway to handle this gracefully using Side Input as well. Other than 
determining X minutes for initialization to complete, we havent had any issue 
with this solution - we have over 40 million states refreshes daily and close 
to 200Mbps input streams being joined to states.
Hope this helps!


- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy  
wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not 
allow you to pause a source (the positions), so you can't fully consume the and 
preload the accounts or products to perform the join before the positions start 
flowing.  Additionally, Flink SQL does not support materializing an upset table 
for the accounts or products to perform the join, so yo have to develop your 
own KeyedProcessFunction, maintain the state, and perform the join on your own 
if you only want to join against the latest value for each key.
On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann  wrote:

Yes, using Kafka which you initialize with the initial values and then feed 
changes to the Kafka topic from which you consume could be a solution.
On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal 
 wrote:

Hi Till,
How would we do the initial hydration of the Product and Account data since 
it’s currently in a relational DB? Do we have to copy over data to Kafka and 
then use them? 
Regards,Harsh
On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:

Hi Harshvardhan,
I agree with Ankit that this problem could actually be solved quite elegantly 
with Flink's state. If you can ingest the product/account information changes 
as a stream, you can keep the latest version of it in Flink state by using a 
co-map function [1, 2]. One input of the co-map function would be the 
product/account update stream which updates the respective entries in Flink's 
state and the other input stream is the one to be enriched. When receiving 
input from this stream one would lookup the latest information contained in the 
operator's state and join it with the incoming event.
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
Cheers,Till
On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal 
 wrote:

Hi,
Thanks for your responses.
There is no fixed interval for the data being updated. It’s more like whenever 
you onboard a new product or there are any mandates that change will trigger 
the reference data to change.
It’s not just the enrichment we are doing here. Once we have enriched the data 
we will be performing a bunch of aggregations using the enriched data. 
Which approach would you recommend?
Regards,Harshvardhan
On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:


How often is the product db updated? Based on that you can store product 
metadata as state in Flink, maybe setup the state on cluster startup and then 
update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on 
top of Kafka. As Jorn said below, you can very well store all the events in an 
external store and then periodically run a cron to enrich later since your 
processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke 
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal 
Cc: 
Subject: Re: Implement Joins with Lookup Data

 

For the first one (lookup of single entries) you could use a NoSQL db (eg key 
value store) - a relational database will not scale.

 

Depending on when you need to do the enrichment you could also first store the 
data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal  
wrote:


Hi,

 

We are using Flink for financial data enrichment and aggregations. We have 
Positions data that we are currently receiving from Kafka. We want to enrich 
that data with reference data like Product and Account information that is 
present in a relational database. From my understanding of Flink so far I think 
there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain 
Tuple2

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the 
solution is very chatty. Its hard to scale this cos t

Re: Permissions to delete Checkpoint on cancel

2018-07-23 Thread ashish pok
Stefan, 
Can’t thank you enough for this write-up. This is awesome explanation. I had 
misunderstood concepts of RocksDB working directory and Checkpoint FS. My main 
intent is to boost performance of RocksDB with SSD available locally. Recovery 
time from HDFS is not much of a concern but load on HDFS “may” be a concern in 
future - we will see.
Going over the documentation again after reading your email, it looks like what 
I intended to do was change my RocksDB working directory to local SSD, which I 
believe is Java IO Tmp dir by default, by using 
state.backend.rocksdb.checkpointdir option first and perform any tuning 
necessary to optimize SSD. 
Thanks,

- Ashish

On Monday, July 23, 2018, 10:39 AM, Stefan Richter 
 wrote:

Hi,
ok, let me briefly explain the differences between local working director, 
checkpoint directory, and savepoint directory and also outline their best 
practises/requirements/tradeoffs. First easy comment is that typically 
checkpoints and savepoints have similar requirements and most users write them 
to the same fs. The working directory, i.e. the directory for spilling or where 
RocksDB operates is transient, it does not require replication because it is 
not part of the fault tolerance strategy. Here the main concern is speed and 
that is why it is ideally a local, physically attached disk on the TM machine.
In contrast to that, checkpoints and savepoints are part of the fault tolerance 
strategy and that is why they typically should be on fault tolerant file 
systems. In database terms, think of checkpoints as a recovery mechanism and 
savepoints as backups. As we usually want to survive node failures, those file 
systems should be fault tolerant/replicated, and also accessible for read/write 
from all TMs and the JM. TMs obviously need to write the data, and read in 
recovery. Under node failures, this means that a TM might have to read state 
that was written on a different machine, that is why TMs should be able to 
access the files written by other TMs. The JM is responsible for deleting 
checkpoints, because TMs might go down and that is why the JM needs access as 
well.
Those requirements typically hold for most Flink users. However, you might get 
away with certain particular trade-offs. You can write checkpoints to local 
disk if:
- Everything runs on one machine, or
- (not sure somebody ever did this, but it could work)1) You will do the 
cleanup of old checkpoints manually (because JM cannot reach them), e.g. with 
scripts and2) You will never try to rescale from a checkpoint and 3) Tasks will 
never migrate to a different machine. You ignore node/disk/etc failures, and 
ensure that your job „owns" the cluster with no other jobs running in parallel. 
This means accepting data loss in the previous cases.
Typically, it should be ok to use a dfs only for checkpoints and savepoints, 
the local working directories should not go to dfs or else things will slow 
down dramatically. If you are just worried about recovery times, you might want 
to take a look at the local recovery feature [1], that keeps a secondary copy 
of the state on local disk for faster restores, but still ensures fault 
tolerance with a primary copy in dfs.
Best,Stefan
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#task-local-recovery


Am 23.07.2018 um 14:18 schrieb ashish pok :
Sorry,
Just a follow-up. In absence of NAS then the best option to go with here is 
checkpoint and savepoints both on HDFS and StateBackend using local SSDs then?
We were trying to not even hit HDFS other than for savepoints.


- Ashish

On Monday, July 23, 2018, 7:45 AM, ashish pok  wrote:

Stefan,
I did have first point at the back of my mind. I was under the impression 
though for checkpoints, cleanup would be done by TMs as they are being taken by 
TMs.
So for a standalone cluster with its own zookeeper for JM high availability, a 
NAS is a must have? We were going to go with local checkpoints with access to 
remote HDFS for savepoints. This sounds like it will be a bad idea then. 
Unfortunately we can’t run on YARN and NAS is also a no-no in one of our 
datacenters - there is a mountain of security complainace to climb before we 
will in Production if we need to go that route.
Thanks, Ashish


On Monday, July 23, 2018, 5:10 AM, Stefan Richter  
wrote:

Hi,

I am wondering how this can even work properly if you are using a local fs for 
checkpoints instead of a distributed fs. First, what happens under node 
failures, if the SSD becomes unavailable or if a task gets scheduled to a 
different machine, and can no longer access the disk with the  corresponding 
state data, or if you want to scale-out. Second, the same problem is also what 
you can observe with the job manager: how could the checkpoint coordinator, 
that runs on the JM, access a file on a local FS on a different node to cleanup 
the checkpoint data? The purpose of using a distributed fs here is that a

Re: Permissions to delete Checkpoint on cancel

2018-07-23 Thread ashish pok
Sorry,
Just a follow-up. In absence of NAS then the best option to go with here is 
checkpoint and savepoints both on HDFS and StateBackend using local SSDs then?
We were trying to not even hit HDFS other than for savepoints.


- Ashish

On Monday, July 23, 2018, 7:45 AM, ashish pok  wrote:

Stefan,
I did have first point at the back of my mind. I was under the impression 
though for checkpoints, cleanup would be done by TMs as they are being taken by 
TMs.
So for a standalone cluster with its own zookeeper for JM high availability, a 
NAS is a must have? We were going to go with local checkpoints with access to 
remote HDFS for savepoints. This sounds like it will be a bad idea then. 
Unfortunately we can’t run on YARN and NAS is also a no-no in one of our 
datacenters - there is a mountain of security complainace to climb before we 
will in Production if we need to go that route.
Thanks, Ashish


On Monday, July 23, 2018, 5:10 AM, Stefan Richter  
wrote:

Hi,

I am wondering how this can even work properly if you are using a local fs for 
checkpoints instead of a distributed fs. First, what happens under node 
failures, if the SSD becomes unavailable or if a task gets scheduled to a 
different machine, and can no longer access the disk with the  corresponding 
state data, or if you want to scale-out. Second, the same problem is also what 
you can observe with the job manager: how could the checkpoint coordinator, 
that runs on the JM, access a file on a local FS on a different node to cleanup 
the checkpoint data? The purpose of using a distributed fs here is that all TM 
and the JM can access the checkpoint files.

Best,
Stefan

> Am 22.07.2018 um 19:03 schrieb Ashish Pokharel :
> 
> All,
> 
> We recently moved our Checkpoint directory from HDFS to local SSDs mounted on 
> Data Nodes (we were starting to see perf impacts on checkpoints etc as 
> complex ML apps were spinning up more and more in YARN). This worked great 
> other than the fact that when jobs are being canceled or canceled with 
> Savepoint, local data is not being cleaned up. In HDFS, Checkpoint 
> directories were cleaned up on Cancel and Cancel with Savepoints as far as I 
> can remember. I am wondering if it is permissions issue. Local disks have RWX 
> permissions for both yarn and flink headless users (flink headless user 
> submits the apps to YARN using our CICD pipeline). 
> 
> Appreciate any pointers on this.
> 
> Thanks, Ashish








Re: Permissions to delete Checkpoint on cancel

2018-07-23 Thread ashish pok
Stefan,
I did have first point at the back of my mind. I was under the impression 
though for checkpoints, cleanup would be done by TMs as they are being taken by 
TMs.
So for a standalone cluster with its own zookeeper for JM high availability, a 
NAS is a must have? We were going to go with local checkpoints with access to 
remote HDFS for savepoints. This sounds like it will be a bad idea then. 
Unfortunately we can’t run on YARN and NAS is also a no-no in one of our 
datacenters - there is a mountain of security complainace to climb before we 
will in Production if we need to go that route.
Thanks, Ashish


On Monday, July 23, 2018, 5:10 AM, Stefan Richter  
wrote:

Hi,

I am wondering how this can even work properly if you are using a local fs for 
checkpoints instead of a distributed fs. First, what happens under node 
failures, if the SSD becomes unavailable or if a task gets scheduled to a 
different machine, and can no longer access the disk with the  corresponding 
state data, or if you want to scale-out. Second, the same problem is also what 
you can observe with the job manager: how could the checkpoint coordinator, 
that runs on the JM, access a file on a local FS on a different node to cleanup 
the checkpoint data? The purpose of using a distributed fs here is that all TM 
and the JM can access the checkpoint files.

Best,
Stefan

> Am 22.07.2018 um 19:03 schrieb Ashish Pokharel :
> 
> All,
> 
> We recently moved our Checkpoint directory from HDFS to local SSDs mounted on 
> Data Nodes (we were starting to see perf impacts on checkpoints etc as 
> complex ML apps were spinning up more and more in YARN). This worked great 
> other than the fact that when jobs are being canceled or canceled with 
> Savepoint, local data is not being cleaned up. In HDFS, Checkpoint 
> directories were cleaned up on Cancel and Cancel with Savepoints as far as I 
> can remember. I am wondering if it is permissions issue. Local disks have RWX 
> permissions for both yarn and flink headless users (flink headless user 
> submits the apps to YARN using our CICD pipeline). 
> 
> Appreciate any pointers on this.
> 
> Thanks, Ashish





Re: Multi-tenancy environment with mutual auth

2018-07-16 Thread ashish pok
Nico,
This is great news. This is exactly what we are looking for.


- Ashish

On Monday, July 16, 2018, 8:28 AM, Nico Kruber  wrote:

Hi Ashish,
this was just merged today for Flink 1.6.
Please have a look at https://github.com/apache/flink/pull/6326 to check
whether this fulfils your needs.


Nico

On 14/07/18 14:02, ashish pok wrote:
> All,
> 
> We are running into a blocking production deployment issue. It looks
> like Flink inter-communications doesnt support SSL mutual auth. Any
> plans/ways to support it? We are going to have to create DMZ for each
> tenant without that, not preferable of course.
> 
> 
> - Ashish

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen





Multi-tenancy environment with mutual auth

2018-07-14 Thread ashish pok
All,
We are running into a blocking production deployment issue. It looks like Flink 
inter-communications doesnt support SSL mutual auth. Any plans/ways to support 
it? We are going to have to create DMZ for each tenant without that, not 
preferable of course.


- Ashish

Re: Flink Kafka TimeoutException

2018-07-05 Thread ashish pok
Our experience on this has been that if Kafka cluster is healthy, JVM resource 
contentions on our Flink app caused by high heap utilization and there by lost 
CPU cycles on GC also did result in this issue. Getting basic JVM metrics like 
CPU load, GC times and Heap Util from your app (we use Graphite reporter) might 
 help point out if you have same issue.


- Ashish

On Thursday, July 5, 2018, 11:46 AM, Ted Yu  wrote:

Have you tried increasing the request.timeout.ms parameter (Kafka) ?
Which Flink / Kafka release are you using ?
Cheers
On Thu, Jul 5, 2018 at 5:39 AM Amol S - iProgrammer  
wrote:

Hello,

I am using flink with kafka and getting below exception.

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
helloworld.t-7: 30525 ms has passed since last append

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 







Re: Memory Leak in ProcessingTimeSessionWindow

2018-07-02 Thread ashish pok
 All,
I have been doing a little digging on this and to Stefan's point incrementing 
memory (not necessarily leak) was essentially because of keys that were 
incrementing as I was using time buckets concatenated with actual key to make 
unique sessions.
Taking a couple of steps back, use case is very simple tumbling window of 15 
mins by keys. Stream can be viewed simply as:
||
We have a few of these type of pipelines and one catch here is we wanted to 
create an app which can process historical and current data. HIstorical data is 
mainly because users adhoc request for "backfill". In order to easily manage 
processing pipeline, we are making no distinction between historical and 
current data as processing is based on event time. 
1) Of course, easiest way to solve this problem is to create TumblingWindow of 
15mins with some allowed lateness. One issue here was watermarks are moved 
forward and backfill data appeared to be viewed as late arrival data, which is 
a correct behavior from Flink perspective but seems to be causing issues in how 
we are trying to handle streams.
2) Another issue is our data collectors are highly distributed - we regularly 
get data from later event time buckets faster than older buckets. Also, it is 
also more consistent to actually create 15min buckets using concept of Session 
instead. So I am creating a key with | and a 
session gap of say 10 mins. This works perfectly from business logic 
perspective. However, now I am introducing quite a lot of keys which based on 
my heap dumps seem to be hanging around causing memory issues.
3) We converted the apps to a Process function and manage all states using key 
defined in step (2) and registering/unregistering timeouts. 
Solution (3) seems to be working pretty stable from memory perspective. 
However, it just feels like with so much high-level APIs available, we are not 
using them properly and keep reverting back to low level Process APIs - in the 
last month we have migrated about 5 or 6 apps to Process now :) 
For solution (2) it feels like any other Session aggregation use case will have 
the issue of keys hanging around (eg: for click streams with user sessions 
etc). Isn't there a way to clear those session windows? Sorry, I just feel like 
we are missing something simple and have been reverting to low level APIs 
instead.
Thanks,

On Friday, June 22, 2018, 9:00:14 AM EDT, ashish pok  
wrote:  
 
 Stefan, All, 
If there are no further thoughts on this I am going to switch my app to low 
level Process API. I still think there is an easier solution here which I am 
missing but I will revisit that after I fix Production issue.
Thanks, Ashish



On Thursday, June 21, 2018, 7:28 AM, ashish pok  wrote:

Hi Stefan, 
Thanks for outlining the steps and are similar to what we have been doing for 
OOM issues.
However, I was looking for something more high level on whether state / key 
handling needs some sort of cleanup specifically that is not done by default. I 
am about 99% (nothing is certain:)) sure that if I switch this app to a lower 
lever API like Process Function and manage my own state and timers, I will not 
see this issue. When I had same issue in the past it was for Global Window and 
Fabian point d out that new keys are constantly being created. I built a simple 
Process Function for that and issue went away. I think your first statement 
sort of hints that as well. So let me hone in on that. I am processing a time 
series data for network elements. Keys are 10 mins floor of event time concat 
with element ID. Idea here was to create 10 min buckets of data with windows 
that start with first event in that bucket and fire when no events arrive for 
12 or so minutes.So new keys are definitely being created. So,
1- Am I adding to the memory constantly by doing that? Sounds like it based on 
your comments.2- If so, whats the way to clear those keys when windows fire if 
any?3- It seems like a very simple use case, so now I am wondering if I am even 
using the right high level API?
Thanks, Ashish


Sent from Yahoo Mail for iPhone


On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter 
 wrote:

Hi,
it is possible that the number of processing time timers can grow, because 
internal timers are scoped by time, key, and namespace (typically this means 
„window“, because each key can be part of multiple windows). So if the number 
of keys in your application is steadily growing this can happen. 
To analyse the heap dump, I usually take the following approach:- obviously 
include only reachable objects. If dumps are very big, try limit the size or to 
trigger the OOM earlier by configuring a lower heap size. It should still give 
you the problematic object accumulation, if there is one.- like at the 
statistics of „heavy hitter“ classes, i.e. classes for which the instances 
contribute the most to the overall heap consumption. Sometimes this will show 
you classes that are also part of classes that

Re: How to partition within same physical node in Flink

2018-07-02 Thread ashish pok
 Thanks Fabian! It sounds like KeyGroup will do the trick if that can be made 
publicly accessible.
On Monday, July 2, 2018, 5:43:33 AM EDT, Fabian Hueske  
wrote:  
 
 Hi Ashish, hi Vijay,
Flink does not distinguish between different parts of a key (parent, child) in 
the public APIs. However, there is an internal concept of KeyGroups which is 
similar to what you call physical partitioning. A KeyGroup is a group of keys 
that are always processed on the same physical node. The motivation for this 
feature is operator scaling because all keys of a group are always processed by 
the same node and hence their state is always distributed together. However, 
AFAIK, KeyGroups are not exposed to the user API. Moreover, KeyGroups are 
distributed to slots, i.e., each KeyGroup is processed by a single slot, but 
each slot might processes multiple key groups. This distribution is done with 
hash partitioning and hence hard to tune.

There might be a way to tweak this by implementing an own low-level operator 
but I'm not sure. Stefan (in CC) might be able  to give some hints.
Best, Fabian

2018-06-29 18:35 GMT+02:00 Vijay Balakrishnan :

Thanks for the clarification, Fabian.This is what I compromised on for my 
use-case-doesn't exactly do what I intended to do.Partition by a key, and then 
spawn threads inside that partition to do my task and then finally repartition 
again(for a subsequent connect).
DataStream keyedByCamCameraStream = env            
.addSource(new Source())            .keyBy((cameraWithCube) -> 
cameraWithCube.getCam() );AsyncFunction 
cameraWithCubeAsyncFunction =                new SampleAsyncFunction(, 
nThreads);//spawn threads here with the second key ts here        
DataStream cameraWithCubeDataStreamAsync =                
AsyncDataStream.orderedWait( keyedByCamCameraStream, 
cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nThreads)          
              .setParallelism( parallelCamTasks);//capacity= max # of inflight 
requests - how much; timeout - max time until considered failed                 
               DataStream cameraWithCubeDataStream = 
cameraWithCubeDataStreamAsync. keyBy((cameraWithCube) -> 
cameraWithCube.getTs());  

On Thu, Jun 28, 2018 at 9:22 AM ashish pok  wrote:

Fabian, All,
Along this same line, we have a datasource where we have parent key and child 
key. We need to first keyBy parent and then by child. If we want to have 
physical partitioning in a way where physical partiotioning happens first by 
parent key and localize grouping by child key, is there a need to using custom 
partitioner? Obviously we can keyBy twice but was wondering if we can minimize 
the re-partition stress.
Thanks,
Ashish


- Ashish

On Thursday, June 28, 2018, 9:02 AM, Fabian Hueske  wrote:

Hi Vijay,
Flink does not provide fine-grained control to place keys to certain slots or 
machines. 
When specifying a key, it is up to Flink (i.e., its internal hash function) 
where the data is processed. This works well for large key spaces, but can be 
difficult if you have only a few keys.
So, even if you keyBy(cam) and handle the parallelization of seq# internally 
(which I would not recommend), it might still happen that the data of two 
cameras is processed on the same slot.The only way to change that would be to 
fiddle with the hash of your keys, but this might give you a completely 
different distribution when scaling out the application at a later point in 
time.
Best, Fabian


2018-06-26 19:54 GMT+02:00 Vijay Balakrishnan :



Hi Fabian,Thanks once again for your reply. I need to get the data from each 
cam/camera into 1 partition/slot and not move the gigantic video data around as 
much as I perform other operations on it. For eg, I can get seq#1 and seq#2 for 
cam1 in cam1 partition/slot and then combine, split,parse, stitch etc. 
operations on it in multiple threads within the same cam1 partition.
I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in 1 
partition(eg: cam1). The idea is to then work within the cam1 partition with 
various seq#'s 1,2 etc on various threads within the same slot/partition of 
TaskManager.
The data is stored in EFS keyed based on seq#/cam# folder structure.
Our actual problem is managing network bandwidth as a resource in each 
partition. We want to make sure that the processing of 1 camera(split into 
multiple seq# tasks) is not running on the same node as the processing of 
another camera as in that case, the required network bandwidth for storing the 
output of the process running in the partition would exceed the network 
bandwidth of the hardware. Camera processing is expected to run on the same 
hardware as the video decode step which is an earlier sequential process in the 
same Dataflow pipeline.
I guess I might have to use a ThreadPool within each Slot(cam partition) to 
work on each seq# ??
TIA



On Tue, Jun 26, 2018 at 1:06 AM Fabian Hueske  wrote:





Hi,
keyBy() does no

Re: How to partition within same physical node in Flink

2018-06-28 Thread ashish pok
Fabian, All,
Along this same line, we have a datasource where we have parent key and child 
key. We need to first keyBy parent and then by child. If we want to have 
physical partitioning in a way where physical partiotioning happens first by 
parent key and localize grouping by child key, is there a need to using custom 
partitioner? Obviously we can keyBy twice but was wondering if we can minimize 
the re-partition stress.
Thanks,
Ashish


- Ashish

On Thursday, June 28, 2018, 9:02 AM, Fabian Hueske  wrote:

Hi Vijay,
Flink does not provide fine-grained control to place keys to certain slots or 
machines. 
When specifying a key, it is up to Flink (i.e., its internal hash function) 
where the data is processed. This works well for large key spaces, but can be 
difficult if you have only a few keys.
So, even if you keyBy(cam) and handle the parallelization of seq# internally 
(which I would not recommend), it might still happen that the data of two 
cameras is processed on the same slot.The only way to change that would be to 
fiddle with the hash of your keys, but this might give you a completely 
different distribution when scaling out the application at a later point in 
time.
Best, Fabian

2018-06-26 19:54 GMT+02:00 Vijay Balakrishnan :

Hi Fabian,Thanks once again for your reply. I need to get the data from each 
cam/camera into 1 partition/slot and not move the gigantic video data around as 
much as I perform other operations on it. For eg, I can get seq#1 and seq#2 for 
cam1 in cam1 partition/slot and then combine, split,parse, stitch etc. 
operations on it in multiple threads within the same cam1 partition.
I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in 1 
partition(eg: cam1). The idea is to then work within the cam1 partition with 
various seq#'s 1,2 etc on various threads within the same slot/partition of 
TaskManager.
The data is stored in EFS keyed based on seq#/cam# folder structure.
Our actual problem is managing network bandwidth as a resource in each 
partition. We want to make sure that the processing of 1 camera(split into 
multiple seq# tasks) is not running on the same node as the processing of 
another camera as in that case, the required network bandwidth for storing the 
output of the process running in the partition would exceed the network 
bandwidth of the hardware. Camera processing is expected to run on the same 
hardware as the video decode step which is an earlier sequential process in the 
same Dataflow pipeline.
I guess I might have to use a ThreadPool within each Slot(cam partition) to 
work on each seq# ??
TIA
On Tue, Jun 26, 2018 at 1:06 AM Fabian Hueske  wrote:

Hi,
keyBy() does not work hierarchically. Each keyBy() overrides the previous 
partitioning.You can keyBy(cam, seq#) which guarantees that all records with 
the same (cam, seq#) are processed by the same parallel instance.However, Flink 
does not give any guarantees about how the (cam, seq#) partitions are 
distributed across slots (or even physical nodes).
Btw. why is it important that all records of the same cam are processed by the 
same physical node?
Fabian

2018-06-25 21:36 GMT+02:00 Vijay Balakrishnan :

I see a .slotSharingGroup for SingleOutputStreamOperator   which can put 
parallel instances of operations in same TM slot.I also see a CoLocationGroup 
but do not see a .coLocationGroup for  SingleOutputStreamOperator to put a task 
on the same slot.Seems CoLocationGroup is defined at JobVertex level and has 
nothing to do with for  SingleOutputStreamOperator.TaskManager has many slots. 
Slots have many threads within it.I want to be able to put the cam1 
partition(keyBy(cam) in 1 slot and then use a keyBy(seq#) to run on many 
threads within that cam1 slot.
Vijay
On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan  wrote:

Thanks, Fabian.Been reading your excellent book on Flink Streaming.Can't wait 
for more chapters.Attached a pic.


I have records with seq# 1 and cam1 and cam2. I also have records with varying 
seq#'s.By partitioning on cam field first(keyBy(cam)), I can get cam1 partition 
on the same task manager instance/slot/vCore(???)Can I then have seq# 1 and 
seq# 2 for cam1 partition run in different slots/threads on the same Task 
Manager instance(aka cam1 partition) using keyBy(seq#) & setParallelism() ? Can 
forward Strategy be used to achieve this ?
TIA

On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske  wrote:

Hi,
Flink distributes task instances to slots and does not expose physical 
machines.Records are partitioned to task instances by hash partitioning. It is 
also not possible to guarantee that the records in two different operators are 
send to the same slot.Sharing information by side-passing it (e.g., via a file 
on a machine or in a static object) is an anti-pattern and should be avoided.
Best, Fabian

2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan :


Hi,

Need to partition by cameraWithCube.getCam() 1st using parallelCamTasks(passed 
in as args).

Then wit

Re: Memory Leak in ProcessingTimeSessionWindow

2018-06-22 Thread ashish pok
Stefan, All, 
If there are no further thoughts on this I am going to switch my app to low 
level Process API. I still think there is an easier solution here which I am 
missing but I will revisit that after I fix Production issue.
Thanks, Ashish


Sent from Yahoo Mail for iPhone


On Thursday, June 21, 2018, 7:28 AM, ashish pok  wrote:

Hi Stefan, 
Thanks for outlining the steps and are similar to what we have been doing for 
OOM issues.
However, I was looking for something more high level on whether state / key 
handling needs some sort of cleanup specifically that is not done by default. I 
am about 99% (nothing is certain:)) sure that if I switch this app to a lower 
lever API like Process Function and manage my own state and timers, I will not 
see this issue. When I had same issue in the past it was for Global Window and 
Fabian point d out that new keys are constantly being created. I built a simple 
Process Function for that and issue went away. I think your first statement 
sort of hints that as well. So let me hone in on that. I am processing a time 
series data for network elements. Keys are 10 mins floor of event time concat 
with element ID. Idea here was to create 10 min buckets of data with windows 
that start with first event in that bucket and fire when no events arrive for 
12 or so minutes.So new keys are definitely being created. So,
1- Am I adding to the memory constantly by doing that? Sounds like it based on 
your comments.2- If so, whats the way to clear those keys when windows fire if 
any?3- It seems like a very simple use case, so now I am wondering if I am even 
using the right high level API?
Thanks, Ashish


Sent from Yahoo Mail for iPhone


On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter 
 wrote:

Hi,
it is possible that the number of processing time timers can grow, because 
internal timers are scoped by time, key, and namespace (typically this means 
„window“, because each key can be part of multiple windows). So if the number 
of keys in your application is steadily growing this can happen. 
To analyse the heap dump, I usually take the following approach:- obviously 
include only reachable objects. If dumps are very big, try limit the size or to 
trigger the OOM earlier by configuring a lower heap size. It should still give 
you the problematic object accumulation, if there is one.- like at the 
statistics of „heavy hitter“ classes, i.e. classes for which the instances 
contribute the most to the overall heap consumption. Sometimes this will show 
you classes that are also part of classes that rank higher up, e.g. 1st place 
could be string, and second place char[]. But you can figure that out in the 
next step.- explore the instances of the top heavy hitter class(es). If there 
is a leak, if you just randomly sample into some objects, the likelihood is 
usually *very* high that you catch an object that is part of the leak (as 
determined in the next step). Otherwise just repeat and sample another object.- 
inspect the object instance and follow the reference links to the parent 
objects in the object graph that hold a reference to the leak object candidate. 
You will typically end up in some array where the leak accumulates. Inspect the 
object holding references to the leaking objects. You can see the field values 
and this can help to determine if the collection of objects is justified or if 
data is actually leaking. So in your case, you can start from some 
InternalTimer or Window object, backwards through the reference chain to see 
what class is holding onto them and why (e.g. should they already be gone 
w.r.t. to their timestamp). Walking through the references should be supported 
by all major heap analysis tools, including JVisualVM that comes with your JDK. 
You can also use OQL[1] to query for timers or windows that should already be 
gone.
Overall I think it could at least be helpful to see the statistics for heavy 
hitter classes and screenshots of representative reference chains to objects to 
figure out the problem cause. If it is not possible to share heap dumps, 
unfortunately I think giving you this strategy is currently the best I can 
offer to help. 
Best,Stefan


[1] https://blogs.oracle.com/sundararajan/querying-java-heap-with-oql

Am 20.06.2018 um 02:33 schrieb ashish pok :
 All, 
I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, 
they are 5GB to 8GB. I did some compares and what I can see is heap shows data 
tuples (basically instances of object that is maintained as states) counts 
going up slowly. 
Only thing I could possibly relate that to were 
streaming.api.operators.InternalTimer and 
streaming.api.windowing.windows.TimeWindow both were trending up as well. There 
are definitely lot more windows created than the increments I could notice but 
nevertheless those objects are trending up. Input stream has a very consistent 
sin wave throughput. So it really doesn't make sense for windows and tuples to 

Re: Memory Leak in ProcessingTimeSessionWindow

2018-06-21 Thread ashish pok
Hi Stefan, 
Thanks for outlining the steps and are similar to what we have been doing for 
OOM issues.
However, I was looking for something more high level on whether state / key 
handling needs some sort of cleanup specifically that is not done by default. I 
am about 99% (nothing is certain:)) sure that if I switch this app to a lower 
lever API like Process Function and manage my own state and timers, I will not 
see this issue. When I had same issue in the past it was for Global Window and 
Fabian point d out that new keys are constantly being created. I built a simple 
Process Function for that and issue went away. I think your first statement 
sort of hints that as well. So let me hone in on that. I am processing a time 
series data for network elements. Keys are 10 mins floor of event time concat 
with element ID. Idea here was to create 10 min buckets of data with windows 
that start with first event in that bucket and fire when no events arrive for 
12 or so minutes.So new keys are definitely being created. So,
1- Am I adding to the memory constantly by doing that? Sounds like it based on 
your comments.2- If so, whats the way to clear those keys when windows fire if 
any?3- It seems like a very simple use case, so now I am wondering if I am even 
using the right high level API?
Thanks, Ashish


Sent from Yahoo Mail for iPhone


On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter 
 wrote:

Hi,
it is possible that the number of processing time timers can grow, because 
internal timers are scoped by time, key, and namespace (typically this means 
„window“, because each key can be part of multiple windows). So if the number 
of keys in your application is steadily growing this can happen. 
To analyse the heap dump, I usually take the following approach:- obviously 
include only reachable objects. If dumps are very big, try limit the size or to 
trigger the OOM earlier by configuring a lower heap size. It should still give 
you the problematic object accumulation, if there is one.- like at the 
statistics of „heavy hitter“ classes, i.e. classes for which the instances 
contribute the most to the overall heap consumption. Sometimes this will show 
you classes that are also part of classes that rank higher up, e.g. 1st place 
could be string, and second place char[]. But you can figure that out in the 
next step.- explore the instances of the top heavy hitter class(es). If there 
is a leak, if you just randomly sample into some objects, the likelihood is 
usually *very* high that you catch an object that is part of the leak (as 
determined in the next step). Otherwise just repeat and sample another object.- 
inspect the object instance and follow the reference links to the parent 
objects in the object graph that hold a reference to the leak object candidate. 
You will typically end up in some array where the leak accumulates. Inspect the 
object holding references to the leaking objects. You can see the field values 
and this can help to determine if the collection of objects is justified or if 
data is actually leaking. So in your case, you can start from some 
InternalTimer or Window object, backwards through the reference chain to see 
what class is holding onto them and why (e.g. should they already be gone 
w.r.t. to their timestamp). Walking through the references should be supported 
by all major heap analysis tools, including JVisualVM that comes with your JDK. 
You can also use OQL[1] to query for timers or windows that should already be 
gone.
Overall I think it could at least be helpful to see the statistics for heavy 
hitter classes and screenshots of representative reference chains to objects to 
figure out the problem cause. If it is not possible to share heap dumps, 
unfortunately I think giving you this strategy is currently the best I can 
offer to help. 
Best,Stefan


[1] https://blogs.oracle.com/sundararajan/querying-java-heap-with-oql

Am 20.06.2018 um 02:33 schrieb ashish pok :
 All, 
I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, 
they are 5GB to 8GB. I did some compares and what I can see is heap shows data 
tuples (basically instances of object that is maintained as states) counts 
going up slowly. 
Only thing I could possibly relate that to were 
streaming.api.operators.InternalTimer and 
streaming.api.windowing.windows.TimeWindow both were trending up as well. There 
are definitely lot more windows created than the increments I could notice but 
nevertheless those objects are trending up. Input stream has a very consistent 
sin wave throughput. So it really doesn't make sense for windows and tuples to 
keep trending up. There is also no event storm or anything of that sort (ie. 
source stream has been very steady as far as throughput is concerned).
Here is a plot of heap utilization:
<1529454480422blob.jpg>
So it has a typical sin wave pattern which is definitely expected as input 
stream has the same pattern but source doesnt h

Re: Memory Leak in ProcessingTimeSessionWindow

2018-06-19 Thread ashish pok
 All, 
I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, 
they are 5GB to 8GB. I did some compares and what I can see is heap shows data 
tuples (basically instances of object that is maintained as states) counts 
going up slowly. 
Only thing I could possibly relate that to were 
streaming.api.operators.InternalTimer and 
streaming.api.windowing.windows.TimeWindow both were trending up as well. There 
are definitely lot more windows created than the increments I could notice but 
nevertheless those objects are trending up. Input stream has a very consistent 
sin wave throughput. So it really doesn't make sense for windows and tuples to 
keep trending up. There is also no event storm or anything of that sort (ie. 
source stream has been very steady as far as throughput is concerned).
Here is a plot of heap utilization:

So it has a typical sin wave pattern which is definitely expected as input 
stream has the same pattern but source doesnt have a trend upwards like heap 
utilization shown above. Screenshot above is showing spike from 60% utilization 
to 80% and trend keeps going up until an issue occurs that resets the app.
Since processing is based on ProcessingTime, I really would have expected 
memory to reach a steady state and remain sort of flat from a trending 
perspective. 
Appreciate any pointers anyone might have.
Thanks, Ashish
On Monday, June 18, 2018, 12:54:03 PM EDT, ashish pok  
wrote:  
 
 Right, thats where I am headed now but was wondering there are any “gochas” I 
am missing before I try and dig into a few gigs of heap dump. 

Thanks, Ashish

Sent from Yahoo Mail for iPhone


On Monday, June 18, 2018, 3:37 AM, Stefan Richter  
wrote:

Hi,
can you take a heap dump from a JVM that runs into the problem and share it 
with us? That would make finding the cause a lot easier.
Best,Stefan


Am 15.06.2018 um 23:01 schrieb ashish pok :
All,
I have another slow Memory Leak situation using basic TimeSession Window 
(earlier it was GlobalWindow related that Fabian helped clarify). 
I have a very simple data pipeline:
DataStream processedData = rawTuples 
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780
  .trigger(new ProcessingTimePurgeTrigger()) .apply(new IPSLAMetricWindowFn()) 
.name("windowFunctionTuple") .map(new TupleToPlatformEventMapFn()) 
.name("mapTupleEvent") ; 
I initially didnt even have ProcessingTmePurgeTrigger and it was using default 
Trigger. In an effort to fix this issue, I created my own Trigger from default 
ProcessingTimeTrigger with simple override to onProcessingTime method 
(essentially replacing FIRE with FIRE_AND_PURGE)
@Override public TriggerResult onProcessingTime(long time, 
TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; }
This seems to have done nothing (may have delayed issue by couple of hours - 
not certain). But, I still see heap utilization creep up slowly and eventually 
reaches a point when GC starts to take too long and then the dreaded OOM. 
For completeness here is my Window Function (still using old function 
interface). It creates few metrics for reporting and applies logic by looping 
over the Iterable. NO states are explicitly kept in this function, needed 
RichWindowFunction to generate metrics basically.


public class IPSLAMetricWindowFn extends RichWindowFunction {




 private static final long serialVersionUID = 1L;

 

 private static Logger logger = 
LoggerFactory.getLogger(IPSLAMetricWindowFn.class);

 

 private Meter in;

 

 private Meter out;




 private Meter error;

 

 @Override

 public void open(Configuration conf) throws Exception {

     this.in = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.IN, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

     this.out = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.OUT, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

     this.error = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.ERROR, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

 super.open(conf);

 }




 @Override

 public void apply(String key, TimeWindow window, Iterable 
events, Collector collector) throws Exception {

 }

}



Appreciate any pointers on what could be causing leaks here. This seems pretty 
straight-forward.
Thanks, Ashish





  

Re: Memory Leak in ProcessingTimeSessionWindow

2018-06-18 Thread ashish pok
Right, thats where I am headed now but was wondering there are any “gochas” I 
am missing before I try and dig into a few gigs of heap dump. 

Thanks, Ashish

Sent from Yahoo Mail for iPhone


On Monday, June 18, 2018, 3:37 AM, Stefan Richter  
wrote:

Hi,
can you take a heap dump from a JVM that runs into the problem and share it 
with us? That would make finding the cause a lot easier.
Best,Stefan


Am 15.06.2018 um 23:01 schrieb ashish pok :
All,
I have another slow Memory Leak situation using basic TimeSession Window 
(earlier it was GlobalWindow related that Fabian helped clarify). 
I have a very simple data pipeline:
DataStream processedData = rawTuples 
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780
  .trigger(new ProcessingTimePurgeTrigger()) .apply(new IPSLAMetricWindowFn()) 
.name("windowFunctionTuple") .map(new TupleToPlatformEventMapFn()) 
.name("mapTupleEvent") ; 
I initially didnt even have ProcessingTmePurgeTrigger and it was using default 
Trigger. In an effort to fix this issue, I created my own Trigger from default 
ProcessingTimeTrigger with simple override to onProcessingTime method 
(essentially replacing FIRE with FIRE_AND_PURGE)
@Override public TriggerResult onProcessingTime(long time, 
TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; }
This seems to have done nothing (may have delayed issue by couple of hours - 
not certain). But, I still see heap utilization creep up slowly and eventually 
reaches a point when GC starts to take too long and then the dreaded OOM. 
For completeness here is my Window Function (still using old function 
interface). It creates few metrics for reporting and applies logic by looping 
over the Iterable. NO states are explicitly kept in this function, needed 
RichWindowFunction to generate metrics basically.


public class IPSLAMetricWindowFn extends RichWindowFunction {




 private static final long serialVersionUID = 1L;

 

 private static Logger logger = 
LoggerFactory.getLogger(IPSLAMetricWindowFn.class);

 

 private Meter in;

 

 private Meter out;




 private Meter error;

 

 @Override

 public void open(Configuration conf) throws Exception {

     this.in = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.IN, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

     this.out = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.OUT, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

     this.error = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.ERROR, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

 super.open(conf);

 }




 @Override

 public void apply(String key, TimeWindow window, Iterable 
events, Collector collector) throws Exception {

 }

}



Appreciate any pointers on what could be causing leaks here. This seems pretty 
straight-forward.
Thanks, Ashish







Memory Leak in ProcessingTimeSessionWindow

2018-06-15 Thread ashish pok
All,
I have another slow Memory Leak situation using basic TimeSession Window 
(earlier it was GlobalWindow related that Fabian helped clarify). 
I have a very simple data pipeline:
DataStream processedData = rawTuples 
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780
  .trigger(new ProcessingTimePurgeTrigger()) .apply(new IPSLAMetricWindowFn()) 
.name("windowFunctionTuple") .map(new TupleToPlatformEventMapFn()) 
.name("mapTupleEvent") ; 
I initially didnt even have ProcessingTmePurgeTrigger and it was using default 
Trigger. In an effort to fix this issue, I created my own Trigger from default 
ProcessingTimeTrigger with simple override to onProcessingTime method 
(essentially replacing FIRE with FIRE_AND_PURGE)
@Override public TriggerResult onProcessingTime(long time, 
TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; }
This seems to have done nothing (may have delayed issue by couple of hours - 
not certain). But, I still see heap utilization creep up slowly and eventually 
reaches a point when GC starts to take too long and then the dreaded OOM. 
For completeness here is my Window Function (still using old function 
interface). It creates few metrics for reporting and applies logic by looping 
over the Iterable. NO states are explicitly kept in this function, needed 
RichWindowFunction to generate metrics basically.


public class IPSLAMetricWindowFn extends RichWindowFunction {




 private static final long serialVersionUID = 1L;

 

 private static Logger logger = 
LoggerFactory.getLogger(IPSLAMetricWindowFn.class);

 

 private Meter in;

 

 private Meter out;




 private Meter error;

 

 @Override

 public void open(Configuration conf) throws Exception {

     this.in = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.IN, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

     this.out = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.OUT, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

     this.error = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.ERROR, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

 super.open(conf);

 }




 @Override

 public void apply(String key, TimeWindow window, Iterable 
events, Collector collector) throws Exception {

 }

}



Appreciate any pointers on what could be causing leaks here. This seems pretty 
straight-forward.
Thanks, Ashish


IoT Use Case, Problem and Thoughts

2018-06-05 Thread ashish pok
Fabian, Stephan, All,
I started a discussion a while back around having a form of event-based 
checkpointing policy that will help us in some of our high volume data 
pipelines. Here is an effort to put this in front of community and understand 
what capabilities can support these type of use cases, how much others feel the 
same need and potentially a feature that can make it to a user story.
Use Case Summary:- Extremely high volume of data (events from consumer devices 
with customer base of over 100M)- Multiple events need to be combined using a 
windowing streaming app grouped by keys (something like 5 min floor of 
timestamp and unique identifiers for customer devices)- "Most" events by a 
group/key arrive in few seconds if not milliseconds however events can 
sometimes delay or get lost in transport (so delayed event handling and 
timeouts will be needed)- Extremely low (pretty vague but hopefully details 
below clarify it more) data loss is acceptable- Because of the volume and 
transient nature of source, checkpointing is turned off (saves on writes to 
persistence as states/sessions are active for only few seconds during 
processing)
Problem Summary:Of course, none of the above is out of the norm for Flink and 
as a matter of factor we already have a Flink app doing this. The issue arises 
when it comes to graceful shutdowns and on operator failures (eg: Kafka 
timeouts etc.) On operator failures, entire job graph restarts which 
essentially flushes out in-memory states/sessions. I think there is a feature 
in works (not sure if it made it to 1.5) to perform selective restarts which 
will control the damage but still will result in data loss. Also, it doesn't 
help when application restarts are needed. We did try going savepoint route for 
explicit restart needs but I think MemoryBackedState ran into issues for larger 
states or something along those line(not certain). We obviously cannot recover 
an operator that actually fails because it's own state could be unrecoverable. 
However, it feels like Flink already has a lot of plumbing to help with overall 
problem of allowing some sort of recoverable state to handle graceful shutdowns 
and restarts with minimal data loss.
Solutions:Some in community commented on my last email with decent ideas like 
having an event-based checkpointing trigger (on shutdown, on restart etc) or 
life-cycle hooks (onCancel, onRestart etc) in Functions that can be implemented 
if this type of behavior is needed etc. 
Appreciate feedback from community on how useful this might be for others and 
from core contributors on their thoughts as well.
Thanks in advance, Ashish


Re: Best way to clean-up states in memory

2018-05-14 Thread ashish pok
 Thanks Fabian, Kostas,
Here is what I had in the Trigger - idea is to run bitwise OR until a threshold 
is reached or a timeout is reached (nothing too fancy here). Let me know what 
you guys think. Like I said, I moved this logic to Process Function and I 
haven't seen the same issue I was with this. 

@PublicEvolvingpublic class BitwiseOrTrigger extends 
Trigger { private static final long serialVersionUID = 1L; 
private final int threshold; private final long epocDelta; private final 
ReducingStateDescriptor> stateDesc =  new 
ReducingStateDescriptor<>("bitwiseOr", new BitwiseOr(), TypeInformation.of(new 
TypeHint>() {}));

 private BitwiseOrTrigger(int threshold, long allowedLateness) { this.threshold 
= threshold; this.epocDelta = allowedLateness; }
 @Override public TriggerResult onElement(FactoredEvent event, long timestamp, 
W window, TriggerContext ctx) throws Exception { 
ReducingState> currState = 
ctx.getPartitionedState(stateDesc); if (this.epocDelta>0) { 
ctx.registerProcessingTimeTimer(System.currentTimeMillis() + this.epocDelta); } 
currState.add(new Tuple2(event.getFactor(), this.epocDelta)); if 
(currState.get().f0 >= threshold) { currState.clear(); return 
TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; }
 @Override public TriggerResult onEventTime(long time, W window, TriggerContext 
ctx) { return TriggerResult.FIRE_AND_PURGE; }
 @Override public TriggerResult onProcessingTime(long time, W window, 
TriggerContext ctx) throws Exception { return TriggerResult.FIRE_AND_PURGE; }
 @Override public void clear(W window, TriggerContext ctx) throws Exception { 
ctx.getPartitionedState(stateDesc).clear(); }
 @Override public boolean canMerge() { return true; }
 @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { 
ctx.mergePartitionedState(stateDesc); }
 @Override public String toString() { return "BitwiseOrTrigger(" +  threshold + 
")"; }
 public static  BitwiseOrTrigger of(int threshold, long 
expirationEpoc) { return new BitwiseOrTrigger<>(threshold, expirationEpoc); }
 private static class BitwiseOr implements ReduceFunction> { private static final long serialVersionUID = 1L;
 @Override public Tuple2 reduce(Tuple2 tup1, 
Tuple2 tup2) throws Exception { Tuple2 retTup = 
tup1; retTup.f0 = tup1.f0 | tup2.f0; return retTup; }
 }}

On Monday, May 14, 2018, 6:00:11 AM EDT, Fabian Hueske  
wrote:  
 
 Hi Ashish,

Did you use per-window state (also called partitioned state) in your Trigger? 
If yes, you need to make sure that it is completely removed in the clear() 
method because processing time timers won't fire once a window was purged. 
So you cannot (fully) rely on timers to clean up per-window state.

Best, Fabian

2018-05-14 9:34 GMT+02:00 Kostas Kloudas :

Hi Ashish,
It would be helpful to share the code of your custom trigger for the first 
case.Without that, we cannot tell what state you create and how/when you 
update/clear it.
Cheers,Kostas

On May 14, 2018, at 1:04 AM, ashish pok  wrote:
 Hi Till,
Thanks for getting back. I am sure that will fix the issue but I feel like that 
would potentially mask an issue. I have been going back and forth with Fabian 
on a use case where for some of our highly transient datasets, it might make 
sense to just use memory based state (except of course data loss becomes an 
issue when apps occasionally hit a problem and whole job restarts or app has to 
be taken down etc - ie. handling graceful shutdowns / restarts better 
essentially). I was on the hook to create a business case and post it back to 
this forum (which I am hoping I can get around to at some point soon). Long 
story short, this is one of those datasets. 
States in this case are either fired and cleared normally or on processing 
timeout. So technically, unless there is a memory leak in app code, memory 
usage should plateau out at a high-point. What I was noticing was memory would 
start to creep up ever so slowly. 
I couldn't tell exactly why heap utilization kept on growing (ever so slowly 
but it had upward trend for sure) because the states should technically be 
cleared if not as part of a reducing function then on timeout. App after 
running for couple of days would then run into Java Heap issues. So changing to 
RocksDB probably will fix the issue but not necessarily leak of states that 
should be cleared IMO. Interestingly, I switched my app from using something 
like this:
WindowedStream windowedStats = 
statsStream          .keyBy(BasicFactTuple::getKey)          
.window(GlobalWindows.create() )          .trigger(BitwiseOrTrigger.of( 60, 
AppConfigs.getWindowSize(5*60* 1000)))          ;
To 
 DataStream processStats = pipStatsStream          
.keyBy(BasicFactTuple::getKey)          .process(new IfStatsReduceProcessFn( 
AppConfigs.getWindowSize(5*60* 1000), 60))
I basically moved logic of trigger to process function over the weekend. Once I 
did 

Re: Best way to clean-up states in memory

2018-05-13 Thread ashish pok
 Hi Till,
Thanks for getting back. I am sure that will fix the issue but I feel like that 
would potentially mask an issue. I have been going back and forth with Fabian 
on a use case where for some of our highly transient datasets, it might make 
sense to just use memory based state (except of course data loss becomes an 
issue when apps occasionally hit a problem and whole job restarts or app has to 
be taken down etc - ie. handling graceful shutdowns / restarts better 
essentially). I was on the hook to create a business case and post it back to 
this forum (which I am hoping I can get around to at some point soon). Long 
story short, this is one of those datasets. 
States in this case are either fired and cleared normally or on processing 
timeout. So technically, unless there is a memory leak in app code, memory 
usage should plateau out at a high-point. What I was noticing was memory would 
start to creep up ever so slowly. 
I couldn't tell exactly why heap utilization kept on growing (ever so slowly 
but it had upward trend for sure) because the states should technically be 
cleared if not as part of a reducing function then on timeout. App after 
running for couple of days would then run into Java Heap issues. So changing to 
RocksDB probably will fix the issue but not necessarily leak of states that 
should be cleared IMO. Interestingly, I switched my app from using something 
like this:
WindowedStream windowedStats = 
statsStream          .keyBy(BasicFactTuple::getKey)          
.window(GlobalWindows.create())          .trigger(BitwiseOrTrigger.of(60, 
AppConfigs.getWindowSize(5*60*1000)))          ;
To 
 DataStream processStats = pipStatsStream          
.keyBy(BasicFactTuple::getKey)          .process(new 
IfStatsReduceProcessFn(AppConfigs.getWindowSize(5*60*1000), 60))
I basically moved logic of trigger to process function over the weekend. Once I 
did that, heap is completely stabilized. In trigger implementation, I was using 
FIRE_AND_PURGE on trigger condition or onProcessingTime and in process 
implementation I am using .clear() method for same. 
I seem to have solved the problem by using process but I'd be interested to 
understand the cause of why heap would creep up in trigger scenario. 
Hope this makes sense,
Ashish
On Sunday, May 13, 2018, 4:06:59 PM EDT, Till Rohrmann 
 wrote:  
 
 Hi Ashish,
have you tried using Flink's RocksDBStateBackend? If your job accumulates state 
exceeding the available main memory, then you have to use a state backend which 
can spill to disk. The RocksDBStateBackend offers you exactly this 
functionality.
Cheers,Till
On Mon, Apr 30, 2018 at 3:54 PM, ashish pok  wrote:

All,
I am using noticing heap utilization creeping up slowly in couple of apps which 
eventually lead to OOM issue. Apps only have 1 process function that cache 
state. I did make sure I have a clear method invoked when events are collected 
normally, on exception and on timeout.
Are any other best practices others follow for memory backed states?
Thanks,

-- Ashish

  

Best way to clean-up states in memory

2018-04-30 Thread ashish pok
All,
I am using noticing heap utilization creeping up slowly in couple of apps which 
eventually lead to OOM issue. Apps only have 1 process function that cache 
state. I did make sure I have a clear method invoked when events are collected 
normally, on exception and on timeout.
Are any other best practices others follow for memory backed states?
Thanks,

-- Ashish

Re: Scaling down Graphite metrics

2018-04-16 Thread ashish pok
Thanks for that tip about override, will give that a shot at some point. We are 
already using interval.

-- Ashish 
 
  On Sun, Apr 15, 2018 at 6:18 PM, Chesnay Schepler wrote:  
  Hello,
 
 you can configure the rate at which metrics are reported by setting 
"metrics.reporter..interval" as described in the reporter documentation.
 
 At this time there is no way to disable specific metrics.
 You can however extend the reporter that you are using and override 
"MetricReporter#notifyOfAddedMetric(Metric, String, MetricGroup)" to ignore 
metrics that you aren't interested in.
 
 On 13.04.2018 18:52, ashish pok wrote:
  
All, 
  We love Flinks OOTB metrics but boy there is a ton :) Any way to scale them 
down (frequency and metric itself)? 
  Flink apps are becoming huge source of data right now. 
  Thanks,
 
 -- Ashish  
 

 
   


Scaling down Graphite metrics

2018-04-13 Thread ashish pok
All,
We love Flinks OOTB metrics but boy there is a ton :) Any way to scale them 
down (frequency and metric itself)?
Flink apps are becoming huge source of data right now.
Thanks,

-- Ashish

Re: Fw: 1.4.2 - Unable to start YARN containers with more than 1 vCores per Task Manager

2018-04-03 Thread ashish pok
Thanks Shashank, I will check ot out.

-- Ashish 
 
  On Tue, Apr 3, 2018 at 10:11 AM, shashank734 wrote:   
CHeck in your Yarn configuration, Are you using DeafaultResourceCalculater
which only uses memory while allocating resources. So you have to change it
to DominantResourceCalculator.


    yarn.scheduler.capacity.resource-calculator
  
org.apache.hadoop.yarn.util.resource.DominantResourceCalculator




Check this:

https://hortonworks.com/blog/managing-cpu-resources-in-your-hadoop-yarn-clusters/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
  


Fw: 1.4.2 - Unable to start YARN containers with more than 1 vCores per Task Manager

2018-04-03 Thread ashish pok


Hi All,
 
  
 
I had been using the following command in a Lab environment successfully in 1.3 
Flink version.
 
  
 
yarn-session.sh -n 4 -s 4 -jm 2048 -tm 2048 -Dyarn.containers.vcores=2 -nm 
infra.test3
 
  
 
As expected, I see 4 TMs with 16 slots and taking 8 vCores from YARN. In a new 
Prod environment, I am using 1.4.2 version and I am only seeing 1 vCore per 
container with the same command. I did check YARN config and max vCores allowed 
per container is configured as 48. 
 
  
 
Anyone has seen something similar or has idea on what we can look into? Might 
be YARN config?
 
  
 
Thanks,
 
  
 
Ashish
   


Re: Error running on Hadoop 2.7

2018-03-26 Thread ashish pok
Stephan, we are in 1.4.2.
Thanks,

-- Ashish 
 
  On Mon, Mar 26, 2018 at 7:38 AM, Stephan Ewen wrote:   If 
you are on Flink 1.4.0 or 1.4.1, please check if you accidentally have Hadoop 
in your application jar. That can mess up things with child-first classloading. 
1.4.2 should handle Hadoop properly in any case.
On Sun, Mar 25, 2018 at 3:26 PM, Ashish Pokharel  wrote:

Hi Ken,
Yes - we are on 1.4. Thanks for that link - it certainly now explains how 
things are working :) 
We currently don’t have HADOOP_CLASSPATH env var setup and “hadoop class path” 
command basically points to HDP2.6 locations (HDP = Hortonworks Data Platform). 
Best guess I have for this right now is HDP2.6 back ported some 2.9 changes 
into their distro. This is on my list to get to the bottom of (hopefully no 
hiccups till prod) - we double checked our Salt Orchestration packages which 
were used to built the cluster but couldn’t find a reference to hadoop 2.9. For 
now, we are moving on with our testing to prepare for deployment with hadoop 
free version which is using hadoop classpath as described in FLINK-7477.  
Thanks, Ashish

On Mar 23, 2018, at 12:31 AM, Ken Krugler  wrote:
Hi Ashish,
Are you using Flink 1.4? If so, what does the “hadoop classpath” command return 
from the command line where you’re trying to start the job?
Asking because I’d run into issues with https://issues.apache. 
org/jira/browse/FLINK-7477, where I had a old version of Hadoop being 
referenced by the “hadoop" command.
— Ken


On Mar 22, 2018, at 7:05 PM, Ashish Pokharel  wrote:
Hi All,
Looks like we are out of the woods for now (so we think) - we went with Hadoop 
free version and relied on client libraries on edge node. 
However, I am still not very confident as I started digging into that stack as 
well and realized what Till pointed out (traces leads to a class that is part 
of 2.9). I did dig around env variables and nothing was set. This is a brand 
new clustered installed a week back and our team is literally the first hands 
on deck. I will fish around and see if Hortonworks back-ported something for 
HDP (dots are still not completely connected but nonetheless, we have a test 
session and app running in our brand new Prod)
Thanks, Ashish

On Mar 22, 2018, at 4:47 AM, Till Rohrmann  wrote:
Hi Ashish,
the class ` RequestHedgingRMFailoverProxyP rovider` was only introduced with 
Hadoop 2.9.0. My suspicion is thus that you start the client with some Hadoop 
2.9.0 dependencies on the class path. Could you please check the logs of the 
client what's on its class path? Maybe you could also share the logs with us. 
Please also check whether HADOOP_CLASSPATH is set to something suspicious.
Thanks a lot!
Cheers,Till
On Wed, Mar 21, 2018 at 6:25 PM, ashish pok  wrote:

Hi Piotrek,
At this point we are simply trying to start a YARN session. 
BTW, we are on Hortonworks HDP 2.6 which is on 2.7 Hadoop if anyone has 
experienced similar issues. 
We actually pulled 2.6 binaries for the heck of it and ran into same issues. 
I guess we are left with getting non-hadoop binaries and set HADOOP_CLASSPATH 
then?

-- Ashish 
 
  On Wed, Mar 21, 2018 at 12:03 PM, Piotr Nowojski 
wrote:   Hi,
> Does some simple word count example works on the cluster after the upgrade?
If not, maybe your job is pulling some dependency that’s causing this version 
conflict?
Piotrek


On 21 Mar 2018, at 16:52, ashish pok  wrote:
Hi Piotrek,
Yes, this is a brand new Prod environment. 2.6 was in our lab.
Thanks,

-- Ashish 
 
  On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski 
wrote:   Hi,
Have you replaced all of your old Flink binaries with freshly downloaded Hadoop 
2.7 versions? Are you sure that something hasn't mix in the process?
Does some simple word count example works on the cluster after the upgrade?
Piotrek


On 21 Mar 2018, at 16:11, ashish pok  wrote:
Hi All,
We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem 
like :) We definitely are using 2.7 binaries but it looks like there is a call 
here to a private methos which screams runtime incompatibility. 
Anyone has seen this and have pointers?
Thanks, Ashish

Exception in thread "main" java.lang.IllegalAccessError: tried to access method 
org.apache.hadoop.yarn.client. ConfiguredRMFailoverProxyProvi 
der.getProxyInternal()Ljava/la ng/Object; from class 
org.apache.hadoop.yarn.client. RequestHedgingRMFailoverProxyP rovider
    at org.apache.hadoop.yarn.client. RequestHedgingRMFailoverProxyP 
rovider.init(RequestHedgingRMF ailoverProxyProvider.java:75)
    at org.apache.hadoop.yarn.client. RMProxy.createRMFailoverProxyP 
rovider(RMProxy.java:163)
    at org.apache.hadoop.yarn.client. RMProxy.createRMProxy(RMProxy. 
java:94)
    at org.apache.hadoop.yarn.client. ClientRMProxy.createRMProxy(Cl 
ientRMProxy.java:72)
    at org.apache.hadoop.yarn.client. ap

Re: "dynamic" bucketing sink

2018-03-26 Thread ashish pok
Hi Christophe,
Have you looked at Kite SDK? We do something like this but using Gobblin and 
Kite SDK, which is a parallel pipeline to Flink. It feels like if you partition 
by something logical like topic name, you should be able to sink using Kite 
SDK. Kite allows you good ways to handle further partitoning like using 
timestamp and also schema evolution if you are using AVRO.
-- Ashish 
 
  On Mon, Mar 26, 2018 at 4:57 AM, Timo Walther wrote:
Hi Christophe,
 
 I think this will require more effort. As far as I know there is no such 
"dynamic" feature. Have you looked in to the bucketing sink code? Maybe you can 
adapt it to your needs?
 
 Otherwise it might also make sense to open an issue for it to discuss a design 
for it. Maybe other contributors are interested in this feature as well.
 
 Regards,
 Timo
 
 Am 23.03.18 um 18:20 schrieb Christophe Jolif:
  
 Hi all, 
  I'm using the nice topic pattern feature on the KafkaConsumer to read from 
multiple topics, automatically discovering new topics added into the system. 
  At the end of the processing I'm sinking the result into a Hadoop Filesystem 
using a BucketingSink. 
  All works great until I get the requirement to sink into a different Hadoop 
Filesystem based on the input topic. 
  One way to do this would obviously be to get rid of the topic pattern and 
start a (similar) job per topic which would each get its own sink to its own 
filesystem. And start new jobs when new topics are added. But that's far from 
being ideal. This would lead to the usual issues with Flink and a dynamic 
number of jobs (requiring new task slots...) also obviously it would require 
some external machinery to know new topics have been added and create new jobs 
etc... 
  What would be the recommended way to have a "dynamic" BucketingSink that can 
not only write to several basePath (not too hard I guess) but also dynamically 
add new base path when new topics are coming into the system.
 
  Thanks, -- 
 Christophe   
 

 
   


Re: Error running on Hadoop 2.7

2018-03-21 Thread ashish pok
Hi Piotrek,
At this point we are simply trying to start a YARN session. 
BTW, we are on Hortonworks HDP 2.6 which is on 2.7 Hadoop if anyone has 
experienced similar issues. 
We actually pulled 2.6 binaries for the heck of it and ran into same issues. 
I guess we are left with getting non-hadoop binaries and set HADOOP_CLASSPATH 
then?

-- Ashish 
 
  On Wed, Mar 21, 2018 at 12:03 PM, Piotr Nowojski 
wrote:   Hi,
> Does some simple word count example works on the cluster after the upgrade?
If not, maybe your job is pulling some dependency that’s causing this version 
conflict?
Piotrek


On 21 Mar 2018, at 16:52, ashish pok  wrote:
Hi Piotrek,
Yes, this is a brand new Prod environment. 2.6 was in our lab.
Thanks,

-- Ashish 
 
  On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski 
wrote:   Hi,
Have you replaced all of your old Flink binaries with freshly downloaded Hadoop 
2.7 versions? Are you sure that something hasn't mix in the process?
Does some simple word count example works on the cluster after the upgrade?
Piotrek


On 21 Mar 2018, at 16:11, ashish pok  wrote:
Hi All,
We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem 
like :) We definitely are using 2.7 binaries but it looks like there is a call 
here to a private methos which screams runtime incompatibility. 
Anyone has seen this and have pointers?
Thanks, Ashish

Exception in thread "main" java.lang.IllegalAccessError: tried to access method 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
 from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
    at 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
    at 
org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
    at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
    at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)



  


  


Re: Error running on Hadoop 2.7

2018-03-21 Thread ashish pok
Hi Piotrek,
Yes, this is a brand new Prod environment. 2.6 was in our lab.
Thanks,

-- Ashish 
 
  On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski 
wrote:   Hi,
Have you replaced all of your old Flink binaries with freshly downloaded Hadoop 
2.7 versions? Are you sure that something hasn't mix in the process?
Does some simple word count example works on the cluster after the upgrade?
Piotrek


On 21 Mar 2018, at 16:11, ashish pok  wrote:
Hi All,
We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem 
like :) We definitely are using 2.7 binaries but it looks like there is a call 
here to a private methos which screams runtime incompatibility. 
Anyone has seen this and have pointers?
Thanks, Ashish

Exception in thread "main" java.lang.IllegalAccessError: tried to access method 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
 from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
    at 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
    at 
org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
    at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
    at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)



  


Error running on Hadoop 2.7

2018-03-21 Thread ashish pok
Hi All,
We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem 
like :) We definitely are using 2.7 binaries but it looks like there is a call 
here to a private methos which screams runtime incompatibility. 
Anyone has seen this and have pointers?
Thanks, Ashish

Exception in thread "main" java.lang.IllegalAccessError: tried to access method 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
 from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
    at 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
    at 
org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
    at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
    at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
   


Restart hook and checkpoint

2018-03-02 Thread ashish pok
All,
It looks like Flink's default behavior is to restart all operators on a single 
operator error - in my case it is a Kafka Producer timing out. When this 
happens, I see logs that all operators are restarted. This essentially leads to 
data loss. In my case the volume of data is so high that it is becoming very 
expensive to checkpoint. I was wondering if Flink has a lifecycle hook to 
attach a forced checkpointing before restarting operators. That will solve a 
dire production issue for us. 
Thanks,

-- Ashish

Re: Task Manager detached under load

2018-02-24 Thread ashish pok
@Jelmer, this is Till's las response on the issue.

-- Ashish 
 
  On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann wrote:   
Hi,
this sounds like a serious regression wrt Flink 1.3.2 and we should definitely 
find out what's causing this problem. Given from what I see in the logs, the 
following happens:
For some time the JobManager seems to no longer receive heartbeats from the 
TaskManager. This could be, for example, due to long GC pauses or heavy load 
which starves the ActorSystem's threads which are responsible for sending the 
heartbeats. Due to this, the TM's ActorSystem is quarantined which effectively 
renders them useless because the JM will henceforth ignore all messages from 
these systems. The only way to resolve this problem is to restart the 
ActorSystem. By setting taskmanager.exit-on-fatal-akka-error to true in 
flink-conf.yaml, a quarantined TM will shut down. If you run the Flink cluster 
on Yarn, then a new substitute TM will be started if you have still some 
container restarts left. That way, the system should be able to recover.
Additionally you could try to play around with akka.watch.heartbeat.interval 
and akka.watch.heartbeat.pause which control the heartbeat interval and the 
acceptable pause. By increasing the latter, the system should tolerate longer 
GC pauses and period of high load.
However, this only addresses the symptoms of the problem and I'd like to find 
out what's causing the problem. In order to further debug the problem, it would 
be really helpful to obtain the logs of the JobManager and the TaskManagers on 
DEBUG log level and with taskmanager.debug.memory.startLogThread set to true. 
Additionally it would be interesting to see whats happening on the TaskManagers 
when you observe high load. So obtaining a profiler dump via VisualVM would be 
great. And last but not least, it also helps to learn more about the job you're 
running. What kind of connectors is it using? Are you using Flink's metric 
system? How is the Flink cluster deployed? Which other libraries are you using 
in your job?
Thanks a lot for your help!
Cheers,Till
On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick  wrote:

I've seen a similar issue while running successive Flink SQL batches on 1.4. In 
my case, the Job Manager would fail with the log output about unreachability 
(with an additional statement about something going "horribly wrong"). Under 
workload pressure, I reverted to 1.3.2 where everything works perfectly, but we 
will try again soon on 1.4. When we do I will post the actual log output.
This was on YARN in AWS, with akka.ask.timeout = 60s.
On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel  wrote:

I haven’t gotten much further with this. It doesn’t look like GC related - at 
least GC counters were not that atrocious. However, my main concern was once 
the load subsides why aren’t TM and JM connecting again? That doesn’t look 
normal. I could definitely tell JM was listening on the port and from logs it 
does appear TM is trying to message JM that is still alive. 
Thanks, Ashish

On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard  
wrote:
Hi. 
Did you find a reason for the detaching ?I sometimes see the same on our system 
running Flink 1.4 on dc/os. I have enabled taskmanager.Debug.memory.start 
logthread for debugging. 
Med venlig hilsen / Best regardsLasse Nedergaard

Den 20. jan. 2018 kl. 12.57 skrev Kien Truong :


 
Hi,

You should enable and check your garbage collection log.

We've encountered case where Task Manager disassociated due to long GC pause.
 


 

Regards,

Kien
 
 On 1/20/2018 1:27 AM, ashish pok wrote:
  
  Hi All, 
  We have hit some load related issues and was wondering if any one has some 
suggestions. We are noticing task managers and job managers being detached from 
each other under load and never really sync up again. As a result, Flink 
session shows 0 slots available for processing. Even though, apps are 
configured to restart it isn't really helping as there are no slots available 
to run the apps. 
  
  Here are excerpt from logs that seemed relevant. (I am trimming out rest of 
the logs for brevity) 
  Job Manager:  2018-01-19 12:38:00,423 INFO  org.apache.flink.runtime.jobma 
nager.JobManager                -  Starting JobManager (Version: 1.4.0, 
Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) 
  2018-01-19 12:38:00,792 INFO  org.apache.flink.runtime.jobma nager.JobManager 
               -  Maximum heap size: 16384 MiBytes
   2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma 
nager.JobManager                -  Hadoop version: 2.6.5 2018-01-19 
12:38:00,794 INFO  org.apache.flink.runtime.jobma nager.JobManager              
  -  JVM Options: 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma 
nager.JobManager                -     -Xms16384m 2018-01-19 12:38:00,794 INFO  
org.apache.flink.runtime.jobma nager.JobManager                -     -Xmx16384m 
2018-01-19 1

Re: Task manager not able to rejoin job manager after network hicup

2018-02-24 Thread ashish pok
We see the same in 1.4. I dont think we could see this in 1.3. I had started a 
thread a while back on this. Till asked for more details. I havent had a chance 
to get back to him on this. If you can repro this easily perhaps you can get to 
it faster. I will find the thread and resend.
Thanks,

-- Ashish 
 
  On Fri, Feb 23, 2018 at 9:56 AM, jelmer wrote:   We found 
out there's a taskmanager.exit-on-fatal-akka-error property that will restart 
flink in this situation but it is not enabled by default and that feels like a 
rather blunt tool. I expect systems like this to be more resilient to this
On 23 February 2018 at 14:42, Aljoscha Krettek  wrote:

@Till Is this the expected behaviour or do you suspect something could be going 
wrong?


On 23. Feb 2018, at 08:59, jelmer  wrote:
We've observed on our flink 1.4.0 setup that if for some reason the networking 
between the task manager and the job manager gets disrupted then the task 
manager is never able to reconnect.
You'll end up with messages like this getting printed to the log repeatedly
Trying to register at JobManager akka.tcp://flink@jobmanager: 
6123/user/jobmanager (attempt 17, timeout: 3 milliseconds)
Quarantined address [akka.tcp://flink@jobmanager: 6123] is still unreachable or 
has not been restarted. Keeping it quarantined.
Or alternatively

Tried to associate with unreachable remote address 
[akka.tcp://flink@jobmanager: 6123]. Address is now gated for 5000 ms, all 
messages to this address will be delivered to dead letters. Reason: [The remote 
system has quarantined this system. No further associations to the remote 
system are possible until this system is restarted.
But it never recovers until you either restart the job manager or the task 
manager
I was able to successfully reproduce this behaviour in two docker containers 
here :
https://github.com/jelmerk/ flink-worker-not-rejoining 
Has anyone else seen this problem ?










  


Re: Strata San Jose

2018-02-09 Thread ashish pok
Awesome, I will send a note from my work email :) 

-- Ashish 
 
  On Fri, Feb 9, 2018 at 5:12 AM, Fabian Hueske wrote:   Hi 
Ashish,

I'll be at Strata San Jose and give two talks.

Just ping me and we can meet there :-)

Cheers, Fabian

2018-02-09 0:53 GMT+01:00 ashish pok :

Wondering if any of the core Flink team members are planning to be at the 
conference? It would be great to meet in peson.
Thanks,

-- Ashish

  


Strata San Jose

2018-02-08 Thread ashish pok
Wondering if any of the core Flink team members are planning to be at the 
conference? It would be great to meet in peson.
Thanks,

-- Ashish

Kafka Producer timeout causing data loss

2018-01-19 Thread ashish pok
Team,
One more question to the community regarding hardening Flink Apps.
Let me start off by saying we do have known Kafka bottlenecks which we are in 
the midst of resolving. So during certain times of day, a lot of our Flink Apps 
are seeing Kafka Producer timeout issues. Most of the logs are some flavor of 
this:
java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) for 
dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time 
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at 
java.lang.Thread.run(Thread.java:745)Caused by: 
org.apache.kafka.common.errors.TimeoutException: Expiring 28 record(s) for 
dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time
Timeouts are not necessarily good but I am sure we understand this is bound to 
happen (hopefully lesser). 
The issue for us however is it almost looks like Flink is stopping and 
restarting all operators (a lot of other operators including Map, Reduce and 
Process functions if not all) along with Kafka Producers. We are processing 
pretty substantial load in Flink and dont really intend to enable Rocks/HDFS 
checkpointing in some of these Apps - we are ok to sustain some data loss when 
App crashes completely or something along those lines. However, what we are 
noticing here is all the data that are in memory for sliding window functions 
are also lost completely because of this. I would have thought because of the 
retry settings in Kafka Producer, even those 28 events in queue should have 
been recovered let alone over a million events in Memory State waiting to be 
Folded/Reduced for the sliding window. This doesnt feel right :) 
Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would 
almost all Job Graph restart on an operator timeout? Do I need to do something 
simple like disable Operator chaining? We really really are trying to just use 
Memory and not any other state for these heavy hitting streams. 
Thanks for your help,
Ashish

Understanding Restart Strategy

2018-01-19 Thread ashish pok
Team,
Hopefully, this is a quick one. 
We have setup restart strategy as follows in pretty much all of our apps:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.of(30, 
TimeUnit.SECONDS)));

This seems pretty straight-forward. App should retry starting 10 times every 30 
seconds - so about 5 minutes. Either we are not understanding this or it seems 
inconsistent. Some of the applications restart and come back fine on issues 
like Kafka timeout (which I will come back to later) but in some cases same 
issues pretty much shuts the app down. 

My first guess here was that total count of 10 is not reset after App recovered 
normally. Is there a need to manually reset the counter in an App? I doubt 
Flink would be treating it like a counter that spans the life of an App instead 
of resetting on successful start-up - but not sure how else to explain the 
behavior.
Along the same line, what actually constitutes as a "restart"? Our Kafka 
cluster has known performance bottlenecks during certain times of day that we 
are working to resolve. I do notice Kafka producer timeouts quite a few times 
during these times. When App hits these timeouts, it does recover fine but I 
dont necessary see entire application restarting as I dont see bootstrap logs 
of my App. Does something like this count as a restart of App from Restart 
Strategy perspective as well vs things like apps crashes/Yarn killing 
application etc. where App is actually restarted from scratch?
We are really liking Flink, just need to hash out these operational issues to 
make it prime time for all streaming apps we have in our cluster.
Thanks,
Ashish

Task Manager detached under load

2018-01-19 Thread ashish pok
Hi All,
We have hit some load related issues and was wondering if any one has some 
suggestions. We are noticing task managers and job managers being detached from 
each other under load and never really sync up again. As a result, Flink 
session shows 0 slots available for processing. Even though, apps are 
configured to restart it isn't really helping as there are no slots available 
to run the apps.

Here are excerpt from logs that seemed relevant. (I am trimming out rest of the 
logs for brevity)
Job Manager:2018-01-19 12:38:00,423 INFO  
org.apache.flink.runtime.jobmanager.JobManager                -  Starting 
JobManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
2018-01-19 12:38:00,792 INFO  org.apache.flink.runtime.jobmanager.JobManager    
            -  Maximum heap size: 16384 MiBytes
2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobmanager.JobManager    
            -  Hadoop version: 2.6.52018-01-19 12:38:00,794 INFO  
org.apache.flink.runtime.jobmanager.JobManager                -  JVM 
Options:2018-01-19 12:38:00,794 INFO  
org.apache.flink.runtime.jobmanager.JobManager                -     
-Xms16384m2018-01-19 12:38:00,794 INFO  
org.apache.flink.runtime.jobmanager.JobManager                -     
-Xmx16384m2018-01-19 12:38:00,795 INFO  
org.apache.flink.runtime.jobmanager.JobManager                -     -XX:+UseG1GC
2018-01-19 12:38:00,908 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.rpc.port, 61232018-01-19 12:38:00,908 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.heap.mb, 16384


2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher                         
            - Detected unreachable: 
[akka.tcp://flink@:37840]2018-01-19 12:53:34,676 INFO  
org.apache.flink.runtime.jobmanager.JobManager                - Task manager 
akka.tcp://flink@:37840/user/taskmanager terminated.
-- So once Flink session boots up, we are hitting it with pretty heavy load, 
which typically results in the WARN above
Task Manager:2018-01-19 12:38:01,002 INFO  
org.apache.flink.runtime.taskmanager.TaskManager              -  Starting 
TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            -  Hadoop version: 2.6.52018-01-19 12:38:01,367 INFO  
org.apache.flink.runtime.taskmanager.TaskManager              -  JVM 
Options:2018-01-19 12:38:01,367 INFO  
org.apache.flink.runtime.taskmanager.TaskManager              -     
-Xms16384M2018-01-19 12:38:01,367 INFO  
org.apache.flink.runtime.taskmanager.TaskManager              -     
-Xmx16384M2018-01-19 12:38:01,367 INFO  
org.apache.flink.runtime.taskmanager.TaskManager              -     
-XX:MaxDirectMemorySize=8388607T2018-01-19 12:38:01,367 INFO  
org.apache.flink.runtime.taskmanager.TaskManager              -     -XX:+UseG1GC
2018-01-19 12:38:01,392 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.rpc.port, 61232018-01-19 12:38:01,392 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.heap.mb, 16384

2018-01-19 12:54:48,626 WARN  akka.remote.RemoteWatcher                         
            - Detected unreachable: [akka.tcp://flink@:6123]2018-01-19 
12:54:48,690 INFO  akka.remote.Remoting                                         
 - Quarantined address [akka.tcp://flink@:6123] is still unreachable 
or has not been restarted. Keeping it quarantined.018-01-19 12:54:48,774 WARN  
akka.remote.Remoting                                          - Tried to 
associate with unreachable remote address [akka.tcp://flink@:6123]. 
Address is now gated for 5000 ms, all messages to this address will be 
delivered to dead letters. Reason: [The remote system has a UID that has been 
quarantined. Association aborted.] 2018-01-19 12:54:48,833 WARN  
akka.remote.Remoting                                          - Tried to 
associate with unreachable remote address [akka.tcp://flink@:6123]. 
Address is now gated for 5000 ms, all messages to this address will be 
delivered to dead letters. Reason: [The remote system has quarantined this 
system. No further associations to the remote system are possible until this 
system is restarted.] 
2018-01-19 12:56:51,244 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Trying to register at JobManager 
akka.tcp://flink@:6123/user/jobmanager (attempt 10, timeout: 3 
milliseconds)2018-01-19 12:56:51,253 WARN  akka.remote.Remoting                 
                         - Tried to associate with unreachable remote address 
[akka.tcp://flink@:6123]. Address is now gated for 5000 ms, all 
messages to this address will be delivered to dead letters. Reason: [The remote 
system has quarantined this system. No further asso

Re: Metric Registry Warnings

2017-11-13 Thread ashish pok
Thanks Fabian!

Sent from Yahoo Mail on Android 
 
  On Mon, Nov 13, 2017 at 4:44 AM, Fabian Hueske wrote:   Hi 
Ashish,

this is a known issue and has been fixed for the next version [1].

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7100

2017-11-11 16:02 GMT+01:00 Ashish Pokharel :

All,
Hopefully this is a quick one. I enabled Graphite reporter in my App and I 
started to see the following warning messages all over the place:
2017-11-07 20:54:09,288 WARN  org.apache.flink.runtime. metrics.MetricRegistry  
      - Error while registering metric.java.lang. 
IllegalArgumentException: A metric named flink.taskmanager.pndapoc-cdh- dn-14. 
8823e32fae717d08e211fceec56479 b7.normalizeData.parseRawStats -> 
Filter.numBytesOut already exists
I saw some threads about this regarding JMX as well but I don’t think I see a 
resolution for it. 
One thing I made sure was I haven’t reused name (like parseRawStats) in my App. 
Also, this seems to happen for every metric, not only for a select few where I 
could have mistakenly set the same name.
Appreciate any help on this.
Thanks, Ashish

  


Re: Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-07 Thread ashish pok
Thanks Fabian.
I am seeing thia consistently and can definitely use some help. I have plenty 
of graphana views I can share if that helps :)

Sent from Yahoo Mail on Android 
 
  On Tue, Nov 7, 2017 at 3:54 AM, Fabian Hueske wrote:   Hi 
Ashish,
Gordon (in CC) might be able to help you.
Cheers, Fabian

2017-11-05 16:24 GMT+01:00 Ashish Pokharel :

All,

I am starting to notice a strange behavior in a particular streaming app. I 
initially thought it was a Producer issue as I was seeing timeout exceptions 
(records expiring in queue. I did try to modify request.timeout.ms, linger.ms 
etc to help with the issue if it were caused by a sudden burst of data or 
something along those lines. However, what it caused the app to increase back 
pressure and made the slower and slower until that timeout is reached. With 
lower timeouts, app would actually raise exception and recover faster. I can 
tell it is not related to connectivity as other apps are running just fine 
around the same time frame connected to same brokers (we have at least 10 
streaming apps connected to same list of brokers) from the same data nodes. We 
have enabled Graphite Reporter in all of our applications. After deep diving 
into some of consumer and producer stats, I noticed that consumer fetch-rate 
drops tremendously while fetch-size grows exponentially BEFORE the producer 
actually start to show higher response-time and lower rates. Eventually, I 
noticed connection resets start to occur and connection counts go up 
momentarily. After which, things get back to normal. Data producer rates remain 
constant around that timeframe - we have Logstash producer sending data over. 
We checked both Logstash and Kafka metrics and they seem to be showing same 
pattern (sort of sin wave) throughout.

It seems to point to Kafka issue (perhaps some tuning between Flink App and 
Kafka) but wanted to check with the experts before I start knocking down Kafka 
Admin’s doors. Are there anything else I can look into. There are quite a few 
default stats in Graphite but those were the ones that made most sense.

Thanks, Ashish

  


Re: Capacity Planning For Large State in YARN Cluster

2017-10-30 Thread ashish pok
Thanks Till, I will pull it out today then.

Sent from Yahoo Mail on Android 
 
  On Mon, Oct 30, 2017 at 3:48 AM, Till Rohrmann wrote:   
Hi Ashish,

great to hear that things work better with the RocksDB state backend. I would 
only start playing with the containerized.heap-cutoff-ratio if you see TMs 
failing due to exceeding the direct memory limit. Currently, not all of the 
cutoff memory is set as the direct memory limit. We have a pending fix for that.

Apart from that, it is indeed a good idea to test your application and monitor 
how it behaves when increasing the workload.

Cheers,
Till
​
On Mon, Oct 30, 2017 at 1:34 AM, ashish pok  wrote:

Jorn, correct and I suppose that's where we are at this point. RocksDB based 
backend is definitely looking promising for our use case. Since I haven't 
gotten a definite no-no on using 30% for YARN cut-off ratio (about 1.8GB from 
6GB memory) and off-heap flag turned on, we will continue on that path. Current 
plan is to increase throughput on input streams - state streams are pretty much 
processing already and preserved in RocksDB and we can control streams for 
joining with those states and monitor resource utilizations + join performance. 
We are seeing 200-500ms processing times with pretty decent amount of logging, 
which is pretty good for our needs. 
Agree about the way to estimate the size of state and hence one of the reasons 
of my original question on what others have done. Our states are essentially 
tuples (few primitive values like string, long and a Map of string and string, 
which hold about 10-12 keys, values are small - not more than 128 bytes tops). 
We created a savepoint after processing about 500k records and that's where my 
estimate came from. I'd be the first one to admit it is not accurate but that's 
the best we could think of. 
Thanks, Ashish

  From: Jörn Franke 
 To: Ashish Pokharel  
Cc: Till Rohrmann ; user 
 Sent: Sunday, October 29, 2017 6:05 PM
 Subject: Re: Capacity Planning For Large State in YARN Cluster
  
Well you can only performance test it beforehand in different scenarios with 
different configurations. 
I am not sure what exactly your state holds (eg how many objects etc), but if 
it is Java objects then 3 times might be a little bit low (depends also how you 
initially tested state size) - however Flink optimizes this as well. 
Nevertheless, something like Rocksdb is probably a better solution for larger 
states.
On 29. Oct 2017, at 21:15, Ashish Pokharel  wrote:



Hi Till,
I got the same feedback from Robert Metzger over in Stackflow. I have switched 
my app to use RocksDB and as yes, it did stabilize the app :) 
However, I am still struggling with how to map out my TMs and JMs memory, 
number of slots per TMs etc. Currently I am using 60 slots with 10 TMs and 60 
GB of total cluster memory. Idea was to make the states distributed and approx. 
1 GB of memory per slot. I have also changed containerized.heap- cutoff-ratio 
config to 0.3 to allow for a little room for RocksDB (RocksDB is using basic 
spinning disk optimized pre-defined configs but we do have SSDs on our Prod 
machines that we can leverage in future too) and set taskmanager.memory.off- 
heap to true.It feels more experimental at this point than an exact science :) 
If there are any further guidelines on how we can plan for this as we open up 
the flood gates to stream heavy continuous streams, that will be great.
Thanks again,
Ashish

On Oct 27, 2017, at 8:45 AM, Till Rohrmann  wrote:
Hi Ashish,
what you are describing should be a good use case for Flink and it should be 
able to run your program.
When you are seeing a GC overhead limit exceeded error, then it means that 
Flink or your program are creating too many/too large objects filling up the 
memory in a short time. I would recommend checking your user program to see 
whether you can avoid unnecessary object instantiations and whether it is 
possible to reuse created objects.
Concerning Flink's state backends, the memory state backend is currently not 
able to spill to disk. Also the managed memory is only relevant for 
DataSet/batch programs and not streaming programs. Therefore, I would recommend 
you to try out the RocksDB state backend which is able to gracefully spill to 
disk if the state size should grow too large. Consequently, you don't have to 
adjust the managed memory settings because they currently don't have an effect 
on streaming programs. 
My gut feeling is that switching to the RocksDBStateBackend could already solve 
your problems. If this should not be the case, then please let me know again.
Cheers,Till
On Fri, Oct 27, 2017 at 5:27 AM, Ashish Pokharel  wrote:

Hi Everyone,

We have hit a roadblock moving an app at Production scale and was hoping to get 
some guidance. Application is pretty common use case in stream processing but 
does require maintaining large number of keyed states. We are processing 2 
streams - one of wh

Re: Capacity Planning For Large State in YARN Cluster

2017-10-29 Thread ashish pok
Jorn, correct and I suppose that's where we are at this point. RocksDB based 
backend is definitely looking promising for our use case. Since I haven't 
gotten a definite no-no on using 30% for YARN cut-off ratio (about 1.8GB from 
6GB memory) and off-heap flag turned on, we will continue on that path. Current 
plan is to increase throughput on input streams - state streams are pretty much 
processing already and preserved in RocksDB and we can control streams for 
joining with those states and monitor resource utilizations + join performance. 
We are seeing 200-500ms processing times with pretty decent amount of logging, 
which is pretty good for our needs. 
Agree about the way to estimate the size of state and hence one of the reasons 
of my original question on what others have done. Our states are essentially 
tuples (few primitive values like string, long and a Map of string and string, 
which hold about 10-12 keys, values are small - not more than 128 bytes tops). 
We created a savepoint after processing about 500k records and that's where my 
estimate came from. I'd be the first one to admit it is not accurate but that's 
the best we could think of. 
Thanks, Ashish

  From: Jörn Franke 
 To: Ashish Pokharel  
Cc: Till Rohrmann ; user 
 Sent: Sunday, October 29, 2017 6:05 PM
 Subject: Re: Capacity Planning For Large State in YARN Cluster
   
Well you can only performance test it beforehand in different scenarios with 
different configurations. 
I am not sure what exactly your state holds (eg how many objects etc), but if 
it is Java objects then 3 times might be a little bit low (depends also how you 
initially tested state size) - however Flink optimizes this as well. 
Nevertheless, something like Rocksdb is probably a better solution for larger 
states.
On 29. Oct 2017, at 21:15, Ashish Pokharel  wrote:



Hi Till,
I got the same feedback from Robert Metzger over in Stackflow. I have switched 
my app to use RocksDB and as yes, it did stabilize the app :) 
However, I am still struggling with how to map out my TMs and JMs memory, 
number of slots per TMs etc. Currently I am using 60 slots with 10 TMs and 60 
GB of total cluster memory. Idea was to make the states distributed and approx. 
1 GB of memory per slot. I have also changed containerized.heap-cutoff-ratio 
config to 0.3 to allow for a little room for RocksDB (RocksDB is using basic 
spinning disk optimized pre-defined configs but we do have SSDs on our Prod 
machines that we can leverage in future too) and set 
taskmanager.memory.off-heap to true.It feels more experimental at this point 
than an exact science :) If there are any further guidelines on how we can plan 
for this as we open up the flood gates to stream heavy continuous streams, that 
will be great.
Thanks again,
Ashish

On Oct 27, 2017, at 8:45 AM, Till Rohrmann  wrote:
Hi Ashish,
what you are describing should be a good use case for Flink and it should be 
able to run your program.
When you are seeing a GC overhead limit exceeded error, then it means that 
Flink or your program are creating too many/too large objects filling up the 
memory in a short time. I would recommend checking your user program to see 
whether you can avoid unnecessary object instantiations and whether it is 
possible to reuse created objects.
Concerning Flink's state backends, the memory state backend is currently not 
able to spill to disk. Also the managed memory is only relevant for 
DataSet/batch programs and not streaming programs. Therefore, I would recommend 
you to try out the RocksDB state backend which is able to gracefully spill to 
disk if the state size should grow too large. Consequently, you don't have to 
adjust the managed memory settings because they currently don't have an effect 
on streaming programs. 
My gut feeling is that switching to the RocksDBStateBackend could already solve 
your problems. If this should not be the case, then please let me know again.
Cheers,Till
On Fri, Oct 27, 2017 at 5:27 AM, Ashish Pokharel  wrote:

Hi Everyone,

We have hit a roadblock moving an app at Production scale and was hoping to get 
some guidance. Application is pretty common use case in stream processing but 
does require maintaining large number of keyed states. We are processing 2 
streams - one of which is a daily burst of stream (normally around 50 mil but 
could go upto 100 mil in one hour burst) and other is constant stream of around 
70-80 mil per hour. We are doing a low level join using CoProcess function 
between the two keyed streams. CoProcess function needs to refresh (upsert) 
state from the daily burst stream and decorate constantly streaming data with 
values from state built using bursty stream. All of the logic is working pretty 
well in a standalone Dev environment. We are throwing about 500k events of 
bursty traffic for state and about 2-3 mil of data stream. We have 1 TM with 
16GB memory, 1 JM with 8 GB memory and 16 slots (1 per core on the server) on 
the server. We h