ayushtkn commented on code in PR #5192: URL: https://github.com/apache/hive/pull/5192#discussion_r1600619259
########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java: ########## @@ -17,72 +17,45 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.ColumnType; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.SkewedInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.*; Review Comment: Avoid using this *, expand the imports ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java: ########## @@ -419,265 +362,45 @@ private void buildWhereClauseForInsert(StringBuilder query) { } } } - - if (CompactionType.MINOR.equals(compactionType) && !insertOnly && validWriteIdList != null) { - long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds(); - if (invalidWriteIds.length > 0) { - query.append(" where `originalTransaction` not in (").append( - StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")) - .append(")"); - } - } } - private void getDdlForCreate(StringBuilder query) { - defineColumns(query); - - // PARTITIONED BY. Used for parts of minor compaction. - if (isPartitioned) { - query.append(" PARTITIONED BY (`file_name` STRING) "); - } - - // CLUSTERED BY. (bucketing) - int bucketingVersion = 0; - if (!insertOnly && CompactionType.MINOR.equals(compactionType)) { - bucketingVersion = getMinorCrudBucketing(query, bucketingVersion); - } else if (insertOnly) { - getMmBucketing(query); - } - - // SKEWED BY - if (insertOnly) { - getSkewedByClause(query); - } - - // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT - if (!insertOnly) { - query.append(" stored as orc"); - } else { - copySerdeFromSourceTable(query); + protected void appendColumns(StringBuilder query, List<FieldSchema> cols, boolean alias) { + if (cols == null) { + throw new IllegalStateException("Query could not be created: Source columns are unknown"); } - - // LOCATION - if (location != null) { - query.append(" LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("'"); + for (int i = 0; i < cols.size(); ++i) { + if (alias) { + query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', `").append(cols.get(i).getName()) + .append("`"); + } else { + query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`"); + } } - - // TBLPROPERTIES - addTblProperties(query, bucketingVersion); } /** * Define columns of the create query. */ - private void defineColumns(StringBuilder query) { - if (sourceTab == null) { - return; // avoid NPEs, don't throw an exception but skip this part of the query - } - query.append("("); - if (!insertOnly) { + protected void defineColumns(StringBuilder query) { + if (sourceTab != null) { + query.append("("); query.append( - "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, " - + "`currentTransaction` bigint, `row` struct<"); + "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, " + "`currentTransaction` bigint, `row` struct<"); + List<String> columnDescs = getColumnDescs(); + query.append(StringUtils.join(columnDescs, ',')); + query.append(">) "); } Review Comment: I don't think we need to do query. everywhere, something like this can be done ``` query.append("(") .append("`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<") .append(StringUtils.join(getColumnDescs(), ',')) .append(">) "); ``` ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; Review Comment: should be ``import static org.junit.Assert.assertTrue;`` ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java: ########## @@ -419,265 +362,45 @@ private void buildWhereClauseForInsert(StringBuilder query) { } } } - - if (CompactionType.MINOR.equals(compactionType) && !insertOnly && validWriteIdList != null) { - long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds(); - if (invalidWriteIds.length > 0) { - query.append(" where `originalTransaction` not in (").append( - StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")) - .append(")"); - } - } } - private void getDdlForCreate(StringBuilder query) { - defineColumns(query); - - // PARTITIONED BY. Used for parts of minor compaction. - if (isPartitioned) { - query.append(" PARTITIONED BY (`file_name` STRING) "); - } - - // CLUSTERED BY. (bucketing) - int bucketingVersion = 0; - if (!insertOnly && CompactionType.MINOR.equals(compactionType)) { - bucketingVersion = getMinorCrudBucketing(query, bucketingVersion); - } else if (insertOnly) { - getMmBucketing(query); - } - - // SKEWED BY - if (insertOnly) { - getSkewedByClause(query); - } - - // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT - if (!insertOnly) { - query.append(" stored as orc"); - } else { - copySerdeFromSourceTable(query); + protected void appendColumns(StringBuilder query, List<FieldSchema> cols, boolean alias) { + if (cols == null) { + throw new IllegalStateException("Query could not be created: Source columns are unknown"); } - - // LOCATION - if (location != null) { - query.append(" LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("'"); + for (int i = 0; i < cols.size(); ++i) { + if (alias) { + query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', `").append(cols.get(i).getName()) + .append("`"); + } else { + query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`"); + } } - - // TBLPROPERTIES - addTblProperties(query, bucketingVersion); } /** * Define columns of the create query. */ - private void defineColumns(StringBuilder query) { - if (sourceTab == null) { - return; // avoid NPEs, don't throw an exception but skip this part of the query - } - query.append("("); - if (!insertOnly) { + protected void defineColumns(StringBuilder query) { + if (sourceTab != null) { + query.append("("); query.append( - "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, " - + "`currentTransaction` bigint, `row` struct<"); + "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, " + "`currentTransaction` bigint, `row` struct<"); + List<String> columnDescs = getColumnDescs(); + query.append(StringUtils.join(columnDescs, ',')); + query.append(">) "); Review Comment: Can we just do ``` query.append(StringUtils.join(getColumnDescs(), ',')); ``` ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java: ########## @@ -275,137 +250,105 @@ String build() { case DROP: default: } - return query.toString(); } + protected void getDdlForCreate(StringBuilder query) { + defineColumns(query); + + // PARTITIONED BY. Used for parts of minor compaction. + if (isPartitioned) { + query.append(" PARTITIONED BY (`file_name` STRING) "); + } + + // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT + query.append(" stored as orc"); + + // LOCATION + if (location != null) { + query.append(" LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("'"); + } + + addTblProperties(query, false, 0); + } + + /** + * Part of Create operation. All tmp tables are not transactional and are marked as + * compaction tables. Additionally... + * - Crud compaction temp tables need tblproperty, "compactiontable." + * - Minor crud compaction temp tables need bucketing version tblproperty, if table is bucketed. + */ + protected void addTblProperties(StringBuilder query, boolean addBucketingVersion, int bucketingVersion) { + Map<String, String> tblProperties = new HashMap<>(); + tblProperties.put("transactional", "false"); + tblProperties.put(AcidUtils.COMPACTOR_TABLE_PROPERTY, compactionType.name()); + if (addBucketingVersion) { + tblProperties.put("bucketing_version", String.valueOf(bucketingVersion)); + } + if (sourceTab != null) { // to avoid NPEs, skip this part if sourceTab is null + for (Map.Entry<String, String> e : sourceTab.getParameters().entrySet()) { + if (e.getKey().startsWith("orc.")) { + tblProperties.put(e.getKey(), HiveStringUtils.escapeHiveCommand(e.getValue())); + } + } + } + addTblProperties(query, tblProperties); + } + + protected void addTblProperties(StringBuilder query, Map<String, String> tblProperties) { + // add TBLPROPERTIES clause to query + boolean isFirst; + query.append(" TBLPROPERTIES ("); Review Comment: if ``tblProperties`` is empty, in that case I think it will append something like ``TBLPROPERTIES ("")`` not sure if the query compiles or not, or maybe we can try to avoid such a situation ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java: ########## @@ -419,265 +362,45 @@ private void buildWhereClauseForInsert(StringBuilder query) { } } } - - if (CompactionType.MINOR.equals(compactionType) && !insertOnly && validWriteIdList != null) { - long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds(); - if (invalidWriteIds.length > 0) { - query.append(" where `originalTransaction` not in (").append( - StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")) - .append(")"); - } - } } - private void getDdlForCreate(StringBuilder query) { - defineColumns(query); - - // PARTITIONED BY. Used for parts of minor compaction. - if (isPartitioned) { - query.append(" PARTITIONED BY (`file_name` STRING) "); - } - - // CLUSTERED BY. (bucketing) - int bucketingVersion = 0; - if (!insertOnly && CompactionType.MINOR.equals(compactionType)) { - bucketingVersion = getMinorCrudBucketing(query, bucketingVersion); - } else if (insertOnly) { - getMmBucketing(query); - } - - // SKEWED BY - if (insertOnly) { - getSkewedByClause(query); - } - - // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT - if (!insertOnly) { - query.append(" stored as orc"); - } else { - copySerdeFromSourceTable(query); + protected void appendColumns(StringBuilder query, List<FieldSchema> cols, boolean alias) { + if (cols == null) { + throw new IllegalStateException("Query could not be created: Source columns are unknown"); } - - // LOCATION - if (location != null) { - query.append(" LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("'"); + for (int i = 0; i < cols.size(); ++i) { + if (alias) { + query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', `").append(cols.get(i).getName()) + .append("`"); + } else { + query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`"); + } } - - // TBLPROPERTIES - addTblProperties(query, bucketingVersion); } /** * Define columns of the create query. */ - private void defineColumns(StringBuilder query) { - if (sourceTab == null) { - return; // avoid NPEs, don't throw an exception but skip this part of the query - } - query.append("("); - if (!insertOnly) { + protected void defineColumns(StringBuilder query) { + if (sourceTab != null) { + query.append("("); query.append( - "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, " - + "`currentTransaction` bigint, `row` struct<"); + "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, " + "`currentTransaction` bigint, `row` struct<"); Review Comment: I don't think you need to break strings & then concat here, it can be one single stirng ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java: ########## @@ -17,72 +17,45 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.ColumnType; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.SkewedInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.ql.exec.DDLPlanUtils; import org.apache.hadoop.hive.ql.io.AcidDirectory; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.util.DirectionUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hive.common.util.HiveStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; Review Comment: same avoid *, we don't do that usually ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderFactory.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.metastore.api.CompactionType; + +class CompactionQueryBuilderFactory { + public CompactionQueryBuilder getCompactionQueryBuilder(CompactionType compactionType, + boolean insertOnly){ Review Comment: nit it can be one line, we have 120 line limit ``` public CompactionQueryBuilder getCompactionQueryBuilder(CompactionType compactionType, boolean insertOnly) { ``` ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java: ########## @@ -275,137 +250,105 @@ String build() { case DROP: default: } - return query.toString(); } + protected void getDdlForCreate(StringBuilder query) { + defineColumns(query); + + // PARTITIONED BY. Used for parts of minor compaction. + if (isPartitioned) { + query.append(" PARTITIONED BY (`file_name` STRING) "); + } + + // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT + query.append(" stored as orc"); + + // LOCATION + if (location != null) { + query.append(" LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("'"); + } + + addTblProperties(query, false, 0); + } + + /** + * Part of Create operation. All tmp tables are not transactional and are marked as + * compaction tables. Additionally... + * - Crud compaction temp tables need tblproperty, "compactiontable." + * - Minor crud compaction temp tables need bucketing version tblproperty, if table is bucketed. + */ + protected void addTblProperties(StringBuilder query, boolean addBucketingVersion, int bucketingVersion) { + Map<String, String> tblProperties = new HashMap<>(); + tblProperties.put("transactional", "false"); + tblProperties.put(AcidUtils.COMPACTOR_TABLE_PROPERTY, compactionType.name()); + if (addBucketingVersion) { + tblProperties.put("bucketing_version", String.valueOf(bucketingVersion)); + } + if (sourceTab != null) { // to avoid NPEs, skip this part if sourceTab is null + for (Map.Entry<String, String> e : sourceTab.getParameters().entrySet()) { + if (e.getKey().startsWith("orc.")) { + tblProperties.put(e.getKey(), HiveStringUtils.escapeHiveCommand(e.getValue())); + } + } Review Comment: can we try some stream magic like ``` sourceTab.getParameters().entrySet().stream().filter(e -> e.getKey().startsWith("orc.")) .forEach(e -> tblProperties.put(e.getKey(), HiveStringUtils.escapeHiveCommand(e.getValue()))); ``` ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.junit.Assert; +import org.junit.jupiter.api.Test; Review Comment: check if the tests are working. it should be ``` import org.junit.Test; ``` else your assertEquals should be from Assertions., I think as of now this test has both junit-4 & 5 together ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java: ########## @@ -54,6 +54,8 @@ public abstract class QueryCompactor implements Compactor { private static final Logger LOG = LoggerFactory.getLogger(QueryCompactor.class.getName()); private static final String COMPACTOR_PREFIX = "compactor."; + protected CompactionQueryBuilderFactory compactionQueryBuilderFactory = new CompactionQueryBuilderFactory(); Review Comment: Why is this ``protected``? ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.mockito.Mockito.when; + +public class TestCompactionQueryBuilderForMinorCompaction extends CompactionQueryBuilderTest { + + class CompactionQueryBuilderForMinorMock extends CompactionQueryBuilderForMinor { Review Comment: maybe the class can be static ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderTest.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.mockito.Mockito; + +import java.util.*; + +import static org.mockito.Mockito.when; + +public class CompactionQueryBuilderTest { + + public static final String DB_NAME = "comp_test_db"; + public static final String SOURCE_TABLE_NAME = "comp_test_source_table"; + public static final String RESULT_TABLE_NAME = "comp_test_result_table"; + public static final String COLUMN_1_NAME = "column_1"; + public static final String COLUMN_2_NAME = "column_2"; + public static final String COLUMN_3_NAME = "column_3"; + public static final String SOME_TEST_LOCATION = "some_test_path"; + public static final String COMP_TEST_SOURCE_TABLE_FOR_INSERT = "comp_test_db.comp_test_insert_table"; + + public Table createSourceTable() { + return createSourceTable(false, false, false); + } + + public Table createSourceTableWithProperties() { + return createSourceTable(true, false, false); + } + + public Table createSourceTableBucketed() { + return createSourceTable(false, true, false); + } + + public Table createSourceTableBucketedSorted() { + return createSourceTable(false, true, true); + } + + private Table createSourceTable(boolean addTableProperties, boolean bucketed, boolean sorted) { + Table sourceTable = new Table(); + sourceTable.setDbName(DB_NAME); + sourceTable.setTableName(SOURCE_TABLE_NAME); + + StorageDescriptor sd = new StorageDescriptor(); + List<FieldSchema> columns = new ArrayList<>(); + FieldSchema col1 = new FieldSchema(COLUMN_1_NAME, "string", "First column"); + FieldSchema col2 = new FieldSchema(COLUMN_2_NAME, "int", null); + FieldSchema col3 = new FieldSchema(COLUMN_3_NAME, "boolean", "Third column"); + columns.add(col1); + columns.add(col2); + columns.add(col3); + sd.setCols(columns); + + if (bucketed) { + sd.addToBucketCols(COLUMN_1_NAME); + sd.addToBucketCols(COLUMN_3_NAME); + sd.setNumBuckets(4); + } else { + sd.setBucketCols(Collections.emptyList()); + } + + if (sorted) { + sd.addToSortCols(new Order(COLUMN_1_NAME, 0)); + sd.addToSortCols(new Order(COLUMN_2_NAME, 1)); + } else { + sd.setSortCols(Collections.emptyList()); + } + + Map<String, String> parameters = new HashMap<>(); + if (addTableProperties) { + parameters.put("property_1", "true"); + parameters.put("orc.property_2", "44"); + parameters.put("COLUMN_STATS_ACCURATE", "false"); + parameters.put("columns.types", "test"); + } + sourceTable.setParameters(parameters); + sourceTable.setSd(sd); + return sourceTable; + } + + protected AcidDirectory createAcidDirectory() { + AcidDirectory dir = Mockito.mock(AcidDirectory.class); + AcidUtils.ParsedDelta d1 = Mockito.mock(AcidUtils.ParsedDelta.class); + AcidUtils.ParsedDelta d2 = Mockito.mock(AcidUtils.ParsedDelta.class); + AcidUtils.ParsedDelta d3 = Mockito.mock(AcidUtils.ParsedDelta.class); + AcidUtils.ParsedDelta d4 = Mockito.mock(AcidUtils.ParsedDelta.class); + AcidUtils.ParsedDelta d5 = Mockito.mock(AcidUtils.ParsedDelta.class); + + List<AcidUtils.ParsedDelta> dirs = new ArrayList<>(); + dirs.add(d1); + dirs.add(d2); + dirs.add(d3); + dirs.add(d4); + dirs.add(d5); + + when(dir.getCurrentDirectories()).thenReturn(dirs); + when(d1.isDeleteDelta()).thenReturn(true); + when(d1.getMinWriteId()).thenReturn(7L); + when(d1.getMaxWriteId()).thenReturn(11L); + when(d1.getPath()).thenReturn(new Path("/compaction/test/table", "test_delta_1")); + when(d2.isDeleteDelta()).thenReturn(true); + when(d2.getMinWriteId()).thenReturn(1L); + when(d2.getMaxWriteId()).thenReturn(11L); + when(d2.getPath()).thenReturn(new Path("/compaction/test/table", "test_delta_2")); + when(d3.isDeleteDelta()).thenReturn(true); + when(d3.getMinWriteId()).thenReturn(5L); + when(d3.getMaxWriteId()).thenReturn(15L); + when(d3.getPath()).thenReturn(new Path("/compaction/test/table", "test_delta_3")); + when(d4.isDeleteDelta()).thenReturn(true); + when(d4.getMinWriteId()).thenReturn(7L); + when(d4.getMaxWriteId()).thenReturn(20L); + when(d4.getPath()).thenReturn(new Path("/compaction/test/table", "test_delta_4")); + when(d5.isDeleteDelta()).thenReturn(false); + when(d5.getMinWriteId()).thenReturn(6L); + when(d5.getMaxWriteId()).thenReturn(11L); + when(d5.getPath()).thenReturn(new Path("/compaction/test/table", "test_delta_5")); + return dir; + } + + protected ValidCompactorWriteIdList createWriteId(long minWriteId) { + long[] abortedWriteIdList = { 1111L }; + ValidCompactorWriteIdList writeIds = + new ValidCompactorWriteIdList("comp_test_source_table", abortedWriteIdList, null, 15L, minWriteId); + return writeIds; Review Comment: maybe we can return directly ``` return new ValidCompactorWriteIdList("comp_test_source_table", abortedWriteIdList, null, 15L, minWriteId); ``` ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMinor.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hive.common.util.HiveStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Builds query strings that help with query-based MINOR compaction of CRUD. + */ +class CompactionQueryBuilderForMinor extends CompactionQueryBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionQueryBuilderForMinor.class.getName()); + + CompactionQueryBuilderForMinor() { + super(CompactionType.MINOR, false); + } + + @Override + protected void buildSelectClauseForInsert(StringBuilder query) { + query.append("`operation`, `originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row`"); + } + + @Override + protected void buildWhereClauseForInsert(StringBuilder query) { + if (validWriteIdList != null) { + long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds(); + if (invalidWriteIds.length > 0) { + query.append(" where `originalTransaction` not in (") + .append(StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")).append(")"); + } + } + } + + @Override + protected void getDdlForCreate(StringBuilder query) { + defineColumns(query); + + // PARTITIONED BY. Used for parts of minor compaction. + if (isPartitioned) { + query.append(" PARTITIONED BY (`file_name` STRING) "); + } + + // CLUSTERED BY. (bucketing) + int bucketingVersion = getMinorCrudBucketing(query); + + // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT + query.append(" stored as orc"); + + // LOCATION + if (location != null) { + query.append(" LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("'"); + } + + // TBLPROPERTIES + addTblProperties(query, isBucketed, bucketingVersion); + } + + /** + * Part of Create operation. Minor crud compaction uses its own bucketing system. + */ + private int getMinorCrudBucketing(StringBuilder query) { + int bucketingVersion = 0; + if (isBucketed && sourceTab != null) { // skip if sourceTab is null to avoid NPEs + int numBuckets = 1; + try { + org.apache.hadoop.hive.ql.metadata.Table t = getTable(); + numBuckets = Math.max(t.getNumBuckets(), numBuckets); + bucketingVersion = t.getBucketingVersion(); + } catch (HiveException e) { + LOG.info("Error finding table {}. Minor compaction result will use 0 buckets.", sourceTab.getTableName()); + } finally { + query.append(" clustered by (`bucket`)").append(" sorted by (`originalTransaction`, `bucket`, `rowId`)") + .append(" into ").append(numBuckets).append(" buckets"); + } + } + return bucketingVersion; + } + + protected org.apache.hadoop.hive.ql.metadata.Table getTable() throws HiveException { + org.apache.hadoop.hive.ql.metadata.Table t = Hive.get().getTable(sourceTab.getDbName(), sourceTab.getTableName()); + return t; + } Review Comment: nit can we return directly? ``` protected org.apache.hadoop.hive.ql.metadata.Table getTable() throws HiveException { return Hive.get().getTable(sourceTab.getDbName(), sourceTab.getTableName()); ``` ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderTest.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.mockito.Mockito; + +import java.util.*; + +import static org.mockito.Mockito.when; + +public class CompactionQueryBuilderTest { + + public static final String DB_NAME = "comp_test_db"; + public static final String SOURCE_TABLE_NAME = "comp_test_source_table"; + public static final String RESULT_TABLE_NAME = "comp_test_result_table"; + public static final String COLUMN_1_NAME = "column_1"; + public static final String COLUMN_2_NAME = "column_2"; + public static final String COLUMN_3_NAME = "column_3"; + public static final String SOME_TEST_LOCATION = "some_test_path"; + public static final String COMP_TEST_SOURCE_TABLE_FOR_INSERT = "comp_test_db.comp_test_insert_table"; + + public Table createSourceTable() { + return createSourceTable(false, false, false); + } + + public Table createSourceTableWithProperties() { + return createSourceTable(true, false, false); + } + + public Table createSourceTableBucketed() { + return createSourceTable(false, true, false); + } + + public Table createSourceTableBucketedSorted() { + return createSourceTable(false, true, true); + } + + private Table createSourceTable(boolean addTableProperties, boolean bucketed, boolean sorted) { + Table sourceTable = new Table(); + sourceTable.setDbName(DB_NAME); + sourceTable.setTableName(SOURCE_TABLE_NAME); + + StorageDescriptor sd = new StorageDescriptor(); + List<FieldSchema> columns = new ArrayList<>(); + FieldSchema col1 = new FieldSchema(COLUMN_1_NAME, "string", "First column"); + FieldSchema col2 = new FieldSchema(COLUMN_2_NAME, "int", null); + FieldSchema col3 = new FieldSchema(COLUMN_3_NAME, "boolean", "Third column"); + columns.add(col1); + columns.add(col2); + columns.add(col3); + sd.setCols(columns); + + if (bucketed) { + sd.addToBucketCols(COLUMN_1_NAME); + sd.addToBucketCols(COLUMN_3_NAME); + sd.setNumBuckets(4); + } else { + sd.setBucketCols(Collections.emptyList()); + } + + if (sorted) { + sd.addToSortCols(new Order(COLUMN_1_NAME, 0)); + sd.addToSortCols(new Order(COLUMN_2_NAME, 1)); + } else { + sd.setSortCols(Collections.emptyList()); + } + + Map<String, String> parameters = new HashMap<>(); + if (addTableProperties) { + parameters.put("property_1", "true"); + parameters.put("orc.property_2", "44"); + parameters.put("COLUMN_STATS_ACCURATE", "false"); + parameters.put("columns.types", "test"); + } + sourceTable.setParameters(parameters); + sourceTable.setSd(sd); + return sourceTable; + } + + protected AcidDirectory createAcidDirectory() { + AcidDirectory dir = Mockito.mock(AcidDirectory.class); + AcidUtils.ParsedDelta d1 = Mockito.mock(AcidUtils.ParsedDelta.class); + AcidUtils.ParsedDelta d2 = Mockito.mock(AcidUtils.ParsedDelta.class); + AcidUtils.ParsedDelta d3 = Mockito.mock(AcidUtils.ParsedDelta.class); + AcidUtils.ParsedDelta d4 = Mockito.mock(AcidUtils.ParsedDelta.class); + AcidUtils.ParsedDelta d5 = Mockito.mock(AcidUtils.ParsedDelta.class); + + List<AcidUtils.ParsedDelta> dirs = new ArrayList<>(); + dirs.add(d1); + dirs.add(d2); + dirs.add(d3); + dirs.add(d4); + dirs.add(d5); + + when(dir.getCurrentDirectories()).thenReturn(dirs); + when(d1.isDeleteDelta()).thenReturn(true); + when(d1.getMinWriteId()).thenReturn(7L); + when(d1.getMaxWriteId()).thenReturn(11L); + when(d1.getPath()).thenReturn(new Path("/compaction/test/table", "test_delta_1")); + when(d2.isDeleteDelta()).thenReturn(true); + when(d2.getMinWriteId()).thenReturn(1L); + when(d2.getMaxWriteId()).thenReturn(11L); + when(d2.getPath()).thenReturn(new Path("/compaction/test/table", "test_delta_2")); + when(d3.isDeleteDelta()).thenReturn(true); + when(d3.getMinWriteId()).thenReturn(5L); + when(d3.getMaxWriteId()).thenReturn(15L); + when(d3.getPath()).thenReturn(new Path("/compaction/test/table", "test_delta_3")); + when(d4.isDeleteDelta()).thenReturn(true); + when(d4.getMinWriteId()).thenReturn(7L); + when(d4.getMaxWriteId()).thenReturn(20L); + when(d4.getPath()).thenReturn(new Path("/compaction/test/table", "test_delta_4")); + when(d5.isDeleteDelta()).thenReturn(false); + when(d5.getMinWriteId()).thenReturn(6L); + when(d5.getMaxWriteId()).thenReturn(11L); + when(d5.getPath()).thenReturn(new Path("/compaction/test/table", "test_delta_5")); + return dir; + } + + protected ValidCompactorWriteIdList createWriteId(long minWriteId) { + long[] abortedWriteIdList = { 1111L }; + ValidCompactorWriteIdList writeIds = + new ValidCompactorWriteIdList("comp_test_source_table", abortedWriteIdList, null, 15L, minWriteId); + return writeIds; + } + Review Comment: super nit avoid this empty line ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Collections; + +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +public class TestCompactionQueryBuilderForMajorCompaction extends CompactionQueryBuilderTest { + + @Test + public void testCreateNoSourceTable() { + CompactionQueryBuilder queryBuilder = getMajorCompactionQueryBuilderForCreate(); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table stored as orc TBLPROPERTIES ('compactiontable'='MAJOR', 'transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testCreate() { + CompactionQueryBuilder queryBuilder = getMajorCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTable(); + queryBuilder.setSourceTab(sourceTable); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` :int,`column_3` :boolean>) stored as orc TBLPROPERTIES ('compactiontable'='MAJOR', 'transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testCreateWithSourceTableProperties() { + CompactionQueryBuilder queryBuilder = getMajorCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTableWithProperties(); + queryBuilder.setSourceTab(sourceTable); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` :int,`column_3` :boolean>) stored as orc TBLPROPERTIES ('compactiontable'='MAJOR', 'orc.property_2'='44', 'transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testCreateWithSourceTableLocation() { + CompactionQueryBuilder queryBuilder = getMajorCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTable(); + queryBuilder.setSourceTab(sourceTable); + queryBuilder.setLocation(SOME_TEST_LOCATION); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` :int,`column_3` :boolean>) stored as orc LOCATION 'some_test_path' TBLPROPERTIES ('compactiontable'='MAJOR', 'transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testCreateWithPartitionedSourceTable() { + CompactionQueryBuilder queryBuilder = getMajorCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTable(); + queryBuilder.setSourceTab(sourceTable); + queryBuilder.setPartitioned(true); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` :int,`column_3` :boolean>) PARTITIONED BY (`file_name` STRING) stored as orc TBLPROPERTIES ('compactiontable'='MAJOR', 'transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testInsert() { + CompactionQueryBuilder queryBuilder = getMajorCompactionQueryBuilderForInsert(); + Table sourceTable = createSourceTable(); + queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT); + Partition sourcePartition = new Partition(); + sourcePartition.addToValues("source_part_1"); + sourcePartition.addToValues("true"); + sourcePartition.addToValues("4444"); + + sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string", "comment 1")); + sourceTable.addToPartitionKeys(new FieldSchema("source_part_2", "boolean", "comment 2")); + sourceTable.addToPartitionKeys(new FieldSchema("source_part_3", "int", "comment 3")); + queryBuilder.setSourceTab(sourceTable); + queryBuilder.setSourcePartition(sourcePartition); + + String query = queryBuilder.build(); + String expectedQuery = + "INSERT into table comp_test_result_table select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3', `column_3`) from comp_test_db.comp_test_insert_table where `source_part_1`='source_part_1' and `source_part_2`=true and `source_part_3`='4444'"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testInsertPartitionMismatch() { + CompactionQueryBuilder queryBuilder = getMajorCompactionQueryBuilderForInsert(); + Table sourceTable = createSourceTable(); + queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT); + Partition sourcePartition = new Partition(); + sourcePartition.addToValues("source_part_1"); + sourcePartition.addToValues("true"); + sourcePartition.addToValues("4444"); + + sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string", "comment 1")); + sourceTable.addToPartitionKeys(new FieldSchema("source_part_2", "boolean", "comment 2")); + queryBuilder.setSourceTab(sourceTable); + queryBuilder.setSourcePartition(sourcePartition); + + Exception exception = assertThrows(IllegalStateException.class, () -> { + queryBuilder.build(); + }); Review Comment: Can we change to ``` Exception exception = assertThrows(IllegalStateException.class, queryBuilder::build); ``` ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.junit.Assert; +import org.junit.jupiter.api.Test; Review Comment: ``import org.junit.Test;`` ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Collections; + +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; Review Comment: Not sure if you need static import, for assertEquals we didn't had, anyway this is junit-5 ``` import static org.junit.Assert.assertTrue; ``` ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java: ########## @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestCompactionQueryBuilderForMmCompaction extends CompactionQueryBuilderTest { + + @Test + public void testMajorCompactionCreateWithoutSourceTable() { + CompactionQueryBuilder queryBuilder = getMmMajorCompactionQueryBuilderForCreate(); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table TBLPROPERTIES ('transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testMajorCompactionCreateWithTablePropertiesWithLocation() { + CompactionQueryBuilder queryBuilder = getMmMajorCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTableWithProperties(); + queryBuilder.setSourceTab(sourceTable); + queryBuilder.setLocation(SOME_TEST_LOCATION); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`column_1` string,`column_2` int,`column_3` boolean) LOCATION 'some_test_path' TBLPROPERTIES ('property_1'='true', 'orc.property_2'='44', 'transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testMajorCompactionCreatePartitioned() { + CompactionQueryBuilder queryBuilder = getMmMajorCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTable(); + queryBuilder.setSourceTab(sourceTable); + queryBuilder.setPartitioned(true); + queryBuilder.setLocation(SOME_TEST_LOCATION); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`column_1` string,`column_2` int,`column_3` boolean) PARTITIONED BY (`file_name` STRING) LOCATION 'some_test_path' TBLPROPERTIES ('transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testMajorCompactionCreateWithBucketedSourceTable() throws HiveException { + CompactionQueryBuilder queryBuilder = getMmMajorCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTableBucketed(); + queryBuilder.setSourceTab(sourceTable); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`column_1` string,`column_2` int,`column_3` boolean) CLUSTERED BY (column_1,column_3) INTO 4 BUCKETS TBLPROPERTIES ('transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testMajorCompactionCreateWithBucketedSortedSourceTable() throws HiveException { + CompactionQueryBuilder queryBuilder = getMmMajorCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTableBucketedSorted(); + queryBuilder.setSourceTab(sourceTable); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`column_1` string,`column_2` int,`column_3` boolean) CLUSTERED BY (column_1,column_3) SORTED BY (column_1 DESC, column_2 ASC) INTO 4 BUCKETS TBLPROPERTIES ('transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testMajorCompactionCreateWithStorageDescriptor() throws HiveException { + CompactionQueryBuilder queryBuilder = getMmMajorCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTable(); + queryBuilder.setSourceTab(sourceTable); + StorageDescriptor storageDescriptor = createStorageDescriptor(); + queryBuilder.setStorageDescriptor(storageDescriptor); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`column_1` string,`column_2` int,`column_3` boolean) ROW FORMAT SERDE '/some/test/serialization_lib'WITH SERDEPROPERTIES ( \n" + " 'test_param_1'='test_value', \n" + " 'test_param_2'='test_value')STORED AS INPUTFORMAT 'some.test.InputFormat' OUTPUTFORMAT 'some.test.OutputFormat' TBLPROPERTIES ('transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testMajorCompactionCreateWithSkewedByClause() throws HiveException { + CompactionQueryBuilder queryBuilder = getMmMajorCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTable(); + StorageDescriptor storageDescriptor = sourceTable.getSd(); + SkewedInfo skewedInfo = new SkewedInfo(); + skewedInfo.addToSkewedColNames("column_1"); + List<String> skewedColValues = new ArrayList<>(); + skewedColValues.add("value1"); + skewedColValues.add("value2"); + skewedColValues.add("value3"); + skewedInfo.addToSkewedColValues(skewedColValues); + storageDescriptor.setSkewedInfo(skewedInfo); + storageDescriptor.setStoredAsSubDirectories(true); + sourceTable.setSd(storageDescriptor); + queryBuilder.setSourceTab(sourceTable); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`column_1` string,`column_2` int,`column_3` boolean) SKEWED BY (column_1) ON ('value1','value2','value3')) STORED AS DIRECTORIES TBLPROPERTIES ('transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testMajorCompactionCreateWithNonNativeTable() throws HiveException { + CompactionQueryBuilder queryBuilder = getMmMajorCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTable(); + Map<String, String> parameters = new HashMap<>(); + parameters.put("storage_handler", "test_storage_handler"); + sourceTable.setParameters(parameters); + queryBuilder.setSourceTab(sourceTable); + StorageDescriptor storageDescriptor = createStorageDescriptor(); + queryBuilder.setStorageDescriptor(storageDescriptor); + Exception exception = assertThrows(RuntimeException.class, () -> { + queryBuilder.build(); + }); Review Comment: Can replace with ``` Exception exception = assertThrows(RuntimeException.class, queryBuilder::build); ``` ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.junit.Assert; +import org.junit.jupiter.api.Test; Review Comment: use the same as the assert ``` import org.junit.Test; ``` ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestCompactionQueryBuilderForRebalanceCompaction extends CompactionQueryBuilderTest { + + @Test + public void testCreate() { + CompactionQueryBuilder queryBuilder = getRebalanceCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTableWithProperties(); + queryBuilder.setSourceTab(sourceTable); + queryBuilder.setLocation(SOME_TEST_LOCATION); + queryBuilder.setPartitioned(true); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` :int,`column_3` :boolean>) PARTITIONED BY (`file_name` STRING) stored as orc LOCATION 'some_test_path' TBLPROPERTIES ('compactiontable'='REBALANCE', 'orc.property_2'='44', 'transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testCreateWithNonPartitionedSourceTable() { + CompactionQueryBuilder queryBuilder = getRebalanceCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTableWithProperties(); + queryBuilder.setSourceTab(sourceTable); + queryBuilder.setLocation(SOME_TEST_LOCATION); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` :int,`column_3` :boolean>) stored as orc LOCATION 'some_test_path' TBLPROPERTIES ('compactiontable'='REBALANCE', 'orc.property_2'='44', 'transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testCreateWithNoLocationAndNoTableProperties() { + CompactionQueryBuilder queryBuilder = getRebalanceCompactionQueryBuilderForCreate(); + Table sourceTable = createSourceTable(); + queryBuilder.setSourceTab(sourceTable); + String query = queryBuilder.build(); + String expectedQuery = + "CREATE temporary external table comp_test_result_table(`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` :int,`column_3` :boolean>) stored as orc TBLPROPERTIES ('compactiontable'='REBALANCE', 'transactional'='false')"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testInsert() { + CompactionQueryBuilder queryBuilder = getRebalanceCompactionQueryBuilderForInsert(); + Table sourceTable = createSourceTable(); + queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT); + queryBuilder.setSourceTab(sourceTable); + queryBuilder.setNumberOfBuckets(5); + queryBuilder.setOrderByClause("ORDER BY column_1 ASC, column_3 DESC"); + String query = queryBuilder.build(); + String expectedQuery = + "INSERT overwrite table comp_test_result_table select 0, t2.writeId, t2.rowId DIV CEIL(numRows / 5), t2.rowId, t2.writeId, t2.data from (select count(ROW__ID.writeId) over() as numRows, MAX(ROW__ID.writeId) over() as writeId, row_number() OVER (ORDER BY column_1 ASC, column_3 DESC) - 1 AS rowId, NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3', `column_3`) as data from comp_test_db.comp_test_insert_table ORDER BY column_1 ASC, column_3 DESC) t2"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testInsertWithPartitionedTable() { + CompactionQueryBuilder queryBuilder = getRebalanceCompactionQueryBuilderForInsert(); + Table sourceTable = createSourceTable(); + queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT); + Partition sourcePartition = new Partition(); + sourcePartition.addToValues("source_part_1"); + sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string", "comment 1")); + queryBuilder.setSourceTab(sourceTable); + queryBuilder.setSourcePartition(sourcePartition); + + String query = queryBuilder.build(); + String expectedQuery = + "INSERT overwrite table comp_test_result_table select 0, t2.writeId, t2.rowId DIV CEIL(numRows / 0), t2.rowId, t2.writeId, t2.data from (select count(ROW__ID.writeId) over() as numRows, ROW__ID.writeId as writeId, row_number() OVER (order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC) - 1 AS rowId, NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3', `column_3`) as data from comp_test_db.comp_test_insert_table order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC) t2"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testInsertOnlySourceTableIsSet() { + CompactionQueryBuilder queryBuilder = getRebalanceCompactionQueryBuilderForInsert(); + Table sourceTable = createSourceTable(); + queryBuilder.setSourceTab(sourceTable); + String query = queryBuilder.build(); + String expectedQuery = + "INSERT overwrite table comp_test_result_table select 0, t2.writeId, t2.rowId DIV CEIL(numRows / 0), t2.rowId, t2.writeId, t2.data from (select count(ROW__ID.writeId) over() as numRows, ROW__ID.writeId as writeId, row_number() OVER (order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC) - 1 AS rowId, NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3', `column_3`) as data from comp_test_db.comp_test_source_table order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC) t2"; + Assert.assertEquals(expectedQuery, query); + } + + @Test + public void testInsertNoSourceTable() { + CompactionQueryBuilder queryBuilder = getRebalanceCompactionQueryBuilderForInsert(); + queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT); + String query = queryBuilder.build(); + String expectedQuery = + "INSERT overwrite table comp_test_result_table select from comp_test_db.comp_test_insert_table order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC) t2"; + Assert.assertEquals(expectedQuery, query); + } + + public void testAlter() { Review Comment: Misses test annotation ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; Review Comment: Same import issues like above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org