maytasm opened a new pull request #10948:
URL: https://github.com/apache/druid/pull/10948
Fixes #XXXX.
### Description
**When does this problem happens:**
This can happens when we have sparse columns in the input data and a
dimensionsSpec with dimension ordering that is not lexicographic. Note that
this isn’t limited to Compaction but can also occurs in initial batch ingestion
of data (index task).
**Root cause:**
This bug occurs when all the dimensions in the intermediate segments does
not have a common shared dimension ordering.
For example, if the dimensions of my data are “dimA”, “dimB” and “dimC”,
where both “dimA” and “dimC” are sparse. During ingestion, the following
intermediate segments are created:
intermediate segment 1 has column “dimA” and “dimB”. It does not have “dimC”
since “dimC” is sparse and does not exist in any rows of intermediate segment 1.
intermediate segment 2 has column “dimB” and “dimC”. It does not have “dimA”
since “dimA” is sparse and does not exist in any rows of intermediate segment 2.
When we try to merge all the intermediate segments, we will not be able to
find a shared common dimensions across all the intermediate segments (there is
no intermediate segment with all “dimA”, “dimB” and “dimC” in it’s index. The
current behavior would then fall back into merging all the intermediate
segments with a lexicographic ordering of all the dimensions. The final merged
segment will have the lexicographic ordering. This can be different than the
dimension specified in the ingestionSpec if the dimension specified is not
lexicographic ordering.
**How this impact user:**
**Impact 1: Segment size.** When a user specify a dimenionsSpec with
ordering of dimension in the ingestionSpec, the ingestion task should creates
segments with that ordering. The ordering matters as it impacts the segment
size. Since this bug will results in a final segment with lexicographic
ordering despite a dimenionsSpec (with ordering of dimension) in the
ingestionSpec.
**Impact 2: Roll up**. When a user specify forceGuaranteedRollup=true (with
hash or single dim partitioning), the user expects perfect rollup. If a
dimenionsSpec (with ordering of dimension) in the ingestionSpec is specified,
that ordering is used to create the intermediate segments. However, because of
this bug, the merging of all the intermediate segments would be done with a
different ordering (lexicographic) assuming that the ordering in dimenionsSpec
is not also lexicographic. This would result in imperfect roll up.
**More detailed example + walkthrough of how the bug occurs:**
Let’s say this is our input data
```
{"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"F","metA":1}
{"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"J","metA":1}
{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1} **
{"time":"2015-09-12T00:46:58.771Z","dimA":"Z","dimB":"S","metA":1}
{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1} **
{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"Z","metA":1}
{"time":"2015-09-12T00:46:58.771Z","dimA":"J","dimB":"R","metA":1}
{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"T","metA":1}
{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1} **
{"time":"2015-09-12T00:46:58.771Z","dimC":"A","dimB":"X","metA":1}
```
The `**` denotes row that should be roll up together.
Now we will ingest/compact this with `dimB, dimA, dimC` order in the
dimensionsSpec.
Now lets say that the intermediate segments generated are:
```
Segment 1: {"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"F","metA":1}
Segment 1: {"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"J","metA":1}
Segment 1: {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1}
Segment 2: {"time":"2015-09-12T00:46:58.771Z","dimA":"Z","dimB":"S","metA":1}
Segment 2: {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1}
Segment 2: {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"Z","metA":1}
Segment 3: {"time":"2015-09-12T00:46:58.771Z","dimA":"J","dimB":"R","metA":1}
Segment 3: {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"T","metA":1}
Segment 3: {"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1}
Segment 4: {"time":"2015-09-12T00:46:58.771Z","dimC":"A","dimB":"X","metA":1}
```
and the indexes for these intermediate segments are:
Note that the ordering of dimensions here are important and that the
ordering here follow the ordering of the dimension in the dimensionsSpec of the
ingestionSpec (`dimB, dimA, dimC`)
intermediate segments 1: dimB, dimA
intermediate segments 2: dimB, dimA
intermediate segments 3: dimB, dimA
intermediate segments 4: dimB, dimC
The generated intermediate segments rows are sorted by dimension in the
ordering of the dimensionsSpec of the ingestionSpec (`dimB, dimA, dimC`).
At merging of the intermediate segments, `getLongestSharedDimOrder` returns
`null` as there is no common shared dimension ordering across all 4
intermediate segments (none of the index have all dimA, dimB, and dimC). This
causes the merging to be done with lexicographic ordering (`dimA, dimB, and
dimC`).
Merging of the intermediate segments iterates through all the indexes in
like a BFS way. Basically, it would try to get everything for a given (time,
dims) tuple from across all segments before moving to the next tuple. When it
moved to the next tuple, it would assume that we are now on a different time,
dims tuple. Hence, it is important that the intermediate segments are sorted
with the dimension ordering that is used for merging.
Back to our example, since we are now merging with lexicographic ordering
(dimA, dimB, and dimC), the first row to be pulled is
`{"time":"2015-09-12T00:46:58.771Z","dimC":"A","dimB":"X","metA":1}`
since it first looks for the smallest value in dimA, follow by dimB, follow
by dimC where null is treated as smallest across all the first rows of all the
intermediate segments.
then it move on to
` {"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"F","metA":1}` (from
intermediate segment 1) then
`{"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"J","metA":1}` (from
intermediate segment 1) then
`{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X","metA":1}` (from
intermediate segment 1) then
So far nothing is roll up as we haven’t gotten any (time, dims) tuple that
are the same. We have now iterated through all the rows in intermediate
segments 4 (first iteration) and intermediate segments 1 (second to fourth
iterations).
Now the next row to be iterate is either the first row of intermediate
segments 2
(`{"time":"2015-09-12T00:46:58.771Z","dimA":"Z","dimB":"S","metA":1}`) or the
first row of intermediate segment 3
(`{"time":"2015-09-12T00:46:58.771Z","dimA":"J","dimB":"R","metA":1}`). The
problem here is that it should have iterate through the rows with
`"dimA":"H","dimB":"X"` in intermediate segment 2 and 3 first to be able to
combine with the `"dimA":"H","dimB":"X"` already iterated from intermediate
segment 1. Just to complete the example, we then iterates
`{"time":"2015-09-12T00:46:58.771Z","dimA":"J","dimB":"R","metA":1} `(from
intermediate segments 3) first since dimA value of J is smaller than dimA value
of Z (from intermediate segments 2). Then we iterate the
`{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"T","metA":1} `(from
intermediate segments 3) since dimA value of H is smaller than dimA value of Z
(from intermediate segments 2). However, we can no longer roll up the row (from
i
ntermediate segments 3) with the previously iterated row from intermediate
segments 1 since there was the`
{"time":"2015-09-12T00:46:58.771Z","dimA":"J","dimB":"R","metA":1}` in between
which has dimA != H and dimB != X values.
Purpose Fix:
High level:
We will make the dimension names given in the ingest spec available to
getLongestShardeDimOrder (As an additional argument).
We should pass the `schema.getDimensionsSpec().getDimensionNames()` (which
is the DimensionNames from the ingestionSpec) from within the
`AppendetaorImpl#mergeAndPush` to `indexMerger.mergeQueryableIndex(...)`
method. Which would then pass the DimensionNames to the `IndexMergerV9.merge`
method. `IndexMergerV9.merge` would pass DimensionNames to the
`IndexMerger.getMergedDimensions(indexes)` and finally, we pass it to
`getLongestSharedDimOrder(...);`
In `getLongestSharedDimOrder(indexes, specDimensionNames)`, we would then
try to find the LongestSharedDim as usually. However, if we cannot find the
LongestSharedDim, before returning null, we will do the following in sequence:
1. check if `specDimensionNames` is not null and not empty. → if null or
empty, we return null
2. get the `specDimensionNames` and remove all dimensions that does not
exist within the `indexes` dimension names.
3. check if `specDimensionNames` does not have extra / missing columns from
the set of all `indexes` dimension names. → if it has extra/missing columns, we
return null
4. check that all `indexes` dimension name ordering is the same as the
ordering in `specDimensionNames`. → if any index dimension name have a
different ordering than `specDimensionNames` we return null. For example, if
`specDimensionNames` is A, B, C and one of the index dimension name is A, C
then this is valid, however if one of the index dimension name is C, A then
this is invalid and we return null
5. return specDimensionNames
This PR has:
- [ ] been self-reviewed.
- [ ] using the [concurrency
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
(Remove this item if the PR doesn't have any relation to concurrency.)
- [ ] added documentation for new or modified features or behaviors.
- [ ] added Javadocs for most classes and all non-trivial methods. Linked
related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
- [ ] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [ ] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold for [code
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
is met.
- [ ] added integration tests.
- [ ] been tested in a test Druid cluster.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]