SourabhBadhya commented on code in PR #4672:
URL: https://github.com/apache/hive/pull/4672#discussion_r1335579622
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -1644,4 +1647,81 @@ public void
validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
}
}
}
+
+ /**
+ * A function to decide whether a given truncate query can perform a
metadata delete or not.
+ * If its not possible to perform metadata delete then try to perform a
positional delete.
+ * The steps to decide whether truncate is possible is as follows - <br>
+ * a. Create an expression based on the partition spec columns and partition
spec values. <br>
+ * b. Find files which match the expression using Apache Iceberg's FindFiles
API. <br>
+ * c. Do strict evaluation on whether the expression can clearly match all
rows in the file. <br>
+ * If for all files, the strict evaluation returns true, it means that we
safely delete all files
+ * by performing a metadata delete operation. If not, we must convert the
truncate to delete query
+ * which eventually performs a positional delete.
+ * @param hmsTable A Hive table instance.
+ * @param partitionSpec Map containing partition specification given by user.
+ * @return true if we can perform metadata delete, otherwise false.
+ * @throws SemanticException Exception raised when a partition transform is
being used
+ * or when partition column is not present in the table.
+ */
+ @Override
+ public boolean shouldTruncate(org.apache.hadoop.hive.ql.metadata.Table
hmsTable, Map<String, String> partitionSpec)
+ throws SemanticException {
+ Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
+ if (MapUtils.isEmpty(partitionSpec) || !isPartitionEvolution(table)) {
+ return true;
+ }
+
+ Map<String, PartitionField> partitionFieldMap =
Maps.newHashMapWithExpectedSize(table.spec().fields().size());
+ table.spec().fields().forEach(partField ->
partitionFieldMap.put(partField.name(), partField));
+ Expression finalExp = Expressions.alwaysTrue();
+ for (Map.Entry<String, String> entry : partitionSpec.entrySet()) {
+ String partColName = entry.getKey();
+ if (partitionFieldMap.containsKey(partColName)) {
+ PartitionField partitionField = partitionFieldMap.get(partColName);
+ Type resultType =
partitionField.transform().getResultType(table.schema()
+ .findField(partitionField.sourceId()).type());
+ TransformSpec.TransformType transformType =
IcebergTableUtil.getTransformType(partitionField.transform());
+ Object value = Conversions.fromPartitionString(resultType,
entry.getValue());
+ Iterable iterable = () -> Collections.singletonList(value).iterator();
+ if (transformType.equals(TransformSpec.TransformType.IDENTITY)) {
+ Expression boundPredicate = Expressions.in(partitionField.name(),
iterable);
+ finalExp = Expressions.and(finalExp, boundPredicate);
+ } else {
+ throw new SemanticException(
+ String.format("Partition transforms are not supported via
truncate operation: %s", partColName));
+ }
+ } else {
+ throw new SemanticException(String.format("No partition
column/transform by the name: %s", partColName));
+ }
+ }
+ FindFiles.Builder builder = new
FindFiles.Builder(table).withRecordsMatching(finalExp).includeColumnStats();
+ Set<DataFile> dataFiles =
Sets.newHashSet(Iterables.transform(builder.collect(), file -> file));
+ boolean result = true;
+ for (DataFile dataFile : dataFiles) {
+ PartitionData partitionData = (PartitionData) dataFile.partition();
+ Expression residual = ResidualEvaluator.of(table.spec(), finalExp, false)
+ .residualFor(partitionData);
+ StrictMetricsEvaluator strictMetricsEvaluator = new
StrictMetricsEvaluator(table.schema(), residual);
Review Comment:
Not required, hence removed.
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -1644,4 +1647,81 @@ public void
validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
}
}
}
+
+ /**
+ * A function to decide whether a given truncate query can perform a
metadata delete or not.
+ * If its not possible to perform metadata delete then try to perform a
positional delete.
+ * The steps to decide whether truncate is possible is as follows - <br>
+ * a. Create an expression based on the partition spec columns and partition
spec values. <br>
+ * b. Find files which match the expression using Apache Iceberg's FindFiles
API. <br>
+ * c. Do strict evaluation on whether the expression can clearly match all
rows in the file. <br>
+ * If for all files, the strict evaluation returns true, it means that we
safely delete all files
+ * by performing a metadata delete operation. If not, we must convert the
truncate to delete query
+ * which eventually performs a positional delete.
+ * @param hmsTable A Hive table instance.
+ * @param partitionSpec Map containing partition specification given by user.
+ * @return true if we can perform metadata delete, otherwise false.
+ * @throws SemanticException Exception raised when a partition transform is
being used
+ * or when partition column is not present in the table.
+ */
+ @Override
+ public boolean shouldTruncate(org.apache.hadoop.hive.ql.metadata.Table
hmsTable, Map<String, String> partitionSpec)
+ throws SemanticException {
+ Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
+ if (MapUtils.isEmpty(partitionSpec) || !isPartitionEvolution(table)) {
+ return true;
+ }
+
+ Map<String, PartitionField> partitionFieldMap =
Maps.newHashMapWithExpectedSize(table.spec().fields().size());
+ table.spec().fields().forEach(partField ->
partitionFieldMap.put(partField.name(), partField));
+ Expression finalExp = Expressions.alwaysTrue();
+ for (Map.Entry<String, String> entry : partitionSpec.entrySet()) {
+ String partColName = entry.getKey();
+ if (partitionFieldMap.containsKey(partColName)) {
+ PartitionField partitionField = partitionFieldMap.get(partColName);
+ Type resultType =
partitionField.transform().getResultType(table.schema()
+ .findField(partitionField.sourceId()).type());
+ TransformSpec.TransformType transformType =
IcebergTableUtil.getTransformType(partitionField.transform());
+ Object value = Conversions.fromPartitionString(resultType,
entry.getValue());
+ Iterable iterable = () -> Collections.singletonList(value).iterator();
+ if (transformType.equals(TransformSpec.TransformType.IDENTITY)) {
+ Expression boundPredicate = Expressions.in(partitionField.name(),
iterable);
+ finalExp = Expressions.and(finalExp, boundPredicate);
+ } else {
+ throw new SemanticException(
+ String.format("Partition transforms are not supported via
truncate operation: %s", partColName));
+ }
+ } else {
+ throw new SemanticException(String.format("No partition
column/transform by the name: %s", partColName));
+ }
+ }
+ FindFiles.Builder builder = new
FindFiles.Builder(table).withRecordsMatching(finalExp).includeColumnStats();
+ Set<DataFile> dataFiles =
Sets.newHashSet(Iterables.transform(builder.collect(), file -> file));
+ boolean result = true;
+ for (DataFile dataFile : dataFiles) {
+ PartitionData partitionData = (PartitionData) dataFile.partition();
+ Expression residual = ResidualEvaluator.of(table.spec(), finalExp, false)
+ .residualFor(partitionData);
+ StrictMetricsEvaluator strictMetricsEvaluator = new
StrictMetricsEvaluator(table.schema(), residual);
+ if (!strictMetricsEvaluator.eval(dataFile)) {
+ result = false;
+ }
+ }
+
+ boolean isV2Table = hmsTable.getParameters() != null &&
Review Comment:
Added a function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]