cryptoe commented on code in PR #13269:
URL: https://github.com/apache/druid/pull/13269#discussion_r1012070101


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleanerConfig.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+public class DurableStorageCleanerConfig
+{
+
+  private static final long DEFAULT_INITIAL_DELAY_SECONDS = 86400L;
+  private static final long DEFAULT_DELAY_SECONDS = 86400L;
+
+  /**
+   * Whether the {@link DurableStorageCleaner} helper should be enabled or not
+   */
+  @JsonProperty
+  private final boolean enabled;
+
+  /**
+   * Initial delay in seconds post which the durable storage cleaner should run
+   */
+  @JsonProperty
+  private final long initialDelaySeconds;
+
+  /**
+   * The delay (in seconds) after the last run post which the durable storage 
cleaner would clean the outputs
+   */
+  @JsonProperty
+  private final long delaySeconds;
+
+  @JsonCreator
+  public DurableStorageCleanerConfig(
+      @JsonProperty("enabled") final Boolean enabled,
+      @JsonProperty("initialDelay") final Long initialDelaySeconds,
+      @JsonProperty("delay") final Long delaySeconds
+  )
+  {
+    this.enabled = enabled != null && enabled;
+    this.initialDelaySeconds = initialDelaySeconds != null ? 
initialDelaySeconds : DEFAULT_INITIAL_DELAY_SECONDS;

Review Comment:
   Nit: I would be curious to understand why is an InitialDelay required. Maybe 
I am missing a use case?
   



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleanerConfig.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+public class DurableStorageCleanerConfig
+{
+
+  private static final long DEFAULT_INITIAL_DELAY_SECONDS = 86400L;
+  private static final long DEFAULT_DELAY_SECONDS = 86400L;
+
+  /**
+   * Whether the {@link DurableStorageCleaner} helper should be enabled or not
+   */
+  @JsonProperty
+  private final boolean enabled;
+
+  /**
+   * Initial delay in seconds post which the durable storage cleaner should run
+   */
+  @JsonProperty
+  private final long initialDelaySeconds;
+
+  /**
+   * The delay (in seconds) after the last run post which the durable storage 
cleaner would clean the outputs
+   */
+  @JsonProperty
+  private final long delaySeconds;
+
+  @JsonCreator
+  public DurableStorageCleanerConfig(
+      @JsonProperty("enabled") final Boolean enabled,
+      @JsonProperty("initialDelay") final Long initialDelaySeconds,
+      @JsonProperty("delay") final Long delaySeconds
+  )
+  {
+    this.enabled = enabled != null && enabled;

Review Comment:
   Nit: Shouldn't this be enabled by default?



##########
docs/multi-stage-query/reference.md:
##########
@@ -205,6 +205,16 @@ The following table lists the context parameters for the 
MSQ task engine:
 | sqlTimeZone | Sets the time zone for this connection, which affects how time 
functions and timestamp literals behave. Use a time zone name like 
"America/Los_Angeles" or offset like "-08:00".| `druid.sql.planner.sqlTimeZone` 
on the Broker (default: UTC)|
 | useApproximateCountDistinct | Whether to use an approximate cardinality 
algorithm for `COUNT(DISTINCT foo)`.| 
`druid.sql.planner.useApproximateCountDistinct` on the Broker (default: true)|
 
+## Durable Storage
+This section enumates the advantages and performance implications of enabling 
durable storage while executing MSQ tasks.
+
+|Parameter          |Default                                 | Description     
     |
+|-------------------|----------------------------------------|----------------------|
+|`druid.msq.intermediate.storage.enable` | false | Whether or not to enable 
durable storage for the cluster |
+|`druid.msq.intermediate.storage.cleaner.enabled`| false | Whether durable 
storage cleaner should be enabled for the cluster|

Review Comment:
   We might want to mention that this properties are overlord only. 



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -127,10 +129,43 @@ public void deleteRecursively(String dirName)
     }
   }
 
+  @Override
+  public List<String> listDir(String dirName)
+  {
+    ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request()
+        .withBucketName(config.getBucket())
+        .withPrefix(objectPath(dirName))
+        .withDelimiter(DELIM);
+
+    List<String> lsResult = new ArrayList<>();
+
+    ListObjectsV2Result objectListing = 
s3Client.listObjectsV2(listObjectsRequest);

Review Comment:
   Nit: can we add some test cases here ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.helpers.OverlordHelper;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory;
+import org.apache.druid.storage.StorageConnector;
+import org.joda.time.Duration;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+
+/**
+ * This method polls the durable storage for any stray directories, i.e. the 
ones that donot have a controller task
+ * associated with it and cleans them periodically.
+ * This ensures that the tasks which that have exited abruptly or have failed 
to clean up the durable storage themselves
+ * donot pollute it with worker outputs and temporary files. See {@link 
DurableStorageCleanerConfig} for the configs.
+ */
+public class DurableStorageCleaner implements OverlordHelper
+{
+
+  private static final Logger LOG = new Logger(DurableStorageCleaner.class);
+
+  private final DurableStorageCleanerConfig config;
+  private final StorageConnector storageConnector;
+  private final TaskRunner taskRunner;
+
+  @Inject
+  public DurableStorageCleaner(
+      final DurableStorageCleanerConfig config,
+      final StorageConnector storageConnector,
+      final TaskRunner taskRunner
+  )
+  {
+    this.config = config;
+    this.storageConnector = storageConnector;
+    this.taskRunner = taskRunner;
+  }
+
+  @Override
+  public boolean isEnabled()
+  {
+    return config.isEnabled();
+  }
+
+  @Override
+  public void schedule(ScheduledExecutorService exec)
+  {
+    LOG.info("Starting the DurableStorageCleaner with the config [%s]", 
config);
+
+    ScheduledExecutors.scheduleWithFixedDelay(
+        exec,
+        Duration.standardSeconds(config.getInitialDelaySeconds()),
+        Duration.standardSeconds(config.getDelaySeconds()),
+        () -> {
+          try {
+            Set<String> allDirectories = new 
HashSet<>(storageConnector.listDir(""));
+            Set<String> runningTaskIds = taskRunner.getRunningTasks()
+                                                   .stream()
+                                                   
.map(TaskRunnerWorkItem::getTaskId)
+                                                   
.map(DurableStorageOutputChannelFactory::getControllerDirectory)
+                                                   
.collect(Collectors.toSet());
+            Set<String> unknownDirectories = Sets.difference(allDirectories, 
runningTaskIds);
+            LOG.info(
+                "Following directories donot have a corresponding MSQ task 
associated with it:\n%s\nThese will get cleaned up.",
+                unknownDirectories
+            );
+            for (String unknownDirectory : unknownDirectories) {
+              storageConnector.deleteRecursively(unknownDirectory);
+            }
+          }
+          catch (IOException e) {
+            throw new RuntimeException("Error while running the scheduled 
durable storage cleanup helper", e);

Review Comment:
   Does the overlord fail if the exception is thrown?
   
   if yes? then we might want to change this behavior to not cause an overlord 
switch. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to