Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/666#discussion_r91227026 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java --- @@ -207,25 +239,165 @@ public SchemaPlus getDefaultSchema(SchemaPlus rootSchema) { return null; } - final SchemaPlus defaultSchema = SchemaUtilites.findSchema(rootSchema, defaultSchemaPath); - - if (defaultSchema == null) { - // If the current schema resolves to null, return root schema as the current default schema. - return defaultSchema; - } - - return defaultSchema; + return SchemaUtilites.findSchema(rootSchema, defaultSchemaPath); } public boolean setSessionOption(String name, String value) { return true; } + /** + * @return unique session identifier + */ + public String getUuid() { return uuid; } + + /** + * Adds temporary table to temporary tables cache. + * + * @param schema table schema + * @param tableName original table name + * @return generated temporary table name + */ + public String registerTemporaryTable(AbstractSchema schema, String tableName) { + return temporaryTablesCache.add(schema, tableName); + } + + /** + * Looks for temporary table in temporary tables cache by its name in specified schema. + * + * @param fullSchemaName table full schema name (example, dfs.tmp) + * @param tableName original table name + * @return temporary table name if found, null otherwise + */ + public String findTemporaryTable(String fullSchemaName, String tableName) { + return temporaryTablesCache.find(fullSchemaName, tableName); + } + + /** + * Before removing temporary table from temporary tables cache, + * checks if table exists physically on disk, if yes, removes it. + * + * @param fullSchemaName full table schema name (example, dfs.tmp) + * @param tableName original table name + * @return true if table was physically removed, false otherwise + */ + public boolean removeTemporaryTable(String fullSchemaName, String tableName) { + final AtomicBoolean result = new AtomicBoolean(); + temporaryTablesCache.remove(fullSchemaName, tableName, new BiConsumer<AbstractSchema, String>() { + @Override + public void accept(AbstractSchema schema, String temporaryTableName) { + if (schema.getTable(temporaryTableName) != null) { + schema.dropTable(temporaryTableName); + result.set(true); + } + } + }); + return result.get(); + } + private String getProp(String key) { return properties.get(key) != null ? properties.get(key) : ""; } private void setProp(String key, String value) { properties.put(key, value); } + + /** + * Temporary tables cache stores data by full schema name (schema and workspace separated by dot + * (example: dfs.tmp)) as key, and map of generated temporary tables names + * and its schemas represented by {@link AbstractSchema} as values. + * Schemas represented by {@link AbstractSchema} are used to drop temporary tables. + * Generated temporary tables consists of original table name and unique session id. + * Cache is represented by {@link ConcurrentMap} so if is thread-safe and can be used + * in multi-threaded environment. + * + * Temporary tables cache is used to find temporary table by its name and schema, + * to drop all existing temporary tables on session close + * or remove temporary table from cache on user demand. + */ + public static class TemporaryTablesCache { + + private final String uuid; + private final ConcurrentMap<String, ConcurrentMap<String, AbstractSchema>> temporaryTables; + + public TemporaryTablesCache(String uuid) { + this.uuid = uuid; + this.temporaryTables = Maps.newConcurrentMap(); + } + + /** + * Generates temporary table name using its original table name and unique session identifier. + * Caches generated table name and its schema in temporary table cache. + * + * @param schema table schema + * @param tableName original table name + * @return generated temporary table name + */ + public String add(AbstractSchema schema, String tableName) { + final String temporaryTableName = SqlHandlerUtil.generateTemporaryTableName(tableName, uuid); + final ConcurrentMap<String, AbstractSchema> newValues = Maps.newConcurrentMap(); + newValues.put(temporaryTableName, schema); + final ConcurrentMap<String, AbstractSchema> oldValues = temporaryTables.putIfAbsent(schema.getFullSchemaName(), newValues); + if (oldValues != null) { + oldValues.putAll(newValues); + } + return temporaryTableName; + } + + /** + * Looks for temporary table in temporary tables cache. + * + * @param fullSchemaName table schema name which is used as key in temporary tables cache + * @param tableName original table name + * @return generated temporary table name (original name with unique session id) if table is found, + * null otherwise + */ + public String find(String fullSchemaName, String tableName) { + final String temporaryTableName = SqlHandlerUtil.generateTemporaryTableName(tableName, uuid); + final ConcurrentMap<String, AbstractSchema> tables = temporaryTables.get(fullSchemaName); + if (tables == null || tables.get(temporaryTableName) == null) { + return null; + } + return temporaryTableName; + } + + /** + * Removes temporary table from temporary tables cache + * by temporary table original name and its schema. + * + * @param fullSchemaName table schema name which is used as key in temporary tables cache + * @param tableName original table name + * @param action action applied to temporary table and its schema before removing from cache + */ + public void remove(String fullSchemaName, String tableName, BiConsumer<AbstractSchema, String> action) { --- End diff -- Oh, clever -- now I see you added a Drill-specific version.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---