kasakrisz commented on code in PR #4852:
URL: https://github.com/apache/hive/pull/4852#discussion_r1398782142
##########
iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java:
##########
@@ -587,31 +602,13 @@ public void testUpdateForSupportedTypes() throws
IOException {
}
@Test
- public void testDeleteStatementFormatV1() {
- // create and insert an initial batch of records
- testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
- PartitionSpec.unpartitioned(), fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2);
- // insert one more batch so that we have multiple data files within the
same partition
-
shell.executeStatement(testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1,
- TableIdentifier.of("default", "customers"), false));
- AssertHelpers.assertThrows("should throw exception",
IllegalArgumentException.class,
- "Attempt to do update or delete on table", () -> {
- shell.executeStatement("DELETE FROM customers WHERE customer_id=3 or
first_name='Joanna'");
- });
+ public void testDeleteStatementFormatV1() throws TException,
InterruptedException {
Review Comment:
Testing V1 tables in a test class named `TestHiveIcebergV2.java` ?
##########
iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java:
##########
@@ -587,31 +602,13 @@ public void testUpdateForSupportedTypes() throws
IOException {
}
@Test
- public void testDeleteStatementFormatV1() {
- // create and insert an initial batch of records
- testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
- PartitionSpec.unpartitioned(), fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2);
- // insert one more batch so that we have multiple data files within the
same partition
-
shell.executeStatement(testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1,
- TableIdentifier.of("default", "customers"), false));
- AssertHelpers.assertThrows("should throw exception",
IllegalArgumentException.class,
- "Attempt to do update or delete on table", () -> {
- shell.executeStatement("DELETE FROM customers WHERE customer_id=3 or
first_name='Joanna'");
- });
+ public void testDeleteStatementFormatV1() throws TException,
InterruptedException {
+ executeDeleteAndValidateDataUnpartitioned(1);
Review Comment:
Are there any tests that expects an exception is thrown?
This seems to be removed:
```
AssertHelpers.assertThrows("should throw exception",
IllegalArgumentException.class,
"Attempt to do update or delete on table", () -> {
shell.executeStatement("DELETE FROM customers WHERE customer_id=3
or first_name='Joanna'");
});
```
##########
iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java:
##########
@@ -403,10 +407,21 @@ public void testDeleteForSupportedTypes() throws
IOException {
}
@Test
- public void testUpdateStatementUnpartitioned() {
+ public void testUpdateStatementUnpartitioned() throws TException,
InterruptedException {
+ executeUpdateAndValidateDataUnpartitioned(2);
+ }
+
+ private void executeUpdateAndValidateDataUnpartitioned(int formatVersion)
throws TException, InterruptedException {
// create and insert an initial batch of records
testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
- PartitionSpec.unpartitioned(), fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+ PartitionSpec.unpartitioned(), fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2,
+ formatVersion);
+
+ // verify update mode set to merge-on-read
+ Assert.assertEquals(formatVersion == 2 ?
HiveIcebergStorageHandler.MERGE_ON_READ : null,
+ shell.metastore().getTable("default", "customers")
+ .getParameters().get(TableProperties.UPDATE_MODE));
Review Comment:
Are there any tests for copy-on-write ?
##########
ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/CopyOnWriteMergeRewriter.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.rewrite;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.parse.ParseUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.rewrite.sql.COWWithClauseBuilder;
+import org.apache.hadoop.hive.ql.parse.rewrite.sql.MultiInsertSqlGenerator;
+import org.apache.hadoop.hive.ql.parse.rewrite.sql.SqlGeneratorFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.UnaryOperator;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.hadoop.hive.ql.parse.rewrite.sql.SqlGeneratorFactory.TARGET_PREFIX;
+
+public class CopyOnWriteMergeRewriter extends MergeRewriter {
+
+ public CopyOnWriteMergeRewriter(Hive db, HiveConf conf, SqlGeneratorFactory
sqlGeneratorFactory) {
+ super(db, conf, sqlGeneratorFactory);
+ }
+
+ @Override
+ public ParseUtils.ReparseResult rewrite(Context ctx, MergeStatement
mergeStatement) throws SemanticException {
+
+ setOperation(ctx);
+ MultiInsertSqlGenerator sqlGenerator =
sqlGeneratorFactory.createSqlGenerator();
+ handleSource(mergeStatement, sqlGenerator);
+
+ sqlGenerator.append('\n');
+ sqlGenerator.append("INSERT INTO ").appendTargetTableName();
+ sqlGenerator.append('\n');
+
+ List<MergeStatement.WhenClause> whenClauses =
Lists.newArrayList(mergeStatement.getWhenClauses());
+
+ Optional<String> extraPredicate = whenClauses.stream()
+ .filter(whenClause -> !(whenClause instanceof
MergeStatement.InsertClause))
+ .map(MergeStatement.WhenClause::getExtraPredicate)
+ .map(Strings::nullToEmpty)
+ .reduce((p1, p2) -> isNotBlank(p2) ? p1 + " OR " + p2 : p2);
+
+ whenClauses.removeIf(whenClause -> whenClause instanceof
MergeStatement.DeleteClause);
+ extraPredicate.ifPresent(p -> whenClauses.add(new
MergeStatement.DeleteClause(p, null)));
+
+ MergeStatement.MergeSqlGenerator mergeSqlGenerator =
createMergeSqlGenerator(mergeStatement, sqlGenerator);
+
+ for (MergeStatement.WhenClause whenClause : whenClauses) {
+ whenClause.toSql(mergeSqlGenerator);
+ }
+
+ // TODO: handleCardinalityViolation;
+
+ ParseUtils.ReparseResult rr = ParseUtils.parseRewrittenQuery(ctx,
sqlGenerator.toString());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ setOperation(rewrittenCtx);
+
+ //set dest name mapping on new context; 1st child is TOK_FROM
+ rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.MERGE);
+ return rr;
+ }
+
+ @Override
+ protected CopyOnWriteMergeWhenClauseSqlGenerator createMergeSqlGenerator(
+ MergeStatement mergeStatement, MultiInsertSqlGenerator sqlGenerator) {
+ return new CopyOnWriteMergeWhenClauseSqlGenerator(conf, sqlGenerator,
mergeStatement);
+ }
+
+ private void handleSource(MergeStatement mergeStatement,
MultiInsertSqlGenerator sqlGenerator) {
+ boolean hasWhenNotMatchedInsertClause =
mergeStatement.hasWhenNotMatchedInsertClause();
+
+ String sourceName = mergeStatement.getSourceName();
+ String sourceAlias = mergeStatement.getSourceAlias();
+
+ String targetAlias = mergeStatement.getTargetAlias();
+ String onClauseAsString = replaceColumnRefsWithTargetPrefix(targetAlias,
mergeStatement.getOnClauseAsText());
+
+ sqlGenerator.newCteExpr();
+
+ sqlGenerator.append(sourceName + " AS ( SELECT * FROM\n");
+ sqlGenerator.append("(SELECT ");
+ sqlGenerator.appendAcidSelectColumns(Context.Operation.MERGE);
+ sqlGenerator.appendAllColsOfTargetTable(TARGET_PREFIX);
+ sqlGenerator.append(" FROM ").appendTargetTableName().append(") ");
+ sqlGenerator.append(targetAlias);
+ sqlGenerator.append('\n');
+ sqlGenerator.indent().append(hasWhenNotMatchedInsertClause ? "FULL OUTER
JOIN" : "LEFT OUTER JOIN").append("\n");
+ sqlGenerator.indent().append(sourceAlias);
+ sqlGenerator.append('\n');
+ sqlGenerator.indent().append("ON ").append(onClauseAsString);
+ sqlGenerator.append('\n');
+ sqlGenerator.append(")");
+
+ sqlGenerator.addCteExpr();
+ }
+
+ private static String replaceColumnRefsWithTargetPrefix(String columnRef,
String strValue) {
+ return strValue.replaceAll(columnRef + "\\.(`?)", "$1" + TARGET_PREFIX);
+ }
+
+ static class CopyOnWriteMergeWhenClauseSqlGenerator extends
MergeRewriter.MergeWhenClauseSqlGenerator {
+
+ private final COWWithClauseBuilder cowWithClauseBuilder;
+
+ CopyOnWriteMergeWhenClauseSqlGenerator(
+ HiveConf conf, MultiInsertSqlGenerator sqlGenerator, MergeStatement
mergeStatement) {
+ super(conf, sqlGenerator, mergeStatement);
+ this.cowWithClauseBuilder = new COWWithClauseBuilder();
+ }
+
+ @Override
+ public void appendWhenNotMatchedInsertClause(MergeStatement.InsertClause
insertClause) {
+ String targetAlias = mergeStatement.getTargetAlias();
+
+ if (mergeStatement.getWhenClauses().size() > 1) {
+ sqlGenerator.append("union all\n");
+ }
+ sqlGenerator.append(" -- insert clause\n").append("SELECT ");
+
+ if (isNotBlank(hintStr)) {
+ sqlGenerator.append(hintStr);
+ hintStr = null;
+ }
+ List<String> values =
sqlGenerator.getDeleteValues(Context.Operation.MERGE);
+ values.add(insertClause.getValuesClause());
+
+ sqlGenerator.append(StringUtils.join(values, ","));
+ sqlGenerator.append("\nFROM " + mergeStatement.getSourceName());
+ sqlGenerator.append("\n WHERE ");
+
+ StringBuilder whereClause = new
StringBuilder(insertClause.getPredicate());
+
+ if (insertClause.getExtraPredicate() != null) {
+ //we have WHEN NOT MATCHED AND <boolean expr> THEN INSERT
+ whereClause.append(" AND ").append(insertClause.getExtraPredicate());
+ }
+ sqlGenerator.append(
+ replaceColumnRefsWithTargetPrefix(targetAlias,
whereClause.toString()));
+ sqlGenerator.append('\n');
+ }
+
+ @Override
+ public void appendWhenMatchedUpdateClause(MergeStatement.UpdateClause
updateClause) {
+ Table targetTable = mergeStatement.getTargetTable();
+ String targetAlias = mergeStatement.getTargetAlias();
+ String onClauseAsString = mergeStatement.getOnClauseAsText();
+
+ UnaryOperator<String> columnRefsFunc = value ->
replaceColumnRefsWithTargetPrefix(targetAlias, value);
Review Comment:
Can this be extracted as a constant?
##########
iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java:
##########
@@ -587,31 +602,13 @@ public void testUpdateForSupportedTypes() throws
IOException {
}
@Test
- public void testDeleteStatementFormatV1() {
- // create and insert an initial batch of records
- testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
- PartitionSpec.unpartitioned(), fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2);
- // insert one more batch so that we have multiple data files within the
same partition
-
shell.executeStatement(testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1,
- TableIdentifier.of("default", "customers"), false));
- AssertHelpers.assertThrows("should throw exception",
IllegalArgumentException.class,
- "Attempt to do update or delete on table", () -> {
- shell.executeStatement("DELETE FROM customers WHERE customer_id=3 or
first_name='Joanna'");
- });
+ public void testDeleteStatementFormatV1() throws TException,
InterruptedException {
+ executeDeleteAndValidateDataUnpartitioned(1);
}
@Test
- public void testUpdateStatementFormatV1() {
- // create and insert an initial batch of records
- testTables.createTable(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
- PartitionSpec.unpartitioned(), fileFormat,
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2);
- // insert one more batch so that we have multiple data files within the
same partition
-
shell.executeStatement(testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1,
- TableIdentifier.of("default", "customers"), false));
- AssertHelpers.assertThrows("should throw exception",
IllegalArgumentException.class,
- "Attempt to do update or delete on table", () -> {
- shell.executeStatement("UPDATE customers SET last_name='Changed'
WHERE customer_id=3 or first_name='Joanna'");
- });
+ public void testUpdateStatementFormatV1() throws TException,
InterruptedException {
+ executeUpdateAndValidateDataUnpartitioned(1);
Review Comment:
AssertHelpers.assertThrows removed, is this intentional?
##########
ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/CopyOnWriteMergeRewriter.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.rewrite;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.parse.ParseUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.rewrite.sql.COWWithClauseBuilder;
+import org.apache.hadoop.hive.ql.parse.rewrite.sql.MultiInsertSqlGenerator;
+import org.apache.hadoop.hive.ql.parse.rewrite.sql.SqlGeneratorFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.UnaryOperator;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.hadoop.hive.ql.parse.rewrite.sql.SqlGeneratorFactory.TARGET_PREFIX;
+
+public class CopyOnWriteMergeRewriter extends MergeRewriter {
Review Comment:
What is used from `MergeRewriter`? Isn't it better to implement
`Rewriter<MergeStatement>` interface ?
##########
ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeRewriter.java:
##########
@@ -217,13 +218,17 @@ public void
appendWhenMatchedUpdateClause(MergeStatement.UpdateClause updateClau
}
protected void addValues(Table targetTable, String targetAlias,
Map<String, String> newValues,
- List<String> values) {
+ List<String> values, boolean aliasRhsExpr) {
for (FieldSchema fieldSchema : targetTable.getCols()) {
+ String value = String.format("%s.%s", targetAlias,
HiveUtils.unparseIdentifier(fieldSchema.getName(), conf));
if (newValues.containsKey(fieldSchema.getName())) {
- values.add(newValues.get(fieldSchema.getName()));
+ String rhsExp = newValues.get(fieldSchema.getName());
+ if (aliasRhsExpr){
+ rhsExp += String.format(" AS %s", value);
+ }
+ values.add(rhsExp);
Review Comment:
How about extracting this to method to get rid of the boolean flag
`aliasRhsExpr` since UUIC it's value is implementation specific?
```
getNewValue(...) {
return fieldSchema.getName();
}
```
and override it in `CopyOnWriteMergeWhenClauseSqlGenerator`
```
@Override
getNewValue(...) {
String value = String.format("%s.%s", targetAlias,
HiveUtils.unparseIdentifier(fieldSchema.getName(), conf));
return String.format("%s AS %s", fieldSchema.getName(), value);
}
```
##########
iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_copy_on_write_partitioned.q:
##########
@@ -0,0 +1,25 @@
+-- SORT_QUERY_RESULTS
+set hive.explain.user=false;
+
+drop table if exists target_ice;
+drop table if exists source;
+
+create external table target_ice(a int, b string, c int) partitioned by spec
(bucket(16, a), truncate(3, b)) stored by iceberg tblproperties
('format-version'='2', 'write.merge.mode'='copy-on-write');
+create table source(a int, b string, c int);
+
+insert into target_ice values (1, 'one', 50), (2, 'two', 51), (111, 'one',
55), (333, 'two', 56);
+insert into source values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52),
(4, 'four', 53), (5, 'five', 54), (111, 'one', 55);
+
+-- merge
+explain
+merge into target_ice as t using source src ON t.a = src.a
+when matched and t.a > 100 THEN DELETE
+when matched then update set b = 'Merged', c = t.c + 10
+when not matched then insert values (src.a, src.b, src.c);
+
+merge into target_ice as t using source src ON t.a = src.a
+when matched and t.a > 100 THEN DELETE
+when matched then update set b = 'Merged', c = t.c + 10
+when not matched then insert values (src.a, src.b, src.c);
+
Review Comment:
It is worth adding some tests where merge does not have all insert, delete
and update clauses.
```
merge into target_ice as t using source src ON t.a = src.a
when matched and t.a > 100 THEN DELETE
when matched then update set b = 'Merged', c = t.c + 10;
```
```
merge into target_ice as t using source src ON t.a = src.a
when not matched then insert values (src.a, src.b, src.c);
```
WDYT?
##########
ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/CopyOnWriteUpdateRewriter.java:
##########
@@ -45,40 +45,51 @@ public class CopyOnWriteUpdateRewriter implements
Rewriter<UpdateStatement> {
private final SetClausePatcher setClausePatcher;
- public CopyOnWriteUpdateRewriter(HiveConf conf, SqlGeneratorFactory
sqlGeneratorFactory,
- COWWithClauseBuilder cowWithClauseBuilder,
SetClausePatcher setClausePatcher) {
+ public CopyOnWriteUpdateRewriter(HiveConf conf, SqlGeneratorFactory
sqlGeneratorFactory) {
this.conf = conf;
this.sqlGeneratorFactory = sqlGeneratorFactory;
- this.cowWithClauseBuilder = cowWithClauseBuilder;
- this.setClausePatcher = setClausePatcher;
+ this.cowWithClauseBuilder = new COWWithClauseBuilder();
+ this.setClausePatcher = new SetClausePatcher();
}
@Override
public ParseUtils.ReparseResult rewrite(Context context, UpdateStatement
updateBlock)
throws SemanticException {
- Tree wherePredicateNode = updateBlock.getWhereTree().getChild(0);
- String whereClause = context.getTokenRewriteStream().toString(
- wherePredicateNode.getTokenStartIndex(),
wherePredicateNode.getTokenStopIndex());
String filePathCol =
HiveUtils.unparseIdentifier(VirtualColumn.FILE_PATH.getName(), conf);
-
MultiInsertSqlGenerator sqlGenerator =
sqlGeneratorFactory.createSqlGenerator();
- cowWithClauseBuilder.appendWith(sqlGenerator, filePathCol, whereClause);
-
- sqlGenerator.append("insert into table ");
+ String whereClause = null;
+ int columnOffset = 0;
+
+ boolean shouldOverwrite = updateBlock.getWhereTree() == null;
Review Comment:
1. I think it would be better to move this to another Jira and patch since
this isn't really about Merge.
2. I would like to avoid these type of flags in the implementation. Why
don't we give another implementation of `Rewriter<UpdateStatement>`? It can
extend `CopyOnWriteUpdateRewriter` if makes sense or vice versa. Or the common
parts could be extracted to an abstract base class or into a wrapped class
similar to `COWWithClauseBuilder` but with a different name like
`CopyOnWriteUpdaterBase`
##########
ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/CopyOnWriteMergeRewriter.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.rewrite;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.parse.ParseUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.rewrite.sql.COWWithClauseBuilder;
+import org.apache.hadoop.hive.ql.parse.rewrite.sql.MultiInsertSqlGenerator;
+import org.apache.hadoop.hive.ql.parse.rewrite.sql.SqlGeneratorFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.UnaryOperator;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.hadoop.hive.ql.parse.rewrite.sql.SqlGeneratorFactory.TARGET_PREFIX;
+
+public class CopyOnWriteMergeRewriter extends MergeRewriter {
+
+ public CopyOnWriteMergeRewriter(Hive db, HiveConf conf, SqlGeneratorFactory
sqlGeneratorFactory) {
+ super(db, conf, sqlGeneratorFactory);
+ }
+
+ @Override
+ public ParseUtils.ReparseResult rewrite(Context ctx, MergeStatement
mergeStatement) throws SemanticException {
+
+ setOperation(ctx);
+ MultiInsertSqlGenerator sqlGenerator =
sqlGeneratorFactory.createSqlGenerator();
+ handleSource(mergeStatement, sqlGenerator);
+
+ sqlGenerator.append('\n');
+ sqlGenerator.append("INSERT INTO ").appendTargetTableName();
+ sqlGenerator.append('\n');
+
+ List<MergeStatement.WhenClause> whenClauses =
Lists.newArrayList(mergeStatement.getWhenClauses());
+
+ Optional<String> extraPredicate = whenClauses.stream()
+ .filter(whenClause -> !(whenClause instanceof
MergeStatement.InsertClause))
+ .map(MergeStatement.WhenClause::getExtraPredicate)
+ .map(Strings::nullToEmpty)
+ .reduce((p1, p2) -> isNotBlank(p2) ? p1 + " OR " + p2 : p2);
+
+ whenClauses.removeIf(whenClause -> whenClause instanceof
MergeStatement.DeleteClause);
+ extraPredicate.ifPresent(p -> whenClauses.add(new
MergeStatement.DeleteClause(p, null)));
+
+ MergeStatement.MergeSqlGenerator mergeSqlGenerator =
createMergeSqlGenerator(mergeStatement, sqlGenerator);
+
+ for (MergeStatement.WhenClause whenClause : whenClauses) {
+ whenClause.toSql(mergeSqlGenerator);
+ }
+
+ // TODO: handleCardinalityViolation;
+
+ ParseUtils.ReparseResult rr = ParseUtils.parseRewrittenQuery(ctx,
sqlGenerator.toString());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ setOperation(rewrittenCtx);
+
+ //set dest name mapping on new context; 1st child is TOK_FROM
+ rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.MERGE);
+ return rr;
+ }
+
+ @Override
+ protected CopyOnWriteMergeWhenClauseSqlGenerator createMergeSqlGenerator(
+ MergeStatement mergeStatement, MultiInsertSqlGenerator sqlGenerator) {
+ return new CopyOnWriteMergeWhenClauseSqlGenerator(conf, sqlGenerator,
mergeStatement);
+ }
+
+ private void handleSource(MergeStatement mergeStatement,
MultiInsertSqlGenerator sqlGenerator) {
+ boolean hasWhenNotMatchedInsertClause =
mergeStatement.hasWhenNotMatchedInsertClause();
+
+ String sourceName = mergeStatement.getSourceName();
+ String sourceAlias = mergeStatement.getSourceAlias();
+
+ String targetAlias = mergeStatement.getTargetAlias();
+ String onClauseAsString = replaceColumnRefsWithTargetPrefix(targetAlias,
mergeStatement.getOnClauseAsText());
+
+ sqlGenerator.newCteExpr();
+
+ sqlGenerator.append(sourceName + " AS ( SELECT * FROM\n");
+ sqlGenerator.append("(SELECT ");
+ sqlGenerator.appendAcidSelectColumns(Context.Operation.MERGE);
+ sqlGenerator.appendAllColsOfTargetTable(TARGET_PREFIX);
+ sqlGenerator.append(" FROM ").appendTargetTableName().append(") ");
+ sqlGenerator.append(targetAlias);
+ sqlGenerator.append('\n');
+ sqlGenerator.indent().append(hasWhenNotMatchedInsertClause ? "FULL OUTER
JOIN" : "LEFT OUTER JOIN").append("\n");
+ sqlGenerator.indent().append(sourceAlias);
+ sqlGenerator.append('\n');
+ sqlGenerator.indent().append("ON ").append(onClauseAsString);
+ sqlGenerator.append('\n');
+ sqlGenerator.append(")");
+
+ sqlGenerator.addCteExpr();
+ }
+
+ private static String replaceColumnRefsWithTargetPrefix(String columnRef,
String strValue) {
+ return strValue.replaceAll(columnRef + "\\.(`?)", "$1" + TARGET_PREFIX);
+ }
Review Comment:
What does this method do? Could you please share an example? Does it handle
quoted identifiers too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]