This proposal is to fix https://github.com/apache/incubator-druid/issues/6136.

## Background

### Druid's versioning system & segment timeline

Currently in Druid, a segment is tightly coupled with its _interval_. The 
interval means that all timestamps stored in the segment falls into segment's 
interval. The segment version is associated with the segment interval. That is, 
all _queryable_ segments in an interval have the same version.

With this, we can imagine that a set of segments composes a [_segment 
timeline_](http://druid.io/docs/latest/ingestion/index.html). The segment 
timeline can be thought of as an interval space. In this space, the segments 
are lined up by their interval. The segments of the same interval fall into the 
same interval space.

This can be represented by a sorted map of interval and corresponding segments 
and is implemented in `VersionedIntervalTimeline`. The 
`VersionedIntervalTimeline` is responsible for searching the segments of the 
highest version for the given interval (`lookup()`). If there are two versions 
of segments for the overlapping interval, the segments of the older version are 
_overshadowed_ by the one of the higher version and not returned as the result. 
This timeline is being used primarily by coordinators and brokers to overshadow 
old segments and to find queryable segments, respectively. We are also using 
the timeline in many places to figure out the latest segments from the given 
set of segments.

### Interval space locking

In Druid, tasks should get a lock when reading or writing data. The lock is 
created for a pair of dataSource and interval. This means, two or more tasks 
cannot be run for the same dataSource and the same interval simultaneously. If 
two or more tasks are submitted for the same dataSource and interval, the task 
of the highest priority gets a lock and other tasks wait for the task to 
complete or fail because of the revoked lock (see 
https://github.com/apache/incubator-druid/issues/4479 for more details).

<img width="1115" alt="interval space lock" 
src="https://user-images.githubusercontent.com/2322288/45274547-d5215800-b46c-11e8-9d43-060dadf6ed0b.png";>

This figure shows an example of interval space locking. `IndexTask` and 
`KafkaIndexTask` got locks for the interval `2018-01-01/2018-04-01` and 
`2018-08-01/2018-09-01`, respectively. 

The lock is also associated with a version. If a task creates multiple segments 
using the same lock, the created segments have the same version.

The version of the new locks usually have the higher version than the old ones. 
For example, overwriting tasks always get a lock of the higher version than 
that of existing segments. However, sometimes, we need to use the same version 
across different locks. The well-known use case is the tasks of the appending 
mode like kafka index task. Since these tasks append new segments to intervals 
even if segments already exist, the new segments shouldn't overshadow the 
existing ones. So, the appending tasks get the lock of the same version with 
the existing segments. This is implemented in `TaskLockBox`.

## Problem & motivation

Since https://github.com/apache/incubator-druid/issues/6136 is describing well 
what the problem and the motivation are, I'm just summarizing the key problems 
and motivations here.

### Late data problem
The kafkaIndexTask has the higher priority than the compactionTask by default. 
If some data arrives late, the kafkaIndexTasks spawned to handle the late data 
can revoke the locks of running compactionTasks. If late data is too frequent, 
compaction might never complete.

So, the compaction should be available while new data is being ingested into 
the same dataSource and the same interval. From the perspective of 
optimization, this is also good because more recent data becomes available for 
compaction as soon as it is ingested into Druid.

## Requirements for compaction

- A segment should be available for compaction immediately once it is published.
- Overwriting tasks (including compactionTask) and appending tasks shouldn't 
interfere each other. More generally speaking, two tasks shouldn't block each 
other if they read/write from/to different segments.
  - This implies the segments generated by appendingTasks should be always 
queryable before/after overwriting tasks complete.
- The overwriting tasks can change the segment granularity. This means, the 
segments of different time chunks can have the overshadow relation.

## Proposed solution

In this proposal, I propose to add a new locking algorithm, _segment locking_. 
In segment locking, a task should get a lock for each reading/writing segment. 

<img width="944" alt="segment lock" 
src="https://user-images.githubusercontent.com/2322288/45274914-7826a180-b46e-11e8-8dd7-b1faa314e3a4.png";>

This figure shows an example of segment locking. A compactionTask should get 
locks for all input segments as well as output segments. A kafkaIndexTask gets 
a new lock whenever it reads an event of a new timestamp which is not covered 
by its current locks. Since each task has a separate lock for individual 
segment, there's no lock contention between those tasks even though they are 
running for the same dataSource and the same interval as long as they 
read/write different segments.

Please note that segment locking doesn't replace with the existing interval 
space locking. Instead, both can be used in different cases. Details are 
described in the below [Changes in indexing 
service](#changes-in-indexing-service) section.

The segment locking makes the current timeline implementation broken because we 
should be able to figure out what segments are newer than others even in the 
same segment interval (time chunk in the timeline). As a result, we need a new 
mechanism to improve the timeline to support this new requirement of segment 
locking.

This section describes details of the new segment locking algorithm and the new 
timeline management algorothm.

### Changes for `DataSegment`

- `DataSegment` has an `interval`, a `version`, and a `partitionId` as it is. 
  - No two segments have the same `(interval, version, partitionId)` as it is.
- `DataSegment` has two new sets of partitionIds
  - Overshadowing segments: the segments overshadowed by this segment. This 
represent the overshadow relation between segments of the _same version_ in the 
_same time chunk_.
  - Atomic update group: the segments which should be atomically updated all 
together in the timeline. This will be further explained later in the [Timeline 
construction algorithm](#timeline-construction-algorithm) section.
  
The `DataSegment` class would be like below:

```java
public class DataSegment
{
  private final String dataSource;
  private final Interval interval;
  private final String version;
  private final Map<String, Object> loadSpec;
  private final List<String> dimensions;
  private final List<String> metrics;
  private final ShardSpec shardSpec;
  private final long size;
  private final String identifier;
  // the below two sets can be improved by encoding consecutive partitionIds as 
a range
  private final Set<Integer> overshadowingSegments;
  private final Set<Integer> atomicUpdateGroup;
}
```

### Changes in indexing service

Tasks have two options for locking, i.e., interval lock and segment lock. 
Whenever tasks publish a segment, they release the corresponding locks no 
matter what the lock type is.

#### Interval lock

The interval lock is used when the granularity of output segments is different 
from that of input segments. For example, overwriting tasks with a new segment 
granularity or the compactionTask with `keepSegmentGranularity = false` can use 
this lock.

The interval lock is same with the current lock. When a task requests an 
interval lock, the overlord makes a new version for the requested lock 
interval. The new segments can have the same `partitionId`s with the old 
segments because they would have a higher version. As a result, the new 
segments overshadow all existing segments no matter what their `partitionId`s 
are.

The `IntervalLock` would look like below:

```java
public class IntervalLock implements TaskLock
{
  private final TaskLockType type;
  private final String groupId;
  private final String dataSource;
  private final Interval interval;
  private final String version;
  private final Integer priority;
  private final boolean revoked;
}
```

#### Segment lock

The segment lock is used in all other cases. All appending tasks and 
overwriting tasks without segment granularity change use the segment lock.

In segment locking, a task first gets locks for all input segments if exist. 
Whenever the task writes a new row having the timetamp which are not covered by 
the current lock for output segments, it requests a new segmentId and gets a 
lock for the new segmentId. Requesting a new segmentId and getting a lock are 
atomically done together in `SegmentAllocateAction`. The returned lock has the 
same `version` with the segment of the highest `version` in the same 
`interval`. The returned segmentId has the unique `partitionId` among the 
segments of the same `version` in the same `interval`.

Once a task gets a segmentId and a lock, it creates a new `DataSegment` and 
sets the `overshadowing segments` of the new `DataSegment` to the 
`partitionId`s of input segments of the same interval.

The `SegmentLock` would be like:

```java
public class SegmentLock implements TaskLock
{
  private final TaskLockType type;
  private final String groupId;
  private final String dataSource;
  private final Interval interval;
  private final String version;
  private final Integer priority;
  private final boolean revoked;
  // this can be improved by encoding consecutive partitionIds as a range
  private final Set<Integer> partitionIds;
}
```

With this change, a giant batch task can get a lot of segment locks instead of 
a single interval lock.

### Changes in `TaskLockBox`

`TaskLockBox` should support both interval locks and segment locks. It checks 
the followings to detect lock collisions.

- A single exclusive interval lock can exist for the same interval.
- Two or more shared interval locks can exist for the same interval.
- Two or more mixed interval locks can't exist for the same interval.
- Interval locks and segment locks can't exist for the same interval.
- A single exlusive segment lock can exist for the same interval and the same 
partitionId.
- Two or more shared segment locks can exist for the same interval and the same 
partitionId.
- Two or more mixed segment locks can't exist for the same interval and the 
same partitionId.

### Timeline management

The new timeline extends the capability of the existing timeline algorithm 
based on the versioned intervals; the timeline first compares the segment 
version and then their `overshadowingSegments` list to find queryable segments.

Since `overshadowingSegments` stores only one-hop overshadowing relation, we 
need to track the `overshadowingSegments` of multiple segments to figure out 
all queryable segments. This is _traversing `overshadowingSegments` graph_ 
where each segment and their overshadowing relation are the node and the edge, 
respectively.

<img width="981" alt="overshadow graph" 
src="https://user-images.githubusercontent.com/2322288/45275474-e6b92e80-b471-11e8-9150-21b48c70b03a.png";>

Let me imagine the below scenario.

1) A batch task created the segments 1, 2, and 3.
2) A compaction task merged segment 2 and 3 to segment 4. At the same time, a 
kafkaIndexTask created segment 5.
3) A compaction task created segments 6 and 7 by merging and splitting segments 
4 and 5. At the same time, a kafkaIndexTask created segment 8.

The created segments consist of the overshadowing graph as shown in the above 
figure. After 3), segments 1, 6, 7, and 8 are the latest ones.

This works well only if there is no missing node in the graph. To guarantee 
this, we need to add a new assumption that _the timeline user (brokers, 
coordinators) are aware of all active segments_ of which `used` is set to 
`true` in metastore.

The coordinator is already syncing active segments from the metastore 
periodically (`SQLMetadataSegmentManager`). 

Brokers can do a similar sync to the coordinator. Each broker periodically 
reads all active segments of all or some dataSources (depending on 
`druid.broker.segment.watchedDataSources`) and keeps them in memory.

#### Timeline construction algorithm

1. Given a set of segments, groups those segments by their interval.
2. Creates `overshadowingMap` and `overshadowedMap` based on 
`DataSegment.overshadowingSegments` per interval. ``overshadowingMap`` is a 
directed graph representing what segment _overshadows_ what segments in an 
interval. Similarly, `overshadowedMap` is a directed graph representing what 
segment _is overshadowed by_ what segments in an interval.
   - The key set of `overshadowingMap` should be all input segments.
3. Per interval, finds the segments of the highest version.
4. Among the found segments from 3., finds `queryableCandidates` for active 
segments by (found segments - key set of `overshadowedMap`).
5. Checks `atomicUpdateGroup` of all `queryableCandidates`. All segments in 
`atomicUpdateGroup`s should be in `queryableCandidates`.
6. If some candidates have some segments in `atomicUpdateGroup` but not in 
`queryableCandidates`, they are not queryable yet. In this case, replaces those 
candidates with all the segments overshadowed by them.
7. Repeats 5 and 6 until
   1. all segments of each `atomicUpdateGroup` are in `queryableCandidates`. 
Returns the found timeline.
   2. or, there's no segments which can replace the incomplete atomic update 
group. These incomplete atomic groups mean that there're some missing segments. 
Even in this case, returns the found timeline including incomplete atomic 
update groups. We can improve this later by making the behavior of this case 
configurable.
   
As well as bulk construction, brokers should be able to update their timeline 
incrementally whenever a historical announces/unannounces a segment. The below 
two sections describe the algorithms for these cases.

#### Incremental update for new segments

1. Given a new segment, compares its version with the segments of overlapping 
intervals.
2. If it has the same version and interval, updates `overshadowingMap` and 
`overshadowedMap` for the new segment.
3. Checks its `atomicUpdateGroup` to see if it's ready to update the timeline. 
If all segments in `atomicUpdateGroup` are in `overshadowingMap` as well, 
replace the segments overshadowed by the segments in `atomicUpdateGroup` with 
the segments in `atomicUpdateGroup`.

#### Incremental update for removed segments

When removing a segment, we should consider falling back to the second latest 
segments.

1. Given a removed segment, removes it from `overshadowingMap` and 
`overshadowedMap`.
2. Checks all overshadowed segmets by the removed segment are still active by 
checking they are in `overshadowingMap`.
3. If all of them are active, replaces the segments in `atomicUpdateGroup` of 
the removed segment with the segments overshadowed by the removed segment.
4. If not, checks one more hop in the overshadowing graph. Checks all 
overshadowed segments of the overshadowed segments of the removed segment are 
active, and repeat 3.
5. Repeat 3 and 4 until
   1. `3.` successes.
   2. or, it reaches to the leaves of the overshadowing graph. In this case, 
returns the most latest segments with the removed segment as missing one.

[ Full content available at: 
https://github.com/apache/incubator-druid/issues/6319 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to