[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor
[ https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16916617#comment-16916617 ] Xiaogang Shi commented on FLINK-13848: -- [~SleePy] Sorry that I misunderstand your second question. You are right that we can use delayed scheduling of {{MainThreadExecutor}} to achieve periodically scheduling. But it requires two messages sent to the actor for each triggering. It may affect the throughput when the rpc endpoint is flooded. > Support “scheduleAtFixedRate/scheduleAtFixedDelay” in > RpcEndpoint#MainThreadExecutor > > > Key: FLINK-13848 > URL: https://issues.apache.org/jira/browse/FLINK-13848 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Biao Liu >Priority: Major > Fix For: 1.10.0 > > > Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of > {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no > requirement on them before. > Now we are planning to implement these methods to support periodic checkpoint > triggering. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor
[ https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16916607#comment-16916607 ] Xiaogang Shi commented on FLINK-13848: -- 1. Yes. If the task message is already sent to the actor's mailbox, it's very difficult to prohibit it from being executed by the actor. Using a boolean flag which can be accessed by returned ScheduledFuture may be a solution, but that makes the cancellation very complicated. 2. Most rpc endpoints are already equipped with {{ScheduledExecutorService}} which can be used to achieve periodical triggering. If you take a look at the implementation, {{ScheduledExecutorService}} implements periodically scheduling by scheduling next triggering after the completion of current triggering. > Support “scheduleAtFixedRate/scheduleAtFixedDelay” in > RpcEndpoint#MainThreadExecutor > > > Key: FLINK-13848 > URL: https://issues.apache.org/jira/browse/FLINK-13848 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Biao Liu >Priority: Major > Fix For: 1.10.0 > > > Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of > {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no > requirement on them before. > Now we are planning to implement these methods to support periodic checkpoint > triggering. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13848) Support “scheduleAtFixedRate/scheduleAtFixedDelay” in RpcEndpoint#MainThreadExecutor
[ https://issues.apache.org/jira/browse/FLINK-13848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16915689#comment-16915689 ] Xiaogang Shi commented on FLINK-13848: -- Hi [~SleePy], what's your detailed plan on implementing these methods? Once we attempted to implement these methods in our private branches. I am very happy to share some experience here. Our implementation is similar to that in java's {{ScheduledThreadPoolExecutor}} except that new tasks are submitted via Akka's dispatcher. One problem we encountered is that we can only cancel the scheduling of task messages in Akka, but fail to cancel those task messages already sent. Finally, some tasks are still be executed after the {{ScheduledFuture}} is cancelled, leading to some weird concurrent behaviors. Given that few benefits are brought by Akka's dispatcher, we decided to use a non-main-threaded {{ScheduledExecutorService}} together with {{runAsync}} to achieve periodic triggering in main thread. The new implementation suffers from the same problem, but is much simpler. > Support “scheduleAtFixedRate/scheduleAtFixedDelay” in > RpcEndpoint#MainThreadExecutor > > > Key: FLINK-13848 > URL: https://issues.apache.org/jira/browse/FLINK-13848 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Biao Liu >Priority: Major > Fix For: 1.10.0 > > > Currently the methods “scheduleAtFixedRate/scheduleAtFixedDelay" of > {{RpcEndpoint#MainThreadExecutor}} are not implemented. Because there was no > requirement on them before. > Now we are planning to implement these methods to support periodic checkpoint > triggering. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13732) Enhance JobManagerMetricGroup with FLIP-6 architecture
[ https://issues.apache.org/jira/browse/FLINK-13732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16907856#comment-16907856 ] Xiaogang Shi commented on FLINK-13732: -- [~SleePy] Thanks for bringing up this issue. We are also suffering from confusing "job manager metrics". It will be nice if we can sperate legacy {{JobManagerMetricGroup}} into {{DispatcherMetricGroup}}, {{ResourceManagerMetricGroup}}, and {{JobManagerMetricGroup}}, and distinguish them with cluster ids. But an interesting question here is the collection of process metrics (e.g., cpu, memory, i/o, and threads). Currently, it's not a problem as Flink now does not collect any process metrics. But from our experience, these process metrics are very helpful in monitoring and troubleshooting. Definitely, it's another question whether we should collect process metrics. But in case we do, it will be a question in which metric group we collect process metrics of job managers. > Enhance JobManagerMetricGroup with FLIP-6 architecture > -- > > Key: FLINK-13732 > URL: https://issues.apache.org/jira/browse/FLINK-13732 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: Biao Liu >Priority: Major > Fix For: 1.10.0 > > > There is a requirement from user mailing list [1]. I think it's reasonable > enough to support. > The scenario is that when deploying a Flink cluster on Yarn, there might be > several {{JM(RM)}} s running on the same host. IMO that's quite a general > scenario. However we can't distinguish the metrics from different > {{JobManagerMetricGroup}}, because there is only one variable "hostname" we > can use. > I think there are some problems of current implementation of > {{JobManagerMetricGroup}}. It's still non-FLIP-6 style. We should split the > metric group into {{RM}} and {{Dispatcher}} to match the FLIP-6 architecture. > And there should be an identification variable supported, just like {{tm_id}}. > CC [~StephanEwen], [~till.rohrmann], [~Zentol] > 1. > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-scope-for-YARN-single-job-td29389.html] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (FLINK-12887) Schedule UnfencedMessage would lost envelope info
[ https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-12887: Assignee: TisonKun (was: Xiaogang Shi) > Schedule UnfencedMessage would lost envelope info > -- > > Key: FLINK-12887 > URL: https://issues.apache.org/jira/browse/FLINK-12887 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > > We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for > {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and > {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}. > Let's think about a case when we want to schedule a unfenced runnable or any > other unfenced message(currently, we don't have such code path but it's > semantically valid.). > 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay > 2. It extracted the runnable from unfenced message and call > {{super.handleRpcMessage}}. > 3. {{AkkaRpcActor}} enveloped the message and schedule it by > {{AkkaRpcActor#L410}}. > However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. > Thus the unfenced message now become a fenced message. > We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule > unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., > dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a > wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a > message is shown on what params ScheduleExecutorService called with, at least > we cannot extract an unfenced message and envelop it into a fence message and > then schedule it, which goes into wrong semantic. > cc [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (FLINK-12887) Schedule UnfencedMessage would lost envelope info
[ https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-12887: Assignee: Xiaogang Shi > Schedule UnfencedMessage would lost envelope info > -- > > Key: FLINK-12887 > URL: https://issues.apache.org/jira/browse/FLINK-12887 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: TisonKun >Assignee: Xiaogang Shi >Priority: Major > > We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for > {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and > {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}. > Let's think about a case when we want to schedule a unfenced runnable or any > other unfenced message(currently, we don't have such code path but it's > semantically valid.). > 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay > 2. It extracted the runnable from unfenced message and call > {{super.handleRpcMessage}}. > 3. {{AkkaRpcActor}} enveloped the message and schedule it by > {{AkkaRpcActor#L410}}. > However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. > Thus the unfenced message now become a fenced message. > We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule > unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., > dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a > wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a > message is shown on what params ScheduleExecutorService called with, at least > we cannot extract an unfenced message and envelop it into a fence message and > then schedule it, which goes into wrong semantic. > cc [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-12887) Schedule UnfencedMessage would lost envelope info
[ https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16883233#comment-16883233 ] Xiaogang Shi commented on FLINK-12887: -- As described in my first comment, we are using {{scheduleRunAsyncWithoutFencing}} to kill leaked containers. When Yarn RM restarts, it will take over containers from previous attempts, some of which may be in stuck. Those containers must be killed to release resources. In our private version, we schedule a delayed operation to check whether TMs in these containers register themselves in RM in time. If a container's TM does not register itself in time, the container will be killed. Because RM may have granted its leadership when it restarts, the check operation must be scheduled without fencing. For the scenario described, it's true that it could not happen in current implementation because we now enforce the method {{setFencingToken}} to be called in the main thread. I'm sorry for my confusing description. My point here is that by simply scheduling message with Akka dispatcher, we can omit the inconvenient brought by unnecessary enveloping. I think {{AkkaInvocationHandler}} knows the underlying {{AkkaRpcActor}} as it is referenced by {{rpcEndpoint}}. I guess the problem actually is due to the unknown {{ActorSystem}}. If so, we can add the actor's {{ActorSystem}} in the constructor of {{AkkaInvocationHandler}}, and use the {{ActorSystem}}'s dispatcher to schedule delayed {{RunAsync}} messages. What do you think? [~till.rohrmann] > Schedule UnfencedMessage would lost envelope info > -- > > Key: FLINK-12887 > URL: https://issues.apache.org/jira/browse/FLINK-12887 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: TisonKun >Priority: Major > > We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for > {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and > {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}. > Let's think about a case when we want to schedule a unfenced runnable or any > other unfenced message(currently, we don't have such code path but it's > semantically valid.). > 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay > 2. It extracted the runnable from unfenced message and call > {{super.handleRpcMessage}}. > 3. {{AkkaRpcActor}} enveloped the message and schedule it by > {{AkkaRpcActor#L410}}. > However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. > Thus the unfenced message now become a fenced message. > We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule > unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., > dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a > wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a > message is shown on what params ScheduleExecutorService called with, at least > we cannot extract an unfenced message and envelop it into a fence message and > then schedule it, which goes into wrong semantic. > cc [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-12887) Schedule UnfencedMessage would lost envelope info
[ https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869118#comment-16869118 ] Xiaogang Shi commented on FLINK-12887: -- [~till.rohrmann] I am also very curious on the method we implement delayed {{runAsync}} operations. Now we first send the {{runAsync}} message into the actor and then schedule the operation with Akka dispatcher. There are two questions in the implementation: 1. It seems unnecessary to send the {{runAsync}} message to the actor at first. Can we simply schedule the message with Akka dispatcher? 2. The token is enveloped again by the actor. Rarely but possibily, the token at submit time is different from the one at envelope time: t1. A rpc endpoint submit a {{runAsync}} operation t2. The rpc endpoint loses its leadership t3. The rpc endpoint grants its leadership, creating a new fencing token t4. The {{runAsync}} operation is executed by the actor. It's enveloped with the new fencing token, and is scheduled by the Akka dispatcher. In such cases, an operation in previous session will be executed. That may lead to unexpected results. > Schedule UnfencedMessage would lost envelope info > -- > > Key: FLINK-12887 > URL: https://issues.apache.org/jira/browse/FLINK-12887 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: TisonKun >Priority: Major > > We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for > {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and > {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}. > Let's think about a case when we want to schedule a unfenced runnable or any > other unfenced message(currently, we don't have such code path but it's > semantically valid.). > 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay > 2. It extracted the runnable from unfenced message and call > {{super.handleRpcMessage}}. > 3. {{AkkaRpcActor}} enveloped the message and schedule it by > {{AkkaRpcActor#L410}}. > However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. > Thus the unfenced message now become a fenced message. > We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule > unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., > dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a > wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a > message is shown on what params ScheduleExecutorService called with, at least > we cannot extract an unfenced message and envelop it into a fence message and > then schedule it, which goes into wrong semantic. > cc [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12887) Schedule UnfencedMessage would lost envelope info
[ https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869086#comment-16869086 ] Xiaogang Shi commented on FLINK-12887: -- Hi [~till.rohrmann], now we are using many unfenced asynchronous operations in Yarn RM to process notifications from Yarn. Otherwise, Yarn RM will miss some notifications when it has not granted the leadership. Another case is the timers to release stuck containers. When a Yarn RM restarts, it will recover containers from previous attempts. Some containers may be in stuck and we should kill them to release resources. We now use timers to monitor these recovered containers and will kill those containers whose task managers cannot register in time. The timers must be unfenced because the Yarn RM may not grant the leadership when it recovers the containers. > Schedule UnfencedMessage would lost envelope info > -- > > Key: FLINK-12887 > URL: https://issues.apache.org/jira/browse/FLINK-12887 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: TisonKun >Priority: Major > > We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for > {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and > {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}. > Let's think about a case when we want to schedule a unfenced runnable or any > other unfenced message(currently, we don't have such code path but it's > semantically valid.). > 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay > 2. It extracted the runnable from unfenced message and call > {{super.handleRpcMessage}}. > 3. {{AkkaRpcActor}} enveloped the message and schedule it by > {{AkkaRpcActor#L410}}. > However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. > Thus the unfenced message now become a fenced message. > We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule > unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., > dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a > wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a > message is shown on what params ScheduleExecutorService called with, at least > we cannot extract an unfenced message and envelop it into a fence message and > then schedule it, which goes into wrong semantic. > cc [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12865) State inconsistency between RM and TM on the slot status
[ https://issues.apache.org/jira/browse/FLINK-12865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16866148#comment-16866148 ] Xiaogang Shi commented on FLINK-12865: -- [~till.rohrmann]You are right that there is no problem with the postponed handling of slot requests. I revisited the code and found that we do use ask to send heartbeat requests, but the responses are not sent back to {{PromiseActorRef}}. Instead, they are sent back directly to RM with a separate RPC method. So the handling of the heartbeat reponses will not be postponed. After revisiting the code, it seems sending heartbeats in the main thread will fix the problem. Thanks a lot for your explanation and sorry for my misleading information. > State inconsistency between RM and TM on the slot status > > > Key: FLINK-12865 > URL: https://issues.apache.org/jira/browse/FLINK-12865 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > There may be state inconsistency between TM and RM due to race condition and > message loss: > # When TM sends heartbeat, it retrieve SlotReport in the main thread, but > sends the heartbeat in another thread. There may be cases that the slot on TM > is FREE initially and SlotReport read the FREE state, then RM requests slot > and mark the slot as allocated, and the SlotReport finally override the > allocated status at the RM side wrongly. > # When RM requests slot, TM received the requests but the acknowledge > message get lot. Then RM will think this slot is free. > Both the problems may cause RM marks an ALLOCATED slot as FREE. This may > currently cause additional retries till the state is synchronized after the > next heartbeat, and for the inaccurate resource statistics for the > fine-grained resource management in the future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport
[ https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16865269#comment-16865269 ] Xiaogang Shi commented on FLINK-12863: -- Btw, i want to note that the race condition may not necessarily be caused by {{HeartbeatManagerSenderImpl}} sending heartbeats in a seperate thread. It can solve the problem in JM, but not the one in RM. Even when RM send heartbeat requests in the main thread, right after a slot request, the heartbeart responses may be handled first by RM. It's because RM uses ask to send both heartbeat and slot requests. Temporary {{PromiseActor}}s will be created to receive responses from TM. Since there is no guarantee on the execution order of actors, the {{PromiseActor}} which receives response first may be executed later. > Race condition between slot offerings and AllocatedSlotReport > - > > Key: FLINK-12863 > URL: https://issues.apache.org/jira/browse/FLINK-12863 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by > the {{TaskExecutor}} to synchronize its internal view on slot allocations > with the view of the {{JobMaster}}. It seems that there is a race condition > between offering slots and receiving the report because the > {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a > separate thread. > Due to that it can happen that we generate an {{AllocatedSlotReport}} just > before getting new slots offered. Since the report is sent from a different > thread, it can then happen that the response to the slot offerings is sent > earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an > outdated slot report on the {{TaskExecutor}} causing active slots to be > released. > In order to solve the problem I propose to add a fencing token to the > {{AllocatedSlotReport}} which is being updated whenever we offer new slots to > the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the > {{TaskExecutor}} we compare the current slot report fencing token with the > received one and only process the report if they are equal. Otherwise we wait > for the next heartbeat to send us an up to date {{AllocatedSlotReport}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12863) Race condition between slot offerings and AllocatedSlotReport
[ https://issues.apache.org/jira/browse/FLINK-12863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16865239#comment-16865239 ] Xiaogang Shi commented on FLINK-12863: -- I think a similar problem happens in the heartbeats between RM and TM. When a RM receives a slot request from JM, it will find an available slot, mark it as pending, and send a slot request to TM. In the cases where the slot request is following a heartbeat request, RM will receive the heartbeat response first and will remove the pending slot. RM may reuse the slot when it receives a new slot request from JM, leading to duplicated slot allocation. A solution proposed by [~yungao.gy] is using version numbers. Each slot is equipped with a version number, which is increased once a new pending request is generated. These version numbers then are attached to the heartbeats sent to TM. Once a heartbeat response is received, we don't need to remove those pending slot requests whose version numbers are greater than those of heartbeats. I think the solution can also work here. What do you think? [~yungao.gy][~till.rohrmann] > Race condition between slot offerings and AllocatedSlotReport > - > > Key: FLINK-12863 > URL: https://issues.apache.org/jira/browse/FLINK-12863 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > With FLINK-11059 we introduced the {{AllocatedSlotReport}} which is used by > the {{TaskExecutor}} to synchronize its internal view on slot allocations > with the view of the {{JobMaster}}. It seems that there is a race condition > between offering slots and receiving the report because the > {{AllocatedSlotReport}} is sent by the {{HeartbeatManagerSenderImpl}} from a > separate thread. > Due to that it can happen that we generate an {{AllocatedSlotReport}} just > before getting new slots offered. Since the report is sent from a different > thread, it can then happen that the response to the slot offerings is sent > earlier than the {{AllocatedSlotReport}}. Consequently, we might receive an > outdated slot report on the {{TaskExecutor}} causing active slots to be > released. > In order to solve the problem I propose to add a fencing token to the > {{AllocatedSlotReport}} which is being updated whenever we offer new slots to > the {{JobMaster}}. When we receive the {{AllocatedSlotReport}} on the > {{TaskExecutor}} we compare the current slot report fencing token with the > received one and only process the report if they are equal. Otherwise we wait > for the next heartbeat to send us an up to date {{AllocatedSlotReport}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12761) Fine grained resource management
[ https://issues.apache.org/jira/browse/FLINK-12761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857448#comment-16857448 ] Xiaogang Shi commented on FLINK-12761: -- I think it's a good idea to enable fine-grained resource management in Flink. I'ver read the design document and found many details omitted in the document. As ResourceManager is a critical component in Flink and has many bugs right now (especially YarnResourceManager), it's appreciated that you can provide more details in your design document. > Fine grained resource management > > > Key: FLINK-12761 > URL: https://issues.apache.org/jira/browse/FLINK-12761 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.8.0, 1.9.0 >Reporter: Tony Xintong Song >Assignee: Tony Xintong Song >Priority: Major > Labels: Umbrella > > This is an umbrella issue for enabling fine grained resource management in > Flink. > Fine grained resource management is a big topic that requires long term > efforts. There are many issues to be addressed and designing decisions to be > made, some of which may not be resolved in short time. Here we propose our > design and implementation plan for the upcoming release 1.9, as well as our > thoughts and ideas on the long term road map on this topic. > A practical short term target is to enable fine grained resource management > for batch sql jobs only in the upcoming Flink 1.9. This is necessary for > batch operators added from blink to achieve good performance. > Please find detailed design and implementation plan in attached docs. Any > comment and feedback are welcomed and appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12285) Memory leak in SavepointITCase and SavepointMigrationTestBase
Xiaogang Shi created FLINK-12285: Summary: Memory leak in SavepointITCase and SavepointMigrationTestBase Key: FLINK-12285 URL: https://issues.apache.org/jira/browse/FLINK-12285 Project: Flink Issue Type: Bug Components: Tests Reporter: Xiaogang Shi Assignee: Biao Liu The tests in {{SavepointITCase}} and {{SavepointMigrationTestBase}} do not cancel running jobs before exit. It will cause exceptions in {{TaskExecutor}}s and unreleased memory segments. Succeeding tests may fail due to insufficient amount of memory. The problem is caused by cancelling {{TaskExecutor}}s with running tasks. Another issue caused by the reason can be seen in FLINK-11343. Maybe we can find a more dedicated method to cancel those {{TaskExecutor}}s still having running tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12284) InputBufferPoolUsage is incorrect in credit-based network control flow
Xiaogang Shi created FLINK-12284: Summary: InputBufferPoolUsage is incorrect in credit-based network control flow Key: FLINK-12284 URL: https://issues.apache.org/jira/browse/FLINK-12284 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.8.0, 1.7.2, 1.6.4, 1.6.3 Reporter: Xiaogang Shi When using credit-based network control flow, exclusive buffers are directly assigned to {{RemoteInputChannel}} and are not counted in {{LocalBufferPool}}, leading to incorrect InputBufferPoolUsage. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16816118#comment-16816118 ] Xiaogang Shi commented on FLINK-10052: -- What's the status of the issue? [~Wosinsan] {{SessionConnectionStateErrorPolicy}} is introduced in Curator 3.0 while Flink is using Curator 2.12. Since Curator 3.x has problems in the compatibility with Zookeeper 3.x, [Zookeeper Compatibility | [https://curator.apache.org/zk-compatibility.html]] , we should bump our Curator version to 4.x to use {{SessionConnectionStateErrorPolicy}}. What do you think? [~till.rohrmann] > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Till Rohrmann >Assignee: Dominik Wosiński >Priority: Major > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)
[ https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702604#comment-16702604 ] Xiaogang Shi commented on FLINK-10333: -- I think so. We can wrap all the access to ZK in the following way {code:java} client.inTransaction() .check().forPath(election-node-path).and() .setData(...).forPath(...).and() .commit(); {code} where {{election-node-path}} is the path of the election znode. The check on the path will ensure the JobManager is leader. > Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, > CompletedCheckpoints) > - > > Key: FLINK-10333 > URL: https://issues.apache.org/jira/browse/FLINK-10333 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.8.0 > > > While going over the ZooKeeper based stores > ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, > {{ZooKeeperCompletedCheckpointStore}}) and the underlying > {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were > introduced with past incremental changes. > * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} > or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization > problems will either lead to removing the Znode or not > * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of > exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case > of a failure) > * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be > better to move {{RetrievableStateStorageHelper}} out of it for a better > separation of concerns > * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even > if it is locked. This should not happen since it could leave another system > in an inconsistent state (imagine a changed {{JobGraph}} which restores from > an old checkpoint) > * Redundant but also somewhat inconsistent put logic in the different stores > * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} > which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}} > * Getting rid of the {{SubmittedJobGraphListener}} would be helpful > These problems made me think how reliable these components actually work. > Since these components are very important, I propose to refactor them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)
[ https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701821#comment-16701821 ] Xiaogang Shi commented on FLINK-10333: -- hi [~StephanEwen], I think the key point to achieve atomicity is the usage of Zookeeper transactions, ensuring that every modification to ZooKeeper only takes effect when the corresponding election node exists. As far as i know, ZooKeeper transactions only support the checks on the existence of given paths, and do not support the checks on the payload. Hence i think adding the leader ID as payload to the ephemeral node does not help in achieving atomicity. What do you think? > Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, > CompletedCheckpoints) > - > > Key: FLINK-10333 > URL: https://issues.apache.org/jira/browse/FLINK-10333 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.8.0 > > > While going over the ZooKeeper based stores > ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, > {{ZooKeeperCompletedCheckpointStore}}) and the underlying > {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were > introduced with past incremental changes. > * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} > or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization > problems will either lead to removing the Znode or not > * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of > exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case > of a failure) > * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be > better to move {{RetrievableStateStorageHelper}} out of it for a better > separation of concerns > * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even > if it is locked. This should not happen since it could leave another system > in an inconsistent state (imagine a changed {{JobGraph}} which restores from > an old checkpoint) > * Redundant but also somewhat inconsistent put logic in the different stores > * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} > which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}} > * Getting rid of the {{SubmittedJobGraphListener}} would be helpful > These problems made me think how reliable these components actually work. > Since these components are very important, I propose to refactor them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)
[ https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701495#comment-16701495 ] Xiaogang Shi commented on FLINK-10333: -- Hi [~till.rohrmann] Our cluster is suffering from unstable Zookeeper connections and I think this effort will help deal with some problems. But we are still suffering from some problems in leader elections. The main cause is due the lack of atomicity. For example, JobMaster will write its address on another znode when it becomes the leader. But its leadership may already be lost when it is going to write its address (e.g., due to a long-time full GC). To alleviate the problem, many double checks are used in the code. Similar problems are also observed in the access to checkpoints. When an old job master loses its leadership, it may still have access to the checkpoints in Zookeeper and may modify them. Various methods (including locks to disallow deletion and rescanning zookeeper on restoring) are deployed to deal with these exceptions, but it does not seem to be a perfect solution. After diving deep into the implementation of leader election in Zookeeper Recipes, i have some ideas to improve our implementation. The basic idea is that we should guarantee that only the elected leader has the access to Zookeeper. In Zookeeper Recipes, each leader contender will create an election znode which is SEQUENTIAL and EPHERMAL under a certain path. The contender with the smallest sequence number will be elected as the leader. When the elected leader fails, its election znode will disappear and the contender whose session number is smallest among the remaining contenders will be elected as the new leader. So once a contender grants the leadership, its election znode must exist in Zookeeper. Hence, we can record the election node of each contender. Once each contender wants to modify something in Zookeeper, it must put these modification, together with the check on the existence of its election node, in a Zookeeper transaction. If the contender has already lost its leadership, the transaction will fail due to the unsatisfied check. That way, we can ensure only the elected leader has access to the states in Zookeeper. Currently, Zookeeper Recipes does not expose any interface to access the path of election nodes. Maybe we need to reimplement the leader election with native Zookeeper interfaces in Flink without the usage of Zookeeper Recipes. What do you think of the idea? > Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, > CompletedCheckpoints) > - > > Key: FLINK-10333 > URL: https://issues.apache.org/jira/browse/FLINK-10333 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.8.0 > > > While going over the ZooKeeper based stores > ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, > {{ZooKeeperCompletedCheckpointStore}}) and the underlying > {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were > introduced with past incremental changes. > * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} > or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization > problems will either lead to removing the Znode or not > * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of > exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case > of a failure) > * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be > better to move {{RetrievableStateStorageHelper}} out of it for a better > separation of concerns > * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even > if it is locked. This should not happen since it could leave another system > in an inconsistent state (imagine a changed {{JobGraph}} which restores from > an old checkpoint) > * Redundant but also somewhat inconsistent put logic in the different stores > * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} > which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}} > * Getting rid of the {{SubmittedJobGraphListener}} would be helpful > These problems made me think how reliable these components actually work. > Since these components are very important, I propose to refactor them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10714) java.lang.IndexOutOfBoundsException when creating a heap backend snapshot
[ https://issues.apache.org/jira/browse/FLINK-10714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668009#comment-16668009 ] Xiaogang Shi commented on FLINK-10714: -- [~cmick] I came across a similar problem before. It seems that kryo cannot properly serialize some collection types. Finally i got rid of this problem by registering another serializer (e.g. JavaSerializer) for problematic collection types in ExecutionConfig. > java.lang.IndexOutOfBoundsException when creating a heap backend snapshot > - > > Key: FLINK-10714 > URL: https://issues.apache.org/jira/browse/FLINK-10714 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.5.5, 1.6.2 > Environment: Flink 1.6.2, FsStateBackend >Reporter: Michał Ciesielczyk >Priority: Blocker > Fix For: 1.7.0 > > > I'm sometimes getting error while creating a checkpointing using a filesystem > state backend. This ONLY happens when asynchronous snapshots are enabled > using the FileSystem State Backend. When RocksDB is enabled everything works > fine. > > I'm using a simple KeyedStream,mapWithState function with a ValueState > holding a hashmap (scala.collection.immutable.Map). It's hard to reproduce > the error using a simple code snippet, as the error occurs randomly. > > This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), > but I'm still experiencing such behavior. > > Stacktrace: > > {code:java} > java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172] > at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172] > at > com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56) > ~[kryo-shaded-4.0.0.jar:?] > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) > ~[kryo-shaded-4.0.0.jar:?] > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) > ~[kryo-shaded-4.0.0.jar:?] > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231) > ~[flink-core-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > [flink-runtime_2.11-1.6.1.jar:1.6.1] > at java.lang.Threa
[jira] [Commented] (FLINK-6557) RocksDbStateBackendTest fails on Windows
[ https://issues.apache.org/jira/browse/FLINK-6557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16009910#comment-16009910 ] Xiaogang Shi commented on FLINK-6557: - [~Zentol] Thanks a lot for pointing it out. As far as i know, the maximum length of directory paths in Windows is 260. The length of the path printed on the log however is only 181. Could you provide more information (e.g., stacks on exception) to help address the problem? > RocksDbStateBackendTest fails on Windows > > > Key: FLINK-6557 > URL: https://issues.apache.org/jira/browse/FLINK-6557 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.3.0, 1.4.0 >Reporter: Chesnay Schepler > > The {{RocksDbStateBackendTest}} fails on windows when incremental checkpoint > is enabled. > Based on the exception i guess the file name is just simply too long: > {code} > org.rocksdb.RocksDBException: IO error: Failed to create dir: > /C:/Users/Zento/AppData/Local/Temp/junit572330160893758355/junit5754599533651878867/job-ecbdb9df76fd3a39108dac7c515e3214_op-Test_uuid-6a43f1f6-1f35-443e-945c-aab3643e62fc/chk-0.tmp: > Invalid argument > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6545) Make incremental checkpoints externalizable
[ https://issues.apache.org/jira/browse/FLINK-6545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16007785#comment-16007785 ] Xiaogang Shi commented on FLINK-6545: - I prefer not to use `SavepointSerializer` to serialize/deserialize external checkpoints. Since the implementation of incremental checkpointing may vary a lot in different state backends, it will be very tedious and error-prone for `SavepointSerializer` to support each kind of incremental state handle. Given that checkpoints are not supposed to be back-compatible, maybe we can simply use java serializer to do the serialization of external checkpoints. What do you think? > Make incremental checkpoints externalizable > --- > > Key: FLINK-6545 > URL: https://issues.apache.org/jira/browse/FLINK-6545 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Priority: Blocker > Fix For: 1.3.0 > > > Incremental checkpoints are currently not externalizible. The missing piece > is familiarizing the {{SavepointSerializer}}(s) with the new state handles > classes that we added for incremental checkpointing. Currently, some of those > (e.g. > {{org.apache.flink.contrib.streaming.state.RocksDBIncrementalKeyedStateHandle}}) > currently live in the contrib.rocksdb package and need to be migrated. We > can also push them to a different abstraction level, i.e. > {{IncrementalKeyedStateHandle}} with some private data, referenced existing > shared data (from previous checkpoints), and (presumably) newly created > shared data (first created by the current checkpoint). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6467) Potential lack of synchronization w.r.t. newSstFiles in RocksDBKeyedStateBackend#releaseResources()
[ https://issues.apache.org/jira/browse/FLINK-6467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16002329#comment-16002329 ] Xiaogang Shi commented on FLINK-6467: - [~te...@apache.org] Thanks for pointing out this problem. It's true that {{newSstFiles}} will be accessed by both the cancel thread and the materialization thread. But the materialization thread must be stopped (due to the closing of the input stream/output stream and the interruption) when {{releaseResources}} is executed in the cancel thread. So synchronization is not added here. What do you think? > Potential lack of synchronization w.r.t. newSstFiles in > RocksDBKeyedStateBackend#releaseResources() > --- > > Key: FLINK-6467 > URL: https://issues.apache.org/jira/browse/FLINK-6467 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Priority: Minor > > {code} > if (canceled) { > List statesToDiscard = new ArrayList<>(); > statesToDiscard.add(metaStateHandle); > statesToDiscard.addAll(miscFiles.values()); > statesToDiscard.addAll(newSstFiles.values()); > {code} > It seems access to newSstFiles should be protected by > stateBackend.asyncSnapshotLock -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6504) Lack of synchronization on materializedSstFiles in RocksDBKEyedStateBackend
[ https://issues.apache.org/jira/browse/FLINK-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-6504: --- Assignee: Xiaogang Shi > Lack of synchronization on materializedSstFiles in RocksDBKEyedStateBackend > --- > > Key: FLINK-6504 > URL: https://issues.apache.org/jira/browse/FLINK-6504 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Xiaogang Shi > > Concurrent checkpoints could access `materializedSstFiles` in the > `RocksDBStateBackend` concurrently. This should be avoided. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend
Xiaogang Shi created FLINK-6364: --- Summary: Implement incremental checkpointing in RocksDBStateBackend Key: FLINK-6364 URL: https://issues.apache.org/jira/browse/FLINK-6364 Project: Flink Issue Type: Sub-task Reporter: Xiaogang Shi Assignee: Xiaogang Shi {{RocksDBStateBackend}} is well suited for incremental checkpointing because RocksDB is base on LSM trees, which record updates in new sst files and all sst files are immutable. By only materializing those new sst files, we can significantly improve the performance of checkpointing. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6014) Allow the registration of state objects in checkpoints
[ https://issues.apache.org/jira/browse/FLINK-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi closed FLINK-6014. --- Resolution: Fixed Fixed via 218bed8b8e49b0e4c61c61f696a8f010eafea1b7 and aa21f853ab0380ec1f68ae1d0b7c8d9268da4533 > Allow the registration of state objects in checkpoints > -- > > Key: FLINK-6014 > URL: https://issues.apache.org/jira/browse/FLINK-6014 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > This issue is the very first step towards incremental checkpointing. We > introduce a new state handle named {{CompositeStateHandle}} to be the base of > the snapshots taken by task components. Known implementation may include > {{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for > subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s). > Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. > It should register all its state objects in {{StateRegistry}} when its > checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending > checkpoint completes or a complete checkpoint is reloaded in the recovery). > When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, > we should not simply discard all state objects in the checkpoint. With the > introduction of incremental checkpointing, a {{StateObject}} may be > referenced by different checkpoints. We should unregister all the state > objects contained in the {{StateRegistry}} first. Only those state objects > that are not referenced by any checkpoint can be deleted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6284) Incorrect sorting of completed checkpoints in ZooKeeperCompletedCheckpointStore
Xiaogang Shi created FLINK-6284: --- Summary: Incorrect sorting of completed checkpoints in ZooKeeperCompletedCheckpointStore Key: FLINK-6284 URL: https://issues.apache.org/jira/browse/FLINK-6284 Project: Flink Issue Type: Bug Reporter: Xiaogang Shi Now all completed checkpoints are sorted in their paths when they are recovered in {{ZooKeeperCompletedCheckpointStore}} . In the cases where the latest checkpoint's id is not the largest in lexical order (e.g., "100" is smaller than "99" in lexical order), Flink will not recover from the latest completed checkpoint. The problem can be easily observed by setting the checkpoint ids in {{ZooKeeperCompletedCheckpointStoreITCase#testRecover()}} to be 99, 100 and 101. To fix the problem, we should explicitly sort found checkpoints in their checkpoint ids, without the usage of {{ZooKeeperStateHandleStore#getAllSortedByName()}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration
[ https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962347#comment-15962347 ] Xiaogang Shi commented on FLINK-3089: - [~aljoscha] In current implementation, each RocksDB timer is identified by the timer's key, namespace and timestamp. Because RocksDB does not need to iterate over the timers to find the timer to delete, it's very efficient to delete a timer in RocksDB timer services. > State API Should Support Data Expiration > > > Key: FLINK-3089 > URL: https://issues.apache.org/jira/browse/FLINK-3089 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Niels Basjes > > In some usecases (webanalytics) there is a need to have a state per visitor > on a website (i.e. keyBy(sessionid) ). > At some point the visitor simply leaves and no longer creates new events (so > a special 'end of session' event will not occur). > The only way to determine that a visitor has left is by choosing a timeout, > like "After 30 minutes no events we consider the visitor 'gone'". > Only after this (chosen) timeout has expired should we discard this state. > In the Trigger part of Windows we can set a timer and close/discard this kind > of information. But that introduces the buffering effect of the window (which > in some scenarios is unwanted). > What I would like is to be able to set a timeout on a specific OperatorState > value which I can update afterwards. > This makes it possible to create a map function that assigns the right value > and that discards the state automatically. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6219) Add a state backend which supports sorting
[ https://issues.apache.org/jira/browse/FLINK-6219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15950388#comment-15950388 ] Xiaogang Shi commented on FLINK-6219: - I prefer to use sorted states (e.g., {{SortedMapState}}) rather than a new state backend to address the described problem. Some users have mentioned similar demands for sorted states. Hence I think we should provide them to facilitate the development of user applications. The implementation of such sorted states however may be very challenging. In {{HeapStateBackend}}, we need to implement a data structure which supports both Copy-on-Write (for asynchronous snapshotting) and sorting. In {{RocksDBStateBackend}} , we need to find an efficient way to support customized sorting. Though RocksDBJava allows customized comparators, the performance will be significantly degraded once a customized comparator is used (approximately 1/3 - 1/15 in QPS). It's critical to address the problems mentioned above. Otherwise, {{ValueState}} s whose data is typed {{SortedMap}} are better to sort user data under the same key. > Add a state backend which supports sorting > -- > > Key: FLINK-6219 > URL: https://issues.apache.org/jira/browse/FLINK-6219 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing, Table API & SQL >Reporter: sunjincheng > > When we implement the OVER window of > [FLIP11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations] > We notice that we need a state backend which supports sorting, allows for > efficient insertion, traversal in order, and removal from the head. > For example: In event-time OVER window, we need to sort by time,If the datas > as follow: > {code} > (1L, 1, Hello) > (2L, 2, Hello) > (5L, 5, Hello) > (4L, 4, Hello) > {code} > We randomly insert the datas, just like: > {code} > put((2L, 2, Hello)),put((1L, 1, Hello)),put((5L, 5, Hello)),put((4L, 4, > Hello)), > {code} > We deal with elements in time order: > {code} > process((1L, 1, Hello)),process((2L, 2, Hello)),process((4L, 4, > Hello)),process((5L, 5, Hello)) > {code} > Welcome anyone to give feedback,And what do you think? [~xiaogang.shi] > [~aljoscha] [~fhueske] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6178) Allow upgrades to state serializers
[ https://issues.apache.org/jira/browse/FLINK-6178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15944776#comment-15944776 ] Xiaogang Shi commented on FLINK-6178: - [~tzulitai] Thanks a lot for your quick response. The changes to the interfaces in {{RuntimeContext}} sound great! They do help in the conversion of savepoints. Looking forwards to them. > Allow upgrades to state serializers > --- > > Key: FLINK-6178 > URL: https://issues.apache.org/jira/browse/FLINK-6178 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Currently, users are locked in with the serializer implementation used to > write their state. > This is suboptimal, as generally for users, it could easily be possible that > they wish to change their serialization formats / state schemas and types in > the future. > This is an umbrella JIRA for the required tasks to make this possible. > Here's an overview description of what to expect for the overall outcome of > this JIRA (the specific details are outlined in their respective subtasks): > Ideally, the main user-facing change this would result in is that users > implementing their custom {{TypeSerializer}} s will also need to implement > hook methods that identify whether or not there is a change to the serialized > format or even a change to the serialized data type. It would be the user's > responsibility that the {{deserialize}} method can bridge the change between > the old / new formats. > For Flink's built-in serializers that are automatically built using the > user's configuration (most notably the more complex {{KryoSerializer}} and > {{GenericArraySerializer}}), Flink should be able to automatically > "reconfigure" them using the new configuration, so that the reconfigured > versions can be used to de- / serialize previous state. This would require > knowledge of the previous configuration of the serializer, therefore > "serializer configuration metadata" will be added to savepoints. > Note that for the first version of this, although additional infrastructure > (e.g. serializer reconfigure hooks, serializer configuration metadata in > savepoints) will be added to potentially allow Kryo version upgrade, this > JIRA will not cover this. Kryo has breaking binary formats across major > versions, and will most likely need some further changes. Therefore, for the > {{KryoSerializer}}, "upgrading" it simply means changes in the registration > of specific / default serializers, at least for now. > Finally, we would need to add a "convertState" phase to the task lifecycle, > that takes place after the "open" phase and before checkpointing starts / the > task starts running. It can only happen after "open", because only then can > we be certain if any reconfiguration of state serialization has occurred, and > state needs to be converted. Ideally, the code for the "convertState" is > designed so that it can be easily exposed as an offline tool in the future. > For this JIRA, we should simply assume that after {{open()}}, we have all the > required information and serializers are appropriately reconfigured. > [~srichter] is currently planning to deprecate RuntimeContext state > registration methods in favor of a new interface that enforces eager state > registration, so that we may have all the info after {{open()}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6178) Allow upgrades to state serializers
[ https://issues.apache.org/jira/browse/FLINK-6178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15944743#comment-15944743 ] Xiaogang Shi commented on FLINK-6178: - The idea that allowing the upgrades to state serializers is excellent. But I have some concerns about the "convertState" phase. Currently, Flink has no knowledge of the serializers to use before users access the states (via the methods provided in {{RuntimeContext}}). That means, we can only convert the states when users are about to access them. The conversion may be very costly and the processing of data streams will be paused for quite a long time. Actually, i am very interested at the offline tool provided in the future. Now many efforts are made in Flink runtime to allow the restoring from old savepoints. They make the code very complicated and hard to follow. I prefer to move them from the main program to the offline tool. I think the offline tool also eases the burdens of users to implement {{TypeSerializer}}s that allow the deserialization of the data in different serialization formats. They only need to provide the new serializers to access the states stored in the savepoints. > Allow upgrades to state serializers > --- > > Key: FLINK-6178 > URL: https://issues.apache.org/jira/browse/FLINK-6178 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Currently, users are locked in with the serializer implementation used to > write their state. > This is suboptimal, as generally for users, it could easily be possible that > they wish to change their serialization formats / state schemas and types in > the future. > This is an umbrella JIRA for the required tasks to make this possible. > Here's an overview description of what to expect for the overall outcome of > this JIRA (the specific details are outlined in their respective subtasks): > Ideally, the main user-facing change this would result in is that users > implementing their custom {{TypeSerializer}} s will also need to implement > hook methods that identify whether or not there is a change to the serialized > format or even a change to the serialized data type. It would be the user's > responsibility that the {{deserialize}} method can bridge the change between > the old / new formats. > For Flink's built-in serializers that are automatically built using the > user's configuration (most notably the more complex {{KryoSerializer}} and > {{GenericArraySerializer}}), Flink should be able to automatically > "reconfigure" them using the new configuration, so that the reconfigured > versions can be used to de- / serialize previous state. This would require > knowledge of the previous configuration of the serializer, therefore > "serializer configuration metadata" will be added to savepoints. > Note that for the first version of this, although additional infrastructure > (e.g. serializer reconfigure hooks, serializer configuration metadata in > savepoints) will be added to potentially allow Kryo version upgrade, this > JIRA will not cover this. Kryo has breaking binary formats across major > versions, and will most likely need some further changes. Therefore, for the > {{KryoSerializer}}, "upgrading" it simply means changes in the registration > of specific / default serializers, at least for now. > Finally, we would need to add a "convertState" phase to the task lifecycle, > that takes place after the "open" phase and before checkpointing starts / the > task starts running. It can only happen after "open", because only then can > we be certain if any reconfiguration of state serialization has occurred, and > state needs to be converted. Ideally, the code for the "convertState" is > designed so that it can be easily exposed as an offline tool in the future. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6096) Refactor the migration of old versioned savepoints
[ https://issues.apache.org/jira/browse/FLINK-6096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-6096: --- Assignee: Xiaogang Shi > Refactor the migration of old versioned savepoints > -- > > Key: FLINK-6096 > URL: https://issues.apache.org/jira/browse/FLINK-6096 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Existing code for the migration of old-versioned savepoints does not allow to > correctly deserialize those classes changed in different versions. I think > we should create a migration package for each old-versioned savepoint and put > all migrated classes in the savepoint there. A mapping can be deployed to > record those migrated classes in the savepoint so that we can correctly > deserialize them. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6096) Refactor the migration of old versioned savepoints
[ https://issues.apache.org/jira/browse/FLINK-6096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi updated FLINK-6096: Component/s: State Backends, Checkpointing > Refactor the migration of old versioned savepoints > -- > > Key: FLINK-6096 > URL: https://issues.apache.org/jira/browse/FLINK-6096 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Existing code for the migration of old-versioned savepoints does not allow to > correctly deserialize those classes changed in different versions. I think > we should create a migration package for each old-versioned savepoint and put > all migrated classes in the savepoint there. A mapping can be deployed to > record those migrated classes in the savepoint so that we can correctly > deserialize them. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6096) Refactor the migration of old versioned savepoints
Xiaogang Shi created FLINK-6096: --- Summary: Refactor the migration of old versioned savepoints Key: FLINK-6096 URL: https://issues.apache.org/jira/browse/FLINK-6096 Project: Flink Issue Type: Improvement Reporter: Xiaogang Shi Existing code for the migration of old-versioned savepoints does not allow to correctly deserialize those classes changed in different versions. I think we should create a migration package for each old-versioned savepoint and put all migrated classes in the savepoint there. A mapping can be deployed to record those migrated classes in the savepoint so that we can correctly deserialize them. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6055) Supported setting timers on a Non-Keyed Stream
[ https://issues.apache.org/jira/browse/FLINK-6055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925575#comment-15925575 ] Xiaogang Shi commented on FLINK-6055: - I think it's very challenging because the storing and restoring of timers in non-keyed streams is very difficult. Do you have any idea? > Supported setting timers on a Non-Keyed Stream > -- > > Key: FLINK-6055 > URL: https://issues.apache.org/jira/browse/FLINK-6055 > Project: Flink > Issue Type: New Feature >Reporter: sunjincheng >Assignee: sunjincheng > > After [FLINK-4460] Allow ProcessFunction on non-keyed streams, I want > supported setting timers on a Non-Keyed Stream. What do you think? > [~aljoscha] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi updated FLINK-6034: Description: Currently, the only type of the snapshots in keyed streams is {{KeyGroupsStateHandle}} which is full and store the states one group after another. With the introduction of incremental checkpointing, we need a higher level abstraction of keyed snapshots to allow flexible snapshot formats. The implementation of {{KeyedStateHandle}} s may vary a lot in different backends. The only information needed in {{KeyedStateHandle}} s is their key group range. When recovering the job with a different degree of parallelism, {{KeyedStateHandle}} s will be assigned to those subtasks whose key group ranges overlap with their ranges. was: Currently, the only type of the snapshots in keyed streams is {{KeyGroupsStateHandle}} which is full and store the states one group after another. With the introduction of incremental checkpointing, we need a higher level abstraction of keyed snapshots to allow flexible snapshot formats. The implementation of {{KeyedStateHandle}}s may vary a lot in different backends. The only information needed in {{KeyedStateHandle}}s is their key group range. When recovering the job with a different degree of parallelism, {{KeyedStateHandle}}s will be assigned to those subtasks whose key group ranges overlap with their ranges. > Add KeyedStateHandle for the snapshots in keyed streams > --- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
Xiaogang Shi created FLINK-6034: --- Summary: Add KeyedStateHandle for the snapshots in keyed streams Key: FLINK-6034 URL: https://issues.apache.org/jira/browse/FLINK-6034 Project: Flink Issue Type: Sub-task Reporter: Xiaogang Shi Assignee: Xiaogang Shi Currently, the only type of the snapshots in keyed streams is {{KeyGroupsStateHandle}} which is full and store the states one group after another. With the introduction of incremental checkpointing, we need a higher level abstraction of keyed snapshots to allow flexible snapshot formats. The implementation of {{KeyedStateHandle}}s may vary a lot in different backends. The only information needed in {{KeyedStateHandle}}s is their key group range. When recovering the job with a different degree of parallelism, {{KeyedStateHandle}}s will be assigned to those subtasks whose key group ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6027) Ignore the exception thrown by the subsuming of old completed checkpoints
Xiaogang Shi created FLINK-6027: --- Summary: Ignore the exception thrown by the subsuming of old completed checkpoints Key: FLINK-6027 URL: https://issues.apache.org/jira/browse/FLINK-6027 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Xiaogang Shi Assignee: Xiaogang Shi When a checkpoint is added into the {{CompletedCheckpointStore}} via the method {{addCheckpoint()}}, the oldest checkpoints will be removed from the store if the number of stored checkpoints exceeds the given limit. The subsuming of old checkpoints may fail and make {{addCheckpoint()}} throw exceptions which are caught by {{CheckpointCoordinator}}. Finally, the states in the new checkpoint will be deleted by {{CheckpointCoordinator}}. Because the new checkpoint is still in the store, we may recover the job from the new checkpoint. But the recovery will fail as the states of the checkpoint are all deleted. We should ignore the exceptions thrown by the subsuming of old checkpoints because we can always recover from the new checkpoint when successfully adding it into the store. The ignorance may produce some dirty data, but it's acceptable because they can be cleaned with the cleanup hook introduced in the near future. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6014) Allow the registration of state objects in checkpoints
Xiaogang Shi created FLINK-6014: --- Summary: Allow the registration of state objects in checkpoints Key: FLINK-6014 URL: https://issues.apache.org/jira/browse/FLINK-6014 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Reporter: Xiaogang Shi Assignee: Xiaogang Shi This issue is the very first step towards incremental checkpointing. We introduce a new state handle named {{CompositeStateHandle}} to be the base of the snapshots taken by task components. Known implementation may include {{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s). Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. It should register all its state objects in {{StateRegistry}} when its checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending checkpoint completes or a complete checkpoint is reloaded in the recovery). When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, we should not simply discard all state objects in the checkpoint. With the introduction of incremental checkpointing, a {{StateObject}} may be referenced by different checkpoints. We should unregister all the state objects contained in the {{StateRegistry}} first. Only those state objects that are not referenced by any checkpoint can be deleted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5053) Incremental / lightweight snapshots for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-5053: --- Assignee: Xiaogang Shi > Incremental / lightweight snapshots for checkpoints > --- > > Key: FLINK-5053 > URL: https://issues.apache.org/jira/browse/FLINK-5053 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Xiaogang Shi > > There is currently basically no difference between savepoints and checkpoints > in Flink and both are created through exactly the same process. > However, savepoints and checkpoints have a slightly different meaning which > we should take into account to keep Flink efficient: > - Savepoints are (typically infrequently) triggered by the user to create a > state from which the application can be restarted, e.g. because Flink, some > code, or the parallelism needs to be changed. > - Checkpoints are (typically frequently) triggered by the System to allow for > fast recovery in case of failure, but keeping the job/system unchanged. > This means that savepoints and checkpoints can have different properties in > that: > - Savepoint should represent a state of the application, where > characteristics of the job (e.g. parallelism) can be adjusted for the next > restart. One example for things that savepoints need to be aware of are > key-groups. Savepoints can potentially be a little more expensive than > checkpoints, because they are usually created a lot less frequently through > the user. > - Checkpoints are frequently triggered by the system to allow for fast > failure recovery. However, failure recovery leaves all characteristics of the > job unchanged. This checkpoints do not have to be aware of those, e.g. think > again of key groups. Checkpoints should run faster than creating savepoints, > in particular it would be nice to have incremental checkpoints. > For a first approach, I would suggest the following steps/changes: > - In checkpoint coordination: differentiate between triggering checkpoints > and savepoints. Introduce properties for checkpoints that describe their set > of abilities, e.g. "is-key-group-aware", "is-incremental". > - In state handle infrastructure: introduce state handles that reflect > incremental checkpoints and drop full key-group awareness, i.e. covering > folders instead of files and not having keygroup_id -> file/offset mapping, > but keygroup_range -> folder? > - Backend side: We should start with RocksDB by reintroducing something > similar to semi-async snapshots, but using > BackupableDBOptions::setShareTableFiles(true) and transferring only new > incremental outputs to HDFS. Notice that using RocksDB's internal backup > mechanism is giving up on the information about individual key-groups. But as > explained above, this should be totally acceptable for checkpoints, while > savepoints should use the key-group-aware fully async mode. Of course we also > need to implement the ability to restore from both types of snapshots. > One problem in the suggested approach is still that even checkpoints should > support scale-down, in case that only a smaller number of instances is left > available in a recovery case. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5917) Remove MapState.size()
[ https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-5917: --- Assignee: Xiaogang Shi > Remove MapState.size() > -- > > Key: FLINK-5917 > URL: https://issues.apache.org/jira/browse/FLINK-5917 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek >Assignee: Xiaogang Shi > > I'm proposing to remove {{size()}} because it is a prohibitively expensive > operation and users might not be aware of it. Instead of {{size()}} users can > use an iterator over all mappings to determine the size, when doing this they > will be aware of the fact that it is a costly operation. > Right now, {{size()}} is only costly on the RocksDB state backend but I think > with future developments on the in-memory state backend it might also become > an expensive operation there. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5917) Remove MapState.size()
[ https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891970#comment-15891970 ] Xiaogang Shi commented on FLINK-5917: - +1 to remove the {{size()}} method due to the cost implementation. > Remove MapState.size() > -- > > Key: FLINK-5917 > URL: https://issues.apache.org/jira/browse/FLINK-5917 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek > > I'm proposing to remove {{size()}} because it is a prohibitively expensive > operation and users might not be aware of it. Instead of {{size()}} users can > use an iterator over all mappings to determine the size, when doing this they > will be aware of the fact that it is a costly operation. > Right now, {{size()}} is only costly on the RocksDB state backend but I think > with future developments on the in-memory state backend it might also become > an expensive operation there. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5925) Clean up extracted RocksDB JNI library
[ https://issues.apache.org/jira/browse/FLINK-5925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889462#comment-15889462 ] Xiaogang Shi commented on FLINK-5925: - +1 for the cleanup mechanism. Each component (e.g. {{TaskManager}}, {{Task}} and {{StateBackend}}) should be given a working directory and it should place all its files in that directory. When the component exists, we can simply delete the directory to clean unused files. > Clean up extracted RocksDB JNI library > -- > > Key: FLINK-5925 > URL: https://issues.apache.org/jira/browse/FLINK-5925 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann > > The {{RocksDBStateBackend}} extracts the RocksDB jni library from the RocksDB > dependency in a temp directory (see > {{RocksDBStateBackend#ensureRocksDBIsLoaded}}). This file is, however, never > removed. > In general, I think we should add a cleanup mechanism which cleans all > {{Task}} specific files after the {{Task}} has completed. The same applies to > the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5917) Remove MapState.size()
[ https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887094#comment-15887094 ] Xiaogang Shi commented on FLINK-5917: - [~StephanEwen] We just use the cache to avoid the costly scanning. It's initialized the first time the `size()` method is called. After then, the cache will be updated every time a new entry is inserted or an entry is removed. When the backend is closed, we can simply drop the cache. A better choice, i think, is to use a RocksDB entry to record the value of the `size`. We don't need to write the value into the entry everytime it's updated. We can update it only when taking snapshots. But this requires states to be aware of checkpointing which is missing in our current implementation. > Remove MapState.size() > -- > > Key: FLINK-5917 > URL: https://issues.apache.org/jira/browse/FLINK-5917 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek > > I'm proposing to remove {{size()}} because it is a prohibitively expensive > operation and users might not be aware of it. Instead of {{size()}} users can > use an iterator over all mappings to determine the size, when doing this they > will be aware of the fact that it is a costly operation. > Right now, {{size()}} is only costly on the RocksDB state backend but I think > with future developments on the in-memory state backend it might also become > an expensive operation there. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5917) Remove MapState.size()
[ https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885019#comment-15885019 ] Xiaogang Shi commented on FLINK-5917: - I think it's okay to remove this method. But a better choice is to use an in-memory cache to record the size of the state. That way, we can achieve constant-time implementation of the `size` method with little cost. I think this mechanism also works for the heap states in the future. > Remove MapState.size() > -- > > Key: FLINK-5917 > URL: https://issues.apache.org/jira/browse/FLINK-5917 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek > > I'm proposing to remove {{size()}} because it is a prohibitively expensive > operation and users might not be aware of it. Instead of {{size()}} users can > use an iterator over all mappings to determine the size, when doing this they > will be aware of the fact that it is a costly operation. > Right now, {{size()}} is only costly on the RocksDB state backend but I think > with future developments on the in-memory state backend it might also become > an expensive operation there. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5790) Use list types when ListStateDescriptor extends StateDescriptor
[ https://issues.apache.org/jira/browse/FLINK-5790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi closed FLINK-5790. --- Resolution: Fixed Fixed via d47446cafffe0d34d89488f6eb860aa139ceb3f1 > Use list types when ListStateDescriptor extends StateDescriptor > --- > > Key: FLINK-5790 > URL: https://issues.apache.org/jira/browse/FLINK-5790 > Project: Flink > Issue Type: Improvement >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Flink keeps the state serializer in {{StateDescriptor}}, but it's the > serializer of list elements that is put in {{ListStateDescriptor}}. The > implementation is a little confusing. Some backends need to construct the > state serializer with the element serializer by themselves. > We should use an {{ArrayListSerializer}}, which is composed of the serializer > of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid > constructing the state serializer. > If a backend needs customized serialization of the state (e.g. > {{RocksDBStateBackend}}), it still can obtain the element serializer from the > {{ArrayListSerializer}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5863) Unify the serialization of queryable list states in different backends
[ https://issues.apache.org/jira/browse/FLINK-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi closed FLINK-5863. --- Resolution: Fixed Now that we are refactoring the queryable states, we can make the changes then. > Unify the serialization of queryable list states in different backends > -- > > Key: FLINK-5863 > URL: https://issues.apache.org/jira/browse/FLINK-5863 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.3.0 >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi >Priority: Minor > > Now the deserialization of list states is implemented in > {{KvStateRequestSerializer}}. The serialization however is implemented > individually in different backends. > We should provide a method in {{KvStateRequestSerializer}} to remove the > redundant code. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5865) Throw original exception in states
[ https://issues.apache.org/jira/browse/FLINK-5865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi updated FLINK-5865: Summary: Throw original exception in states (was: Throw original exception in RocksDB states) > Throw original exception in states > -- > > Key: FLINK-5865 > URL: https://issues.apache.org/jira/browse/FLINK-5865 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Now all exception thrown in RocksDB states are converted to > {{RuntimeException}}. It's unnecessary and will print useless stacks in the > log. > I think it's better to throw the original exception, without any wrapping. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5865) Throw original exception in RocksDB states
Xiaogang Shi created FLINK-5865: --- Summary: Throw original exception in RocksDB states Key: FLINK-5865 URL: https://issues.apache.org/jira/browse/FLINK-5865 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Affects Versions: 1.3.0 Reporter: Xiaogang Shi Assignee: Xiaogang Shi Now all exception thrown in RocksDB states are converted to {{RuntimeException}}. It's unnecessary and will print useless stacks in the log. I think it's better to throw the original exception, without any wrapping. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5863) Unify the serialization of queryable list states in different backends
[ https://issues.apache.org/jira/browse/FLINK-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-5863: --- Assignee: Xiaogang Shi > Unify the serialization of queryable list states in different backends > -- > > Key: FLINK-5863 > URL: https://issues.apache.org/jira/browse/FLINK-5863 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.3.0 >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi >Priority: Minor > > Now the deserialization of list states is implemented in > {{KvStateRequestSerializer}}. The serialization however is implemented > individually in different backends. > We should provide a method in {{KvStateRequestSerializer}} to remove the > redundant code. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5863) Unify the serialization of queryable list states in different backends
Xiaogang Shi created FLINK-5863: --- Summary: Unify the serialization of queryable list states in different backends Key: FLINK-5863 URL: https://issues.apache.org/jira/browse/FLINK-5863 Project: Flink Issue Type: Improvement Components: Queryable State Affects Versions: 1.3.0 Reporter: Xiaogang Shi Priority: Minor Now the deserialization of list states is implemented in {{KvStateRequestSerializer}}. The serialization however is implemented individually in different backends. We should provide a method in {{KvStateRequestSerializer}} to remove the redundant code. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5036) Perform the grouping of keys in restoring instead of checkpointing
[ https://issues.apache.org/jira/browse/FLINK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi closed FLINK-5036. --- Resolution: Invalid > Perform the grouping of keys in restoring instead of checkpointing > -- > > Key: FLINK-5036 > URL: https://issues.apache.org/jira/browse/FLINK-5036 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Whenever taking snapshots of {{RocksDBKeyedStateBackend}}, the values in the > states will be written onto different files according to their key groups. > The procedure is very costly when the states are very big. > Given that the snapshot operations will be performed much more frequently > than restoring, we can leave the key groups as they are to improve the > overall performance. In other words, we can perform the grouping of keys in > restoring instead of in checkpointing. > I think, the implementation will be very similar to the restoring of > non-partitioned states. Each task will receive a collection of snapshots each > of which contains a set of key groups. Each task will restore its states from > the given snapshots by picking values in assigned key groups. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi closed FLINK-5023. --- Resolution: Won't Fix The updates to the `State` interface will affect existing user code. We will not update the interface before Flink 2.0. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts
[ https://issues.apache.org/jira/browse/FLINK-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi closed FLINK-5024. --- Resolution: Invalid Now we refactor the state descriptors with the introduction of composited serializers (e.g. {{ArrayListSerializer}}) > Add SimpleStateDescriptor to clarify the concepts > - > > Key: FLINK-5024 > URL: https://issues.apache.org/jira/browse/FLINK-5024 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, StateDescriptors accept two type arguments : the first one is the > type of the created state and the second one is the type of the values in the > states. > The concepts however is a little confusing here because in ListStates, the > arguments passed to the StateDescriptors are the types of the list elements > instead of the lists. It also makes the implementation of MapStates difficult. > I suggest not to put the type serializer in StateDescriptors, making > StateDescriptors independent of the data structures of the values. > A new type of StateDescriptor named SimpleStateDescriptor can be provided to > abstract those states (namely ValueState, ReducingState and FoldingState) > whose states are not composited. > The states (e.g. ListStates and MapStates) can implement their own > descriptors according to their data structures. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5790) Use list types when ListStateDescriptor extends StateDescriptor
Xiaogang Shi created FLINK-5790: --- Summary: Use list types when ListStateDescriptor extends StateDescriptor Key: FLINK-5790 URL: https://issues.apache.org/jira/browse/FLINK-5790 Project: Flink Issue Type: Improvement Reporter: Xiaogang Shi Assignee: Xiaogang Shi Flink keeps the state serializer in {{StateDescriptor}}, but it's the serializer of list elements that is put in {{ListStateDescriptor}}. The implementation is a little confusing. Some backends need to construct the state serializer with the element serializer by themselves. We should use an {{ArrayListSerializer}}, which is composed of the serializer of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid constructing the state serializer. If a backend needs customized serialization of the state (e.g. {{RocksDBStateBackend}}), it still can obtain the element serializer from the {{ArrayListSerializer}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5738) Destroy created backend when task is canceled
Xiaogang Shi created FLINK-5738: --- Summary: Destroy created backend when task is canceled Key: FLINK-5738 URL: https://issues.apache.org/jira/browse/FLINK-5738 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Xiaogang Shi When a task is canceled, the {{ClosableRegistry}} will be closed in the cancel thread. However, the task may still in the creation of {{KeyedStateBackend}}, and it will fail to register the backend to the {{ClosableRegistry}}. Because the backend is not assigned to the operator yet (due to the exception), the backend will not be destroyed when the task thread exits. A simple solution is to catch exception in the registering and destroy the created backend in the case of failures. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5590) Create a proper internal state hierarchy
[ https://issues.apache.org/jira/browse/FLINK-5590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833260#comment-15833260 ] Xiaogang Shi commented on FLINK-5590: - [~StephanEwen], do you have any ideas of the solution? I think {{KvState}} already provides some needed internal methods. Maybe we can extend it to create the internal state hierarchy? > Create a proper internal state hierarchy > > > Key: FLINK-5590 > URL: https://issues.apache.org/jira/browse/FLINK-5590 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Currently, the state interfaces (like {{ListState}}, {{ValueState}}, > {{ReducingState}}) are very sparse and contain only methods exposed to the > users. That makes sense to keep the public stable API minimal > At the same time, the runtime needs more methods for its internal interaction > with state, such as: > - setting namespaces > - accessing raw values > - merging namespaces > These are currently realized by re-creating or re-obtaining the state objects > from the KeyedStateBackend. That method causes quite an overhead for each > access to the state > The KeyedStateBackend tries to do some tricks to reduce that overhead, but > does it only partially and induces other overhead in the course. > The root cause of all these issues is a problem in the design: There is no > proper "internal state abstraction" in a similar way as there is an external > state abstraction (the public state API). > We should add a similar hierarchy of states for the internal methods. It > would look like in the example below: > {code} > * State > * | > * +---InternalKvState > * | | > * MergingState | > * | | > * +-InternalMergingState > * | | > * ++--+ | > * | | | > * ReducingStateListState+-+-+ > * | || | > * +---+ +--- -InternalListState > * || > * +-InternalReducingState > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-5544: --- Assignee: Xiaogang Shi > Implement Internal Timer Service in RocksDB > --- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in the order of timestamp. But when performing checkpoints, > we must have a method to obtain all timers of a given key group. > A good implementation, as suggested by [~StephanEwen], follows the idea of > merge sorting. We can store timers in RocksDB with the format > {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put > together and are sorted. > Then we can deploy an in-memory heap which keeps the first timer of each key > group to get the next timer to trigger. When a key group's first timer is > updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi updated FLINK-5544: Issue Type: New Feature (was: Bug) > Implement Internal Timer Service in RocksDB > --- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Xiaogang Shi > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in the order of timestamp. But when performing checkpoints, > we must have a method to obtain all timers of a given key group. > A good implementation, as suggested by [~StephanEwen], follows the idea of > merge sorting. We can store timers in RocksDB with the format > {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put > together and are sorted. > Then we can deploy an in-memory heap which keeps the first timer of each key > group to get the next timer to trigger. When a key group's first timer is > updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5544) Implement Internal Timer Service in RocksDB
Xiaogang Shi created FLINK-5544: --- Summary: Implement Internal Timer Service in RocksDB Key: FLINK-5544 URL: https://issues.apache.org/jira/browse/FLINK-5544 Project: Flink Issue Type: Bug Components: Streaming Reporter: Xiaogang Shi Now the only implementation of internal timer service is HeapInternalTimerService which stores all timers in memory. In the cases where the number of keys is very large, the timer service will cost too much memory. A implementation which stores timers in RocksDB seems good to deal with these cases. It might be a little challenging to implement a RocksDB timer service because the timers are accessed in different ways. When timers are triggered, we need to access timers in the order of timestamp. But when performing checkpoints, we must have a method to obtain all timers of a given key group. A good implementation, as suggested by [~StephanEwen], follows the idea of merge sorting. We can store timers in RocksDB with the format {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put together and are sorted. Then we can deploy an in-memory heap which keeps the first timer of each key group to get the next timer to trigger. When a key group's first timer is updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5398) Exclude generated files in module flink-batch-connectors in license checking
[ https://issues.apache.org/jira/browse/FLINK-5398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15786803#comment-15786803 ] Xiaogang Shi commented on FLINK-5398: - [~fhueske] Thanks for your explanation. It helps. > Exclude generated files in module flink-batch-connectors in license checking > > > Key: FLINK-5398 > URL: https://issues.apache.org/jira/browse/FLINK-5398 > Project: Flink > Issue Type: Bug >Reporter: Xiaogang Shi > > Now the master branch fails to execute {{mvn install}} due to unlicensed > files in the module flink-batch-connectors. We should exclude these generated > files in the pom file. > Unapproved licenses: > > flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Address.java > > flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java > > flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Fixed16.java > > flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5400) Add accessor to folding states in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi updated FLINK-5400: Description: Now {{RuntimeContext}} does not provide the accessors to folding states. Therefore users cannot use folding states in their rich functions. I think we should provide the missing accessor. (was: Now {{RuntimeContext}} does provide the accessors to folding states. Therefore users cannot use folding states in their rich functions. I think we should provide the missing accessor.) > Add accessor to folding states in RuntimeContext > > > Key: FLINK-5400 > URL: https://issues.apache.org/jira/browse/FLINK-5400 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Now {{RuntimeContext}} does not provide the accessors to folding states. > Therefore users cannot use folding states in their rich functions. I think we > should provide the missing accessor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5400) Add accessor to folding states in RuntimeContext
Xiaogang Shi created FLINK-5400: --- Summary: Add accessor to folding states in RuntimeContext Key: FLINK-5400 URL: https://issues.apache.org/jira/browse/FLINK-5400 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Xiaogang Shi Now {{RuntimeContext}} does provide the accessors to folding states. Therefore users cannot use folding states in their rich functions. I think we should provide the missing accessor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5398) Exclude generated files in module flink-batch-connectors in license checking
Xiaogang Shi created FLINK-5398: --- Summary: Exclude generated files in module flink-batch-connectors in license checking Key: FLINK-5398 URL: https://issues.apache.org/jira/browse/FLINK-5398 Project: Flink Issue Type: Bug Reporter: Xiaogang Shi Now the master branch fails to execute {{mvn install}} due to unlicensed files in the module flink-batch-connectors. We should exclude these generated files in the pom file. Unapproved licenses: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Address.java flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Fixed16.java flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5397) Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors
[ https://issues.apache.org/jira/browse/FLINK-5397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15782811#comment-15782811 ] Xiaogang Shi commented on FLINK-5397: - The idea does work and is better. Very thanks for the quick fix :) > Fail to deserialize savepoints in v1.1 when there exist missing fields in > class serialization descriptors > - > > Key: FLINK-5397 > URL: https://issues.apache.org/jira/browse/FLINK-5397 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Stefan Richter > > To restore from the savepoints in previous versions, Flink now keeps all > classes whose serialization is changed and put them in a separated package > ("migration"). > When deserializing the old savepoints, flink will look up correct descriptors > ({{ObjectStreamClass}}) for these classes, without using those ones written > in serialized data. The implementation however is problematic when there > exist missing field descriptors in the serialized data. > When serializing an object, Java will only write the descriptors of those > non-null fields. But when we look up class descriptors with given classes, > all fields will be put into the descriptors. As a result, we will deserialize > the savepoints with incorrect descriptors, leading to serialization > exceptions. > A simple resolution is to update the name of read descriptors using > Reflections, without using different descriptors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5397) Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors
[ https://issues.apache.org/jira/browse/FLINK-5397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi updated FLINK-5397: Description: To restore from the savepoints in previous versions, Flink now keeps all classes whose serialization is changed and put them in a separated package ("migration"). When deserializing the old savepoints, flink will look up correct descriptors ({{ObjectStreamClass}}) for these classes, without using those ones written in serialized data. The implementation however is problematic when there exist missing field descriptors in the serialized data. When serializing an object, Java will only write the descriptors of those non-null fields. But when we look up class descriptors with given classes, all fields will be put into the descriptors. As a result, we will deserialize the savepoints with incorrect descriptors, leading to serialization exceptions. A simple resolution is to update the name of read descriptors using Reflections, without using different descriptors. was: To restore from the savepoints in previous versions, Flink now keeps all classes whose serialization is changed and put them in a separated package ("migration"). When deserializing the old savepoints, flink will look up correct descriptors ({{ObjectStreamClass}}) for these classes, without using those ones written in serialized data. The implementation however is problematic when there exist missing field descriptors in the serialized data. When deserializing an object, Java will only write the descriptors of those non-null fields. But when we look up class descriptors with given classes, all fields will be put into the descriptors. As a result, we will deserialize the savepoints with incorrect descriptors, leading to serialization exceptions. A simple resolution is to update the name of read descriptors using Reflections, without using a different descriptors. > Fail to deserialize savepoints in v1.1 when there exist missing fields in > class serialization descriptors > - > > Key: FLINK-5397 > URL: https://issues.apache.org/jira/browse/FLINK-5397 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > To restore from the savepoints in previous versions, Flink now keeps all > classes whose serialization is changed and put them in a separated package > ("migration"). > When deserializing the old savepoints, flink will look up correct descriptors > ({{ObjectStreamClass}}) for these classes, without using those ones written > in serialized data. The implementation however is problematic when there > exist missing field descriptors in the serialized data. > When serializing an object, Java will only write the descriptors of those > non-null fields. But when we look up class descriptors with given classes, > all fields will be put into the descriptors. As a result, we will deserialize > the savepoints with incorrect descriptors, leading to serialization > exceptions. > A simple resolution is to update the name of read descriptors using > Reflections, without using different descriptors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5397) Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors
Xiaogang Shi created FLINK-5397: --- Summary: Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors Key: FLINK-5397 URL: https://issues.apache.org/jira/browse/FLINK-5397 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Xiaogang Shi To restore from the savepoints in previous versions, Flink now keeps all classes whose serialization is changed and put them in a separated package ("migration"). When deserializing the old savepoints, flink will look up correct descriptors ({{ObjectStreamClass}}) for these classes, without using those ones written in serialized data. The implementation however is problematic when there exist missing field descriptors in the serialized data. When deserializing an object, Java will only write the descriptors of those non-null fields. But when we look up class descriptors with given classes, all fields will be put into the descriptors. As a result, we will deserialize the savepoints with incorrect descriptors, leading to serialization exceptions. A simple resolution is to update the name of read descriptors using Reflections, without using a different descriptors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5214) Clean up checkpoint files when failing checkpoint operation on TM
[ https://issues.apache.org/jira/browse/FLINK-5214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15710512#comment-15710512 ] Xiaogang Shi commented on FLINK-5214: - I opened FLINK-5086 to report a similar problem, but I do not have a good idea how to resolve it. Because JM does know the existence of these checkpoint files, it seems only TM can delete them. But as a failed TM may not be recovered by the JM if the number of retries exceeds the given limit, these files will not be deleted in such cases. One possible solution i think is to let each TM return a handler to JM when the TM is registered. JM can use the handler to clean the files even when the TM fails. Another solution is to recover the TM when the number of retries exceeds the limit. Once the TM is recovered, the only thing it does is to clean the checkpoint files. Do you have any better ideas? > Clean up checkpoint files when failing checkpoint operation on TM > - > > Key: FLINK-5214 > URL: https://issues.apache.org/jira/browse/FLINK-5214 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > When the {{StreamTask#performCheckpoint}} operation fails on a > {{TaskManager}} potentially created checkpoint files are not cleaned up. This > should be changed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5090) Expose optionally detailed metrics about network queue lengths
[ https://issues.apache.org/jira/browse/FLINK-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15679596#comment-15679596 ] Xiaogang Shi commented on FLINK-5090: - In flink, the performance bottlenecks are usually caused by 1. the mismatched parallelism of the producer and the consumer operators. 2. the imbalanced load across the different tasks of the same operator The metrics of all channels help a lot to figure out the two problems. But the solution to the second problem usually needs modification to the application logic. The gate-wise metrics are sufficient to identify the first problem. I think it requires few additional overheads (due to two input operators). > Expose optionally detailed metrics about network queue lengths > -- > > Key: FLINK-5090 > URL: https://issues.apache.org/jira/browse/FLINK-5090 > Project: Flink > Issue Type: New Feature > Components: Network >Affects Versions: 1.1.3 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0, 1.1.4 > > > For debugging purposes, it is important to have access to more detailed > metrics about the length of network input and output queues. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5090) Expose optionally detailed metrics about network queue lengths
[ https://issues.apache.org/jira/browse/FLINK-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15675401#comment-15675401 ] Xiaogang Shi commented on FLINK-5090: - I suggest the metrics to be channel-wise. With these metrics, we will easily find those hotspot channels. What do you think? > Expose optionally detailed metrics about network queue lengths > -- > > Key: FLINK-5090 > URL: https://issues.apache.org/jira/browse/FLINK-5090 > Project: Flink > Issue Type: New Feature > Components: Network >Affects Versions: 1.1.3 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0, 1.1.4 > > > For debugging purposes, it is important to have access to more detailed > metrics about the length of network input and output queues. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5086) Clean dead snapshot files produced by the tasks failing to acknowledge checkpoints
Xiaogang Shi created FLINK-5086: --- Summary: Clean dead snapshot files produced by the tasks failing to acknowledge checkpoints Key: FLINK-5086 URL: https://issues.apache.org/jira/browse/FLINK-5086 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Xiaogang Shi A task may fail when performing checkpoints. In that case, the task may have already copied some data to external storage. But since the task fails to send the state handler to {{CheckpointCoordinator}}, the copied data will not be deleted by {{CheckpointCoordinator}}. I think we must find a method to clean such dead snapshot data to avoid unlimited usage of external storage. One possible method is to clean these dead files when the task recovers. When a task recovers, {{CheckpointCoordinator}} will tell the task all the retained checkpoints. The task then can scan the external storage to delete all the snapshots not in these retained checkpoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5053) Incremental / lightweight snapshots for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15673253#comment-15673253 ] Xiaogang Shi edited comment on FLINK-5053 at 11/17/16 9:41 AM: --- [~srichter] Do you have a more detailed plan about incremental checkpoints? I think much more work is needed to make it. One big problem is the concurrent modification made by TaskExecutors and JobMaster. Currently, the state handlers as well as the snapshot data (the files on HDFS) are both deleted by the JobMasters. In incremental checkpoints, a file may be used in different checkpoints. The concurrent access to the files may lead to incorrect results. For example, JobMaster may delete a file which the TaskExecutor thought it's already on HDFS and did not copy the file onto HDFS. One method is to synchronize the access of JobMasters and TaskExecutors. Another solution, i think, is to let TaskExecutors delete these snapshot files. That way, all access to the snapshot data is made by TaskExecutors, hence avoiding the need of synchronization. Do you have any idea about this problem? was (Author: xiaogang.shi): [~srichter] Do you have a more detailed plan about incremental checkpoints? I think much more work is needed to make it. One big problem is the concurrent modification made by TaskExecutors and JobMaster. Currently, the state handlers as well as the snapshot data (the files on HDFS) are both deleted by the JobMasters. In incremental checkpoints, a file may be used in different checkpoints. One method is to synchronize the access of JobMasters and TaskExecutors. Another solution, i think, is to let TaskExecutors delete these snapshot files. Do you have any idea about this problem? > Incremental / lightweight snapshots for checkpoints > --- > > Key: FLINK-5053 > URL: https://issues.apache.org/jira/browse/FLINK-5053 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter > > There is currently basically no difference between savepoints and checkpoints > in Flink and both are created through exactly the same process. > However, savepoints and checkpoints have a slightly different meaning which > we should take into account to keep Flink efficient: > - Savepoints are (typically infrequently) triggered by the user to create a > state from which the application can be restarted, e.g. because Flink, some > code, or the parallelism needs to be changed. > - Checkpoints are (typically frequently) triggered by the System to allow for > fast recovery in case of failure, but keeping the job/system unchanged. > This means that savepoints and checkpoints can have different properties in > that: > - Savepoint should represent a state of the application, where > characteristics of the job (e.g. parallelism) can be adjusted for the next > restart. One example for things that savepoints need to be aware of are > key-groups. Savepoints can potentially be a little more expensive than > checkpoints, because they are usually created a lot less frequently through > the user. > - Checkpoints are frequently triggered by the system to allow for fast > failure recovery. However, failure recovery leaves all characteristics of the > job unchanged. This checkpoints do not have to be aware of those, e.g. think > again of key groups. Checkpoints should run faster than creating savepoints, > in particular it would be nice to have incremental checkpoints. > For a first approach, I would suggest the following steps/changes: > - In checkpoint coordination: differentiate between triggering checkpoints > and savepoints. Introduce properties for checkpoints that describe their set > of abilities, e.g. "is-key-group-aware", "is-incremental". > - In state handle infrastructure: introduce state handles that reflect > incremental checkpoints and drop full key-group awareness, i.e. covering > folders instead of files and not having keygroup_id -> file/offset mapping, > but keygroup_range -> folder? > - Backend side: We should start with RocksDB by reintroducing something > similar to semi-async snapshots, but using > BackupableDBOptions::setShareTableFiles(true) and transferring only new > incremental outputs to HDFS. Notice that using RocksDB's internal backup > mechanism is giving up on the information about individual key-groups. But as > explained above, this should be totally acceptable for checkpoints, while > savepoints should use the key-group-aware fully async mode. Of course we also > need to implement the ability to restore from both types of snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5053) Incremental / lightweight snapshots for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15673253#comment-15673253 ] Xiaogang Shi commented on FLINK-5053: - [~srichter] Do you have a more detailed plan about incremental checkpoints? I think much more work is needed to make it. One big problem is the concurrent modification made by TaskExecutors and JobMaster. Currently, the state handlers as well as the snapshot data (the files on HDFS) are both deleted by the JobMasters. In incremental checkpoints, a file may be used in different checkpoints. One method is to synchronize the access of JobMasters and TaskExecutors. Another solution, i think, is to let TaskExecutors delete these snapshot files. Do you have any idea about this problem? > Incremental / lightweight snapshots for checkpoints > --- > > Key: FLINK-5053 > URL: https://issues.apache.org/jira/browse/FLINK-5053 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter > > There is currently basically no difference between savepoints and checkpoints > in Flink and both are created through exactly the same process. > However, savepoints and checkpoints have a slightly different meaning which > we should take into account to keep Flink efficient: > - Savepoints are (typically infrequently) triggered by the user to create a > state from which the application can be restarted, e.g. because Flink, some > code, or the parallelism needs to be changed. > - Checkpoints are (typically frequently) triggered by the System to allow for > fast recovery in case of failure, but keeping the job/system unchanged. > This means that savepoints and checkpoints can have different properties in > that: > - Savepoint should represent a state of the application, where > characteristics of the job (e.g. parallelism) can be adjusted for the next > restart. One example for things that savepoints need to be aware of are > key-groups. Savepoints can potentially be a little more expensive than > checkpoints, because they are usually created a lot less frequently through > the user. > - Checkpoints are frequently triggered by the system to allow for fast > failure recovery. However, failure recovery leaves all characteristics of the > job unchanged. This checkpoints do not have to be aware of those, e.g. think > again of key groups. Checkpoints should run faster than creating savepoints, > in particular it would be nice to have incremental checkpoints. > For a first approach, I would suggest the following steps/changes: > - In checkpoint coordination: differentiate between triggering checkpoints > and savepoints. Introduce properties for checkpoints that describe their set > of abilities, e.g. "is-key-group-aware", "is-incremental". > - In state handle infrastructure: introduce state handles that reflect > incremental checkpoints and drop full key-group awareness, i.e. covering > folders instead of files and not having keygroup_id -> file/offset mapping, > but keygroup_range -> folder? > - Backend side: We should start with RocksDB by reintroducing something > similar to semi-async snapshots, but using > BackupableDBOptions::setShareTableFiles(true) and transferring only new > incremental outputs to HDFS. Notice that using RocksDB's internal backup > mechanism is giving up on the information about individual key-groups. But as > explained above, this should be totally acceptable for checkpoints, while > savepoints should use the key-group-aware fully async mode. Of course we also > need to implement the ability to restore from both types of snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5085) Execute CheckpointCoodinator's state discard calls asynchronously
[ https://issues.apache.org/jira/browse/FLINK-5085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15673026#comment-15673026 ] Xiaogang Shi commented on FLINK-5085: - Great, this is what i thought of in recent days. Our states are composed of thousands of files on HDFS. It takes a long time to delete them in sequence. A dedicated executor will help improve the performance. > Execute CheckpointCoodinator's state discard calls asynchronously > - > > Key: FLINK-5085 > URL: https://issues.apache.org/jira/browse/FLINK-5085 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > The {{CheckpointCoordinator}} discards under certain circumstances pending > checkpoints or state handles. These discard operations can involve a blocking > IO operation if the underlying state handle refers to a file which has to be > deleted. In order to not block the calling thread, we should execute these > calls in a dedicated IO executor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5053) Incremental / lightweight snapshots for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663160#comment-15663160 ] Xiaogang Shi commented on FLINK-5053: - I think it's better to use {{checkpoint}} instead of {{backup}} to perform incremental checkpointing of rocksdb. The {{checkpoint}} method will create hard links for all living files, without the need to copy files. Hence it can help reduce the time taken in the synchronous part. What do you think? > Incremental / lightweight snapshots for checkpoints > --- > > Key: FLINK-5053 > URL: https://issues.apache.org/jira/browse/FLINK-5053 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter > > There is currently basically no difference between savepoints and checkpoints > in Flink and both are created through exactly the same process. > However, savepoints and checkpoints have a slightly different meaning which > we should take into account to keep Flink efficient: > - Savepoints are (typically infrequently) triggered by the user to create a > state from which the application can be restarted, e.g. because Flink, some > code, or the parallelism needs to be changed. > - Checkpoints are (typically frequently) triggered by the System to allow for > fast recovery in case of failure, but keeping the job/system unchanged. > This means that savepoints and checkpoints can have different properties in > that: > - Savepoint should represent a state of the application, where > characteristics of the job (e.g. parallelism) can be adjusted for the next > restart. One example for things that savepoints need to be aware of are > key-groups. Savepoints can potentially be a little more expensive than > checkpoints, because they are usually created a lot less frequently through > the user. > - Checkpoints are frequently triggered by the system to allow for fast > failure recovery. However, failure recovery leaves all characteristics of the > job unchanged. This checkpoints do not have to be aware of those, e.g. think > again of key groups. Checkpoints should run faster than creating savepoints, > in particular it would be nice to have incremental checkpoints. > For a first approach, I would suggest the following steps/changes: > - In checkpoint coordination: differentiate between triggering checkpoints > and savepoints. Introduce properties for checkpoints that describe their set > of abilities, e.g. "is-key-group-aware", "is-incremental". > - In state handle infrastructure: introduce state handles that reflect > incremental checkpoints and drop full key-group awareness, i.e. covering > folders instead of files and not having keygroup_id -> file/offset mapping, > but keygroup_range -> folder? > - Backend side: We should start with RocksDB by reintroducing something > similar to semi-async snapshots, but using > BackupableDBOptions::setShareTableFiles(true) and transferring only new > incremental outputs to HDFS. Notice that using RocksDB's internal backup > mechanism is giving up on the information about individual key-groups. But as > explained above, this should be totally acceptable for checkpoints, while > savepoints should use the key-group-aware fully async mode. Of course we also > need to implement the ability to restore from both types of snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653177#comment-15653177 ] Xiaogang Shi commented on FLINK-5023: - [~aljoscha] [~StephanEwen] I have updated the PR. Now, `State` only provides a read-only accessor and a new interface called `UpdatableState` is added. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts
[ https://issues.apache.org/jira/browse/FLINK-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15652901#comment-15652901 ] Xiaogang Shi commented on FLINK-5024: - I am very poor at English :( But i think "Simple" is more often used as the opposite of "Compound". For example: simple interests and compound interests. "Primitive" is not that good because it is usually used to describe those BASIC elements which form the other things. Maybe we need some help from native speakers lol > Add SimpleStateDescriptor to clarify the concepts > - > > Key: FLINK-5024 > URL: https://issues.apache.org/jira/browse/FLINK-5024 > Project: Flink > Issue Type: Improvement >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, StateDescriptors accept two type arguments : the first one is the > type of the created state and the second one is the type of the values in the > states. > The concepts however is a little confusing here because in ListStates, the > arguments passed to the StateDescriptors are the types of the list elements > instead of the lists. It also makes the implementation of MapStates difficult. > I suggest not to put the type serializer in StateDescriptors, making > StateDescriptors independent of the data structures of the values. > A new type of StateDescriptor named SimpleStateDescriptor can be provided to > abstract those states (namely ValueState, ReducingState and FoldingState) > whose states are not composited. > The states (e.g. ListStates and MapStates) can implement their own > descriptors according to their data structures. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5036) Perform the grouping of keys in restoring instead of checkpointing
[ https://issues.apache.org/jira/browse/FLINK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650483#comment-15650483 ] Xiaogang Shi commented on FLINK-5036: - Glad you understand my problem :) > Perform the grouping of keys in restoring instead of checkpointing > -- > > Key: FLINK-5036 > URL: https://issues.apache.org/jira/browse/FLINK-5036 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Whenever taking snapshots of {{RocksDBKeyedStateBackend}}, the values in the > states will be written onto different files according to their key groups. > The procedure is very costly when the states are very big. > Given that the snapshot operations will be performed much more frequently > than restoring, we can leave the key groups as they are to improve the > overall performance. In other words, we can perform the grouping of keys in > restoring instead of in checkpointing. > I think, the implementation will be very similar to the restoring of > non-partitioned states. Each task will receive a collection of snapshots each > of which contains a set of key groups. Each task will restore its states from > the given snapshots by picking values in assigned key groups. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5036) Perform the grouping of keys in restoring instead of checkpointing
[ https://issues.apache.org/jira/browse/FLINK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650462#comment-15650462 ] Xiaogang Shi commented on FLINK-5036: - Current implementation of both checkpointing and restoring requires to iterate over all key-value pairs. When the states are very big (up to multiple GBs or TBs, which are usual cases in our daily jobs), the performance is obviously unacceptable. Let me explain more details about my proposal. By not organizing kv pairs into grouping, we can avoid the iterating and can directly copy the files of RocksDB onto HDFS. Of course, we should record all the key groups contained in the produced snapshot. When restoring from snapshots, the master will give each task all the rocksdb that contain assigned key groups. Tasks can pick those keys assigned to them by accessing these rocksdbs. If the key groups in a rocksdb are all assigned to the task, then the task can avoid unnecessary picking. In most cases where the degree of parallelism is not changed, fast recovery can be achieved because states can be restored by simply copying the files from HDFS. In all cases, the performance will be much better than existing implementation which needs costly iterating. > Perform the grouping of keys in restoring instead of checkpointing > -- > > Key: FLINK-5036 > URL: https://issues.apache.org/jira/browse/FLINK-5036 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Whenever taking snapshots of {{RocksDBKeyedStateBackend}}, the values in the > states will be written onto different files according to their key groups. > The procedure is very costly when the states are very big. > Given that the snapshot operations will be performed much more frequently > than restoring, we can leave the key groups as they are to improve the > overall performance. In other words, we can perform the grouping of keys in > restoring instead of in checkpointing. > I think, the implementation will be very similar to the restoring of > non-partitioned states. Each task will receive a collection of snapshots each > of which contains a set of key groups. Each task will restore its states from > the given snapshots by picking values in assigned key groups. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5036) Perform the grouping of keys in restoring instead of checkpointing
Xiaogang Shi created FLINK-5036: --- Summary: Perform the grouping of keys in restoring instead of checkpointing Key: FLINK-5036 URL: https://issues.apache.org/jira/browse/FLINK-5036 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Xiaogang Shi Whenever taking snapshots of {{RocksDBKeyedStateBackend}}, the values in the states will be written onto different files according to their key groups. The procedure is very costly when the states are very big. Given that the snapshot operations will be performed much more frequently than restoring, we can leave the key groups as they are to improve the overall performance. In other words, we can perform the grouping of keys in restoring instead of in checkpointing. I think, the implementation will be very similar to the restoring of non-partitioned states. Each task will receive a collection of snapshots each of which contains a set of key groups. Each task will restore its states from the given snapshots by picking values in assigned key groups. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649615#comment-15649615 ] Xiaogang Shi edited comment on FLINK-5023 at 11/9/16 2:52 AM: -- [~aljoscha] In that case, i think we should provide {{UpdatableState}} instead of {{ReadableState}}. Now {{State}} is updatable because it has the method {{clear()}}. I would prefer {{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a readable state is not a state. >From this point, i think the base {{State}} should be readable, and a new >interface {{UpdatableState}} could be provided to abstract those states >allowing both reads and writes. was (Author: xiaogang.shi): @Aljoscha Krettek In that case, i think we should provide {{UpdatableState}} instead of {{ReadableState}}. Now {{State}} is updatable because it has the method {{clear()}}. I would prefer {{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a readable state is not a state. >From this point, i think the base {{State}} should be readable, and a new >interface {{UpdatableState}} could be provided to abstract those states >allowing both reads and writes. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649615#comment-15649615 ] Xiaogang Shi edited comment on FLINK-5023 at 11/9/16 2:46 AM: -- @Aljoscha Krettek In that case, i think we should provide {{UpdatableState}} instead of {{ReadableState}}. Now {{State}} is updatable because it has the method {{clear()}}. I would prefer {{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a readable state is not a state. >From this point, i think the base {{State}} should be readable, and a new >interface {{UpdatableState}} could be provided to abstract those states >allowing both reads and writes. was (Author: xiaogang.shi): @Aljoscha Krettek In that case, i think we should provide {{UpdatableState}} instead of {{ReadableState}}. Now {{State}} is updatable because it has the method {{clear()}}. If {{ReadableState}} does not inherit from {{State}}. I would prefer {{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a readable state is not a state. >From this point, i think the base {{State}} should be readable, and a new >interface {{UpdatableState}} could be provided to abstract those states >allowing both reads and writes. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649615#comment-15649615 ] Xiaogang Shi edited comment on FLINK-5023 at 11/9/16 2:45 AM: -- @Aljoscha Krettek In that case, i think we should provide {{UpdatableState}} instead of {{ReadableState}}. Now {{State}} is updatable because it has the method {{clear()}}. If {{ReadableState}} does not inherit from {{State}}. I would prefer {{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a readable state is not a state. >From this point, i think the base {{State}} should be readable, and a new >interface {{UpdatableState}} could be provided to abstract those states >allowing both reads and writes. was (Author: xiaogang.shi): Aljoscha Krettek In that case, i think we should provide {{UpdatableState}} instead of {{ReadableState}}. Now {{State}} is updatable because it has the method {{clear()}}. If {{ReadableState}} does not inherit from {{State}}. I would prefer {{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a readable state is not a state. >From this point, i think the base {{State}} should be readable, and a new >interface {{UpdatableState}} could be provided to abstract those states >allowing both reads and writes. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649615#comment-15649615 ] Xiaogang Shi commented on FLINK-5023: - Aljoscha Krettek In that case, i think we should provide {{UpdatableState}} instead of {{ReadableState}}. Now {{State}} is updatable because it has the method {{clear()}}. If {{ReadableState}} does not inherit from {{State}}. I would prefer {{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a readable state is not a state. >From this point, i think the base {{State}} should be readable, and a new >interface {{UpdatableState}} could be provided to abstract those states >allowing both reads and writes. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts
[ https://issues.apache.org/jira/browse/FLINK-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-5024: --- Assignee: Xiaogang Shi > Add SimpleStateDescriptor to clarify the concepts > - > > Key: FLINK-5024 > URL: https://issues.apache.org/jira/browse/FLINK-5024 > Project: Flink > Issue Type: Improvement >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, StateDescriptors accept two type arguments : the first one is the > type of the created state and the second one is the type of the values in the > states. > The concepts however is a little confusing here because in ListStates, the > arguments passed to the StateDescriptors are the types of the list elements > instead of the lists. It also makes the implementation of MapStates difficult. > I suggest not to put the type serializer in StateDescriptors, making > StateDescriptors independent of the data structures of the values. > A new type of StateDescriptor named SimpleStateDescriptor can be provided to > abstract those states (namely ValueState, ReducingState and FoldingState) > whose states are not composited. > The states (e.g. ListStates and MapStates) can implement their own > descriptors according to their data structures. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-5023: --- Assignee: Xiaogang Shi > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646299#comment-15646299 ] Xiaogang Shi commented on FLINK-5023: - I have opened a PR: https://github.com/apache/flink/pull/2768/files. Since the implementation of `State` and `StateDescriptor` is closely connected, i also put the code of FLINK-5024 in this PR. Existing code may be affected by the changes in `StateDescriptor` because now `StateDescriptor` only accept one type argument. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643925#comment-15643925 ] Xiaogang Shi commented on FLINK-5023: - The only old method affected is the `value()` method in ValueState. All other states have already implemented the `get()` method. We can implement `ValueState#value()` by wrapping the `get()` method to avoid any changes to existing code. The introduction of ReadableState works. But I think the additional interfaces will make the code "verbose" :) > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15642868#comment-15642868 ] Xiaogang Shi edited comment on FLINK-4856 at 11/7/16 2:29 AM: -- I have started the implementation of MapStates. But at prior to that, I think we need some modification to current implementation to clarify the concepts. I have started two JIRA to state these problems. You may see FLINK-5023 and FLINK-5024 for the details. was (Author: xiaogang.shi): I have started the implementation of MapStates. But at prior to that, I think we need some modification to currently framework to clarify the concepts. I have started two JIRA to state these problems. You may see FLINK-5023 and FLINK-5024 for the details. > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15642868#comment-15642868 ] Xiaogang Shi commented on FLINK-4856: - I have started the implementation of MapStates. But at prior to that, I think we need some modification to currently framework to clarify the concepts. I have started two JIRA to state these problems. You may see FLINK-5023 and FLINK-5024 for the details. > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts
Xiaogang Shi created FLINK-5024: --- Summary: Add SimpleStateDescriptor to clarify the concepts Key: FLINK-5024 URL: https://issues.apache.org/jira/browse/FLINK-5024 Project: Flink Issue Type: Improvement Reporter: Xiaogang Shi Currently, StateDescriptors accept two type arguments : the first one is the type of the created state and the second one is the type of the values in the states. The concepts however is a little confusing here because in ListStates, the arguments passed to the StateDescriptors are the types of the list elements instead of the lists. It also makes the implementation of MapStates difficult. I suggest not to put the type serializer in StateDescriptors, making StateDescriptors independent of the data structures of the values. A new type of StateDescriptor named SimpleStateDescriptor can be provided to abstract those states (namely ValueState, ReducingState and FoldingState) whose states are not composited. The states (e.g. ListStates and MapStates) can implement their own descriptors according to their data structures. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5023) Add get() method in State interface
Xiaogang Shi created FLINK-5023: --- Summary: Add get() method in State interface Key: FLINK-5023 URL: https://issues.apache.org/jira/browse/FLINK-5023 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Xiaogang Shi Currently, the only method provided by the State interface is `clear()`. I think we should provide another method called `get()` to return the structured value (e.g., value, list, or map) under the current key. In fact, the functionality of `get()` has already been implemented in all types of states: e.g., `value()` in ValueState and `get()` in ListState. The modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-4856: --- Assignee: Xiaogang Shi > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4856) Add MapState for keyed streams
Xiaogang Shi created FLINK-4856: --- Summary: Add MapState for keyed streams Key: FLINK-4856 URL: https://issues.apache.org/jira/browse/FLINK-4856 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Reporter: Xiaogang Shi Many states in keyed streams are organized as key-value pairs. Currently, these states are implemented by storing the entire map into a ValueState or a ListState. The implementation however is very costly because all entries have to be serialized/deserialized when updating a single entry. To improve the efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4448) Use Listeners to monitor execution status
[ https://issues.apache.org/jira/browse/FLINK-4448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi closed FLINK-4448. --- Resolution: Fixed The functionality will be implemented in Flink-4457 https://issues.apache.org/jira/browse/FLINK-4457 > Use Listeners to monitor execution status > - > > Key: FLINK-4448 > URL: https://issues.apache.org/jira/browse/FLINK-4448 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, JobMaster monitors the ExecutionGraph's job status and execution > state through Akka. Since the dependencies on Akka should be removed in the > refactoring, JobMaster will utilize JobStatusListener and > ExecutionStateListener to receive the notifications from ExecutionGraph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4408) Submit Job and setup ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434052#comment-15434052 ] Xiaogang Shi commented on FLINK-4408: - I think it's okay to pull the logics about leadership out of the JobManager and let the component holding the JobManager to take care of it. But I think the JobManager still needs to worry about the switches. The JobManager still need the function to submit/execute the job when it is started by the component holding it (to be specific, the `submitJob` function in old JobManager). The only difference made by pulling the logics out is that the function will be called by the component holding the JobManager but not the JobManager itself. If I understand your comment correctly, i think we should revise the implementation in Flink-4400 to let JobManager not care about leadership. But the JobManager still needs to implement the methods to start and cancel the execution, which are implemented in this JIRA. > Submit Job and setup ExecutionGraph > --- > > Key: FLINK-4408 > URL: https://issues.apache.org/jira/browse/FLINK-4408 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Once granted the leadership, JM will start to execute the job. > Most code remains the same except that > (1) In old implementation where JM manages the execution of multiple jobs, JM > has to load all submitted JobGraphs from SubmittedJobGraphStore and recover > them. Now that the components creating JM will be responsible for the > recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, > without the need to load the JobGraph. > (2) JM should not rely on Akka to listen on the updates of JobStatus and > Execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4448) Use Listeners to monitor execution status
Xiaogang Shi created FLINK-4448: --- Summary: Use Listeners to monitor execution status Key: FLINK-4448 URL: https://issues.apache.org/jira/browse/FLINK-4448 Project: Flink Issue Type: Sub-task Components: Cluster Management Reporter: Xiaogang Shi Assignee: Xiaogang Shi Currently, JobMaster monitors the ExecutionGraph's job status and execution state through Akka. Since the dependencies on Akka should be removed in the refactoring, JobMaster will utilize JobStatusListener and ExecutionStateListener to receive the notifications from ExecutionGraph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4408) Submit Job and setup ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi updated FLINK-4408: Summary: Submit Job and setup ExecutionGraph (was: JobSubmission) > Submit Job and setup ExecutionGraph > --- > > Key: FLINK-4408 > URL: https://issues.apache.org/jira/browse/FLINK-4408 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Once granted the leadership, JM will start to execute the job. > Most code remains the same except that > (1) In old implementation where JM manages the execution of multiple jobs, JM > has to load all submitted JobGraphs from SubmittedJobGraphStore and recover > them. Now that the components creating JM will be responsible for the > recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, > without the need to load the JobGraph. > (2) JM should not rely on Akka to listen on the updates of JobStatus and > Execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4408) JobSubmission
Xiaogang Shi created FLINK-4408: --- Summary: JobSubmission Key: FLINK-4408 URL: https://issues.apache.org/jira/browse/FLINK-4408 Project: Flink Issue Type: Sub-task Components: Cluster Management Reporter: Xiaogang Shi Assignee: Xiaogang Shi Once granted the leadership, JM will start to execute the job. Most code remains the same except that (1) In old implementation where JM manages the execution of multiple jobs, JM has to load all submitted JobGraphs from SubmittedJobGraphStore and recover them. Now that the components creating JM will be responsible for the recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, without the need to load the JobGraph. (2) JM should not rely on Akka to listen on the updates of JobStatus and Execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4400) Leadership Election among JobManagers
Xiaogang Shi created FLINK-4400: --- Summary: Leadership Election among JobManagers Key: FLINK-4400 URL: https://issues.apache.org/jira/browse/FLINK-4400 Project: Flink Issue Type: Sub-task Components: Cluster Management Reporter: Xiaogang Shi Assignee: Xiaogang Shi * All JobMasters are LeaderContenders * Once a JobMaster is initialized, the very first thing it has to do is to start the leadership election service and contend for the leadership. * A JobMaster starts to perform its functionality when it grants the leadership. * If a JobMaster’s leadership is revoked, it will cancel all performed execution and release all acquired resources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)