jon-wei commented on a change in pull request #11025:
URL: https://github.com/apache/druid/pull/11025#discussion_r603768694



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
##########
@@ -301,11 +317,12 @@ public boolean isAudited()
   public String toString()
   {
     return "SegmentTransactionalInsertAction{" +
-           "segmentsToBeOverwritten=" + 
SegmentUtils.commaSeparatedIdentifiers(segmentsToBeOverwritten) +
-           ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
+           "segmentsToBeOverwritten=" + segmentsToBeOverwritten +

Review comment:
       Should this preserve the `SegmentUtils.commaSeparatedIdentifiers(` and 
apply it to `segmentsToBeDropped` as well?

##########
File path: docs/ingestion/native-batch.md
##########
@@ -719,6 +723,7 @@ that range if there's some stray data with unexpected 
timestamps.
 |type|The task type, this should always be "index".|none|yes|
 |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to 
parse input data.|none|yes|
 |appendToExisting|Creates segments as additional shards of the latest version, 
effectively appending to the segment set instead of replacing it. This means 
that you can append new segments to any datasource regardless of its original 
partitioning scheme. You must use the `dynamic` partitioning type for the 
appended segments. If you specify a different partitioning type, the task fails 
with an error.|false|no|
+|dropExisting|If set to true (and `appendToExisting` set to false and 
`interval` specified in `granularitySpec`), then the ingestion task would drop 
(mark unused) all existing segments that are fully contain by the `interval` in 
the `granularitySpec` when the task publishes new segments (no segments would 
be drop (mark unused) if the ingestion fails). Note that if either 
`appendToExisting` is `true` or `interval` is not specified in 
`granularitySpec` then no segments would be drop even if `dropExisting` is set 
to `true`.|false|no|

Review comment:
       ```suggestion
   |dropExisting|If set to true (and `appendToExisting` is set to false and 
`interval` is specified in `granularitySpec`), then the ingestion task would 
drop (mark unused) all existing segments that are fully contained by the 
`interval` in the `granularitySpec` when the task publishes new segments (no 
segments would be dropped (marked unused) if the ingestion fails). Note that if 
either `appendToExisting` is `true` or `interval` is not specified in 
`granularitySpec` then no segments would be dropped even if `dropExisting` is 
set to `true`.|false|no|
   ```

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
##########
@@ -63,13 +62,16 @@
   private final DataSourceMetadata endMetadata;
   @Nullable
   private final String dataSource;
+  @Nullable
+  private final Set<DataSegment> segmentsToBeDropped;

Review comment:
       Can you add some javadocs to this class that explain the difference 
between `segmentsToBeOverwritten` and `segmentsToBeDropped`?

##########
File path: docs/ingestion/native-batch.md
##########
@@ -89,6 +89,8 @@ You may want to consider the below things:
   data in segments where it actively adds data: if there are segments in your 
`granularitySpec`'s intervals that have
   no data written by this task, they will be left alone. If any existing 
segments partially overlap with the
   `granularitySpec`'s intervals, the portion of those segments outside the new 
segments' intervals will still be visible.
+  You can set `dropExisting` flag in the `ioConfig` to true if you want the 
ingestion task to drop all existing data 

Review comment:
       Suggest adding an example where the user would want to enable it, such 
as the YEAR/MONTH example in the PR description

##########
File path: docs/ingestion/native-batch.md
##########
@@ -193,6 +195,7 @@ that range if there's some stray data with unexpected 
timestamps.
 |type|The task type, this should always be `index_parallel`.|none|yes|
 |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to 
parse input data.|none|yes|
 |appendToExisting|Creates segments as additional shards of the latest version, 
effectively appending to the segment set instead of replacing it. This means 
that you can append new segments to any datasource regardless of its original 
partitioning scheme. You must use the `dynamic` partitioning type for the 
appended segments. If you specify a different partitioning type, the task fails 
with an error.|false|no|
+|dropExisting|If set to true (and `appendToExisting` set to false and 
`interval` specified in `granularitySpec`), then the ingestion task would drop 
(mark unused) all existing segments that are fully contain by the `interval` in 
the `granularitySpec` when the task publishes new segments (no segments would 
be drop (mark unused) if the ingestion fails). Note that if either 
`appendToExisting` is `true` or `interval` is not specified in 
`granularitySpec` then no segments would be drop even if `dropExisting` is set 
to `true`.|false|no|

Review comment:
       ```suggestion
   |dropExisting|If set to true (and `appendToExisting` is set to false and 
`interval` is specified in `granularitySpec`), then the ingestion task would 
drop (mark unused) all existing segments that are fully contained by the 
`interval` in the `granularitySpec` when the task publishes new segments (no 
segments would be dropped (marked unused) if the ingestion fails). Note that if 
either `appendToExisting` is `true` or `interval` is not specified in 
`granularitySpec` then no segments would be dropped even if `dropExisting` is 
set to `true`.|false|no|
   ```

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -490,6 +491,27 @@ public static boolean isGuaranteedRollup(IndexIOConfig 
ioConfig, IndexTuningConf
     }
   }
 
+  public static Set<DataSegment> getUsedSegmentsWithinInterval(
+      TaskToolbox toolbox,
+      String dataSource,
+      List<Interval> intervals
+  ) throws IOException
+  {
+    Set<DataSegment> segmentsFoundForDrop = new HashSet<>();
+    List<Interval> condensedIntervals = JodaUtils.condenseIntervals(intervals);
+    if (!intervals.isEmpty()) {
+      Collection<DataSegment> usedSegment = 
toolbox.getTaskActionClient().submit(new RetrieveUsedSegmentsAction(dataSource, 
null, condensedIntervals, Segments.ONLY_VISIBLE));
+      for (DataSegment segment : usedSegment) {
+        for (Interval interval : condensedIntervals) {
+          if (interval.contains(segment.getInterval())) {
+            segmentsFoundForDrop.add(segment);

Review comment:
       Could add a `continue` here once a segment is found for efficiency

##########
File path: 
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
##########
@@ -108,7 +110,7 @@ public IndexerSQLMetadataStorageCoordinator(
     this.connector = connector;
   }
 
-  enum DataSourceMetadataUpdateResult
+  enum DataStoreMetadataUpdateResult

Review comment:
       What's the reasoning behind the rename?

##########
File path: docs/ingestion/native-batch.md
##########
@@ -193,6 +195,7 @@ that range if there's some stray data with unexpected 
timestamps.
 |type|The task type, this should always be `index_parallel`.|none|yes|
 |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to 
parse input data.|none|yes|
 |appendToExisting|Creates segments as additional shards of the latest version, 
effectively appending to the segment set instead of replacing it. This means 
that you can append new segments to any datasource regardless of its original 
partitioning scheme. You must use the `dynamic` partitioning type for the 
appended segments. If you specify a different partitioning type, the task fails 
with an error.|false|no|
+|dropExisting|If set to true (and `appendToExisting` set to false and 
`interval` specified in `granularitySpec`), then the ingestion task would drop 
(mark unused) all existing segments that are fully contain by the `interval` in 
the `granularitySpec` when the task publishes new segments (no segments would 
be drop (mark unused) if the ingestion fails). Note that if either 
`appendToExisting` is `true` or `interval` is not specified in 
`granularitySpec` then no segments would be drop even if `dropExisting` is set 
to `true`.|false|no|

Review comment:
       Suggest noting somewhere that compaction tasks will have this enabled




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to