This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new d4a2aef DRILL-7168: Implement ALTER SCHEMA ADD / REMOVE commands d4a2aef is described below commit d4a2aefd530dae6bdb0a3f115bd8ef8643dbc364 Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> AuthorDate: Thu Aug 29 16:15:43 2019 +0300 DRILL-7168: Implement ALTER SCHEMA ADD / REMOVE commands --- exec/java-exec/src/main/codegen/data/Parser.tdd | 9 +- .../src/main/codegen/includes/parserImpls.ftl | 121 +++++- .../exec/planner/sql/handlers/SchemaHandler.java | 182 ++++++++- .../exec/planner/sql/handlers/SqlHandlerUtil.java | 22 ++ .../sql/parser/CompoundIdentifierConverter.java | 2 + .../drill/exec/planner/sql/parser/SqlSchema.java | 228 +++++++++-- .../metadata/schema/InlineSchemaProvider.java | 4 +- .../record/metadata/schema/PathSchemaProvider.java | 7 +- .../record/metadata/schema/SchemaProvider.java | 8 +- .../record/metadata/schema/StorageProperties.java | 71 ++++ .../java/org/apache/drill/TestSchemaCommands.java | 415 ++++++++++++++++++++- .../exec/record/metadata/TestTupleSchema.java | 64 ++++ .../record/metadata/schema/TestSchemaProvider.java | 23 +- .../record/metadata/AbstractColumnMetadata.java | 1 + .../exec/record/metadata/AbstractPropertied.java | 5 + .../drill/exec/record/metadata/Propertied.java | 8 +- .../drill/exec/record/metadata/TupleSchema.java | 1 + 17 files changed, 1098 insertions(+), 73 deletions(-) diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd b/exec/java-exec/src/main/codegen/data/Parser.tdd index 2da27c3..3480754 100644 --- a/exec/java-exec/src/main/codegen/data/Parser.tdd +++ b/exec/java-exec/src/main/codegen/data/Parser.tdd @@ -43,7 +43,8 @@ "ESTIMATE", "STATISTICS", "SAMPLE", - "COLUMNS" + "COLUMNS", + "REMOVE" ] # List of methods for parsing custom SQL statements. @@ -61,7 +62,8 @@ "SqlDropFunction()", "SqlAnalyzeTable()", "DrillSqlSetOption(Span.of(), null)", - "DrillSqlResetOption(Span.of(), null)" + "DrillSqlResetOption(Span.of(), null)", + "SqlAlterSchema()" ] # List of methods for parsing custom literals. @@ -859,7 +861,8 @@ "YEAR", # "YEARS", # not a keyword in Calcite "ZONE", - "COLUMNS" + "COLUMNS", + "REMOVE" ] # List of additional join types. Each is a method with no arguments. diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl index be30318..70e0c28 100644 --- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl +++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl @@ -314,12 +314,12 @@ SqlNode SqlCreateSchema(SqlParserPos pos, String createType) : token_source.SwitchTo(SCH); } ( - <LOAD> + <SCH_LOAD> { load = StringLiteral(); } | - <PAREN_STRING> + <SCH_PAREN_STRING> { schema = SqlLiteral.createCharString(token.image, getPos()); } @@ -373,13 +373,13 @@ void addProperty(SqlNodeList properties) : } <SCH> TOKEN : { - < LOAD: "LOAD" > { popState(); } - | < NUM: <DIGIT> (" " | "\t" | "\n" | "\r")* > + < SCH_LOAD: "LOAD" > { popState(); } + | < SCH_NUM: <DIGIT> (" " | "\t" | "\n" | "\r")* > // once schema is found, switch back to initial lexical state // must be enclosed in the parentheses // inside may have left parenthesis only if number precedes (covers cases with varchar(10)), // if left parenthesis is present in column name, it must be escaped with backslash - | < PAREN_STRING: <LPAREN> ((~[")"]) | (<NUM> ")") | ("\\)"))* <RPAREN> > { popState(); } + | < SCH_PAREN_STRING: <LPAREN> ((~[")"]) | (<SCH_NUM> ")") | ("\\)"))* <RPAREN> > { popState(); } } /** @@ -465,7 +465,7 @@ SqlNode SqlDropSchema(SqlParserPos pos) : /** * Parse refresh table metadata statement. - * REFRESH TABLE METADATA [COLUMNS ((field1, field2,..) | NONE)] tblname + * REFRESH TABLE METADATA [COLUMNS ((field1, field2,..) | NONE)] table_name */ SqlNode SqlRefreshMetadata() : { @@ -535,6 +535,110 @@ SqlNode SqlDescribeSchema() : } /** +* Parses ALTER SCHEMA statements: +* +* ALTER SCHEMA +* (FOR TABLE dfs.tmp.nation | PATH '/tmp/schema.json') +* ADD [OR REPLACE] +* [COLUMNS (col1 int, col2 varchar)] +* [PROPERTIES ('prop1'='val1', 'prop2'='val2')] +* +* ALTER SCHEMA +* (FOR TABLE dfs.tmp.nation | PATH '/tmp/schema.json') +* REMOVE +* [COLUMNS (`col1`, `col2`)] +* [PROPERTIES ('prop1', 'prop2')] +*/ +SqlNode SqlAlterSchema() : +{ + SqlParserPos pos; + SqlIdentifier table = null; + SqlNode path = null; +} +{ + <ALTER> { pos = getPos(); } + <SCHEMA> + ( + <FOR> <TABLE> { table = CompoundIdentifier(); } + | + <PATH> { path = StringLiteral(); } + ) + ( + <ADD> { return SqlAlterSchemaAdd(pos, table, path); } + | + <REMOVE> { return SqlAlterSchemaRemove(pos, table, path); } + ) +} + +SqlNode SqlAlterSchemaAdd(SqlParserPos pos, SqlIdentifier table, SqlNode path) : +{ + boolean replace = false; + SqlCharStringLiteral schema = null; + SqlNodeList properties = null; +} +{ + [ <OR> <REPLACE> { replace = true; } ] + [ <COLUMNS> { schema = ParseSchema(); } ] + [ + <PROPERTIES> <LPAREN> + { + properties = new SqlNodeList(getPos()); + addProperty(properties); + } + ( + <COMMA> { addProperty(properties); } + )* + <RPAREN> + ] + { + if (schema == null && properties == null) { + throw new ParseException("ALTER SCHEMA ADD command must have at least <COLUMNS> or <PROPERTIES> keyword indicated."); + } + return new SqlSchema.Add(pos, table, path, SqlLiteral.createBoolean(replace, getPos()), schema, properties); + } +} + +SqlCharStringLiteral ParseSchema() : +{} +{ + { + token_source.pushState(); + token_source.SwitchTo(SCH); + } + <SCH_PAREN_STRING> + { + return SqlLiteral.createCharString(token.image, getPos()); + } +} + +SqlNode SqlAlterSchemaRemove(SqlParserPos pos, SqlIdentifier table, SqlNode path) : +{ + SqlNodeList columns = null; + SqlNodeList properties = null; +} +{ + [ <COLUMNS> { columns = ParseRequiredFieldList("Schema"); } ] + [ + <PROPERTIES> <LPAREN> + { + properties = new SqlNodeList(getPos()); + properties.add(StringLiteral()); + } + ( + <COMMA> + { properties.add(StringLiteral()); } + )* + <RPAREN> + ] + { + if (columns == null && properties == null) { + throw new ParseException("ALTER SCHEMA REMOVE command must have at least <COLUMNS> or <PROPERTIES> keyword indicated."); + } + return new SqlSchema.Remove(pos, table, path, columns, properties); + } +} + +/** * Parse create UDF statement * CREATE FUNCTION USING JAR 'jar_name' */ @@ -592,9 +696,10 @@ Pair<SqlNodeList, SqlNodeList> ParenthesizedCompoundIdentifierList() : } } </#if> + /** * Parses a analyze statement. - * ANALYZE TABLE tblname {COMPUTE | ESTIMATE} | STATISTICS + * ANALYZE TABLE table_name {COMPUTE | ESTIMATE} | STATISTICS * [(column1, column2, ...)] [ SAMPLE numeric PERCENT ] */ SqlNode SqlAnalyzeTable() : @@ -695,5 +800,3 @@ DrillSqlResetOption DrillSqlResetOption(Span s, String scope) : return new DrillSqlResetOption(s.end(name), scope, name); } } - - diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java index cbc311d..3487234 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java @@ -31,10 +31,13 @@ import org.apache.drill.exec.planner.sql.parser.SqlCreateType; import org.apache.drill.exec.planner.sql.parser.SqlSchema; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.exec.record.metadata.schema.InlineSchemaProvider; import org.apache.drill.exec.record.metadata.schema.PathSchemaProvider; import org.apache.drill.exec.record.metadata.schema.SchemaContainer; import org.apache.drill.exec.record.metadata.schema.SchemaProvider; import org.apache.drill.exec.record.metadata.schema.SchemaProviderFactory; +import org.apache.drill.exec.record.metadata.schema.StorageProperties; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.StorageStrategy; import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory; @@ -43,20 +46,25 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; /** - * Parent class for CREATE / DROP / DESCRIBE SCHEMA handlers. + * Parent class for CREATE / DROP / DESCRIBE / ALTER SCHEMA handlers. * Contains common logic on how extract workspace, output error result. */ public abstract class SchemaHandler extends DefaultSqlHandler { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaHandler.class); + static final Logger logger = LoggerFactory.getLogger(SchemaHandler.class); SchemaHandler(SqlHandlerConfig config) { super(config); @@ -94,6 +102,17 @@ public abstract class SchemaHandler extends DefaultSqlHandler { } /** + * Provides storage strategy which will create schema file + * with same permission as used for persistent tables. + * + * @return storage strategy + */ + StorageStrategy getStorageStrategy() { + return new StorageStrategy(context.getOption( + ExecConstants.PERSISTENT_TABLE_UMASK).string_val, false); + } + + /** * CREATE SCHEMA command handler. */ public static class Create extends SchemaHandler { @@ -120,10 +139,13 @@ public abstract class SchemaHandler extends DefaultSqlHandler { } } - // schema file will be created with same permission as used for persistent tables - StorageStrategy storageStrategy = new StorageStrategy(context.getOption( - ExecConstants.PERSISTENT_TABLE_UMASK).string_val, false); - schemaProvider.store(schemaString, sqlCall.getProperties(), storageStrategy); + StorageProperties storageProperties = StorageProperties.builder() + .storageStrategy(getStorageStrategy()) + .overwrite(false) + .build(); + + schemaProvider.store(schemaString, sqlCall.getProperties(), storageProperties); + return DirectPlan.createDirectPlan(context, true, String.format("Created schema for [%s]", schemaSource)); } catch (IOException e) { throw UserException.resourceError(e) @@ -294,7 +316,155 @@ public abstract class SchemaHandler extends DefaultSqlHandler { this.schema = schema; } } + } + + /** + * ALTER SCHEMA ADD command handler. + */ + public static class Add extends SchemaHandler { + + public Add(SqlHandlerConfig config) { + super(config); + } + + @Override + public PhysicalPlan getPlan(SqlNode sqlNode) { + SqlSchema.Add addCall = ((SqlSchema.Add) sqlNode); + String schemaSource = addCall.hasTable() ? addCall.getTable().toString() : addCall.getPath(); + + try { + SchemaProvider schemaProvider = SchemaProviderFactory.create(addCall, this); + if (!schemaProvider.exists()) { + throw UserException.resourceError() + .message("Schema does not exist for [%s]", schemaSource) + .addContext("Command: ALTER SCHEMA ADD") + .build(logger); + } + + TupleMetadata currentSchema = schemaProvider.read().getSchema(); + TupleMetadata newSchema = new TupleSchema(); + + if (addCall.hasSchema()) { + InlineSchemaProvider inlineSchemaProvider = new InlineSchemaProvider(addCall.getSchema()); + TupleMetadata providedSchema = inlineSchemaProvider.read().getSchema(); + + if (addCall.isReplace()) { + Map<String, ColumnMetadata> columnsMap = Stream.concat(currentSchema.toMetadataList().stream(), providedSchema.toMetadataList().stream()) + .collect(Collectors.toMap( + ColumnMetadata::name, + Function.identity(), + (o, n) -> n, // replace existing columns + LinkedHashMap::new)); // preserve initial order of the columns + columnsMap.values().forEach(newSchema::addColumn); + } else { + Stream.concat(currentSchema.toMetadataList().stream(), providedSchema.toMetadataList().stream()) + .forEach(newSchema::addColumn); + } + } else { + currentSchema.toMetadataList().forEach(newSchema::addColumn); + } + + if (addCall.hasProperties()) { + if (addCall.isReplace()) { + newSchema.setProperties(currentSchema.properties()); + newSchema.setProperties(addCall.getProperties()); + } else { + Map<String, String> newProperties = Stream.concat(currentSchema.properties().entrySet().stream(), addCall.getProperties().entrySet().stream()) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue)); // no merge strategy is provided to fail on duplicate + newSchema.setProperties(newProperties); + } + } else { + newSchema.setProperties(currentSchema.properties()); + } + + String schemaString = newSchema.toMetadataList().stream() + .map(ColumnMetadata::columnString) + .collect(Collectors.joining(", ")); + + StorageProperties storageProperties = StorageProperties.builder() + .storageStrategy(getStorageStrategy()) + .overwrite() + .build(); + + schemaProvider.store(schemaString, newSchema.properties(), storageProperties); + return DirectPlan.createDirectPlan(context, true, String.format("Schema for [%s] was updated", schemaSource)); + + } catch (IOException e) { + throw UserException.resourceError(e) + .message("Error while accessing / modifying schema for [%s]: %s", schemaSource, e.getMessage()) + .build(logger); + } catch (IllegalArgumentException | IllegalStateException e) { + throw UserException.validationError(e) + .message(e.getMessage()) + .addContext("Error while preparing / creating schema for [%s]", schemaSource) + .build(logger); + } + } } + /** + * ALTER SCHEMA REMOVE command handler. + */ + public static class Remove extends SchemaHandler { + + public Remove(SqlHandlerConfig config) { + super(config); + } + + @Override + public PhysicalPlan getPlan(SqlNode sqlNode) { + SqlSchema.Remove removeCall = ((SqlSchema.Remove) sqlNode); + String schemaSource = removeCall.hasTable() ? removeCall.getTable().toString() : removeCall.getPath(); + + try { + SchemaProvider schemaProvider = SchemaProviderFactory.create(removeCall, this); + + if (!schemaProvider.exists()) { + throw UserException.resourceError() + .message("Schema does not exist for [%s]", schemaSource) + .addContext("Command: ALTER SCHEMA REMOVE") + .build(logger); + } + + TupleMetadata currentSchema = schemaProvider.read().getSchema(); + TupleMetadata newSchema = new TupleSchema(); + + List<String> columns = removeCall.getColumns(); + + currentSchema.toMetadataList().stream() + .filter(column -> columns == null || !columns.contains(column.name())) + .forEach(newSchema::addColumn); + + newSchema.setProperties(currentSchema.properties()); + if (removeCall.hasProperties()) { + removeCall.getProperties().forEach(newSchema::removeProperty); + } + + StorageProperties storageProperties = StorageProperties.builder() + .storageStrategy(getStorageStrategy()) + .overwrite() + .build(); + + String schemaString = newSchema.toMetadataList().stream() + .map(ColumnMetadata::columnString) + .collect(Collectors.joining(", ")); + + schemaProvider.store(schemaString, newSchema.properties(), storageProperties); + return DirectPlan.createDirectPlan(context, true, String.format("Schema for [%s] was updated", schemaSource)); + + } catch (IOException e) { + throw UserException.resourceError(e) + .message("Error while accessing / modifying schema for [%s]: %s", schemaSource, e.getMessage()) + .build(logger); + } catch (IllegalArgumentException e) { + throw UserException.validationError(e) + .message(e.getMessage()) + .addContext("Error while preparing / creating schema for [%s]", schemaSource) + .build(logger); + } + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java index 5981206..43d2f2a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java @@ -272,4 +272,26 @@ public class SqlHandlerUtil { } } + /** + * Unparses given {@link SqlNodeList} into key / values pairs: (k1 = v1, k2 = v2). + * + * @param writer sql writer + * @param leftPrec left precedence + * @param rightPrec right precedence + * @param list sql node list + */ + public static void unparseKeyValuePairs(SqlWriter writer, int leftPrec, int rightPrec, SqlNodeList list) { + writer.keyword("("); + + for (int i = 1; i < list.size(); i += 2) { + if (i != 1) { + writer.keyword(","); + } + list.get(i - 1).unparse(writer, leftPrec, rightPrec); + writer.keyword("="); + list.get(i).unparse(writer, leftPrec, rightPrec); + } + + writer.keyword(")"); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java index ac0d163..e9f586e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java @@ -83,6 +83,8 @@ public class CompoundIdentifierConverter extends SqlShuttle { .put(SqlSchema.Create.class, arrayOf(D, D, D, D, D, D)) .put(SqlSchema.Drop.class, arrayOf(D, D)) .put(SqlSchema.Describe.class, arrayOf(D, D)) + .put(SqlSchema.Add.class, arrayOf(D, D, D, D, D, D)) + .put(SqlSchema.Remove.class, arrayOf(D, D, D, D, D)) .build(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java index 81e8910..44a3adb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java @@ -32,6 +32,7 @@ import org.apache.calcite.sql.util.SqlBasicVisitor; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; import org.apache.drill.exec.planner.sql.handlers.SchemaHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerUtil; import org.apache.drill.exec.store.dfs.FileSelection; import java.util.Arrays; @@ -39,9 +40,10 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** - * Parent class for CREATE, DROP, DESCRIBE SCHEMA commands. + * Parent class for CREATE, DROP, DESCRIBE, ALTER SCHEMA commands. * Holds logic common command property: table, path. */ public abstract class SqlSchema extends DrillSqlCall { @@ -61,6 +63,11 @@ public abstract class SqlSchema extends DrillSqlCall { writer.keyword("FOR TABLE"); table.unparse(writer, leftPrec, rightPrec); } + + if (path != null) { + writer.keyword("PATH"); + path.unparse(writer, leftPrec, rightPrec); + } } public boolean hasTable() { @@ -90,6 +97,20 @@ public abstract class SqlSchema extends DrillSqlCall { return path == null ? null : path.accept(LiteralVisitor.INSTANCE); } + protected Map<String, String> getProperties(SqlNodeList properties) { + if (properties == null) { + return null; + } + + // preserve properties order + Map<String, String> map = new LinkedHashMap<>(); + for (int i = 1; i < properties.size(); i += 2) { + map.put(properties.get(i - 1).accept(LiteralVisitor.INSTANCE), + properties.get(i).accept(LiteralVisitor.INSTANCE)); + } + return map; + } + /** * Visits literal and returns bare value (i.e. single quotes). */ @@ -155,35 +176,21 @@ public abstract class SqlSchema extends DrillSqlCall { writer.keyword("REPLACE"); } - writer.keyword("SCHEMA"); - writer.literal(getSchema()); - - super.unparse(writer, leftPrec, rightPrec); + if (schema != null) { + writer.keyword("SCHEMA"); + writer.literal(getSchema()); + } if (load != null) { writer.keyword("LOAD"); load.unparse(writer, leftPrec, rightPrec); } - if (path != null) { - writer.keyword("PATH"); - path.unparse(writer, leftPrec, rightPrec); - } + super.unparse(writer, leftPrec, rightPrec); if (properties != null) { writer.keyword("PROPERTIES"); - writer.keyword("("); - - for (int i = 1; i < properties.size(); i += 2) { - if (i != 1) { - writer.keyword(","); - } - properties.get(i - 1).unparse(writer, leftPrec, rightPrec); - writer.keyword("="); - properties.get(i).unparse(writer, leftPrec, rightPrec); - } - - writer.keyword(")"); + SqlHandlerUtil.unparseKeyValuePairs(writer, leftPrec, rightPrec, properties); } } @@ -205,23 +212,12 @@ public abstract class SqlSchema extends DrillSqlCall { } public Map<String, String> getProperties() { - if (properties == null) { - return null; - } - - // preserve properties order - Map<String, String> map = new LinkedHashMap<>(); - for (int i = 1; i < properties.size(); i += 2) { - map.put(properties.get(i - 1).accept(LiteralVisitor.INSTANCE), - properties.get(i).accept(LiteralVisitor.INSTANCE)); - } - return map; + return getProperties(properties); } public SqlCreateType getSqlCreateType() { return SqlCreateType.valueOf(createType.toValue()); } - } /** @@ -274,7 +270,6 @@ public abstract class SqlSchema extends DrillSqlCall { public boolean ifExists() { return existenceCheck.booleanValue(); } - } /** @@ -339,4 +334,169 @@ public abstract class SqlSchema extends DrillSqlCall { } } + public static class Add extends SqlSchema { + + private final SqlLiteral replace; + private final SqlCharStringLiteral schema; + private final SqlNodeList properties; + + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("ALTER_SCHEMA_ADD", SqlKind.OTHER_DDL) { + @Override + public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) { + return new Add(pos, (SqlIdentifier) operands[0], operands[1], (SqlLiteral) operands[2], + (SqlCharStringLiteral) operands[3], (SqlNodeList) operands[4]); + } + }; + + public Add(SqlParserPos pos, + SqlIdentifier table, + SqlNode path, + SqlLiteral replace, + SqlCharStringLiteral schema, + SqlNodeList properties) { + super(pos, table, path); + this.replace = replace; + this.schema = schema; + this.properties = properties; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return Arrays.asList(table, path, replace, schema, properties); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("ALTER"); + writer.keyword("SCHEMA"); + writer.keyword("ADD"); + + if (replace.booleanValue()) { + writer.keyword("OR"); + writer.keyword("REPLACE"); + } + + super.unparse(writer, leftPrec, rightPrec); + + if (schema != null) { + writer.keyword("COLUMNS"); + writer.literal(getSchema()); + } + + if (properties != null) { + writer.keyword("PROPERTIES"); + SqlHandlerUtil.unparseKeyValuePairs(writer, leftPrec, rightPrec, properties); + } + } + + @Override + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new SchemaHandler.Add(config); + } + + public boolean isReplace() { + return replace.booleanValue(); + } + + public boolean hasSchema() { + return schema != null; + } + + public String getSchema() { + return hasSchema() ? schema.toValue() : null; + } + + public boolean hasProperties() { + return properties != null; + } + + public Map<String, String> getProperties() { + return getProperties(properties); + } + } + + public static class Remove extends SqlSchema { + + private final SqlNodeList columns; + private final SqlNodeList properties; + + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("ALTER_SCHEMA_REMOVE", SqlKind.OTHER_DDL) { + @Override + public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) { + return new Remove(pos, (SqlIdentifier) operands[0], operands[1], + (SqlNodeList) operands[2], (SqlNodeList) operands[3]); + } + }; + + public Remove(SqlParserPos pos, + SqlIdentifier table, + SqlNode path, + SqlNodeList columns, + SqlNodeList properties) { + super(pos, table, path); + this.columns = columns; + this.properties = properties; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return Arrays.asList(table, path, columns, properties); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("ALTER"); + writer.keyword("SCHEMA"); + writer.keyword("REMOVE"); + + super.unparse(writer, leftPrec, rightPrec); + + if (columns != null) { + writer.keyword("COLUMNS"); + SqlHandlerUtil.unparseSqlNodeList(writer, leftPrec, rightPrec, columns); + } + + if (properties != null) { + writer.keyword("PROPERTIES"); + SqlHandlerUtil.unparseSqlNodeList(writer, leftPrec, rightPrec, properties); + } + } + + @Override + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new SchemaHandler.Remove(config); + } + + public List<String> getColumns() { + if (columns == null) { + return null; + } + return columns.getList().stream() + .map(SqlNode::toString) + .collect(Collectors.toList()); + } + + public boolean hasProperties() { + return properties != null; + } + + public List<String> getProperties() { + if (properties == null) { + return null; + } + return properties.getList().stream() + .map(property -> property.accept(LiteralVisitor.INSTANCE)) + .collect(Collectors.toList()); + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java index a24f5a2..8678028 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.record.metadata.schema; -import org.apache.drill.exec.store.StorageStrategy; - import java.io.IOException; import java.util.Map; @@ -39,7 +37,7 @@ public class InlineSchemaProvider implements SchemaProvider { } @Override - public void store(String schema, Map<String, String> properties, StorageStrategy storageStrategy) { + public void store(String schema, Map<String, String> properties, StorageProperties storageProperties) { throw new UnsupportedOperationException("Schema storage is not supported"); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java index 8a8933a..fe2d920 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; -import org.apache.drill.exec.store.StorageStrategy; import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -103,13 +102,13 @@ public class PathSchemaProvider implements SchemaProvider { } @Override - public void store(String schema, Map<String, String> properties, StorageStrategy storageStrategy) throws IOException { + public void store(String schema, Map<String, String> properties, StorageProperties storageProperties) throws IOException { SchemaContainer tableSchema = createTableSchema(schema, properties); - try (OutputStream stream = fs.create(path, false)) { + try (OutputStream stream = fs.create(path, storageProperties.isOverwrite())) { WRITER.writeValue(stream, tableSchema); } - storageStrategy.applyToFile(fs, path); + storageProperties.getStorageStrategy().applyToFile(fs, path); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProvider.java index 343e0ed..66b5c56 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProvider.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.record.metadata.schema; -import org.apache.drill.exec.store.StorageStrategy; - import java.io.IOException; import java.util.Map; @@ -40,13 +38,12 @@ public interface SchemaProvider { /** * Stores given schema definition and properties. - * If schema is stored in a file, will apply certain permission using {@link StorageStrategy}. * * @param schema schema definition * @param properties map of properties - * @param storageStrategy storage strategy + * @param storageProperties storage properties */ - void store(String schema, Map<String, String> properties, StorageStrategy storageStrategy) throws IOException; + void store(String schema, Map<String, String> properties, StorageProperties storageProperties) throws IOException; /** * Reads schema into {@link SchemaContainer}. Depending on implementation, can read from a file @@ -62,5 +59,4 @@ public interface SchemaProvider { * @return true if schema exists, false otherwise */ boolean exists() throws IOException; - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/StorageProperties.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/StorageProperties.java new file mode 100644 index 0000000..aa8b4d6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/StorageProperties.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record.metadata.schema; + +import org.apache.drill.exec.store.StorageStrategy; + +/** + * Holds storage properties used when writing schema container. + */ +public class StorageProperties { + + private final StorageStrategy storageStrategy; + private final boolean overwrite; + + private StorageProperties(Builder builder) { + this.storageStrategy = builder.storageStrategy; + this.overwrite = builder.overwrite; + } + + public static Builder builder() { + return new Builder(); + } + + public StorageStrategy getStorageStrategy() { + return storageStrategy; + } + + public boolean isOverwrite() { + return overwrite; + } + + public static class Builder { + + private StorageStrategy storageStrategy = StorageStrategy.DEFAULT; + private boolean overwrite; + + public Builder storageStrategy(StorageStrategy storageStrategy) { + this.storageStrategy = storageStrategy; + return this; + } + + public Builder overwrite() { + this.overwrite = true; + return this; + } + + public Builder overwrite(boolean overwrite) { + this.overwrite = overwrite; + return this; + } + + public StorageProperties build() { + return new StorageProperties(this); + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java index f4965ca..c83850e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java @@ -19,6 +19,7 @@ package org.apache.drill; import org.apache.commons.io.FileUtils; import org.apache.drill.categories.SqlTest; +import org.apache.drill.categories.UnlikelyTest; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.common.types.TypeProtos; @@ -42,6 +43,7 @@ import java.io.File; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -51,7 +53,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -@Category(SqlTest.class) +@Category({SqlTest.class, UnlikelyTest.class}) public class TestSchemaCommands extends ClusterTest { @Rule @@ -751,4 +753,415 @@ public class TestSchemaCommands extends ClusterTest { run("drop table if exists %s", table); } } + + @Test + public void testAlterAddAbsentKeywords() throws Exception { + thrown.expect(UserException.class); + thrown.expectMessage("PARSE ERROR"); + run("alter schema for table abc add"); + } + + @Test + public void testAlterAddAbsentSchemaForTable() throws Exception { + String tableName = "table_alter_schema_add_absent_schema"; + String table = String.format("dfs.tmp.%s", tableName); + try { + run("create table %s as select 'a' as c from (values(1))", table); + + thrown.expect(UserException.class); + thrown.expectMessage("RESOURCE ERROR: Schema does not exist"); + + run("alter schema for table %s add columns (col int)", table); + } finally { + run("drop table if exists %s", table); + } + } + + @Test + public void testAlterAddAbsentSchemaPath() throws Exception { + thrown.expect(UserException.class); + thrown.expectMessage("RESOURCE ERROR: Schema does not exist"); + + run("alter schema path '%s' add columns (col int)", + new File(dirTestWatcher.getTmpDir(), "absent.schema").getPath()); + } + + @Test + public void testAlterAddDuplicateColumn() throws Exception { + File tmpDir = dirTestWatcher.getTmpDir(); + File schemaFile = new File(tmpDir, "schema_for_duplicate_column.schema"); + assertFalse(schemaFile.exists()); + try { + run("create schema (col int) path '%s'", schemaFile.getPath()); + + thrown.expect(UserException.class); + thrown.expectMessage("VALIDATION ERROR"); + + run("alter schema path '%s' add columns (col varchar)", schemaFile.getPath()); + + } finally { + FileUtils.deleteQuietly(schemaFile); + } + } + + @Test + public void testAlterAddDuplicateProperty() throws Exception { + File tmpDir = dirTestWatcher.getTmpDir(); + File schemaFile = new File(tmpDir, "schema_for_duplicate_property.schema"); + assertFalse(schemaFile.exists()); + try { + run("create schema (col int) path '%s' properties ('prop' = 'a')", schemaFile.getPath()); + + thrown.expect(UserException.class); + thrown.expectMessage("VALIDATION ERROR"); + + run("alter schema path '%s' add properties ('prop' = 'b')", schemaFile.getPath()); + + } finally { + FileUtils.deleteQuietly(schemaFile); + } + } + + @Test + public void testAlterAddColumns() throws Exception { + File tmpDir = dirTestWatcher.getTmpDir(); + File schemaFile = new File(tmpDir, "alter_schema_add_columns.schema"); + assertFalse(schemaFile.exists()); + try { + run("create schema (col1 int) path '%s' properties ('prop1' = 'a')", schemaFile.getPath()); + + testBuilder() + .sqlQuery("alter schema path '%s' add " + + "columns (col2 varchar) ", schemaFile.getPath()) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(true, String.format("Schema for [%s] was updated", schemaFile.getPath())) + .go(); + + SchemaProvider schemaProvider = new PathSchemaProvider(new Path(schemaFile.getPath())); + TupleMetadata schema = schemaProvider.read().getSchema(); + + assertEquals(2, schema.size()); + + assertEquals("col1", schema.fullName(0)); + assertEquals("col2", schema.fullName(1)); + + assertEquals(1, schema.properties().size()); + + } finally { + FileUtils.deleteQuietly(schemaFile); + } + } + + @Test + public void testAlterAddProperties() throws Exception { + File tmpDir = dirTestWatcher.getTmpDir(); + File schemaFile = new File(tmpDir, "alter_schema_add_properties.schema"); + assertFalse(schemaFile.exists()); + try { + run("create schema (col1 int) path '%s' properties ('prop1' = 'a')", schemaFile.getPath()); + + testBuilder() + .sqlQuery("alter schema path '%s' add " + + "properties ('prop2' = 'b', 'prop3' = 'c')", schemaFile.getPath()) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(true, String.format("Schema for [%s] was updated", schemaFile.getPath())) + .go(); + + SchemaProvider schemaProvider = new PathSchemaProvider(new Path(schemaFile.getPath())); + TupleMetadata schema = schemaProvider.read().getSchema(); + + assertEquals(1, schema.size()); + + Map<String, String> expectedProperties = new HashMap<>(); + expectedProperties.put("prop1", "a"); + expectedProperties.put("prop2", "b"); + expectedProperties.put("prop3", "c"); + + assertEquals(expectedProperties, schema.properties()); + + } finally { + FileUtils.deleteQuietly(schemaFile); + } + } + + @Test + public void testAlterAddSuccess() throws Exception { + File tmpDir = dirTestWatcher.getTmpDir(); + File schemaFile = new File(tmpDir, "alter_schema_add_success.schema"); + assertFalse(schemaFile.exists()); + try { + run("create schema (col1 int) path '%s' properties ('prop1' = 'a')", schemaFile.getPath()); + + testBuilder() + .sqlQuery("alter schema path '%s' add " + + "columns (col2 varchar, col3 boolean) " + + "properties ('prop2' = 'b', 'prop3' = 'c')", schemaFile.getPath()) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(true, String.format("Schema for [%s] was updated", schemaFile.getPath())) + .go(); + + SchemaProvider schemaProvider = new PathSchemaProvider(new Path(schemaFile.getPath())); + TupleMetadata schema = schemaProvider.read().getSchema(); + + assertEquals(3, schema.size()); + + assertEquals("col1", schema.fullName(0)); + assertEquals("col2", schema.fullName(1)); + assertEquals("col3", schema.fullName(2)); + + Map<String, String> expectedProperties = new HashMap<>(); + expectedProperties.put("prop1", "a"); + expectedProperties.put("prop2", "b"); + expectedProperties.put("prop3", "c"); + + assertEquals(expectedProperties, schema.properties()); + + } finally { + FileUtils.deleteQuietly(schemaFile); + } + } + + @Test + public void testAlterAddForTable() throws Exception { + String tableName = "table_for_alter_add"; + String table = String.format("dfs.tmp.%s", tableName); + try { + run("create table %s as select 'a' as c from (values(1))", table); + + File schemaPath = Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), + tableName, SchemaProvider.DEFAULT_SCHEMA_NAME).toFile(); + + run("create schema (col int) for table %s properties ('prop1' = 'a')", table); + + testBuilder() + .sqlQuery("alter schema for table %s add or replace " + + "columns (col2 varchar, col3 boolean) " + + "properties ('prop2' = 'd', 'prop3' = 'c')", table) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(true, String.format("Schema for [%s] was updated", table)) + .go(); + + SchemaProvider schemaProvider = new PathSchemaProvider(new Path(schemaPath.getPath())); + assertTrue(schemaProvider.exists()); + + TupleMetadata schema = schemaProvider.read().getSchema(); + assertEquals(3, schema.size()); + assertEquals(3, schema.properties().size()); + + } finally { + run("drop table if exists %s", table); + } + } + + @Test + public void testAlterReplace() throws Exception { + File tmpDir = dirTestWatcher.getTmpDir(); + File schemaFile = new File(tmpDir, "alter_schema_replace_success.schema"); + assertFalse(schemaFile.exists()); + try { + run("create schema (col1 int, col2 int) path '%s' " + + "properties ('prop1' = 'a', 'prop2' = 'b')", schemaFile.getPath()); + + testBuilder() + .sqlQuery("alter schema path '%s' add or replace " + + "columns (col2 varchar, col3 boolean) " + + "properties ('prop2' = 'd', 'prop3' = 'c')", schemaFile.getPath()) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(true, String.format("Schema for [%s] was updated", schemaFile.getPath())) + .go(); + + SchemaProvider schemaProvider = new PathSchemaProvider(new Path(schemaFile.getPath())); + TupleMetadata schema = schemaProvider.read().getSchema(); + + assertEquals(3, schema.size()); + + assertEquals("col1", schema.fullName(0)); + assertEquals("col2", schema.fullName(1)); + assertEquals(TypeProtos.MinorType.VARCHAR, schema.metadata("col2").type()); + assertEquals("col3", schema.fullName(2)); + + Map<String, String> expectedProperties = new HashMap<>(); + expectedProperties.put("prop1", "a"); + expectedProperties.put("prop2", "d"); + expectedProperties.put("prop3", "c"); + + assertEquals(expectedProperties, schema.properties()); + + } finally { + FileUtils.deleteQuietly(schemaFile); + } + } + + @Test + public void testAlterRemoveAbsentKeywords() throws Exception { + thrown.expect(UserException.class); + thrown.expectMessage("PARSE ERROR"); + run("alter schema for table abc remove"); + } + + @Test + public void testAlterRemoveAbsentSchemaForTable() throws Exception { + String tableName = "table_alter_schema_remove_absent_schema"; + String table = String.format("dfs.tmp.%s", tableName); + try { + run("create table %s as select 'a' as c from (values(1))", table); + + thrown.expect(UserException.class); + thrown.expectMessage("RESOURCE ERROR: Schema does not exist"); + + run("alter schema for table %s remove columns (col)", table); + } finally { + run("drop table if exists %s", table); + } + } + + @Test + public void testAlterRemoveAbsentSchemaPath() throws Exception { + thrown.expect(UserException.class); + thrown.expectMessage("RESOURCE ERROR: Schema does not exist"); + + run("alter schema path '%s' remove columns (col)", + new File(dirTestWatcher.getTmpDir(), "absent.schema").getPath()); + } + + @Test + public void testAlterRemoveColumns() throws Exception { + File tmpDir = dirTestWatcher.getTmpDir(); + File schemaFile = new File(tmpDir, "alter_schema_remove_columns.schema"); + assertFalse(schemaFile.exists()); + try { + run("create schema (col1 int, col2 varchar, col3 boolean, col4 int) path '%s' " + + "properties ('prop1' = 'a', 'prop2' = 'b', 'prop3' = 'c', 'prop4' = 'd')", schemaFile.getPath()); + + testBuilder() + .sqlQuery("alter schema path '%s' remove " + + "columns (col2, col4) ", schemaFile.getPath()) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(true, String.format("Schema for [%s] was updated", schemaFile.getPath())) + .go(); + + SchemaProvider schemaProvider = new PathSchemaProvider(new Path(schemaFile.getPath())); + TupleMetadata schema = schemaProvider.read().getSchema(); + + assertEquals(2, schema.size()); + + assertEquals("col1", schema.fullName(0)); + assertEquals("col3", schema.fullName(1)); + + assertEquals(4, schema.properties().size()); + + } finally { + FileUtils.deleteQuietly(schemaFile); + } + } + + @Test + public void testAlterRemoveProperties() throws Exception { + File tmpDir = dirTestWatcher.getTmpDir(); + File schemaFile = new File(tmpDir, "alter_schema_remove_success.schema"); + assertFalse(schemaFile.exists()); + try { + run("create schema (col1 int, col2 varchar, col3 boolean, col4 int) path '%s' " + + "properties ('prop1' = 'a', 'prop2' = 'b', 'prop3' = 'c', 'prop4' = 'd')", schemaFile.getPath()); + + testBuilder() + .sqlQuery("alter schema path '%s' remove " + + "properties ('prop2', 'prop4')", schemaFile.getPath()) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(true, String.format("Schema for [%s] was updated", schemaFile.getPath())) + .go(); + + SchemaProvider schemaProvider = new PathSchemaProvider(new Path(schemaFile.getPath())); + TupleMetadata schema = schemaProvider.read().getSchema(); + + assertEquals(4, schema.size()); + + Map<String, String> expectedProperties = new HashMap<>(); + expectedProperties.put("prop1", "a"); + expectedProperties.put("prop3", "c"); + + assertEquals(expectedProperties, schema.properties()); + + } finally { + FileUtils.deleteQuietly(schemaFile); + } + } + + @Test + public void testAlterRemoveSuccess() throws Exception { + File tmpDir = dirTestWatcher.getTmpDir(); + File schemaFile = new File(tmpDir, "alter_schema_remove_success.schema"); + assertFalse(schemaFile.exists()); + try { + run("create schema (col1 int, col2 varchar, col3 boolean, col4 int) path '%s' " + + "properties ('prop1' = 'a', 'prop2' = 'b', 'prop3' = 'c', 'prop4' = 'd')", schemaFile.getPath()); + + testBuilder() + .sqlQuery("alter schema path '%s' remove " + + "columns (col2, col4) " + + "properties ('prop2', 'prop4')", schemaFile.getPath()) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(true, String.format("Schema for [%s] was updated", schemaFile.getPath())) + .go(); + + SchemaProvider schemaProvider = new PathSchemaProvider(new Path(schemaFile.getPath())); + TupleMetadata schema = schemaProvider.read().getSchema(); + + assertEquals(2, schema.size()); + + assertEquals("col1", schema.fullName(0)); + assertEquals("col3", schema.fullName(1)); + + Map<String, String> expectedProperties = new HashMap<>(); + expectedProperties.put("prop1", "a"); + expectedProperties.put("prop3", "c"); + + assertEquals(expectedProperties, schema.properties()); + + } finally { + FileUtils.deleteQuietly(schemaFile); + } + } + + @Test + public void testAlterRemoveForTable() throws Exception { + String tableName = "table_for_alter_add"; + String table = String.format("dfs.tmp.%s", tableName); + try { + run("create table %s as select 'a' as c from (values(1))", table); + + File schemaPath = Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), + tableName, SchemaProvider.DEFAULT_SCHEMA_NAME).toFile(); + + run("create schema (col1 int, col2 varchar, col3 boolean, col4 int) for table %s " + + "properties ('prop1' = 'a', 'prop2' = 'b', 'prop3' = 'c', 'prop4' = 'd')", table); + + testBuilder() + .sqlQuery("alter schema for table %s remove " + + "columns (col2, col4) " + + "properties ('prop2', 'prop4')", table) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(true, String.format("Schema for [%s] was updated", table)) + .go(); + + SchemaProvider schemaProvider = new PathSchemaProvider(new Path(schemaPath.getPath())); + assertTrue(schemaProvider.exists()); + + TupleMetadata schema = schemaProvider.read().getSchema(); + assertEquals(2, schema.size()); + assertEquals(2, schema.properties().size()); + + } finally { + run("drop table if exists %s", table); + } + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java index 47dffac..6502d83 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java @@ -869,4 +869,68 @@ public class TestTupleSchema extends SubOperatorTest { assertNull(TupleMetadata.of("")); assertNull(TupleMetadata.of(" ")); } + + @Test + public void testCopy() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.BIGINT) + .build(); + + schema.setIntProperty("int_prop", 1); + schema.setProperty("string_prop", "A"); + + TupleMetadata copy = schema.copy(); + + assertTrue(schema.isEquivalent(copy)); + assertEquals(schema.properties(), copy.properties()); + } + + @Test + public void testAddNewColumn() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.BIGINT) + .build(); + + int index = schema.addColumn( + MetadataUtils.newScalar("b", + MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.OPTIONAL).build())); + + assertEquals(1, index); + assertEquals(2, schema.size()); + } + + @Test + public void testAddNewProperty() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.BIGINT) + .build(); + + assertTrue(schema.properties().isEmpty()); + + schema.setIntProperty("int_prop", 1); + schema.setProperty("string_prop", "A"); + + assertEquals(2, schema.properties().size()); + } + + @Test + public void testRemoveProperty() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.BIGINT) + .build(); + + schema.setIntProperty("int_prop", 1); + schema.setProperty("string_prop", "A"); + assertEquals(2, schema.properties().size()); + + schema.removeProperty("int_prop"); + assertEquals(1, schema.properties().size()); + assertNull(schema.property("int_prop")); + assertEquals("A", schema.property("string_prop")); + + schema.removeProperty("missing_prop"); + assertEquals(1, schema.properties().size()); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java index 03dcc3c..c07610d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.record.metadata.schema; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.TupleMetadata; -import org.apache.drill.exec.store.StorageStrategy; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -69,7 +68,7 @@ public class TestSchemaProvider { SchemaProvider provider = new InlineSchemaProvider("(i int)"); thrown.expect(UnsupportedOperationException.class); thrown.expectMessage("Schema storage is not supported"); - provider.store("i int", null, StorageStrategy.DEFAULT); + provider.store("i int", null, StorageProperties.builder().build()); } @Test @@ -142,7 +141,7 @@ public class TestSchemaProvider { properties.put("k2", "v2"); assertFalse(provider.exists()); - provider.store("i int, v varchar(10), s struct<s1 int, s2 varchar>", properties, StorageStrategy.DEFAULT); + provider.store("i int, v varchar(10), s struct<s1 int, s2 varchar>", properties, StorageProperties.builder().build()); assertTrue(provider.exists()); String expectedContent = "{\n" @@ -186,7 +185,23 @@ public class TestSchemaProvider { thrown.expect(IOException.class); thrown.expectMessage("File already exists"); - provider.store("i int", null, StorageStrategy.DEFAULT); + provider.store("i int", null, StorageProperties.builder().build()); + } + + @Test + public void testPathProviderStoreInExistingFileOverwrite() throws Exception { + File schemaFile = folder.newFile("schema"); + org.apache.hadoop.fs.Path schema = new org.apache.hadoop.fs.Path(schemaFile.getPath()); + SchemaProvider provider = new PathSchemaProvider(schema); + assertTrue(provider.exists()); + + StorageProperties storageProperties = StorageProperties.builder() + .overwrite() + .build(); + provider.store("i int", null, storageProperties); + + TupleMetadata metadata = provider.read().getSchema(); + assertEquals(1, metadata.size()); } @Test diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java index 69926c4..6f7fadc 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java @@ -104,6 +104,7 @@ public abstract class AbstractColumnMetadata extends AbstractPropertied implemen mode = from.mode; precision = from.precision; scale = from.scale; + setProperties(from.properties()); } @Override diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java index b86b548..3e10a1f 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java @@ -111,4 +111,9 @@ public class AbstractPropertied implements Propertied { public void setIntProperty(String key, int value) { setProperty(key, Integer.toString(value)); } + + @Override + public void removeProperty(String key) { + setProperty(key, null); + } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java index 1597ab1..5ccbc62 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java @@ -28,9 +28,9 @@ public interface Propertied { /** * Base name for properties which Drill itself defines. Provides a * separate "name space" from user-defined properties which should - * have some other perfix. + * have some other prefix. */ - public static final String DRILL_PROP_PREFIX = "drill."; + String DRILL_PROP_PREFIX = "drill."; /** * Sets schema properties if not null. @@ -50,6 +50,8 @@ public interface Propertied { int intProperty(String key); int intProperty(String key, int defaultValue); void setIntProperty(String key, int value); + void removeProperty(String key); + /** * Drill-wide properties are of the form:<br><tt> @@ -65,7 +67,7 @@ public interface Propertied { * @return the "drill.<plugin name>." prefix */ - public static String pluginPrefix(String pluginName) { + static String pluginPrefix(String pluginName) { return DRILL_PROP_PREFIX + pluginName + "."; } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java index 97304c5..cb43c64 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java @@ -71,6 +71,7 @@ public class TupleSchema extends AbstractPropertied implements TupleMetadata { for (ColumnMetadata md : this) { tuple.addColumn(md.copy()); } + tuple.setProperties(properties()); return tuple; }