pvary commented on a change in pull request #2701:
URL: https://github.com/apache/hive/pull/2701#discussion_r724382379
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -120,16 +124,23 @@ public void commitTask(TaskAttemptContext
originalContext) throws IOException {
if (table != null) {
HiveIcebergRecordWriter writer = writers.get(output);
DataFile[] closedFiles;
+ List<DeleteFile> deleteFiles = Collections.emptyList();
if (writer != null) {
closedFiles = writer.dataFiles();
+ deleteFiles = writer.getDeleteFiles();
Review comment:
nit: writer.deleteFiles()
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -120,16 +124,23 @@ public void commitTask(TaskAttemptContext
originalContext) throws IOException {
if (table != null) {
HiveIcebergRecordWriter writer = writers.get(output);
DataFile[] closedFiles;
+ List<DeleteFile> deleteFiles = Collections.emptyList();
if (writer != null) {
closedFiles = writer.dataFiles();
+ deleteFiles = writer.getDeleteFiles();
} else {
LOG.info("CommitTask found no writer for specific table: {},
attemptID: {}", output, attemptID);
closedFiles = new DataFile[0];
}
String fileForCommitLocation =
generateFileForCommitLocation(table.location(), jobConf,
attemptID.getJobID(), attemptID.getTaskID().getId());
// Creating the file containing the data files generated by this
task for this table
- createFileForCommit(closedFiles, fileForCommitLocation,
table.io());
+ if (!deleteFiles.isEmpty()) {
Review comment:
Are these mutually exclusive?
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -491,11 +534,29 @@ private static void createFileForCommit(DataFile[]
closedFiles, String location,
LOG.debug("Iceberg committed file is created {}", fileForCommit);
}
+ private static void createDeleteFileForCommit(DeleteFile[] closedFiles,
String location, FileIO io)
+ throws IOException {
+
+ OutputFile fileForCommit = io.newOutputFile(location);
+ try (ObjectOutputStream oos = new
ObjectOutputStream(fileForCommit.createOrOverwrite())) {
+ oos.writeObject(closedFiles);
+ }
+ LOG.debug("Iceberg committed file is created {}", fileForCommit);
Review comment:
nit: maybe a different commit message
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -80,13 +80,13 @@ private static HiveIcebergRecordWriter writer(JobConf jc) {
FileIO io = table.io();
LocationProvider location = table.locationProvider();
EncryptionManager encryption = table.encryption();
+ boolean isDelete = jc.get(HiveIcebergStorageHandler.DELETE_KEY) != null;
Review comment:
nit: TODO message to remove this when we have ORC delete
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
##########
@@ -83,7 +102,29 @@ protected PartitionKey partition(Record row) {
@Override
public void write(Writable row) throws IOException {
- super.write(((Container<Record>) row).get());
+ if (!isDelete) {
+ super.write(((Container<Record>) row).get());
+ } else {
+ Record rec = ((Container<Record>) row).get();
+ Record actualRow = GenericRecord.create(schema);
Review comment:
This should be done outside of the loop
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
##########
@@ -83,7 +102,29 @@ protected PartitionKey partition(Record row) {
@Override
public void write(Writable row) throws IOException {
- super.write(((Container<Record>) row).get());
+ if (!isDelete) {
+ super.write(((Container<Record>) row).get());
+ } else {
+ Record rec = ((Container<Record>) row).get();
+ Record actualRow = GenericRecord.create(schema);
+ for (int i = 2; i < rec.size(); ++i) {
+ actualRow.set(i - 2, rec.get(i));
+ }
+ if (!spec.isUnpartitioned()) {
+ currentKey.partition(actualRow);
+ }
+ // for now, we always create parquet delete writer
+ PositionDeleteWriter<Record> deleteWriter =
Review comment:
Are the `BaseFileWriterFactory` not available for us yet?
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
##########
@@ -83,7 +102,29 @@ protected PartitionKey partition(Record row) {
@Override
public void write(Writable row) throws IOException {
- super.write(((Container<Record>) row).get());
+ if (!isDelete) {
+ super.write(((Container<Record>) row).get());
+ } else {
+ Record rec = ((Container<Record>) row).get();
+ Record actualRow = GenericRecord.create(schema);
+ for (int i = 2; i < rec.size(); ++i) {
+ actualRow.set(i - 2, rec.get(i));
+ }
+ if (!spec.isUnpartitioned()) {
+ currentKey.partition(actualRow);
+ }
+ // for now, we always create parquet delete writer
+ PositionDeleteWriter<Record> deleteWriter =
Review comment:
Is the `BaseFileWriterFactory` not available for us yet?
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
##########
@@ -83,7 +102,29 @@ protected PartitionKey partition(Record row) {
@Override
public void write(Writable row) throws IOException {
- super.write(((Container<Record>) row).get());
+ if (!isDelete) {
+ super.write(((Container<Record>) row).get());
+ } else {
+ Record rec = ((Container<Record>) row).get();
+ Record actualRow = GenericRecord.create(schema);
+ for (int i = 2; i < rec.size(); ++i) {
+ actualRow.set(i - 2, rec.get(i));
+ }
+ if (!spec.isUnpartitioned()) {
+ currentKey.partition(actualRow);
+ }
+ // for now, we always create parquet delete writer
+ PositionDeleteWriter<Record> deleteWriter =
Review comment:
Is the `BaseFileWriterFactory` not available for us yet?
We might want to create a `GenericFileWriterFactory`, and might want to use
`BaseFileWriterFactory.newPositionDeleteWriter` to get the writer
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
##########
@@ -83,7 +102,29 @@ protected PartitionKey partition(Record row) {
@Override
public void write(Writable row) throws IOException {
- super.write(((Container<Record>) row).get());
+ if (!isDelete) {
+ super.write(((Container<Record>) row).get());
+ } else {
+ Record rec = ((Container<Record>) row).get();
+ Record actualRow = GenericRecord.create(schema);
+ for (int i = 2; i < rec.size(); ++i) {
+ actualRow.set(i - 2, rec.get(i));
+ }
+ if (!spec.isUnpartitioned()) {
+ currentKey.partition(actualRow);
+ }
+ // for now, we always create parquet delete writer
+ PositionDeleteWriter<Record> deleteWriter =
+ appender.newPosDeleteWriter(fileFactory.newOutputFile(currentKey),
FileFormat.PARQUET, currentKey);
+ // TODO: refactor not to write 1 delete file per row (use some rolling
positional delete writer)
Review comment:
We should open the writer in the constructor
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -129,7 +132,14 @@ public void initialize(@Nullable Configuration
configuration, Properties serDePr
}
Schema projectedSchema;
- if (serDeProperties.get(HiveIcebergStorageHandler.WRITE_KEY) != null) {
+ if (serDeProperties.get(HiveIcebergStorageHandler.DELETE_KEY) != null) {
+ // when writing out data, we should not do projection pushdown
+ List<Types.NestedField> cols = new ArrayList<>(tableSchema.columns());
Review comment:
We should use `DeleteSchemaUtil.pathPosSchema()` for the schema
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
##########
@@ -132,11 +133,13 @@ public void configureOutputJobProperties(TableDesc
tableDesc, Map<String, String
// For Tez, setting the committer here is enough to make sure it'll be
part of the jobConf
map.put("mapred.output.committer.class",
HiveIcebergNoJobCommitter.class.getName());
// For MR, the jobConf is set only in configureJobConf, so we're setting
the write key here to detect it over there
- map.put(WRITE_KEY, "true");
+ boolean isDelete = Optional.ofNullable(conf).map(c ->
c.get("hive.query.string"))
Review comment:
this is hacky 😺
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
##########
@@ -214,18 +218,27 @@ public void initialize(InputSplit split,
TaskAttemptContext newContext) {
this.reuseContainers =
conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
this.inMemoryDataModel =
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
- this.currentIterator = open(tasks.next(), expectedSchema).iterator();
+ // save the scanTask to retrieve the file path
+ this.currentScanTask = tasks.next();
+ this.currentIterator = open(currentScanTask, expectedSchema).iterator();
}
@Override
public boolean nextKeyValue() throws IOException {
while (true) {
if (currentIterator.hasNext()) {
current = currentIterator.next();
+ Object position = ((GenericRecord)
current).getField(MetadataColumns.ROW_POSITION.name());
+ if (position != null) {
+ // hacky way to propagate the data to
MapOperator#populateVirtualColumnValues
+ System.setProperty("delete_file_path",
currentScanTask.file().path().toString());
Review comment:
This is terrible 🗡️
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]