This is an automated email from the ASF dual-hosted git repository. lpinter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new dd424d77230 HIVE-26397: Honour Iceberg sort orders when writing a table. (#3445) (Laszlo Pinter, reviewed by Adam Szita and Peter Vary) dd424d77230 is described below commit dd424d77230f275b78d4465548cad5b57aca66e9 Author: László Pintér <47777102+lcspin...@users.noreply.github.com> AuthorDate: Thu Jul 21 13:23:43 2022 +0200 HIVE-26397: Honour Iceberg sort orders when writing a table. (#3445) (Laszlo Pinter, reviewed by Adam Szita and Peter Vary) --- .../iceberg/mr/hive/HiveIcebergMetaHook.java | 4 +- .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 116 +++++++++++++++------ .../apache/iceberg/mr/hive/IcebergTableUtil.java | 12 +-- .../java/org/apache/iceberg/mr/TestHelper.java | 19 +++- .../iceberg/mr/hive/HiveIcebergTestUtils.java | 13 ++- .../iceberg/mr/hive/TestHiveIcebergInserts.java | 86 +++++++++++++++ .../org/apache/iceberg/mr/hive/TestTables.java | 49 ++++++++- .../hive/ql/ddl/table/create/CreateTableDesc.java | 6 +- .../desc/formatter/JsonDescTableFormatter.java | 4 +- .../desc/formatter/TextDescTableFormatter.java | 15 ++- .../set/AlterTableSetPartitionSpecAnalyzer.java | 4 +- .../apache/hadoop/hive/ql/exec/DDLPlanUtils.java | 8 +- .../hive/ql/metadata/HiveStorageHandler.java | 23 +++- .../ql/optimizer/SortedDynPartitionOptimizer.java | 38 +++++-- .../hadoop/hive/ql/parse/PartitionTransform.java | 13 ++- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +- ...titionTransformSpec.java => TransformSpec.java} | 6 +- .../hadoop/hive/ql/plan/DynamicPartitionCtx.java | 24 +++++ 18 files changed, 362 insertions(+), 80 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index c8f652d577a..1191ca8e431 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.PartitionTransform; -import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec; +import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -296,7 +296,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook { if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) { db.dropPartitions(hmsTable.getDbName(), hmsTable.getTableName(), EMPTY_FILTER, DROP_OPTIONS); - List<PartitionTransformSpec> spec = PartitionTransform.getPartitionTransformSpec(hmsTable.getPartitionKeys()); + List<TransformSpec> spec = PartitionTransform.getPartitionTransformSpec(hmsTable.getPartitionKeys()); if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC, spec)) { throw new MetaException("Query state attached to Session state must be not null. " + "Partition transform metadata cannot be saved."); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 25881408a63..51e3d60ad9d 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -57,8 +57,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec; -import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -84,11 +84,15 @@ import org.apache.hadoop.mapred.OutputFormat; import org.apache.iceberg.BaseTable; import org.apache.iceberg.FileFormat; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.SerializableTable; import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopConfigurable; @@ -332,27 +336,35 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H } @Override - public List<PartitionTransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + public List<TransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { TableDesc tableDesc = Utilities.getTableDesc(hmsTable); Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties()); - return table.spec().fields().stream().map(f -> { - PartitionTransformSpec spec = new PartitionTransformSpec(); - spec.setColumnName(table.schema().findColumnName(f.sourceId())); - // right now the only way to fetch the transform type and its params is through the toString() call - String transformName = f.transform().toString().toUpperCase(); - // if the transform name contains '[' it means it has some config params - if (transformName.contains("[")) { - spec.setTransformType(PartitionTransformSpec.TransformType - .valueOf(transformName.substring(0, transformName.indexOf("[")))); - spec.setTransformParam(Optional.of(Integer - .valueOf(transformName.substring(transformName.indexOf("[") + 1, transformName.indexOf("]"))))); - } else { - spec.setTransformType(PartitionTransformSpec.TransformType.valueOf(transformName)); - spec.setTransformParam(Optional.empty()); - } + return table.spec().fields().stream().map(f -> + getTransformSpec(table, f.transform().toString().toUpperCase(), f.sourceId()) + ).collect(Collectors.toList()); + } + + private List<TransformSpec> getSortTransformSpec(Table table) { + return table.sortOrder().fields().stream().map(s -> + getTransformSpec(table, s.transform().toString().toUpperCase(), s.sourceId()) + ).collect(Collectors.toList()); + } + + private TransformSpec getTransformSpec(Table table, String transformName, int sourceId) { + TransformSpec spec = new TransformSpec(); + spec.setColumnName(table.schema().findColumnName(sourceId)); + // if the transform name contains '[' it means it has some config params + if (transformName.contains("[")) { + spec.setTransformType(TransformSpec.TransformType + .valueOf(transformName.substring(0, transformName.indexOf("[")))); + spec.setTransformParam(Optional.of(Integer + .valueOf(transformName.substring(transformName.indexOf("[") + 1, transformName.indexOf("]"))))); + } else { + spec.setTransformType(TransformSpec.TransformType.valueOf(transformName)); + spec.setTransformParam(Optional.empty()); + } - return spec; - }).collect(Collectors.toList()); + return spec; } @Override @@ -367,12 +379,6 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H TableDesc tableDesc = Utilities.getTableDesc(hmsTable); Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties()); - if (table.spec().isUnpartitioned()) { - return null; - } - - // Iceberg currently doesn't have publicly accessible partition transform information, hence use above string parse - List<PartitionTransformSpec> partitionTransformSpecs = getPartitionTransformSpec(hmsTable); DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(Maps.newLinkedHashMap(), hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), @@ -380,6 +386,39 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs = Lists.newLinkedList(); dpCtx.setCustomSortExpressions(customSortExprs); + if (table.spec().isPartitioned()) { + addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getPartitionTransformSpec(hmsTable)); + } + + SortOrder sortOrder = table.sortOrder(); + if (sortOrder.isSorted()) { + List<Integer> customSortPositions = Lists.newLinkedList(); + List<Integer> customSortOrder = Lists.newLinkedList(); + dpCtx.setCustomSortOrder(customSortOrder); + List<Integer> customSortNullOrder = Lists.newLinkedList(); + dpCtx.setCustomSortNullOrder(customSortNullOrder); + for (SortField sortField : sortOrder.fields()) { + int pos = 0; + for (Types.NestedField field : table.schema().columns()) { + if (sortField.sourceId() == field.fieldId()) { + customSortPositions.add(pos); + customSortOrder.add(sortField.direction() == SortDirection.ASC ? 1 : 0); + customSortNullOrder.add(sortField.nullOrder() == NullOrder.NULLS_FIRST ? 0 : 1); + break; + } + pos++; + } + } + + addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getSortTransformSpec(table)); + } + + return dpCtx; + } + + private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable, + Operation writeOperation, List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs, + List<TransformSpec> transformSpecs) { Map<String, Integer> fieldOrderMap = Maps.newHashMap(); List<Types.NestedField> fields = table.schema().columns(); for (int i = 0; i < fields.size(); ++i) { @@ -387,16 +426,15 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H } int offset = acidSelectColumns(hmsTable, writeOperation).size(); - for (PartitionTransformSpec spec : partitionTransformSpecs) { + + for (TransformSpec spec : transformSpecs) { int order = fieldOrderMap.get(spec.getColumnName()); - if (PartitionTransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) { + if (TransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) { customSortExprs.add(BUCKET_SORT_EXPR.apply(order + offset, spec.getTransformParam().get())); } else { customSortExprs.add(cols -> cols.get(order + offset).clone()); } } - - return dpCtx; } @Override @@ -570,6 +608,26 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H } } + @Override + public boolean supportsSortColumns() { + return true; + } + + @Override + public List<FieldSchema> sortColumns(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + TableDesc tableDesc = Utilities.getTableDesc(hmsTable); + Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties()); + if (table.sortOrder().isUnsorted()) { + return Collections.emptyList(); + } + + Schema schema = table.schema(); + return table.sortOrder().fields().stream().map(s -> new FieldSchema(schema.findColumnName(s.sourceId()), + schema.findType(s.sourceId()).toString(), + String.format("Transform: %s, Sort direction: %s, Null sort order: %s", + s.transform().toString(), s.direction().name(), s.nullOrder().name()))).collect(Collectors.toList()); + } + private void setCommonJobConf(JobConf jobConf) { jobConf.set("tez.mrreader.config.update.properties", "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids"); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 3fe2eee39df..344834ec62e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec; -import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec; +import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.iceberg.ManageSnapshots; import org.apache.iceberg.PartitionSpec; @@ -98,15 +98,15 @@ public class IcebergTableUtil { /** * Create {@link PartitionSpec} based on the partition information stored in - * {@link PartitionTransformSpec}. + * {@link TransformSpec}. * @param configuration a Hadoop configuration * @param schema iceberg table schema * @return iceberg partition spec, always non-null */ public static PartitionSpec spec(Configuration configuration, Schema schema) { - List<PartitionTransformSpec> partitionTransformSpecList = SessionStateUtil + List<TransformSpec> partitionTransformSpecList = SessionStateUtil .getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC) - .map(o -> (List<PartitionTransformSpec>) o).orElseGet(() -> null); + .map(o -> (List<TransformSpec>) o).orElseGet(() -> null); if (partitionTransformSpecList == null) { LOG.debug("Iceberg partition transform spec is not found in QueryState."); @@ -154,9 +154,9 @@ public class IcebergTableUtil { UpdatePartitionSpec updatePartitionSpec = table.updateSpec().caseSensitive(false); table.spec().fields().forEach(field -> updatePartitionSpec.removeField(field.name())); - List<PartitionTransformSpec> partitionTransformSpecList = SessionStateUtil + List<TransformSpec> partitionTransformSpecList = SessionStateUtil .getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC) - .map(o -> (List<PartitionTransformSpec>) o).orElseGet(() -> null); + .map(o -> (List<TransformSpec>) o).orElseGet(() -> null); partitionTransformSpecList.forEach(spec -> { switch (spec.getTransformType()) { diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java index df61cfd8c37..5614281a731 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java @@ -31,6 +31,7 @@ import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -54,6 +55,7 @@ public class TestHelper { private final FileFormat fileFormat; private final TemporaryFolder tmp; private final Map<String, String> tblProps; + private SortOrder order; private Table table; @@ -79,6 +81,10 @@ public class TestHelper { conf.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema())); } + public void setOrder(SortOrder order) { + this.order = order; + } + public Table table() { return table; } @@ -91,13 +97,22 @@ public class TestHelper { } public Table createTable(Schema theSchema, PartitionSpec theSpec) { - Table tbl = tables.create(theSchema, theSpec, properties(), tableIdentifier); + return createTable(theSchema, theSpec, null); + } + + public Table createTable(Schema theSchema, PartitionSpec theSpec, SortOrder theOrder) { + Table tbl; + if (theOrder != null) { + tbl = tables.create(theSchema, theSpec, theOrder, properties(), tableIdentifier); + } else { + tbl = tables.create(theSchema, theSpec, properties(), tableIdentifier); + } setTable(tbl); return tbl; } public Table createTable() { - return createTable(schema, spec); + return createTable(schema, spec, order); } public Table createUnpartitionedTable() { diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java index 5e217acc82a..dc68ce9a80b 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java @@ -261,8 +261,17 @@ public class HiveIcebergTestUtils { sortedActual.sort(Comparator.comparingInt(record -> record.get(sortBy).hashCode())); Assert.assertEquals(sortedExpected.size(), sortedActual.size()); - for (int i = 0; i < sortedExpected.size(); ++i) { - assertEquals(sortedExpected.get(i), sortedActual.get(i)); + validateData(sortedExpected, sortedActual); + } + + /** + * Validates whether the 2 sets of records are the same. + * @param expected The expected list of Records + * @param actual The actual list of Records + */ + public static void validateData(List<Record> expected, List<Record> actual) { + for (int i = 0; i < expected.size(); ++i) { + assertEquals(expected.get(i), actual.get(i)); } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java index 31a589a7c96..0c15ba4e430 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java @@ -25,6 +25,7 @@ import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; @@ -38,6 +39,10 @@ import org.junit.Assert; import org.junit.Assume; import org.junit.Test; +import static org.apache.iceberg.NullOrder.NULLS_FIRST; +import static org.apache.iceberg.NullOrder.NULLS_LAST; +import static org.apache.iceberg.expressions.Expressions.bucket; +import static org.apache.iceberg.expressions.Expressions.truncate; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -47,6 +52,87 @@ import static org.apache.iceberg.types.Types.NestedField.required; */ public class TestHiveIcebergInserts extends HiveIcebergStorageHandlerWithEngineBase { + @Test + public void testSortedInsert() throws IOException { + TableIdentifier identifier = TableIdentifier.of("default", "sort_table"); + + Schema schema = new Schema( + optional(1, "id", Types.IntegerType.get(), "unique ID"), + optional(2, "data", Types.StringType.get()) + ); + SortOrder order = SortOrder.builderFor(schema) + .asc("id", NULLS_FIRST) + .desc("data", NULLS_LAST) + .build(); + + testTables.createTable(shell, identifier.name(), schema, order, PartitionSpec.unpartitioned(), fileFormat, + ImmutableList.of(), 1, ImmutableMap.of()); + shell.executeStatement(String.format("INSERT INTO TABLE %s VALUES (4, 'a'), (1, 'a'), (3, 'a'), (2, 'a'), " + + "(null, 'a'), (3, 'b'), (3, null)", identifier.name())); + + List<Record> expected = TestHelper.RecordsBuilder.newInstance(schema) + .add(null, "a").add(1, "a").add(2, "a").add(3, "b").add(3, "a").add(3, null).add(4, "a") + .build(); + List<Object[]> result = shell.executeStatement(String.format("SELECT * FROM %s", identifier.name())); + HiveIcebergTestUtils.validateData(expected, HiveIcebergTestUtils.valueForRow(schema, result)); + } + + @Test + public void testSortedAndTransformedInsert() throws IOException { + TableIdentifier identifier = TableIdentifier.of("default", "sort_table"); + + SortOrder order = SortOrder.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + .asc(bucket("customer_id", 2), NULLS_FIRST) + .desc(truncate("first_name", 4), NULLS_LAST) + .asc("last_name", NULLS_LAST) + .build(); + + testTables.createTable(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, order, + PartitionSpec.unpartitioned(), fileFormat, ImmutableList.of(), 1, ImmutableMap.of()); + + StringBuilder insertQuery = new StringBuilder().append(String.format("INSERT INTO %s VALUES ", identifier.name())); + HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2.forEach(record -> insertQuery.append("(") + .append(record.get(0)).append(",'") + .append(record.get(1)).append("','") + .append(record.get(2)).append("'),")); + insertQuery.setLength(insertQuery.length() - 1); + + shell.executeStatement(insertQuery.toString()); + List<Record> expected = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + .add(2L, "Susan", "Morrison").add(1L, "Sharon", "Taylor").add(1L, "Joanna", "Pierce") + .add(2L, "Joanna", "Silver").add(2L, "Jake", "Donnel").add(2L, "Bob", "Silver").add(3L, "Trudy", "Henderson") + .add(3L, "Trudy", "Johnson").add(3L, "Blake", "Burr").build(); + List<Object[]> result = shell.executeStatement(String.format("SELECT * FROM %s", identifier.name())); + HiveIcebergTestUtils.validateData(expected, + HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, result)); + } + + @Test + public void testSortedAndTransformedInsertIntoPartitionedTable() throws IOException { + TableIdentifier identifier = TableIdentifier.of("default", "tbl_bucketed"); + Schema schema = new Schema( + optional(1, "a", Types.IntegerType.get()), + optional(2, "b", Types.StringType.get()), + optional(3, "c", Types.IntegerType.get()) + ); + SortOrder order = SortOrder.builderFor(schema) + .desc("c", NULLS_FIRST) + .asc(truncate("b", 1)) + .build(); + PartitionSpec partitionSpec = PartitionSpec.builderFor(schema) + .bucket("b", 2) + .build(); + testTables.createTable(shell, identifier.name(), schema, order, partitionSpec, fileFormat, ImmutableList.of(), 1, + ImmutableMap.of()); + shell.executeStatement(String.format("INSERT INTO %s VALUES (1, 'EUR', 10), (5, 'HUF', 30), (2, 'EUR', 10), " + + "(8, 'PLN', 20), (6, 'USD', null)", identifier.name())); + List<Object[]> result = shell.executeStatement(String.format("SELECT * FROM %s", identifier.name())); + List<Record> expected = + TestHelper.RecordsBuilder.newInstance(schema).add(1, "EUR", 10).add(2, "EUR", 10).add(6, "USD", null) + .add(5, "HUF", 30).add(8, "PLN", 20).build(); + HiveIcebergTestUtils.validateData(expected, HiveIcebergTestUtils.valueForRow(schema, result)); + } + @Test public void testInsert() throws IOException { Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java index f2b58d776c5..efaea9a8a01 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java @@ -251,9 +251,33 @@ abstract class TestTables { */ public Table createTable(TestHiveShell shell, String tableName, Schema schema, FileFormat fileFormat, List<Record> records, int formatVersion, Map<String, String> tblProperties) throws IOException { + return createTable(shell, tableName, schema, SortOrder.unsorted(), PartitionSpec.unpartitioned(), fileFormat, + records, formatVersion, tblProperties); + } + + /** + * Creates a non partitioned Hive test table. Creates the Iceberg table/data and creates the corresponding Hive + * table as well when needed. The table will be in the 'default' database. The table will be populated with the + * provided List of {@link Record}s. + * @param shell The HiveShell used for Hive table creation + * @param tableName The name of the test table + * @param schema The schema used for the table creation + * @param order The sort order used for the table creation + * @param partSpec The partition spec used for the table creation + * @param fileFormat The file format used for writing the data + * @param records The records with which the table is populated + * @param formatVersion The version of the spec the table should use (format-version) + * @param tblProperties Additional table properties + * @return The created table + * @throws IOException If there is an error writing data + */ + public Table createTable(TestHiveShell shell, String tableName, Schema schema, SortOrder order, + PartitionSpec partSpec, FileFormat fileFormat, List<Record> records, int formatVersion, + Map<String, String> tblProperties) throws IOException { ImmutableMap<String, String> tblProps = ImmutableMap.<String, String>builder().putAll(tblProperties) .put(TableProperties.FORMAT_VERSION, Integer.toString(formatVersion)).build(); - Table table = createIcebergTable(shell.getHiveConf(), tableName, schema, fileFormat, tblProps, records); + Table table = createIcebergTable(shell.getHiveConf(), tableName, schema, order, partSpec, fileFormat, tblProps, + records); String createHiveSQL = createHiveTableSQL(TableIdentifier.of("default", tableName), tblProps); if (createHiveSQL != null) { shell.executeStatement(createHiveSQL); @@ -386,9 +410,30 @@ abstract class TestTables { */ public Table createIcebergTable(Configuration configuration, String tableName, Schema schema, FileFormat fileFormat, Map<String, String> additionalTableProps, List<Record> records) throws IOException { + return createIcebergTable(configuration, tableName, schema, SortOrder.unsorted(), PartitionSpec.unpartitioned(), + fileFormat, additionalTableProps, records); + } + + /** + * Creates an Iceberg table/data without creating the corresponding Hive table. The table will be in the 'default' + * namespace. + * @param configuration The configuration used during the table creation + * @param tableName The name of the test table + * @param schema The schema used for the table creation + * @param order The sort order used for the table creation + * @param partSpec The partition spec used for the table creation + * @param fileFormat The file format used for writing the data + * @param records The records with which the table is populated + * @return The create table + * @throws IOException If there is an error writing data + */ + public Table createIcebergTable(Configuration configuration, String tableName, Schema schema, SortOrder order, + PartitionSpec partSpec, FileFormat fileFormat, Map<String, String> additionalTableProps, List<Record> records) + throws IOException { String identifier = identifier("default." + tableName); TestHelper helper = new TestHelper(new Configuration(configuration), tables(), identifier, schema, - PartitionSpec.unpartitioned(), fileFormat, additionalTableProps, temp); + partSpec, fileFormat, additionalTableProps, temp); + helper.setOrder(order); Table table = helper.createTable(); if (records != null && !records.isEmpty()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java index b484428cc07..eeac3a6d339 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.PartitionManagementTask; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; @@ -45,7 +44,6 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -59,7 +57,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.PartitionTransform; -import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec; +import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.Explain; @@ -833,7 +831,7 @@ public class CreateTableDesc implements DDLDesc, Serializable { if (partCols.isPresent() && !partCols.get().isEmpty()) { // Add the partition columns to the normal columns and save the transform to the session state tbl.getSd().getCols().addAll(partCols.get()); - List<PartitionTransformSpec> spec = PartitionTransform.getPartitionTransformSpec(partCols.get()); + List<TransformSpec> spec = PartitionTransform.getPartitionTransformSpec(partCols.get()); if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC, spec)) { throw new HiveException("Query state attached to Session state must be not null. " + "Partition transform metadata cannot be saved."); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java index 1444eb23263..d115935b4ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.UniqueConstraint; import org.apache.hadoop.hive.ql.metadata.formatting.MapBuilder; -import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec; +import org.apache.hadoop.hive.ql.parse.TransformSpec; import java.io.DataOutputStream; import java.util.ArrayList; @@ -245,7 +245,7 @@ public class JsonDescTableFormatter extends DescTableFormatter { } if (table.isNonNative() && table.getStorageHandler() != null && table.getStorageHandler().supportsPartitionTransform()) { - List<PartitionTransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table); + List<TransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table); if (!specs.isEmpty()) { builder.put("partitionSpecInfo", specs.stream().map(s -> { Map<String, String> result = new LinkedHashMap<>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java index 53273b9810c..5583337ce20 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java @@ -47,7 +47,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.UniqueConstraint; import org.apache.hadoop.hive.ql.metadata.ForeignKeyInfo.ForeignKeyCol; import org.apache.hadoop.hive.ql.metadata.UniqueConstraint.UniqueConstraintCol; -import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec; +import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.common.util.HiveStringUtils; @@ -110,12 +110,12 @@ class TextDescTableFormatter extends DescTableFormatter { if (table.isNonNative() && table.getStorageHandler() != null && table.getStorageHandler().supportsPartitionTransform()) { - List<PartitionTransformSpec> partSpecs = table.getStorageHandler().getPartitionTransformSpec(table); + List<TransformSpec> partSpecs = table.getStorageHandler().getPartitionTransformSpec(table); if (partSpecs != null && !partSpecs.isEmpty()) { TextMetaDataTable metaDataTable = new TextMetaDataTable(); partitionTransformOutput += LINE_DELIM + "# Partition Transform Information" + LINE_DELIM + "# "; metaDataTable.addRow(DescTableDesc.PARTITION_TRANSFORM_SPEC_SCHEMA.split("#")[0].split(",")); - for (PartitionTransformSpec spec : partSpecs) { + for (TransformSpec spec : partSpecs) { String[] row = new String[2]; row[0] = spec.getColumnName(); if (spec.getTransformType() != null) { @@ -278,7 +278,14 @@ class TextDescTableFormatter extends DescTableFormatter { formatOutput("Num Buckets:", String.valueOf(storageDesc.getNumBuckets()), tableInfo); formatOutput("Bucket Columns:", storageDesc.getBucketCols().toString(), tableInfo); } - formatOutput("Sort Columns:", storageDesc.getSortCols().toString(), tableInfo); + + String sortColumnsInfo; + if (table.isNonNative() && table.getStorageHandler() != null && table.getStorageHandler().supportsSortColumns()) { + sortColumnsInfo = table.getStorageHandler().sortColumns(table).toString(); + } else { + sortColumnsInfo = storageDesc.getSortCols().toString(); + } + formatOutput("Sort Columns:", sortColumnsInfo, tableInfo); if (storageDesc.isStoredAsSubDirectories()) { formatOutput("Stored As SubDirectories:", "Yes", tableInfo); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java index 442d927b1b7..4764eedf561 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.PartitionTransform; -import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec; +import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.session.SessionStateUtil; @@ -54,7 +54,7 @@ public class AlterTableSetPartitionSpecAnalyzer extends AbstractAlterTableAnalyz Table table = getTable(tableName); validateAlterTableType(table, AlterTableType.SETPARTITIONSPEC, false); inputs.add(new ReadEntity(table)); - List<PartitionTransformSpec> partitionTransformSpec = + List<TransformSpec> partitionTransformSpec = PartitionTransform.getPartitionTransformSpec(command); if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC, partitionTransformSpec)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java index 62248ff4f71..649455163fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java @@ -57,7 +57,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.UniqueConstraint; -import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec; +import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.util.DirectionUtils; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; @@ -920,13 +920,13 @@ public class DDLPlanUtils { private String getPartitionsBySpec(Table table) { if (table.isNonNative() && table.getStorageHandler() != null && table.getStorageHandler().supportsPartitionTransform()) { - List<PartitionTransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table); + List<TransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table); if (specs.isEmpty()) { return ""; } List<String> partitionTransforms = new ArrayList<>(); - for (PartitionTransformSpec spec : specs) { - if (spec.getTransformType() == PartitionTransformSpec.TransformType.IDENTITY) { + for (TransformSpec spec : specs) { + if (spec.getTransformType() == TransformSpec.TransformType.IDENTITY) { partitionTransforms.add(spec.getColumnName()); } else { partitionTransforms.add(spec.getTransformType().name() + "(" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index b4b3cfca9d8..b67d5d83476 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.Context.Operation; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec; -import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec; +import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -346,6 +346,25 @@ public interface HiveStorageHandler extends Configurable { return Collections.emptyList(); } + /** + * Check if the underlying storage handler implementation supports sort columns. + * @return true if the storage handler can support it + */ + default boolean supportsSortColumns() { + return false; + } + + /** + * Collect the columns that are used to sort the content of the data files + * @param table the table which is being sorted + * @return the list of columns that are used during data sorting + */ + default List<FieldSchema> sortColumns(org.apache.hadoop.hive.ql.metadata.Table table) { + Preconditions.checkState(supportsSortColumns(), "Should only be called for table formats where data sorting " + + "is supported"); + return Collections.emptyList(); + } + /** * Check if the underlying storage handler implementation support partition transformations. * @return true if the storage handler can support it @@ -360,7 +379,7 @@ public interface HiveStorageHandler extends Configurable { * @param table the HMS table, must be non-null * @return partition transform specification, can be null. */ - default List<PartitionTransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table table) { + default List<TransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table table) { return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 2668f269b5f..b57ddd8e6c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.ReduceField; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.SemanticDispatcher; @@ -61,7 +62,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.type.*; +import org.apache.hadoop.hive.ql.parse.type.ExprNodeTypeCheck; import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -200,6 +201,8 @@ public class SortedDynPartitionOptimizer extends Transform { List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema()); LinkedList<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs = new LinkedList<>(dpCtx.getCustomSortExpressions()); + LinkedList<Integer> customSortOrder = new LinkedList<>(dpCtx.getCustomSortOrder()); + LinkedList<Integer> customNullOrder = new LinkedList<>(dpCtx.getCustomSortNullOrder()); // If custom sort expressions are present, there is an explicit requirement to do sorting if (customSortExprs.isEmpty() && !shouldDo(partitionPositions, fsParent)) { @@ -301,8 +304,9 @@ public class SortedDynPartitionOptimizer extends Transform { fsOp.getConf().setTotalFiles(1); // Create ReduceSink operator - ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, customSortExprs, sortOrder, - sortNullOrder, allRSCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType()); + ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, sortOrder, + sortNullOrder, customSortExprs, customSortOrder, customNullOrder, allRSCols, bucketColumns, numBuckets, + fsParent, fsOp.getConf().getWriteType()); // we have to make sure not to reorder the child operators as it might cause weird behavior in the tasks at // the same level. when there is auto stats gather at the same level as another operation then it might // cause unnecessary preemption. Maintaining the order here to avoid such preemption and possible errors @@ -572,8 +576,10 @@ public class SortedDynPartitionOptimizer extends Transform { } public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions, List<Integer> sortPositions, - List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs, List<Integer> sortOrder, - List<Integer> sortNullOrder, ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns, + List<Integer> sortOrder, List<Integer> sortNullOrder, + List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs, + List<Integer> customSortOrder, List<Integer> customSortNullOrder, + ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns, int numBuckets, Operator<? extends OperatorDesc> parent, AcidUtils.Operation writeType) { // Order of KEY columns, if custom sort is present partition and bucket columns are disregarded: @@ -601,17 +607,25 @@ public class SortedDynPartitionOptimizer extends Transform { } keyColsPosInVal.addAll(sortPositions); - // by default partition and bucket columns are sorted in ascending order Integer order = 1; + // by default partition and bucket columns are sorted in ascending order if (sortOrder != null && !sortOrder.isEmpty()) { if (sortOrder.get(0) == 0) { order = 0; } } - for (int i = 0; i < keyColsPosInVal.size() + customSortExprs.size(); i++) { + + for (Integer ignored : keyColsPosInVal) { newSortOrder.add(order); } + if (customSortExprPresent) { + for (int i = 0; i < customSortExprs.size() - customSortOrder.size(); i++) { + newSortOrder.add(order); + } + newSortOrder.addAll(customSortOrder); + } + String orderStr = ""; for (Integer i : newSortOrder) { if (i == 1) { @@ -631,10 +645,18 @@ public class SortedDynPartitionOptimizer extends Transform { nullOrder = 1; } } - for (int i = 0; i < keyColsPosInVal.size() + customSortExprs.size(); i++) { + + for (Integer ignored : keyColsPosInVal) { newSortNullOrder.add(nullOrder); } + if (customSortExprPresent) { + for (int i = 0; i < customSortExprs.size() - customSortNullOrder.size(); i++) { + newSortNullOrder.add(nullOrder); + } + newSortNullOrder.addAll(customSortNullOrder); + } + String nullOrderStr = ""; for (Integer i : newSortNullOrder) { if (i == 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java index 50a6371a1bd..dbc40e8bd11 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java @@ -18,11 +18,10 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec.TransformType; +import org.apache.hadoop.hive.ql.parse.TransformSpec.TransformType; import java.util.ArrayList; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -45,9 +44,9 @@ public class PartitionTransform { * @param fields The partition column fields * @return list of partition transforms */ - public static List<PartitionTransformSpec> getPartitionTransformSpec(List<FieldSchema> fields) { + public static List<TransformSpec> getPartitionTransformSpec(List<FieldSchema> fields) { return fields.stream() - .map(field -> new PartitionTransformSpec(field.getName(), TransformType.IDENTITY, Optional.empty())) + .map(field -> new TransformSpec(field.getName(), TransformType.IDENTITY, Optional.empty())) .collect(Collectors.toList()); } @@ -56,10 +55,10 @@ public class PartitionTransform { * @param node AST Tree node, must be not null * @return list of partition transforms */ - public static List<PartitionTransformSpec> getPartitionTransformSpec(ASTNode node) { - List<PartitionTransformSpec> partSpecList = new ArrayList<>(); + public static List<TransformSpec> getPartitionTransformSpec(ASTNode node) { + List<TransformSpec> partSpecList = new ArrayList<>(); for (int i = 0; i < node.getChildCount(); i++) { - PartitionTransformSpec spec = new PartitionTransformSpec(); + TransformSpec spec = new TransformSpec(); ASTNode child = (ASTNode) node.getChild(i); for (int j = 0; j < child.getChildCount(); j++) { ASTNode grandChild = (ASTNode) child.getChild(j); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 485089e4ad3..1e51d51caf2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -13645,7 +13645,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } break; case HiveParser.TOK_TABLEPARTCOLSBYSPEC: - List<PartitionTransformSpec> partitionTransformSpec = + List<TransformSpec> partitionTransformSpec = PartitionTransform.getPartitionTransformSpec(child); if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransformSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TransformSpec.java similarity index 90% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransformSpec.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/TransformSpec.java index 268660f8efa..f25cbda5af8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransformSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TransformSpec.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.parse; import java.util.Optional; -public class PartitionTransformSpec { +public class TransformSpec { public enum TransformType { IDENTITY, YEAR, MONTH, DAY, HOUR, TRUNCATE, BUCKET, VOID @@ -29,10 +29,10 @@ public class PartitionTransformSpec { private TransformType transformType; private Optional<Integer> transformParam; - public PartitionTransformSpec() { + public TransformSpec() { } - public PartitionTransformSpec(String columnName, TransformType transformType, Optional<Integer> transformParam) { + public TransformSpec(String columnName, TransformType transformType, Optional<Integer> transformParam) { this.columnName = columnName; this.transformType = transformType; this.transformParam = transformParam; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index 4acc5406fb0..3497f3120cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -60,6 +60,8 @@ public class DynamicPartitionCtx implements Serializable { * schema and returns a single expression. Example for simply just referencing column 3: cols -> cols.get(3).clone() */ private transient List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExpressions; + private transient List<Integer> customSortOrder; + private transient List<Integer> customSortNullOrder; public DynamicPartitionCtx() { } @@ -93,6 +95,8 @@ public class DynamicPartitionCtx implements Serializable { } this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal); this.customSortExpressions = new LinkedList<>(); + this.customSortOrder = new LinkedList<>(); + this.customSortNullOrder = new LinkedList<>(); } public DynamicPartitionCtx(Map<String, String> partSpec, String defaultPartName, @@ -126,6 +130,8 @@ public class DynamicPartitionCtx implements Serializable { } this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal); this.customSortExpressions = new LinkedList<>(); + this.customSortOrder = new LinkedList<>(); + this.customSortNullOrder = new LinkedList<>(); } public DynamicPartitionCtx(DynamicPartitionCtx dp) { @@ -141,6 +147,8 @@ public class DynamicPartitionCtx implements Serializable { this.maxPartsPerNode = dp.maxPartsPerNode; this.whiteListPattern = dp.whiteListPattern; this.customSortExpressions = dp.customSortExpressions; + this.customSortOrder = dp.customSortOrder; + this.customSortNullOrder = dp.customSortNullOrder; } public Pattern getWhiteListPattern() { @@ -234,4 +242,20 @@ public class DynamicPartitionCtx implements Serializable { public void setCustomSortExpressions(List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExpressions) { this.customSortExpressions = customSortExpressions; } + + public List<Integer> getCustomSortOrder() { + return customSortOrder; + } + + public void setCustomSortOrder(List<Integer> customSortOrder) { + this.customSortOrder = customSortOrder; + } + + public List<Integer> getCustomSortNullOrder() { + return customSortNullOrder; + } + + public void setCustomSortNullOrder(List<Integer> customSortNullOrder) { + this.customSortNullOrder = customSortNullOrder; + } }