[jira] [Created] (FLINK-12285) Memory leak in SavepointITCase and SavepointMigrationTestBase
Xiaogang Shi created FLINK-12285: Summary: Memory leak in SavepointITCase and SavepointMigrationTestBase Key: FLINK-12285 URL: https://issues.apache.org/jira/browse/FLINK-12285 Project: Flink Issue Type: Bug Components: Tests Reporter: Xiaogang Shi Assignee: Biao Liu The tests in {{SavepointITCase}} and {{SavepointMigrationTestBase}} do not cancel running jobs before exit. It will cause exceptions in {{TaskExecutor}}s and unreleased memory segments. Succeeding tests may fail due to insufficient amount of memory. The problem is caused by cancelling {{TaskExecutor}}s with running tasks. Another issue caused by the reason can be seen in FLINK-11343. Maybe we can find a more dedicated method to cancel those {{TaskExecutor}}s still having running tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12284) InputBufferPoolUsage is incorrect in credit-based network control flow
Xiaogang Shi created FLINK-12284: Summary: InputBufferPoolUsage is incorrect in credit-based network control flow Key: FLINK-12284 URL: https://issues.apache.org/jira/browse/FLINK-12284 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.8.0, 1.7.2, 1.6.4, 1.6.3 Reporter: Xiaogang Shi When using credit-based network control flow, exclusive buffers are directly assigned to {{RemoteInputChannel}} and are not counted in {{LocalBufferPool}}, leading to incorrect InputBufferPoolUsage. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend
Xiaogang Shi created FLINK-6364: --- Summary: Implement incremental checkpointing in RocksDBStateBackend Key: FLINK-6364 URL: https://issues.apache.org/jira/browse/FLINK-6364 Project: Flink Issue Type: Sub-task Reporter: Xiaogang Shi Assignee: Xiaogang Shi {{RocksDBStateBackend}} is well suited for incremental checkpointing because RocksDB is base on LSM trees, which record updates in new sst files and all sst files are immutable. By only materializing those new sst files, we can significantly improve the performance of checkpointing. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6284) Incorrect sorting of completed checkpoints in ZooKeeperCompletedCheckpointStore
Xiaogang Shi created FLINK-6284: --- Summary: Incorrect sorting of completed checkpoints in ZooKeeperCompletedCheckpointStore Key: FLINK-6284 URL: https://issues.apache.org/jira/browse/FLINK-6284 Project: Flink Issue Type: Bug Reporter: Xiaogang Shi Now all completed checkpoints are sorted in their paths when they are recovered in {{ZooKeeperCompletedCheckpointStore}} . In the cases where the latest checkpoint's id is not the largest in lexical order (e.g., "100" is smaller than "99" in lexical order), Flink will not recover from the latest completed checkpoint. The problem can be easily observed by setting the checkpoint ids in {{ZooKeeperCompletedCheckpointStoreITCase#testRecover()}} to be 99, 100 and 101. To fix the problem, we should explicitly sort found checkpoints in their checkpoint ids, without the usage of {{ZooKeeperStateHandleStore#getAllSortedByName()}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6096) Refactor the migration of old versioned savepoints
Xiaogang Shi created FLINK-6096: --- Summary: Refactor the migration of old versioned savepoints Key: FLINK-6096 URL: https://issues.apache.org/jira/browse/FLINK-6096 Project: Flink Issue Type: Improvement Reporter: Xiaogang Shi Existing code for the migration of old-versioned savepoints does not allow to correctly deserialize those classes changed in different versions. I think we should create a migration package for each old-versioned savepoint and put all migrated classes in the savepoint there. A mapping can be deployed to record those migrated classes in the savepoint so that we can correctly deserialize them. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams
Xiaogang Shi created FLINK-6034: --- Summary: Add KeyedStateHandle for the snapshots in keyed streams Key: FLINK-6034 URL: https://issues.apache.org/jira/browse/FLINK-6034 Project: Flink Issue Type: Sub-task Reporter: Xiaogang Shi Assignee: Xiaogang Shi Currently, the only type of the snapshots in keyed streams is {{KeyGroupsStateHandle}} which is full and store the states one group after another. With the introduction of incremental checkpointing, we need a higher level abstraction of keyed snapshots to allow flexible snapshot formats. The implementation of {{KeyedStateHandle}}s may vary a lot in different backends. The only information needed in {{KeyedStateHandle}}s is their key group range. When recovering the job with a different degree of parallelism, {{KeyedStateHandle}}s will be assigned to those subtasks whose key group ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6027) Ignore the exception thrown by the subsuming of old completed checkpoints
Xiaogang Shi created FLINK-6027: --- Summary: Ignore the exception thrown by the subsuming of old completed checkpoints Key: FLINK-6027 URL: https://issues.apache.org/jira/browse/FLINK-6027 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Xiaogang Shi Assignee: Xiaogang Shi When a checkpoint is added into the {{CompletedCheckpointStore}} via the method {{addCheckpoint()}}, the oldest checkpoints will be removed from the store if the number of stored checkpoints exceeds the given limit. The subsuming of old checkpoints may fail and make {{addCheckpoint()}} throw exceptions which are caught by {{CheckpointCoordinator}}. Finally, the states in the new checkpoint will be deleted by {{CheckpointCoordinator}}. Because the new checkpoint is still in the store, we may recover the job from the new checkpoint. But the recovery will fail as the states of the checkpoint are all deleted. We should ignore the exceptions thrown by the subsuming of old checkpoints because we can always recover from the new checkpoint when successfully adding it into the store. The ignorance may produce some dirty data, but it's acceptable because they can be cleaned with the cleanup hook introduced in the near future. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6014) Allow the registration of state objects in checkpoints
Xiaogang Shi created FLINK-6014: --- Summary: Allow the registration of state objects in checkpoints Key: FLINK-6014 URL: https://issues.apache.org/jira/browse/FLINK-6014 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Reporter: Xiaogang Shi Assignee: Xiaogang Shi This issue is the very first step towards incremental checkpointing. We introduce a new state handle named {{CompositeStateHandle}} to be the base of the snapshots taken by task components. Known implementation may include {{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s). Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. It should register all its state objects in {{StateRegistry}} when its checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending checkpoint completes or a complete checkpoint is reloaded in the recovery). When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, we should not simply discard all state objects in the checkpoint. With the introduction of incremental checkpointing, a {{StateObject}} may be referenced by different checkpoints. We should unregister all the state objects contained in the {{StateRegistry}} first. Only those state objects that are not referenced by any checkpoint can be deleted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5865) Throw original exception in RocksDB states
Xiaogang Shi created FLINK-5865: --- Summary: Throw original exception in RocksDB states Key: FLINK-5865 URL: https://issues.apache.org/jira/browse/FLINK-5865 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Affects Versions: 1.3.0 Reporter: Xiaogang Shi Assignee: Xiaogang Shi Now all exception thrown in RocksDB states are converted to {{RuntimeException}}. It's unnecessary and will print useless stacks in the log. I think it's better to throw the original exception, without any wrapping. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5863) Unify the serialization of queryable list states in different backends
Xiaogang Shi created FLINK-5863: --- Summary: Unify the serialization of queryable list states in different backends Key: FLINK-5863 URL: https://issues.apache.org/jira/browse/FLINK-5863 Project: Flink Issue Type: Improvement Components: Queryable State Affects Versions: 1.3.0 Reporter: Xiaogang Shi Priority: Minor Now the deserialization of list states is implemented in {{KvStateRequestSerializer}}. The serialization however is implemented individually in different backends. We should provide a method in {{KvStateRequestSerializer}} to remove the redundant code. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5790) Use list types when ListStateDescriptor extends StateDescriptor
Xiaogang Shi created FLINK-5790: --- Summary: Use list types when ListStateDescriptor extends StateDescriptor Key: FLINK-5790 URL: https://issues.apache.org/jira/browse/FLINK-5790 Project: Flink Issue Type: Improvement Reporter: Xiaogang Shi Assignee: Xiaogang Shi Flink keeps the state serializer in {{StateDescriptor}}, but it's the serializer of list elements that is put in {{ListStateDescriptor}}. The implementation is a little confusing. Some backends need to construct the state serializer with the element serializer by themselves. We should use an {{ArrayListSerializer}}, which is composed of the serializer of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid constructing the state serializer. If a backend needs customized serialization of the state (e.g. {{RocksDBStateBackend}}), it still can obtain the element serializer from the {{ArrayListSerializer}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5738) Destroy created backend when task is canceled
Xiaogang Shi created FLINK-5738: --- Summary: Destroy created backend when task is canceled Key: FLINK-5738 URL: https://issues.apache.org/jira/browse/FLINK-5738 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Xiaogang Shi When a task is canceled, the {{ClosableRegistry}} will be closed in the cancel thread. However, the task may still in the creation of {{KeyedStateBackend}}, and it will fail to register the backend to the {{ClosableRegistry}}. Because the backend is not assigned to the operator yet (due to the exception), the backend will not be destroyed when the task thread exits. A simple solution is to catch exception in the registering and destroy the created backend in the case of failures. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5544) Implement Internal Timer Service in RocksDB
Xiaogang Shi created FLINK-5544: --- Summary: Implement Internal Timer Service in RocksDB Key: FLINK-5544 URL: https://issues.apache.org/jira/browse/FLINK-5544 Project: Flink Issue Type: Bug Components: Streaming Reporter: Xiaogang Shi Now the only implementation of internal timer service is HeapInternalTimerService which stores all timers in memory. In the cases where the number of keys is very large, the timer service will cost too much memory. A implementation which stores timers in RocksDB seems good to deal with these cases. It might be a little challenging to implement a RocksDB timer service because the timers are accessed in different ways. When timers are triggered, we need to access timers in the order of timestamp. But when performing checkpoints, we must have a method to obtain all timers of a given key group. A good implementation, as suggested by [~StephanEwen], follows the idea of merge sorting. We can store timers in RocksDB with the format {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put together and are sorted. Then we can deploy an in-memory heap which keeps the first timer of each key group to get the next timer to trigger. When a key group's first timer is updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5400) Add accessor to folding states in RuntimeContext
Xiaogang Shi created FLINK-5400: --- Summary: Add accessor to folding states in RuntimeContext Key: FLINK-5400 URL: https://issues.apache.org/jira/browse/FLINK-5400 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Xiaogang Shi Now {{RuntimeContext}} does provide the accessors to folding states. Therefore users cannot use folding states in their rich functions. I think we should provide the missing accessor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5398) Exclude generated files in module flink-batch-connectors in license checking
Xiaogang Shi created FLINK-5398: --- Summary: Exclude generated files in module flink-batch-connectors in license checking Key: FLINK-5398 URL: https://issues.apache.org/jira/browse/FLINK-5398 Project: Flink Issue Type: Bug Reporter: Xiaogang Shi Now the master branch fails to execute {{mvn install}} due to unlicensed files in the module flink-batch-connectors. We should exclude these generated files in the pom file. Unapproved licenses: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Address.java flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Fixed16.java flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5397) Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors
Xiaogang Shi created FLINK-5397: --- Summary: Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors Key: FLINK-5397 URL: https://issues.apache.org/jira/browse/FLINK-5397 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Xiaogang Shi To restore from the savepoints in previous versions, Flink now keeps all classes whose serialization is changed and put them in a separated package ("migration"). When deserializing the old savepoints, flink will look up correct descriptors ({{ObjectStreamClass}}) for these classes, without using those ones written in serialized data. The implementation however is problematic when there exist missing field descriptors in the serialized data. When deserializing an object, Java will only write the descriptors of those non-null fields. But when we look up class descriptors with given classes, all fields will be put into the descriptors. As a result, we will deserialize the savepoints with incorrect descriptors, leading to serialization exceptions. A simple resolution is to update the name of read descriptors using Reflections, without using a different descriptors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5086) Clean dead snapshot files produced by the tasks failing to acknowledge checkpoints
Xiaogang Shi created FLINK-5086: --- Summary: Clean dead snapshot files produced by the tasks failing to acknowledge checkpoints Key: FLINK-5086 URL: https://issues.apache.org/jira/browse/FLINK-5086 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Xiaogang Shi A task may fail when performing checkpoints. In that case, the task may have already copied some data to external storage. But since the task fails to send the state handler to {{CheckpointCoordinator}}, the copied data will not be deleted by {{CheckpointCoordinator}}. I think we must find a method to clean such dead snapshot data to avoid unlimited usage of external storage. One possible method is to clean these dead files when the task recovers. When a task recovers, {{CheckpointCoordinator}} will tell the task all the retained checkpoints. The task then can scan the external storage to delete all the snapshots not in these retained checkpoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5036) Perform the grouping of keys in restoring instead of checkpointing
Xiaogang Shi created FLINK-5036: --- Summary: Perform the grouping of keys in restoring instead of checkpointing Key: FLINK-5036 URL: https://issues.apache.org/jira/browse/FLINK-5036 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Xiaogang Shi Whenever taking snapshots of {{RocksDBKeyedStateBackend}}, the values in the states will be written onto different files according to their key groups. The procedure is very costly when the states are very big. Given that the snapshot operations will be performed much more frequently than restoring, we can leave the key groups as they are to improve the overall performance. In other words, we can perform the grouping of keys in restoring instead of in checkpointing. I think, the implementation will be very similar to the restoring of non-partitioned states. Each task will receive a collection of snapshots each of which contains a set of key groups. Each task will restore its states from the given snapshots by picking values in assigned key groups. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts
Xiaogang Shi created FLINK-5024: --- Summary: Add SimpleStateDescriptor to clarify the concepts Key: FLINK-5024 URL: https://issues.apache.org/jira/browse/FLINK-5024 Project: Flink Issue Type: Improvement Reporter: Xiaogang Shi Currently, StateDescriptors accept two type arguments : the first one is the type of the created state and the second one is the type of the values in the states. The concepts however is a little confusing here because in ListStates, the arguments passed to the StateDescriptors are the types of the list elements instead of the lists. It also makes the implementation of MapStates difficult. I suggest not to put the type serializer in StateDescriptors, making StateDescriptors independent of the data structures of the values. A new type of StateDescriptor named SimpleStateDescriptor can be provided to abstract those states (namely ValueState, ReducingState and FoldingState) whose states are not composited. The states (e.g. ListStates and MapStates) can implement their own descriptors according to their data structures. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5023) Add get() method in State interface
Xiaogang Shi created FLINK-5023: --- Summary: Add get() method in State interface Key: FLINK-5023 URL: https://issues.apache.org/jira/browse/FLINK-5023 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Xiaogang Shi Currently, the only method provided by the State interface is `clear()`. I think we should provide another method called `get()` to return the structured value (e.g., value, list, or map) under the current key. In fact, the functionality of `get()` has already been implemented in all types of states: e.g., `value()` in ValueState and `get()` in ListState. The modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4856) Add MapState for keyed streams
Xiaogang Shi created FLINK-4856: --- Summary: Add MapState for keyed streams Key: FLINK-4856 URL: https://issues.apache.org/jira/browse/FLINK-4856 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Reporter: Xiaogang Shi Many states in keyed streams are organized as key-value pairs. Currently, these states are implemented by storing the entire map into a ValueState or a ListState. The implementation however is very costly because all entries have to be serialized/deserialized when updating a single entry. To improve the efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4448) Use Listeners to monitor execution status
Xiaogang Shi created FLINK-4448: --- Summary: Use Listeners to monitor execution status Key: FLINK-4448 URL: https://issues.apache.org/jira/browse/FLINK-4448 Project: Flink Issue Type: Sub-task Components: Cluster Management Reporter: Xiaogang Shi Assignee: Xiaogang Shi Currently, JobMaster monitors the ExecutionGraph's job status and execution state through Akka. Since the dependencies on Akka should be removed in the refactoring, JobMaster will utilize JobStatusListener and ExecutionStateListener to receive the notifications from ExecutionGraph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4408) JobSubmission
Xiaogang Shi created FLINK-4408: --- Summary: JobSubmission Key: FLINK-4408 URL: https://issues.apache.org/jira/browse/FLINK-4408 Project: Flink Issue Type: Sub-task Components: Cluster Management Reporter: Xiaogang Shi Assignee: Xiaogang Shi Once granted the leadership, JM will start to execute the job. Most code remains the same except that (1) In old implementation where JM manages the execution of multiple jobs, JM has to load all submitted JobGraphs from SubmittedJobGraphStore and recover them. Now that the components creating JM will be responsible for the recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, without the need to load the JobGraph. (2) JM should not rely on Akka to listen on the updates of JobStatus and Execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4400) Leadership Election among JobManagers
Xiaogang Shi created FLINK-4400: --- Summary: Leadership Election among JobManagers Key: FLINK-4400 URL: https://issues.apache.org/jira/browse/FLINK-4400 Project: Flink Issue Type: Sub-task Components: Cluster Management Reporter: Xiaogang Shi Assignee: Xiaogang Shi * All JobMasters are LeaderContenders * Once a JobMaster is initialized, the very first thing it has to do is to start the leadership election service and contend for the leadership. * A JobMaster starts to perform its functionality when it grants the leadership. * If a JobMaster’s leadership is revoked, it will cancel all performed execution and release all acquired resources. -- This message was sent by Atlassian JIRA (v6.3.4#6332)