SourabhBadhya commented on code in PR #5961: URL: https://github.com/apache/hive/pull/5961#discussion_r2217639549
########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -1641,6 +1645,12 @@ static void overlayTableProperties(Configuration configuration, TableDesc tableD // serialize table object into config Table serializableTable = SerializableTable.copyOf(table); + // set table format-version and write-mode information from tableDesc + List<String> writeConfigList = ImmutableList.of( + FORMAT_VERSION, DELETE_MODE, UPDATE_MODE, MERGE_MODE); + if (IcebergTableUtil.isV2Table(props::getProperty)) { Review Comment: Is it necessary to create this function - Cant we simply pass as map which is already a defined util function instead of passing a binary operator. Even if we want to use a BinaryOperator, can we remove the isV2Table() which takes in a map? Or is this publicly defined function which can't be removed? ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java: ########## @@ -392,27 +396,47 @@ public static void cherryPick(Table table, long snapshotId) { } public static boolean isV2Table(Map<String, String> props) { Review Comment: Maybe you can rename the function as `isAtleastV2Table()` or something on these lines. ########## iceberg/iceberg-handler/src/test/queries/positive/iceberg_v3_deletion_vectors.q: ########## @@ -0,0 +1,75 @@ +-- Mask random uuid +--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/ +-- Mask a random snapshot id +--! qt:replace:/(\s\'current-snapshot-id\'=\')(\d+)(\')/$1#Masked#$3/ +-- Mask added file size +--! qt:replace:/(\S+\"added-files-size\":\")(\d+)(\")/$1#Masked#$3/ +-- Mask total file size +--! qt:replace:/(\S+\"total-files-size\":\")(\d+)(\")/$1#Masked#$3/ +-- Mask current-snapshot-timestamp-ms +--! qt:replace:/(\s\'current-snapshot-timestamp-ms\'=\')(\d+)(\')/$1#Masked#$3/ +-- Mask iceberg version +--! qt:replace:/("iceberg-version":")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))/$1#Masked#/ + +-- create an unpartitioned table + create table ice01 (id int) Stored by Iceberg stored as ORC + TBLPROPERTIES('format-version'='3'); + +-- check the property value +show create table ice01; + +-- insert some values +insert into ice01 values (1),(2),(3),(4); + +-- check the inserted values +select * from ice01; + +-- delete some values +delete from ice01 where id>2; + +-- check the values, the delete value should be there +select * from ice01 order by id; + +-- insert some more data + insert into ice01 values (5),(6),(7),(8); + +-- check the values, only the delete value shouldn't be there +select * from ice01 order by id; + +-- delete one value +delete from ice01 where id=7; + +-- check the entries, the deleted entries shouldn't be there. +select * from ice01 order by id; Review Comment: Can we also display content offset for each of this delete tests using Position delete metadata tables, so that we are sure that deletion vectors are written in action. Reference on deletion vectors metadata columns: https://github.com/apache/iceberg/commit/026a9b00459ae458eb4c6c620c78742dc43a00e8 ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java: ########## @@ -120,27 +132,69 @@ public HiveIcebergWriter build() { HiveIcebergWriter writer; boolean isCOW = IcebergTableUtil.isCopyOnWriteMode(operation, table.properties()::getOrDefault); + if (isCOW) { writer = new HiveIcebergCopyOnWriteRecordWriter(table, writerFactory, dataFileFactory, context); } else { - switch (operation) { - case DELETE: - writer = new HiveIcebergDeleteWriter(table, writerFactory, deleteFileFactory, context); - break; - case OTHER: - writer = new HiveIcebergRecordWriter(table, writerFactory, dataFileFactory, context); - break; - default: - // Update and Merge should be splitted to inserts and deletes - throw new IllegalArgumentException("Unsupported operation when creating IcebergRecordWriter: " + - operation.name()); - } + writer = switch (operation) { + case DELETE -> + new HiveIcebergDeleteWriter(table, rewritableDeletes, writerFactory, deleteFileFactory, context); + case OTHER -> + new HiveIcebergRecordWriter(table, writerFactory, dataFileFactory, context); + default -> + // Update and Merge should be splitted to inserts and deletes + throw new IllegalArgumentException("Unsupported operation when creating IcebergRecordWriter: " + + operation.name()); + }; } WriterRegistry.registerWriter(attemptID, tableName, writer); return writer; } + private Map<String, DeleteFileSet> rewritableDeletes() { + TableScan scan = table.newScan().caseSensitive(false).ignoreResiduals(); Review Comment: Is it right that we are ignoring residual filters? Is the idea here to just use all the rows applicable for deletion? ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java: ########## @@ -392,27 +396,47 @@ public static void cherryPick(Table table, long snapshotId) { } public static boolean isV2Table(Map<String, String> props) { - return props == null || - "2".equals(props.get(TableProperties.FORMAT_VERSION)) || props.get(TableProperties.FORMAT_VERSION) == null; + return IcebergTableUtil.formatVersion(props) >= 2; } - public static boolean isCopyOnWriteMode(Context.Operation operation, BinaryOperator<String> props) { - String mode = null; - switch (operation) { - case DELETE: - mode = props.apply(TableProperties.DELETE_MODE, - TableProperties.DELETE_MODE_DEFAULT); - break; - case UPDATE: - mode = props.apply(TableProperties.UPDATE_MODE, - TableProperties.UPDATE_MODE_DEFAULT); - break; - case MERGE: - mode = props.apply(TableProperties.MERGE_MODE, - TableProperties.MERGE_MODE_DEFAULT); - break; + public static boolean isV2Table(BinaryOperator<String> props) { + return IcebergTableUtil.formatVersion(props) >= 2; + } + + public static Integer formatVersion(Map<String, String> props) { + if (props == null) { + return 2; // default to V2 Review Comment: nit: Maybe you can add a TODO stating that we have to move the default to 3 whenever applicable? ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java: ########## @@ -392,27 +396,47 @@ public static void cherryPick(Table table, long snapshotId) { } public static boolean isV2Table(Map<String, String> props) { - return props == null || - "2".equals(props.get(TableProperties.FORMAT_VERSION)) || props.get(TableProperties.FORMAT_VERSION) == null; + return IcebergTableUtil.formatVersion(props) >= 2; } - public static boolean isCopyOnWriteMode(Context.Operation operation, BinaryOperator<String> props) { - String mode = null; - switch (operation) { - case DELETE: - mode = props.apply(TableProperties.DELETE_MODE, - TableProperties.DELETE_MODE_DEFAULT); - break; - case UPDATE: - mode = props.apply(TableProperties.UPDATE_MODE, - TableProperties.UPDATE_MODE_DEFAULT); - break; - case MERGE: - mode = props.apply(TableProperties.MERGE_MODE, - TableProperties.MERGE_MODE_DEFAULT); - break; + public static boolean isV2Table(BinaryOperator<String> props) { + return IcebergTableUtil.formatVersion(props) >= 2; + } + + public static Integer formatVersion(Map<String, String> props) { + if (props == null) { + return 2; // default to V2 } - return RowLevelOperationMode.COPY_ON_WRITE.modeName().equalsIgnoreCase(mode); + return IcebergTableUtil.formatVersion(props::getOrDefault); + } + + private static Integer formatVersion(BinaryOperator<String> props) { Review Comment: Same argument here - should we really create another function which takes in a BinaryOperator? ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java: ########## @@ -108,19 +115,53 @@ static PartitioningWriter<Record, DataWriteResult> newDataWriter( // use a fanout writer if the input is unordered no matter whether fanout writers are enabled // clustered writers assume that the position deletes are already ordered by file and position static PartitioningWriter<PositionDelete<Record>, DeleteWriteResult> newDeleteWriter( - Table table, HiveFileWriterFactory writers, OutputFileFactory files, Context context) { + Table table, Map<String, DeleteFileSet> rewritableDeletes, HiveFileWriterFactory writers, + OutputFileFactory files, Context context) { + Function<CharSequence, PositionDeleteIndex> previousDeleteLoader = + PreviousDeleteLoader.create(table, rewritableDeletes); FileIO io = table.io(); boolean inputOrdered = context.inputOrdered(); long targetFileSize = context.targetDeleteFileSize(); DeleteGranularity deleteGranularity = context.deleteGranularity(); - if (inputOrdered) { + if (context.useDVs()) { + return new PartitioningDVWriter<>(files, previousDeleteLoader); + } else if (inputOrdered && rewritableDeletes == null) { Review Comment: Is it because ClusteredDeleteWriter doesn't support passing deletion vectors? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org