This proposal is to fix https://github.com/apache/incubator-druid/issues/6136.
## Background ### Druid's versioning system & segment timeline Currently in Druid, a segment is tightly coupled with its _interval_. The interval means that all timestamps stored in the segment falls into segment's interval. The segment version is associated with the segment interval. That is, all _queryable_ segments in an interval have the same version. With this, we can imagine that a set of segments composes a [_segment timeline_](http://druid.io/docs/latest/ingestion/index.html). The segment timeline can be thought of as an interval space. In this space, the segments are lined up by their interval. The segments of the same interval fall into the same interval space. This can be represented by a sorted map of interval and corresponding segments and is implemented in `VersionedIntervalTimeline`. The `VersionedIntervalTimeline` is responsible for searching the segments of the highest version for the given interval (`lookup()`). If there are two versions of segments for the overlapping interval, the segments of the older version are _overshadowed_ by the one of the higher version and not returned as the result. This timeline is being used primarily by coordinators and brokers to overshadow old segments and to find queryable segments, respectively. We are also using the timeline in many places to figure out the latest segments from the given set of segments. ### Interval space locking In Druid, tasks should get a lock when reading or writing data. The lock is created for a pair of dataSource and interval. This means, two or more tasks cannot be run for the same dataSource and the same interval simultaneously. If two or more tasks are submitted for the same dataSource and interval, the task of the highest priority gets a lock and other tasks wait for the task to complete or fail because of the revoked lock (see https://github.com/apache/incubator-druid/issues/4479 for more details). <img width="1115" alt="interval space lock" src="https://user-images.githubusercontent.com/2322288/45274547-d5215800-b46c-11e8-9d43-060dadf6ed0b.png"> This figure shows an example of interval space locking. `IndexTask` and `KafkaIndexTask` got locks for the interval `2018-01-01/2018-04-01` and `2018-08-01/2018-09-01`, respectively. The lock is also associated with a version. If a task creates multiple segments using the same lock, the created segments have the same version. The version of the new locks usually have the higher version than the old ones. For example, overwriting tasks always get a lock of the higher version than that of existing segments. However, sometimes, we need to use the same version across different locks. The well-known use case is the tasks of the appending mode like kafka index task. Since these tasks append new segments to intervals even if segments already exist, the new segments shouldn't overshadow the existing ones. So, the appending tasks get the lock of the same version with the existing segments. This is implemented in `TaskLockBox`. ## Problem & motivation Since https://github.com/apache/incubator-druid/issues/6136 is describing well what the problem and the motivation are, I'm just summarizing the key problems and motivations here. ### Late data problem The kafkaIndexTask has the higher priority than the compactionTask by default. If some data arrives late, the kafkaIndexTasks spawned to handle the late data can revoke the locks of running compactionTasks. If late data is too frequent, compaction might never complete. So, the compaction should be available while new data is being ingested into the same dataSource and the same interval. From the perspective of optimization, this is also good because more recent data becomes available for compaction as soon as it is ingested into Druid. ## Requirements for compaction - A segment should be available for compaction immediately once it is published. - Overwriting tasks (including compactionTask) and appending tasks shouldn't interfere each other. More generally speaking, two tasks shouldn't block each other if they read/write from/to different segments. - This implies the segments generated by appendingTasks should be always queryable before/after overwriting tasks complete. - The overwriting tasks can change the segment granularity. This means, the segments of different time chunks can have the overshadow relation. ## Proposed solution In this proposal, I propose to add a new locking algorithm, _segment locking_. In segment locking, a task should get a lock for each reading/writing segment. <img width="944" alt="segment lock" src="https://user-images.githubusercontent.com/2322288/45274914-7826a180-b46e-11e8-8dd7-b1faa314e3a4.png"> This figure shows an example of segment locking. A compactionTask should get locks for all input segments as well as output segments. A kafkaIndexTask gets a new lock whenever it reads an event of a new timestamp which is not covered by its current locks. Since each task has a separate lock for individual segment, there's no lock contention between those tasks even though they are running for the same dataSource and the same interval as long as they read/write different segments. Please note that segment locking doesn't replace with the existing interval space locking. Instead, both can be used in different cases. Details are described in the below [Changes in indexing service](#changes-in-indexing-service) section. The segment locking makes the current timeline implementation broken because we should be able to figure out what segments are newer than others even in the same segment interval (time chunk in the timeline). As a result, we need a new mechanism to improve the timeline to support this new requirement of segment locking. This section describes details of the new segment locking algorithm and the new timeline management algorothm. ### Changes for `DataSegment` - `DataSegment` has an `interval`, a `version`, and a `partitionId` as it is. - No two segments have the same `(interval, version, partitionId)` as it is. - `DataSegment` has two new sets of partitionIds - Overshadowing segments: the segments overshadowed by this segment. This represent the overshadow relation between segments of the _same version_ in the _same time chunk_. - Atomic update group: the segments which should be atomically updated all together in the timeline. This will be further explained later in the [Timeline construction algorithm](#timeline-construction-algorithm) section. The `DataSegment` class would be like below: ```java public class DataSegment { private final String dataSource; private final Interval interval; private final String version; private final Map<String, Object> loadSpec; private final List<String> dimensions; private final List<String> metrics; private final ShardSpec shardSpec; private final long size; private final String identifier; // the below two sets can be improved by encoding consecutive partitionIds as a range private final Set<Integer> overshadowingSegments; private final Set<Integer> atomicUpdateGroup; } ``` ### Changes in indexing service Tasks have two options for locking, i.e., interval lock and segment lock. Whenever tasks publish a segment, they release the corresponding locks no matter what the lock type is. #### Interval lock The interval lock is used when the granularity of output segments is different from that of input segments. For example, overwriting tasks with a new segment granularity or the compactionTask with `keepSegmentGranularity = false` can use this lock. The interval lock is same with the current lock. When a task requests an interval lock, the overlord makes a new version for the requested lock interval. The new segments can have the same `partitionId`s with the old segments because they would have a higher version. As a result, the new segments overshadow all existing segments no matter what their `partitionId`s are. The `IntervalLock` would look like below: ```java public class IntervalLock implements TaskLock { private final TaskLockType type; private final String groupId; private final String dataSource; private final Interval interval; private final String version; private final Integer priority; private final boolean revoked; } ``` #### Segment lock The segment lock is used in all other cases. All appending tasks and overwriting tasks without segment granularity change use the segment lock. In segment locking, a task first gets locks for all input segments if exist. Whenever the task writes a new row having the timetamp which are not covered by the current lock for output segments, it requests a new segmentId and gets a lock for the new segmentId. Requesting a new segmentId and getting a lock are atomically done together in `SegmentAllocateAction`. The returned lock has the same `version` with the segment of the highest `version` in the same `interval`. The returned segmentId has the unique `partitionId` among the segments of the same `version` in the same `interval`. Once a task gets a segmentId and a lock, it creates a new `DataSegment` and sets the `overshadowing segments` of the new `DataSegment` to the `partitionId`s of input segments of the same interval. The `SegmentLock` would be like: ```java public class SegmentLock implements TaskLock { private final TaskLockType type; private final String groupId; private final String dataSource; private final Interval interval; private final String version; private final Integer priority; private final boolean revoked; // this can be improved by encoding consecutive partitionIds as a range private final Set<Integer> partitionIds; } ``` With this change, a giant batch task can get a lot of segment locks instead of a single interval lock. ### Changes in `TaskLockBox` `TaskLockBox` should support both interval locks and segment locks. It checks the followings to detect lock collisions. - A single exclusive interval lock can exist for the same interval. - Two or more shared interval locks can exist for the same interval. - Two or more mixed interval locks can't exist for the same interval. - Interval locks and segment locks can't exist for the same interval. - A single exlusive segment lock can exist for the same interval and the same partitionId. - Two or more shared segment locks can exist for the same interval and the same partitionId. - Two or more mixed segment locks can't exist for the same interval and the same partitionId. ### Timeline management The new timeline extends the capability of the existing timeline algorithm based on the versioned intervals; the timeline first compares the segment version and then their `overshadowingSegments` list to find queryable segments. Since `overshadowingSegments` stores only one-hop overshadowing relation, we need to track the `overshadowingSegments` of multiple segments to figure out all queryable segments. This is _traversing `overshadowingSegments` graph_ where each segment and their overshadowing relation are the node and the edge, respectively. <img width="981" alt="overshadow graph" src="https://user-images.githubusercontent.com/2322288/45275474-e6b92e80-b471-11e8-9150-21b48c70b03a.png"> Let me imagine the below scenario. 1) A batch task created the segments 1, 2, and 3. 2) A compaction task merged segment 2 and 3 to segment 4. At the same time, a kafkaIndexTask created segment 5. 3) A compaction task created segments 6 and 7 by merging and splitting segments 4 and 5. At the same time, a kafkaIndexTask created segment 8. The created segments consist of the overshadowing graph as shown in the above figure. After 3), segments 1, 6, 7, and 8 are the latest ones. This works well only if there is no missing node in the graph. To guarantee this, we need to add a new assumption that _the timeline user (brokers, coordinators) are aware of all active segments_ of which `used` is set to `true` in metastore. The coordinator is already syncing active segments from the metastore periodically (`SQLMetadataSegmentManager`). Brokers can do a similar sync to the coordinator. Each broker periodically reads all active segments of all or some dataSources (depending on `druid.broker.segment.watchedDataSources`) and keeps them in memory. #### Timeline construction algorithm 1. Given a set of segments, groups those segments by their interval. 2. Creates `overshadowingMap` and `overshadowedMap` based on `DataSegment.overshadowingSegments` per interval. ``overshadowingMap`` is a directed graph representing what segment _overshadows_ what segments in an interval. Similarly, `overshadowedMap` is a directed graph representing what segment _is overshadowed by_ what segments in an interval. - The key set of `overshadowingMap` should be all input segments. 3. Per interval, finds the segments of the highest version. 4. Among the found segments from 3., finds `queryableCandidates` for active segments by (found segments - key set of `overshadowedMap`). 5. Checks `atomicUpdateGroup` of all `queryableCandidates`. All segments in `atomicUpdateGroup`s should be in `queryableCandidates`. 6. If some candidates have some segments in `atomicUpdateGroup` but not in `queryableCandidates`, they are not queryable yet. In this case, replaces those candidates with all the segments overshadowed by them. 7. Repeats 5 and 6 until 1. all segments of each `atomicUpdateGroup` are in `queryableCandidates`. Returns the found timeline. 2. or, there's no segments which can replace the incomplete atomic update group. These incomplete atomic groups mean that there're some missing segments. Even in this case, returns the found timeline including incomplete atomic update groups. We can improve this later by making the behavior of this case configurable. As well as bulk construction, brokers should be able to update their timeline incrementally whenever a historical announces/unannounces a segment. The below two sections describe the algorithms for these cases. #### Incremental update for new segments 1. Given a new segment, compares its version with the segments of overlapping intervals. 2. If it has the same version and interval, updates `overshadowingMap` and `overshadowedMap` for the new segment. 3. Checks its `atomicUpdateGroup` to see if it's ready to update the timeline. If all segments in `atomicUpdateGroup` are in `overshadowingMap` as well, replace the segments overshadowed by the segments in `atomicUpdateGroup` with the segments in `atomicUpdateGroup`. #### Incremental update for removed segments When removing a segment, we should consider falling back to the second latest segments. 1. Given a removed segment, removes it from `overshadowingMap` and `overshadowedMap`. 2. Checks all overshadowed segmets by the removed segment are still active by checking they are in `overshadowingMap`. 3. If all of them are active, replaces the segments in `atomicUpdateGroup` of the removed segment with the segments overshadowed by the removed segment. 4. If not, checks one more hop in the overshadowing graph. Checks all overshadowed segments of the overshadowed segments of the removed segment are active, and repeat 3. 5. Repeat 3 and 4 until 1. `3.` successes. 2. or, it reaches to the leaves of the overshadowing graph. In this case, returns the most latest segments with the removed segment as missing one. [ Full content available at: https://github.com/apache/incubator-druid/issues/6319 ] This message was relayed via gitbox.apache.org for [email protected]
