jon-wei commented on a change in pull request #11025:
URL: https://github.com/apache/druid/pull/11025#discussion_r603768694
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
##########
@@ -301,11 +317,12 @@ public boolean isAudited()
public String toString()
{
return "SegmentTransactionalInsertAction{" +
- "segmentsToBeOverwritten=" +
SegmentUtils.commaSeparatedIdentifiers(segmentsToBeOverwritten) +
- ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
+ "segmentsToBeOverwritten=" + segmentsToBeOverwritten +
Review comment:
Should this preserve the `SegmentUtils.commaSeparatedIdentifiers(` and
apply it to `segmentsToBeDropped` as well?
##########
File path: docs/ingestion/native-batch.md
##########
@@ -719,6 +723,7 @@ that range if there's some stray data with unexpected
timestamps.
|type|The task type, this should always be "index".|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to
parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version,
effectively appending to the segment set instead of replacing it. This means
that you can append new segments to any datasource regardless of its original
partitioning scheme. You must use the `dynamic` partitioning type for the
appended segments. If you specify a different partitioning type, the task fails
with an error.|false|no|
+|dropExisting|If set to true (and `appendToExisting` set to false and
`interval` specified in `granularitySpec`), then the ingestion task would drop
(mark unused) all existing segments that are fully contain by the `interval` in
the `granularitySpec` when the task publishes new segments (no segments would
be drop (mark unused) if the ingestion fails). Note that if either
`appendToExisting` is `true` or `interval` is not specified in
`granularitySpec` then no segments would be drop even if `dropExisting` is set
to `true`.|false|no|
Review comment:
```suggestion
|dropExisting|If set to true (and `appendToExisting` is set to false and
`interval` is specified in `granularitySpec`), then the ingestion task would
drop (mark unused) all existing segments that are fully contained by the
`interval` in the `granularitySpec` when the task publishes new segments (no
segments would be dropped (marked unused) if the ingestion fails). Note that if
either `appendToExisting` is `true` or `interval` is not specified in
`granularitySpec` then no segments would be dropped even if `dropExisting` is
set to `true`.|false|no|
```
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
##########
@@ -63,13 +62,16 @@
private final DataSourceMetadata endMetadata;
@Nullable
private final String dataSource;
+ @Nullable
+ private final Set<DataSegment> segmentsToBeDropped;
Review comment:
Can you add some javadocs to this class that explain the difference
between `segmentsToBeOverwritten` and `segmentsToBeDropped`?
##########
File path: docs/ingestion/native-batch.md
##########
@@ -89,6 +89,8 @@ You may want to consider the below things:
data in segments where it actively adds data: if there are segments in your
`granularitySpec`'s intervals that have
no data written by this task, they will be left alone. If any existing
segments partially overlap with the
`granularitySpec`'s intervals, the portion of those segments outside the new
segments' intervals will still be visible.
+ You can set `dropExisting` flag in the `ioConfig` to true if you want the
ingestion task to drop all existing data
Review comment:
Suggest adding an example where the user would want to enable it, such
as the YEAR/MONTH example in the PR description
##########
File path: docs/ingestion/native-batch.md
##########
@@ -193,6 +195,7 @@ that range if there's some stray data with unexpected
timestamps.
|type|The task type, this should always be `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to
parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version,
effectively appending to the segment set instead of replacing it. This means
that you can append new segments to any datasource regardless of its original
partitioning scheme. You must use the `dynamic` partitioning type for the
appended segments. If you specify a different partitioning type, the task fails
with an error.|false|no|
+|dropExisting|If set to true (and `appendToExisting` set to false and
`interval` specified in `granularitySpec`), then the ingestion task would drop
(mark unused) all existing segments that are fully contain by the `interval` in
the `granularitySpec` when the task publishes new segments (no segments would
be drop (mark unused) if the ingestion fails). Note that if either
`appendToExisting` is `true` or `interval` is not specified in
`granularitySpec` then no segments would be drop even if `dropExisting` is set
to `true`.|false|no|
Review comment:
```suggestion
|dropExisting|If set to true (and `appendToExisting` is set to false and
`interval` is specified in `granularitySpec`), then the ingestion task would
drop (mark unused) all existing segments that are fully contained by the
`interval` in the `granularitySpec` when the task publishes new segments (no
segments would be dropped (marked unused) if the ingestion fails). Note that if
either `appendToExisting` is `true` or `interval` is not specified in
`granularitySpec` then no segments would be dropped even if `dropExisting` is
set to `true`.|false|no|
```
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -490,6 +491,27 @@ public static boolean isGuaranteedRollup(IndexIOConfig
ioConfig, IndexTuningConf
}
}
+ public static Set<DataSegment> getUsedSegmentsWithinInterval(
+ TaskToolbox toolbox,
+ String dataSource,
+ List<Interval> intervals
+ ) throws IOException
+ {
+ Set<DataSegment> segmentsFoundForDrop = new HashSet<>();
+ List<Interval> condensedIntervals = JodaUtils.condenseIntervals(intervals);
+ if (!intervals.isEmpty()) {
+ Collection<DataSegment> usedSegment =
toolbox.getTaskActionClient().submit(new RetrieveUsedSegmentsAction(dataSource,
null, condensedIntervals, Segments.ONLY_VISIBLE));
+ for (DataSegment segment : usedSegment) {
+ for (Interval interval : condensedIntervals) {
+ if (interval.contains(segment.getInterval())) {
+ segmentsFoundForDrop.add(segment);
Review comment:
Could add a `continue` here once a segment is found for efficiency
##########
File path:
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
##########
@@ -108,7 +110,7 @@ public IndexerSQLMetadataStorageCoordinator(
this.connector = connector;
}
- enum DataSourceMetadataUpdateResult
+ enum DataStoreMetadataUpdateResult
Review comment:
What's the reasoning behind the rename?
##########
File path: docs/ingestion/native-batch.md
##########
@@ -193,6 +195,7 @@ that range if there's some stray data with unexpected
timestamps.
|type|The task type, this should always be `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to
parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version,
effectively appending to the segment set instead of replacing it. This means
that you can append new segments to any datasource regardless of its original
partitioning scheme. You must use the `dynamic` partitioning type for the
appended segments. If you specify a different partitioning type, the task fails
with an error.|false|no|
+|dropExisting|If set to true (and `appendToExisting` set to false and
`interval` specified in `granularitySpec`), then the ingestion task would drop
(mark unused) all existing segments that are fully contain by the `interval` in
the `granularitySpec` when the task publishes new segments (no segments would
be drop (mark unused) if the ingestion fails). Note that if either
`appendToExisting` is `true` or `interval` is not specified in
`granularitySpec` then no segments would be drop even if `dropExisting` is set
to `true`.|false|no|
Review comment:
Suggest noting somewhere that compaction tasks will have this enabled
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]