[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-23 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
I've testet and the function is ok. Please check if it's good to go, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-20 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
All right. I'll change as you suggest and verify the result. Thanks for 
comments and advise :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-20 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
Why would the states by out of sync for non-suspended ExecutionGraphs? As i 
said before, the JobManager and web-backend are working on the same object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-20 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
My main concern is that the status showing in web doesn't match the actual 
state backend. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-20 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
Other states are irrelevant since they don't result in a new ExecutionGraph 
being created for the same JobID. For those cases the existing behavior is 
perfectly fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-20 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
I got it, but still have one question: what about the other state 
transition? Like when job is cancelling or failing or else? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-20 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
@WangTaoTheTonic Doesn't matter that the job status is ever changing, we 
only care about the state at the time of the request.

There are 2 cases to consider when accessing the cache for a given ID:

**a) An EG was cached for the given ID**

In this case we can check the state of the job via 
`AccessExceutionGraph#getState`. Modify the this block in `ExecutionGraphHolder`

```
if (cached != null) {
return cached;
}
```
to this
```
if (cached != null) {
if (cached.getState() == JobStatus.SUSPENDED) {
cache.remove(jid);
}
return cached;
}
```
and you're done.

**b) No EG was cached for the given ID**

In this case the status doesn't matter, you ask the JM and if it returns an 
EG you add it to the cache. We don't care whether this EG is suspended because 
it will be removed with the next request that comes in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-19 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
@zentol How do we know if a job requested is supended or not, as the status 
of jobs in backend is alway changing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
@WangTaoTheTonic The purpose of the cache is to reduce queries to the 
JobManager; and since the state of the job is available through the 
ExecutionGraph the cache still fulfills its purpose.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-19 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
That means every time EGHolder received a request, it will check if the job 
status in request is suspended or not, right?  This will make cache in EGHolder 
unmeaningful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3709
  
@WangTaoTheTonic Yes, that is correct. @zentol's suggestion should work.

On access, if the `JobStatus` is suspended, remove the entry from the 
`WeakHashMap`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
Ok i think i've got your point.

Now using WeakHashMap, we add entries when the map doesn't contain the 
requested EG id,  remove invalid entries when GC happens.

By adding `small 2-line branch` as you suggest, we add entries as same way 
as before, but check if a entry is valid when it's accessed by a handler, and 
update/remove it if it's invalid. Is it right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
eh... in charge?

Whenever *anything* related to a job is requested from the web-ui the 
EGHolder is accessed.

Suppose you have the job info page (/jobs/:jobid) open in a browser or 
smth. The WebUI periodically sends requests to the backend, which will asks the 
EGHolder, which then asks the JM if it doesn't find the job in the cache. Now, 
if we remove the suspended EG we will in fact keep polling the JM until the job 
was recovered.

This is actually the same behavior that you would have if the job is 
suspended and the GC/guava cache starts right away rr if the job was resumed on 
another JM but you aren't refreshing the webUI (which should redirect to the 
current leader).

So for adding entries nothing changes; for removing entries the GC is still 
mostly in charge; we're just adding a small 2-line branch to invalidate 
suspended ExecutionGraphs that is activated if a handler accesses the EGHolder.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
I mean who's in charge of updating EGHolder? EGHolder itself or JobManager? 
EGHolder don't sense status changing of jobs until it queries from JobManager 
periodically.

If JobManager took the responsibility, so it will be a listenser design 
pattern, i guess? Would it be too complicated as now EGHolder is just a light 
weighted cache?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
@WangTaoTheTonic Because *everyone* uses guava which results again and 
again in dependency conflicts. 

What do you mean with "how long it should be"? We remove the job from the 
cache and that's it. If more request for that job come in nothing will be 
returned resulting in the response you get when querying for a non-existing 
job, which is an accurate representation of the state of the JobManager. If the 
same JM recovers the job then it is no longer in a SUSPENDED state and will be 
added to the cache again. If another JM picks the job up the web-ui will be 
redirected to that JM and everything's fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
In my opinion EGHolder is simply a cache which should not be assigned too 
complicated task.

If we add the check logic, how long it should be? Will other events 
afftects status of tasks? I believe there're more concerns if we added it. 

This fix only change internal data structures and decouple with both 
JobManager and web frontend. 

I am not sure why we are reducing usage of guava, but it sounds not a very 
good idea :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
Assuming that some JM will be granted leadership and recovers the suspended 
jobs, the easiest solution would be to simply check in the EGHolder whether the 
cached job is in the SUSPENDED state, and remove it from the cache if it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
Give a bit time to think of other ways to solve this; we are trying to 
reduce the usage of guava, let's see if there isn't another way. 30 seconds 
still seem like a long time to server an old EG.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
That this happens when HA is enabled is a really important detail; finally 
figured out what happens.

Here's roughly what going on:

* EG A is cached by EGHolder for ID_A
* ZK goes down
* JM revokes leadership, throws out all jobs, specifically EG A stored 
under ID_A
* ZK starts up again
* (in your case the same) JM gets leadership back
* JM recovers jobs, which means creating a new EG B, the ID of which is 
ID_A again

When a new request hits the EGHolder cache for ID_A there's still the old 
EG cached. That old EG will remain there until the GC kicks in, from which 
point on the new EG is used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
I'm not a akka expert. As we observed, the status of cancelled tasks will 
be updated to running only when gc happens in JM.
Way to reproduce:
1. launch a flink job with ha mode
2. restart zookeeper(to make tasks failed)
3. after tasks recovered, check if status of tasks are running or 
cancelled(if there's gc happens, tasks' status showed in web frontend will be 
same with the actual states, or the tasks' status are delayed, may cause 
inconsistend with those in backend)

We oberved such phenomemon in yarn mode, and it is fixed after this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
If this were not the case the you'd get NotSerializableExceptions when 
attempting to transfer the EG.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
The web backend and JM work with the same ActorSystem; as a result all 
transmitted objects are neither serialized nor copied but simply passed around 
through a local akka channel, which means that they are, in fact, the exact 
same object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
@zentol No you're wrong.

If you take a look at `ExecutionGraphHolder`, you'll find the graphs in it 
are generated from message answered by JobManager, which means there's no 
reference from JobManager but only from handlers in netty web backend. Once 
there's no reference from those handlers, they would be garbage collected no 
matter the actual job is running or recovering.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
@WangTaoTheTonic I get that, but when the EG is garbage-collected it also 
means the job was removed from the JM. Not just from the set of running jobs, 
but also from the history of finished jobs. While that does mean it can still 
be displayed in the web-ui until the GC happens, it doesn't explain that the 
display of task states is outdated. Especially since you say the tasks were 
actually running later on, which contradicts the idea that the EG was GC'd in 
the first place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
@zentol The execution graphs cached in `ExecutionGraphHolder`(which is 
backed by a WeakHashMap) will be evicted only when gc happens.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
@WangTaoTheTonic You still haven't explained why the JobManager GC has 
anything to do with the update in the web-ui.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-18 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
@wenlong88 LoadingCache can also cache and evict data as WeakHashMap, as 
this implementation shows it will evict data every 30 seconds and fetch data if 
it doesn't contain the required key.

@zentol You're right. The data structures used doesn't matter, while what 
is showed in web frontend and how they are updated does.  I don't think user 
can tasks' stauts update only triggered by JobManager GC(which could be a very 
long time).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
It shouldn't matter for the display in the web-frontend in which data 
structure the cached ExecutionGraphs are being held. We are caching the actual 
ExecutionGraph that the JobManager works with and not some copy, thus there is 
simply no way for the handler to work with outdated data. This implies that the 
entire premise of this issue is flawed.

It's more likely some web-related issue like the page not refreshing 
automatically or being cached by the browser for an inexplicable long time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---