vvysotskyi commented on code in PR #2655: URL: https://github.com/apache/drill/pull/2655#discussion_r979578252
########## exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java: ########## @@ -91,22 +91,69 @@ private CalciteSchema getSchema(String schemaName, boolean caseSensitive) { public SchemaPath resolveTableAlias(String alias) { return Optional.ofNullable(aliasRegistryProvider.getTableAliasesRegistry() - .getUserAliases(schemaConfig.getUserName()).get(alias)) + .getUserAliases(schemaConfig.getUserName()).get(alias)) .map(SchemaPath::parseFromString) .orElse(null); } + private void attemptToRegisterSchemas(StoragePlugin plugin) throws Exception { Review Comment: Please rename the method to something like register with retry. ########## exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java: ########## @@ -91,22 +91,69 @@ private CalciteSchema getSchema(String schemaName, boolean caseSensitive) { public SchemaPath resolveTableAlias(String alias) { return Optional.ofNullable(aliasRegistryProvider.getTableAliasesRegistry() - .getUserAliases(schemaConfig.getUserName()).get(alias)) + .getUserAliases(schemaConfig.getUserName()).get(alias)) .map(SchemaPath::parseFromString) .orElse(null); } + private void attemptToRegisterSchemas(StoragePlugin plugin) throws Exception { + long maxAttempts = schemaConfig + .getOption(ExecConstants.STORAGE_PLUGIN_ACCESS_ATTEMPTS) + .num_val; + long attemptDelayMs = schemaConfig + .getOption(ExecConstants.STORAGE_PLUGIN_ATTEMPT_DELAY) + .num_val; + int attempt=0; + Exception lastAttemptEx = null; + + while (attempt++ < maxAttempts) { Review Comment: Could we either move retry-related logic to a utility method or use some library that does that, for example, https://github.com/failsafe-lib/failsafe? ########## exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java: ########## @@ -1094,6 +1095,35 @@ public static String bootDefaultFor(String name) { new OptionDescription("Enables recursive files listing when querying the `INFORMATION_SCHEMA.FILES` table or executing the SHOW FILES command. " + "Default is false. (Drill 1.15+)")); + public static final String STORAGE_PLUGIN_ACCESS_ATTEMPTS = "storage.plugin_access_attempts"; + public static final PositiveLongValidator STORAGE_PLUGIN_ACCESS_ATTEMPTS_VALIDATOR = new PositiveLongValidator( + STORAGE_PLUGIN_ACCESS_ATTEMPTS, + 10, + new OptionDescription( + "The maximum number of attempts that will be made to request metadata " + + "needed for query planning from a storage plugin." + ) + ); + public static final String STORAGE_PLUGIN_ATTEMPT_DELAY = "storage.plugin_access_attempt_delay"; + public static final NonNegativeLongValidator STORAGE_PLUGIN_ATTEMPT_DELAY_VALIDATOR = new NonNegativeLongValidator( + STORAGE_PLUGIN_ATTEMPT_DELAY, + 5*60*1000, + new OptionDescription( + "The delay in milliseconds between repeated attempts to request metadata " + Review Comment: Please mention here and in the option above that they will be used only when `storage.plugin_auto_disable` is set to true. ########## exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java: ########## @@ -130,11 +132,18 @@ private static PhysicalPlan convertPlan(QueryContext context, String sql, Pointe logger.trace("There was an error during conversion into physical plan. " + "Will sync remote and local function registries if needed and retry " + "in case if issue was due to missing function implementation.", e); - // it is prohibited to retry query planning for ANALYZE statement since it changes - // query-level option values and will fail when rerunning with updated values - if (context.getFunctionRegistry().syncWithRemoteRegistry( - context.getDrillOperatorTable().getFunctionRegistryVersion()) - && context.getSQLStatementType() != SqlStatementType.ANALYZE) { + + int funcRegVer = context.getDrillOperatorTable().getFunctionRegistryVersion(); + // We do not retry conversion if the error is a UserException of type RESOURCE + boolean isResourceErr = e instanceof UserException && ((UserException) e).getErrorType() == RESOURCE; Review Comment: I don't think that it wouldn't break the initial intention for retrying the query after updating the function registry. It is possible that some part of the code wraps exceptions caused by absent functions as `UserException` with `RESOURCE` type. ########## exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java: ########## @@ -122,7 +169,8 @@ private void loadSchemaFactory(String schemaName, boolean caseSensitive) { SchemaPlus firstLevelSchema = schemaPlus.getSubSchema(paths.get(0)); if (firstLevelSchema == null) { // register schema for this storage plugin to 'this'. - plugin.registerSchemas(schemaConfig, schemaPlus); + attemptToRegisterSchemas(plugin); + //plugin.registerSchemas(schemaConfig, schemaPlus); Review Comment: And here also. ########## exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java: ########## @@ -34,12 +34,22 @@ public interface StoragePluginRegistry extends Iterable<Map.Entry<String, Storag @SuppressWarnings("serial") public static class PluginException extends Exception { + + public final StoragePlugin plugin; Review Comment: Is there any particular reason for having a plugin in the exception class? ########## exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java: ########## @@ -91,22 +91,69 @@ private CalciteSchema getSchema(String schemaName, boolean caseSensitive) { public SchemaPath resolveTableAlias(String alias) { return Optional.ofNullable(aliasRegistryProvider.getTableAliasesRegistry() - .getUserAliases(schemaConfig.getUserName()).get(alias)) + .getUserAliases(schemaConfig.getUserName()).get(alias)) .map(SchemaPath::parseFromString) .orElse(null); } + private void attemptToRegisterSchemas(StoragePlugin plugin) throws Exception { + long maxAttempts = schemaConfig + .getOption(ExecConstants.STORAGE_PLUGIN_ACCESS_ATTEMPTS) + .num_val; + long attemptDelayMs = schemaConfig + .getOption(ExecConstants.STORAGE_PLUGIN_ATTEMPT_DELAY) + .num_val; + int attempt=0; + Exception lastAttemptEx = null; + + while (attempt++ < maxAttempts) { + try { + plugin.registerSchemas(schemaConfig, plus()); + return; + } catch (Exception ex) { + lastAttemptEx = ex; + logger.warn( + "Attempt {} of {} to register schemas for plugin {} failed.", + attempt, maxAttempts, plugin, + ex + ); + + if (attempt < maxAttempts) { + logger.info( + "Next attempt to register schemas for plugin {} will be made in {}ms.", + plugin, + attemptDelayMs + ); + try { + Thread.sleep(attemptDelayMs); + } catch (InterruptedException intEx) { + logger.warn( + "Interrupted while waiting to make another attempt to register " + + "chemas for plugin {}.", + plugin, + intEx + ); + } + } + } + } + + throw lastAttemptEx; + } + /** * Loads schema factory(storage plugin) for specified {@code schemaName} * @param schemaName the name of the schema * @param caseSensitive whether matching for the schema name is case sensitive */ private void loadSchemaFactory(String schemaName, boolean caseSensitive) { + StoragePlugin plugin = null; try { SchemaPlus schemaPlus = this.plus(); - StoragePlugin plugin = storages.getPlugin(schemaName); + plugin = storages.getPlugin(schemaName); if (plugin != null) { - plugin.registerSchemas(schemaConfig, schemaPlus); + attemptToRegisterSchemas(plugin); + // plugin.registerSchemas(schemaConfig, schemaPlus); Review Comment: Please delete the commented line. ########## exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java: ########## @@ -52,12 +59,81 @@ public QueryContext getContext() { return context; } + private RuleSet attemptToGetRules(PlannerPhase phase, Collection<StoragePlugin> plugins) throws PluginException{ Review Comment: I don't think that we should add retry logic here. Plugins will be checked during previous stages, at least during query validation. ########## exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java: ########## @@ -1094,6 +1095,35 @@ public static String bootDefaultFor(String name) { new OptionDescription("Enables recursive files listing when querying the `INFORMATION_SCHEMA.FILES` table or executing the SHOW FILES command. " + "Default is false. (Drill 1.15+)")); + public static final String STORAGE_PLUGIN_ACCESS_ATTEMPTS = "storage.plugin_access_attempts"; + public static final PositiveLongValidator STORAGE_PLUGIN_ACCESS_ATTEMPTS_VALIDATOR = new PositiveLongValidator( + STORAGE_PLUGIN_ACCESS_ATTEMPTS, + 10, + new OptionDescription( + "The maximum number of attempts that will be made to request metadata " + + "needed for query planning from a storage plugin." + ) + ); + public static final String STORAGE_PLUGIN_ATTEMPT_DELAY = "storage.plugin_access_attempt_delay"; + public static final NonNegativeLongValidator STORAGE_PLUGIN_ATTEMPT_DELAY_VALIDATOR = new NonNegativeLongValidator( + STORAGE_PLUGIN_ATTEMPT_DELAY, + 5*60*1000, Review Comment: 5 minutes is too long a delay. Please set it to 3 or 5 seconds, since the user will be waiting for its execution. Also please format the code properly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org