arina-ielchiieva commented on a change in pull request #1988: DRILL-7590:
Refactor plugin registry
URL: https://github.com/apache/drill/pull/1988#discussion_r384437532
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
##########
@@ -36,608 +30,616 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import javax.validation.constraints.NotNull;
+
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.drill.common.config.LogicalPlanPersistence;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.collections.ImmutableEntry;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.map.CaseInsensitiveMap;
-import org.apache.drill.common.scanner.ClassPathScanner;
-import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor;
-import org.apache.drill.common.scanner.persistence.ScanResult;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginHandle.PluginType;
import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.sys.CaseInsensitivePersistentStore;
-import org.apache.drill.exec.store.sys.PersistentStore;
-import org.apache.drill.exec.store.sys.PersistentStoreConfig;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
import org.apache.drill.shaded.guava.com.google.common.cache.RemovalListener;
-import org.apache.drill.shaded.guava.com.google.common.io.Resources;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Plugin registry. Caches plugin instances which correspond to configurations
+ * stored in persistent storage. Synchronizes the instances and storage.
+ * <p>
+ * Allows multiple "locators" to provide plugin classes such as the "classic"
+ * version for classes in the same class loader, the "system" version for
+ * system-defined plugins.
+ * <p>
+ * provides multiple layers of abstraction:
+ * <ul>
+ * <li>A plugin config/implementation pair (called a "connector" here)
+ * is located by</li>
+ * <li>A connector locator, which also provides bootstrap plugins and can
+ * create a plugin instance from a configuration, which are cached in</li>
+ * <li>The plugin cache, which holds stored, system and ad-hoc plugins. The
+ * stored plugins are backed by</li>
+ * <li>A persistent store: the file system for tests and embedded, ZK for
+ * a distibuted server, or</li>
+ * <li>An ephemeral cache for unnamed configs, such as those created by
+ * a table function.</li>
+ * </ul>
+ * <p>
+ * The idea is to push most functionality into the above abstractions,
+ * leaving overall coordination here.
+ * <p>
+ * Plugins themselves have multiple levels of definitions:
+ * <ul>
+ * <li>The config and plugin classes, provided by the locator.</li>
+ * <li>The {@link ConnectorHandle} which defines the config class and
+ * the locator which can create instances of that class.</li>
+ * <li>A config instance which is typically deserialized from JSON
+ * independent of the implementation class.</li>
+ * <li>A {@link PluginHandle} which pairs the config with a name as
+ * the unit that the user thinks of as a "plugin." The plugin entry
+ * links to the {@code ConnectorEntry} to create the instance lazily
+ * when first requested.</li>
+ * <li>The plugin class instance, which provides long-term state and
+ * which provides the logic for the plugin.</li>
+ * </ul>
+ *
+ * <h4>Concurrency</h4>
+ *
+ * Drill is a concurrent system; multiple users can attempt to add, remove
+ * and update plugin configurations at the same time. The only good
+ * solution would be to version the plugin configs. Instead, we rely on
+ * the fact that configs change infrequently.
+ * <p>
+ * The code syncs the in-memory cache with the persistent store on each
+ * access (which is actually inefficient and should be reviewed.)
+ * <p>
+ * During refresh, it could be that another thread is doing exactly
+ * the same thing, or even fighting us by changing the config. It is
+ * impossible to ensure a totally consistent answer. The goal is to
+ * make sure that the cache ends up agreeing with the persistent store
+ * as it was at some point in time.
+ * <p>
+ * The {@link PluginsMap} class provides in-memory synchronization of the
+ * name and config maps. Careful coding is needed when handling refresh
+ * since another thread could make the same changes.
+ * <p>
+ * Once the planner obtains a plugin, another user could come along and
+ * change the config for that plugin. Drill treats that change as another
+ * plugin: the original one continues to be used by the planner (but see
+ * below), while new queries use the new version.
+ * <p>
+ * Since the config on remote servers may have changed relative to the one
+ * this Foreman used for planning, the plan includes the plugin config
+ * itself (not just a reference to the config.) This works because the
+ * config is usually small.
+ *
+ * <h4>Ephemeral Plugins</h4>
+ *
+ * An ephemeral plugin handles table functions which create a temporary,
+ * unnamed configuration that is needed only for the execution of a
+ * single query, but which may be used across many threads. If the same
+ * table function is used multiple times, then the same ephemeral plugin
+ * will be used across queries. Ephemeral plugins are are based on the
+ * same connectors as stored plugins, but are not visible to the planner.
+ * They will expire after some time or number.
+ * <p>
+ * The ephemeral store also acts as a graveyard for deleted or changed
+ * plugins. When removing a plugin, the old plugin is moved to ephemeral
+ * storage to allow running queries to locate it. Similarly, when a
+ * new configuration is stored, the corresponding plugin is retrieved
+ * from ephemeral storage, if it exists. This avoids odd cases where
+ * the same plugin exists in both normal and ephemeral storage.
+ *
+ * <h4>Caveats</h4>
+ *
+ * The main problem with synchronization at present is that plugins
+ * provide a {@link close()} method that, if used, could render the
+ * plugin unusable. Suppose a Cassandra plugin, say, maintains a connection
+ * to a server used across multiple queries and threads. Any change to
+ * the config immediately calls {@code close()} on the plugin, even though
+ * it may be in use in planning a query on another thread. Random failures
+ * will result.
+ * <p>
+ * The same issue can affect ephemeral plugins: if the number in the cache
+ * reaches the limit, the registry will start closing old ones, without
+ * knowning if that plugin is actually in use.
+ * <p>
+ * The workaround is to not actually honor the {@code close()} call. Longer
+ * term, a reference count is needed.
+ *
+ * <h4>Error Handling</h4>
+ *
+ * Error handling needs review. Those problems that result from user actions
+ * should be raised as a {@code UserException}. Those that violate invariants
+ * as other forms of exception.
+ */
public class StoragePluginRegistryImpl implements StoragePluginRegistry {
+ private static final Logger logger =
LoggerFactory.getLogger(StoragePluginRegistryImpl.class);
- private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(StoragePluginRegistryImpl.class);
+ private final PluginRegistryContext context;
- private final StoragePluginMap enabledPlugins;
+ /**
+ * Cache of enabled, stored plugins, as well as system and ad-hoc
+ * plugins. Plugins live in the cache until Drillbit exit, or
+ * (except for system plugins) explicitly removed.
+ */
+ private final StoragePluginMap pluginCache;
private final DrillSchemaFactory schemaFactory;
- private final DrillbitContext context;
- private final LogicalPlanPersistence lpPersistence;
- private final ScanResult classpathScan;
- private final PersistentStore<StoragePluginConfig> pluginSystemTable;
- private final LoadingCache<StoragePluginConfig, StoragePlugin>
ephemeralPlugins;
+ private final StoragePluginStore pluginStore;
- private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins =
Collections.emptyMap();
- private Map<String, StoragePlugin> systemPlugins = Collections.emptyMap();
+ /**
+ * Cache of unnamed plugins typically resulting from table functions.
+ * Ephemeral plugins timeout after some time, or some max number of
+ * plugins.
+ */
+ private final LoadingCache<StoragePluginConfig, PluginHandle>
ephemeralPlugins;
+
+ /**
+ * Set of locators which provide connector implementations.
+ */
+ private final List<ConnectorLocator> locators = new ArrayList<>();
+
+ /**
+ * Map of config (as deserialized from the persistent store or UI)
+ * to the connector which can instantiate a connector for that config.
+ */
+ private final Map<Class<? extends StoragePluginConfig>, ConnectorHandle>
connectors =
+ new IdentityHashMap<>();
public StoragePluginRegistryImpl(DrillbitContext context) {
- this.enabledPlugins = new StoragePluginMap();
- this.schemaFactory = new DrillSchemaFactory(null);
- this.context = checkNotNull(context);
- this.lpPersistence = checkNotNull(context.getLpPersistence());
- this.classpathScan = checkNotNull(context.getClasspathScan());
- this.pluginSystemTable = initPluginsSystemTable(context, lpPersistence);
+ this.context = new DrillbitPluginRegistryContext(context);
+ this.pluginCache = new StoragePluginMap();
+ this.schemaFactory = new DrillSchemaFactory(null, this);
+ locators.add(new ClassicConnectorLocator(this.context));
+ locators.add(new SystemPluginLocator(this.context));
+ this.pluginStore = new StoragePluginStoreImpl(context);
this.ephemeralPlugins = CacheBuilder.newBuilder()
.expireAfterAccess(24, TimeUnit.HOURS)
.maximumSize(250)
.removalListener(
- (RemovalListener<StoragePluginConfig, StoragePlugin>) notification
-> closePlugin(notification.getValue()))
- .build(new CacheLoader<StoragePluginConfig, StoragePlugin>() {
+ (RemovalListener<StoragePluginConfig, PluginHandle>) notification
-> notification.getValue().close())
+ .build(new CacheLoader<StoragePluginConfig, PluginHandle>() {
@Override
- public StoragePlugin load(StoragePluginConfig config) throws
Exception {
- return create(null, config);
+ public PluginHandle load(StoragePluginConfig config) throws
Exception {
+ return createPluginEntry("$$ephemeral$$", config,
PluginType.EPHEMERAL);
}
});
}
@Override
public void init() {
- availablePlugins = findAvailablePlugins(classpathScan);
- systemPlugins = initSystemPlugins(classpathScan, context);
- try {
- StoragePlugins bootstrapPlugins = pluginSystemTable.getAll().hasNext() ?
null : loadBootstrapPlugins(lpPersistence);
-
- StoragePluginsHandler storagePluginsHandler = new
StoragePluginsHandlerService(context);
- storagePluginsHandler.loadPlugins(pluginSystemTable, bootstrapPlugins);
-
- defineEnabledPlugins();
- } catch (IOException e) {
- logger.error("Failure setting up storage enabledPlugins. Drillbit
exiting.", e);
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public void deletePlugin(String name) {
- StoragePlugin plugin = enabledPlugins.remove(name);
- closePlugin(plugin);
- pluginSystemTable.delete(name);
+ locators.stream().forEach(loc -> loc.init());
+ loadIntrinsicPlugins();
+ defineConnectors();
+ prepareStore();
}
- @Override
- public StoragePlugin createOrUpdate(String name, StoragePluginConfig config,
boolean persist) throws ExecutionSetupException {
- for (;;) {
- StoragePlugin oldPlugin = enabledPlugins.get(name);
- StoragePlugin newPlugin = create(name, config);
- boolean done = false;
- try {
- if (oldPlugin != null) {
- done = newPlugin == null
- ? enabledPlugins.remove(name, oldPlugin)
- : enabledPlugins.replace(name, oldPlugin, newPlugin);
- } else if (newPlugin != null) {
- done = (null == enabledPlugins.putIfAbsent(name, newPlugin));
- } else {
- done = true;
- }
- } finally {
- StoragePlugin pluginToClose = done ? oldPlugin : newPlugin;
- closePlugin(pluginToClose);
+ private void loadIntrinsicPlugins() {
+ for (ConnectorLocator locator : locators) {
+ Collection<StoragePlugin> intrinsicPlugins = locator.intrinsicPlugins();
+ if (intrinsicPlugins == null) {
+ continue;
}
-
- if (done) {
- if (persist) {
- pluginSystemTable.put(name, config);
- }
-
- return newPlugin;
+ for (StoragePlugin sysPlugin : intrinsicPlugins) {
+ ConnectorHandle connector = new ConnectorHandle(locator, sysPlugin);
+ defineConnector(connector);
+ pluginCache.put(new PluginHandle(sysPlugin, connector,
PluginType.INTRINSIC));
}
}
}
- @Override
- public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
- StoragePlugin plugin = enabledPlugins.get(name);
- if (systemPlugins.get(name) != null) {
- return plugin;
+ private void defineConnector(ConnectorHandle connector) {
+ ConnectorHandle prev = connectors.put(connector.configClass(), connector);
+ if (prev != null) {
+ String msg = String.format("Two connectors defined for the same config:
" +
+ "%s -> %s and %s -> %s",
+ connector.configClass().getName(),
connector.locator().getClass().getName(),
+ prev.configClass().getName(), prev.locator().getClass().getName());
+ logger.error(msg);
+ throw new IllegalStateException(msg);
}
+ }
- // since we lazily manage the list of plugins per server, we need to
update this once we know that it is time.
- StoragePluginConfig config = pluginSystemTable.get(name);
- if (config == null) {
- if (plugin != null) {
- enabledPlugins.remove(name);
+ private void defineConnectors() {
+ for (ConnectorLocator locator : locators) {
+ Set<Class<? extends StoragePluginConfig>> nonIntrinsicConfigs =
locator.configClasses();
+ if (nonIntrinsicConfigs == null) {
+ continue;
}
- return null;
- } else {
- if (plugin == null
- || !plugin.getConfig().equals(config)
- || plugin.getConfig().isEnabled() != config.isEnabled()) {
- plugin = createOrUpdate(name, config, false);
+ for (Class<? extends StoragePluginConfig> configClass :
nonIntrinsicConfigs) {
+ defineConnector(new ConnectorHandle(locator, configClass));
}
- return plugin;
}
}
- @Override
- public StoragePlugin getPlugin(StoragePluginConfig config) throws
ExecutionSetupException {
- if (config instanceof NamedStoragePluginConfig) {
- return getPlugin(((NamedStoragePluginConfig) config).getName());
+ private void prepareStore() {
+ if (pluginStore.isInitialized()) {
+ upgradeStore();
} else {
- // try to lookup plugin by configuration
- StoragePlugin plugin = enabledPlugins.get(config);
- if (plugin != null) {
- return plugin;
- }
-
- // no named plugin matches the desired configuration, let's create an
- // ephemeral storage plugin (or get one from the cache)
- try {
- return ephemeralPlugins.get(config);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof ExecutionSetupException) {
- throw (ExecutionSetupException) cause;
- } else {
- // this shouldn't happen. here for completeness.
- throw new ExecutionSetupException("Failure while trying to create
ephemeral plugin.", cause);
- }
- }
+ initStore();
}
+ loadEnabledPlugins();
}
- @Override
- public void addEnabledPlugin(String name, StoragePlugin plugin) {
- enabledPlugins.put(name, plugin);
- }
-
- @Override
- public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig,
FormatPluginConfig formatConfig) throws ExecutionSetupException {
- StoragePlugin storagePlugin = getPlugin(storageConfig);
- return storagePlugin.getFormatPlugin(formatConfig);
- }
-
- @Override
- public PersistentStore<StoragePluginConfig> getStore() {
- return pluginSystemTable;
- }
-
- @Override
- public SchemaFactory getSchemaFactory() {
- return schemaFactory;
- }
-
- @Override
- public Iterator<Entry<String, StoragePlugin>> iterator() {
- return enabledPlugins.iterator();
- }
-
- @Override
- public synchronized void close() throws Exception {
- ephemeralPlugins.invalidateAll();
- enabledPlugins.close();
- pluginSystemTable.close();
- }
-
- /**
- * Add a plugin and configuration. Assumes neither exists. Primarily for
testing.
- *
- * @param config plugin config
- * @param plugin plugin implementation
- */
- @VisibleForTesting
- public void addPluginToPersistentStoreIfAbsent(String name,
StoragePluginConfig config, StoragePlugin plugin) {
- addEnabledPlugin(name, plugin);
- pluginSystemTable.putIfAbsent(name, config);
- }
-
- /**
- * <ol>
- * <li>Initializes persistent store for storage plugins.</li>
- * <li>Since storage plugins names are case-insensitive in Drill, to
ensure backward compatibility,
- * re-writes those not stored in lower case with lower case names, for
duplicates issues warning. </li>
- * <li>Wraps plugin system table into case insensitive wrapper.</li>
- * </ol>
- *
- * @param context drillbit context
- * @param lpPersistence deserialization mapper provider
- * @return persistent store for storage plugins
- */
- private PersistentStore<StoragePluginConfig>
initPluginsSystemTable(DrillbitContext context, LogicalPlanPersistence
lpPersistence) {
-
+ private void initStore() {
+ StoragePlugins bootstrapPlugins = new StoragePlugins();
try {
- PersistentStore<StoragePluginConfig> pluginSystemTable = context
- .getStoreProvider()
- .getOrCreateStore(PersistentStoreConfig
- .newJacksonBuilder(lpPersistence.getMapper(),
StoragePluginConfig.class)
- .name(PSTORE_NAME)
- .build());
-
- Iterator<Entry<String, StoragePluginConfig>> storedPlugins =
pluginSystemTable.getAll();
- while (storedPlugins.hasNext()) {
- Entry<String, StoragePluginConfig> entry = storedPlugins.next();
- String pluginName = entry.getKey();
- if (!pluginName.equals(pluginName.toLowerCase())) {
- logger.debug("Replacing plugin name {} with its lower case
equivalent.", pluginName);
- pluginSystemTable.delete(pluginName);
- if (!pluginSystemTable.putIfAbsent(pluginName.toLowerCase(),
entry.getValue())) {
- logger.warn("Duplicated storage plugin name [{}] is found.
Duplicate is deleted from persistent storage.", pluginName);
- }
+ for (ConnectorLocator locator : locators) {
+ StoragePlugins locatorPlugins = locator.bootstrapPlugins();
+ if (locatorPlugins != null) {
+ bootstrapPlugins.putAll(locatorPlugins);
}
}
-
- return new CaseInsensitivePersistentStore<>(pluginSystemTable);
- } catch (StoreException e) {
- logger.error("Failure while loading storage plugin registry.", e);
- throw new DrillRuntimeException("Failure while reading and loading
storage plugin configuration.", e);
+ } catch (IOException e) {
+ String msg = "Failure initializing the plugin store. Drillbit exiting.";
+ logger.error(msg, e);
Review comment:
Same here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services