vvysotskyi commented on code in PR #2655:
URL: https://github.com/apache/drill/pull/2655#discussion_r979578252


##########
exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java:
##########
@@ -91,22 +91,69 @@ private CalciteSchema getSchema(String schemaName, boolean 
caseSensitive) {
 
   public SchemaPath resolveTableAlias(String alias) {
     return Optional.ofNullable(aliasRegistryProvider.getTableAliasesRegistry()
-        .getUserAliases(schemaConfig.getUserName()).get(alias))
+      .getUserAliases(schemaConfig.getUserName()).get(alias))
       .map(SchemaPath::parseFromString)
       .orElse(null);
   }
 
+  private void attemptToRegisterSchemas(StoragePlugin plugin) throws Exception 
{

Review Comment:
   Please rename the method to something like register with retry.



##########
exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java:
##########
@@ -91,22 +91,69 @@ private CalciteSchema getSchema(String schemaName, boolean 
caseSensitive) {
 
   public SchemaPath resolveTableAlias(String alias) {
     return Optional.ofNullable(aliasRegistryProvider.getTableAliasesRegistry()
-        .getUserAliases(schemaConfig.getUserName()).get(alias))
+      .getUserAliases(schemaConfig.getUserName()).get(alias))
       .map(SchemaPath::parseFromString)
       .orElse(null);
   }
 
+  private void attemptToRegisterSchemas(StoragePlugin plugin) throws Exception 
{
+    long maxAttempts = schemaConfig
+      .getOption(ExecConstants.STORAGE_PLUGIN_ACCESS_ATTEMPTS)
+      .num_val;
+    long attemptDelayMs = schemaConfig
+      .getOption(ExecConstants.STORAGE_PLUGIN_ATTEMPT_DELAY)
+      .num_val;
+    int attempt=0;
+    Exception lastAttemptEx = null;
+
+    while (attempt++ < maxAttempts) {

Review Comment:
   Could we either move retry-related logic to a utility method or use some 
library that does that, for example, https://github.com/failsafe-lib/failsafe?



##########
exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java:
##########
@@ -1094,6 +1095,35 @@ public static String bootDefaultFor(String name) {
       new OptionDescription("Enables recursive files listing when querying the 
`INFORMATION_SCHEMA.FILES` table or executing the SHOW FILES command. " +
         "Default is false. (Drill 1.15+)"));
 
+  public static final String STORAGE_PLUGIN_ACCESS_ATTEMPTS = 
"storage.plugin_access_attempts";
+  public static final PositiveLongValidator 
STORAGE_PLUGIN_ACCESS_ATTEMPTS_VALIDATOR = new PositiveLongValidator(
+    STORAGE_PLUGIN_ACCESS_ATTEMPTS,
+    10,
+    new OptionDescription(
+      "The maximum number of attempts that will be made to request metadata " +
+      "needed for query planning from a storage plugin."
+    )
+  );
+  public static final String STORAGE_PLUGIN_ATTEMPT_DELAY = 
"storage.plugin_access_attempt_delay";
+  public static final NonNegativeLongValidator 
STORAGE_PLUGIN_ATTEMPT_DELAY_VALIDATOR = new NonNegativeLongValidator(
+    STORAGE_PLUGIN_ATTEMPT_DELAY,
+    5*60*1000,
+    new OptionDescription(
+      "The delay in milliseconds between repeated attempts to request metadata 
" +

Review Comment:
   Please mention here and in the option above that they will be used only when 
`storage.plugin_auto_disable` is set to true.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java:
##########
@@ -130,11 +132,18 @@ private static PhysicalPlan convertPlan(QueryContext 
context, String sql, Pointe
       logger.trace("There was an error during conversion into physical plan. " 
+
           "Will sync remote and local function registries if needed and retry 
" +
           "in case if issue was due to missing function implementation.", e);
-      // it is prohibited to retry query planning for ANALYZE statement since 
it changes
-      // query-level option values and will fail when rerunning with updated 
values
-      if (context.getFunctionRegistry().syncWithRemoteRegistry(
-              context.getDrillOperatorTable().getFunctionRegistryVersion())
-        && context.getSQLStatementType() != SqlStatementType.ANALYZE) {
+
+      int funcRegVer = 
context.getDrillOperatorTable().getFunctionRegistryVersion();
+      // We do not retry conversion if the error is a UserException of type 
RESOURCE
+      boolean isResourceErr = e instanceof UserException && ((UserException) 
e).getErrorType() == RESOURCE;

Review Comment:
   I don't think that it wouldn't break the initial intention for retrying the 
query after updating the function registry. It is possible that some part of 
the code wraps exceptions caused by absent functions as `UserException` with 
`RESOURCE` type.



##########
exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java:
##########
@@ -122,7 +169,8 @@ private void loadSchemaFactory(String schemaName, boolean 
caseSensitive) {
         SchemaPlus firstLevelSchema = schemaPlus.getSubSchema(paths.get(0));
         if (firstLevelSchema == null) {
           // register schema for this storage plugin to 'this'.
-          plugin.registerSchemas(schemaConfig, schemaPlus);
+          attemptToRegisterSchemas(plugin);
+          //plugin.registerSchemas(schemaConfig, schemaPlus);

Review Comment:
   And here also.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java:
##########
@@ -34,12 +34,22 @@ public interface StoragePluginRegistry extends 
Iterable<Map.Entry<String, Storag
 
   @SuppressWarnings("serial")
   public static class PluginException extends Exception {
+
+    public final StoragePlugin plugin;

Review Comment:
   Is there any particular reason for having a plugin in the exception class?



##########
exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java:
##########
@@ -91,22 +91,69 @@ private CalciteSchema getSchema(String schemaName, boolean 
caseSensitive) {
 
   public SchemaPath resolveTableAlias(String alias) {
     return Optional.ofNullable(aliasRegistryProvider.getTableAliasesRegistry()
-        .getUserAliases(schemaConfig.getUserName()).get(alias))
+      .getUserAliases(schemaConfig.getUserName()).get(alias))
       .map(SchemaPath::parseFromString)
       .orElse(null);
   }
 
+  private void attemptToRegisterSchemas(StoragePlugin plugin) throws Exception 
{
+    long maxAttempts = schemaConfig
+      .getOption(ExecConstants.STORAGE_PLUGIN_ACCESS_ATTEMPTS)
+      .num_val;
+    long attemptDelayMs = schemaConfig
+      .getOption(ExecConstants.STORAGE_PLUGIN_ATTEMPT_DELAY)
+      .num_val;
+    int attempt=0;
+    Exception lastAttemptEx = null;
+
+    while (attempt++ < maxAttempts) {
+      try {
+        plugin.registerSchemas(schemaConfig, plus());
+        return;
+      } catch (Exception ex) {
+        lastAttemptEx = ex;
+        logger.warn(
+          "Attempt {} of {} to register schemas for plugin {} failed.",
+          attempt, maxAttempts, plugin,
+          ex
+        );
+
+        if (attempt < maxAttempts) {
+          logger.info(
+            "Next attempt to register schemas for plugin {} will be made in 
{}ms.",
+            plugin,
+            attemptDelayMs
+          );
+          try {
+            Thread.sleep(attemptDelayMs);
+          } catch (InterruptedException intEx) {
+            logger.warn(
+              "Interrupted while waiting to make another attempt to register " 
+
+              "chemas for plugin {}.",
+              plugin,
+              intEx
+            );
+          }
+        }
+      }
+    }
+
+    throw lastAttemptEx;
+  }
+
   /**
    * Loads schema factory(storage plugin) for specified {@code schemaName}
    * @param schemaName the name of the schema
    * @param caseSensitive whether matching for the schema name is case 
sensitive
    */
   private void loadSchemaFactory(String schemaName, boolean caseSensitive) {
+    StoragePlugin plugin = null;
     try {
       SchemaPlus schemaPlus = this.plus();
-      StoragePlugin plugin = storages.getPlugin(schemaName);
+      plugin = storages.getPlugin(schemaName);
       if (plugin != null) {
-        plugin.registerSchemas(schemaConfig, schemaPlus);
+        attemptToRegisterSchemas(plugin);
+        // plugin.registerSchemas(schemaConfig, schemaPlus);

Review Comment:
   Please delete the commented line.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java:
##########
@@ -52,12 +59,81 @@ public QueryContext getContext() {
     return context;
   }
 
+  private RuleSet attemptToGetRules(PlannerPhase phase, 
Collection<StoragePlugin> plugins) throws PluginException{

Review Comment:
   I don't think that we should add retry logic here. Plugins will be checked 
during previous stages, at least during query validation.



##########
exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java:
##########
@@ -1094,6 +1095,35 @@ public static String bootDefaultFor(String name) {
       new OptionDescription("Enables recursive files listing when querying the 
`INFORMATION_SCHEMA.FILES` table or executing the SHOW FILES command. " +
         "Default is false. (Drill 1.15+)"));
 
+  public static final String STORAGE_PLUGIN_ACCESS_ATTEMPTS = 
"storage.plugin_access_attempts";
+  public static final PositiveLongValidator 
STORAGE_PLUGIN_ACCESS_ATTEMPTS_VALIDATOR = new PositiveLongValidator(
+    STORAGE_PLUGIN_ACCESS_ATTEMPTS,
+    10,
+    new OptionDescription(
+      "The maximum number of attempts that will be made to request metadata " +
+      "needed for query planning from a storage plugin."
+    )
+  );
+  public static final String STORAGE_PLUGIN_ATTEMPT_DELAY = 
"storage.plugin_access_attempt_delay";
+  public static final NonNegativeLongValidator 
STORAGE_PLUGIN_ATTEMPT_DELAY_VALIDATOR = new NonNegativeLongValidator(
+    STORAGE_PLUGIN_ATTEMPT_DELAY,
+    5*60*1000,

Review Comment:
   5 minutes is too long a delay. Please set it to 3 or 5 seconds, since the 
user will be waiting for its execution. Also please format the code properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to