kasakrisz commented on code in PR #5251:
URL: https://github.com/apache/hive/pull/5251#discussion_r1609663752
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java:
##########
@@ -75,8 +76,13 @@ public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
@Override
public RecordReader<Void, Container<T>> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter)
throws IOException {
- IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
- return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit,
job, reporter);
+ if (split instanceof IcebergMergeSplit) {
+ IcebergMergeSplit mergeSplit = (IcebergMergeSplit) split;
+ return new MapredIcebergRecordReader<>(innerInputFormat, mergeSplit,
job, reporter);
Review Comment:
I haven't found any difference between creating a
`MapredIcebergRecordReader` in case of `IcebergMergeSplit` and `IcebergSplit`.
How about
```
return new MapredIcebergRecordReader<>((InputSplit)split, mergeSplit,
job, reporter);
```
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -312,33 +314,43 @@ private static final class IcebergRecordReader<T> extends
RecordReader<Void, T>
private CloseableIterator<T> currentIterator;
private Table table;
private boolean fetchVirtualColumns;
+ private boolean isMerge = false;
+ private IcebergMergeSplit mergeSplit;
@Override
public void initialize(InputSplit split, TaskAttemptContext newContext) {
// For now IcebergInputFormat does its own split planning and does not
accept FileSplit instances
- CombinedScanTask task = ((IcebergSplit) split).task();
this.context = newContext;
this.conf = newContext.getConfiguration();
- this.table = SerializationUtil.deserializeFromBase64(
- conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX +
conf.get(InputFormatConfig.TABLE_IDENTIFIER)));
+ this.table = HiveIcebergStorageHandler.table(conf,
conf.get(InputFormatConfig.TABLE_IDENTIFIER));
HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table);
- this.tasks = task.files().iterator();
this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE,
InputFormatConfig.CASE_SENSITIVE_DEFAULT);
this.expectedSchema = readSchema(conf, table, caseSensitive);
this.reuseContainers =
conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
this.inMemoryDataModel =
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf);
+ if (split instanceof IcebergMergeSplit) {
+ this.isMerge = true;
+ this.mergeSplit = (IcebergMergeSplit) split;
+ } else {
+ CombinedScanTask task = ((IcebergSplit) split).task();
+ this.tasks = task.files().iterator();
+ }
this.currentIterator = nextTask();
}
private CloseableIterator<T> nextTask() {
- CloseableIterator<T> closeableIterator = open(tasks.next(),
expectedSchema).iterator();
- if (!fetchVirtualColumns || Utilities.getIsVectorized(conf)) {
- return closeableIterator;
+ if (isMerge) {
+ return open(mergeSplit.getContentFile(), table.schema()).iterator();
Review Comment:
Please move this to new class `MergeIcebergRecordReader`
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -312,33 +314,43 @@ private static final class IcebergRecordReader<T> extends
RecordReader<Void, T>
private CloseableIterator<T> currentIterator;
private Table table;
private boolean fetchVirtualColumns;
+ private boolean isMerge = false;
+ private IcebergMergeSplit mergeSplit;
@Override
public void initialize(InputSplit split, TaskAttemptContext newContext) {
// For now IcebergInputFormat does its own split planning and does not
accept FileSplit instances
- CombinedScanTask task = ((IcebergSplit) split).task();
this.context = newContext;
this.conf = newContext.getConfiguration();
- this.table = SerializationUtil.deserializeFromBase64(
- conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX +
conf.get(InputFormatConfig.TABLE_IDENTIFIER)));
+ this.table = HiveIcebergStorageHandler.table(conf,
conf.get(InputFormatConfig.TABLE_IDENTIFIER));
HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table);
- this.tasks = task.files().iterator();
this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE,
InputFormatConfig.CASE_SENSITIVE_DEFAULT);
this.expectedSchema = readSchema(conf, table, caseSensitive);
this.reuseContainers =
conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
this.inMemoryDataModel =
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf);
+ if (split instanceof IcebergMergeSplit) {
+ this.isMerge = true;
+ this.mergeSplit = (IcebergMergeSplit) split;
+ } else {
+ CombinedScanTask task = ((IcebergSplit) split).task();
+ this.tasks = task.files().iterator();
+ }
Review Comment:
Please move this check into
https://github.com/apache/hive/blob/18c434f346dc590201afa4159aeec62b7dd5e2cf/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L277-L279
and create the proper RecordReader instance based on the descision:
* IcebergMergeRecordReader
* or the original IcebergRecordReader
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -312,33 +314,43 @@ private static final class IcebergRecordReader<T> extends
RecordReader<Void, T>
private CloseableIterator<T> currentIterator;
private Table table;
private boolean fetchVirtualColumns;
+ private boolean isMerge = false;
+ private IcebergMergeSplit mergeSplit;
Review Comment:
Please create a new class like `IcebergMergeRecordReader` and move
everything related to merge into that class.
You can extend the existing `IcebergRecordReader` if you can reuse the code
and override methods if necessary
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java:
##########
@@ -90,6 +96,12 @@ private static final class MapredIcebergRecordReader<T>
extends AbstractMapredIc
splitLength = split.getLength();
}
+
MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T>
mapreduceInputFormat,
+ IcebergMergeSplit split, JobConf job, Reporter
reporter) throws IOException {
+ super(mapreduceInputFormat, split, job, reporter);
+ splitLength = split.getLength();
+ }
+
Review Comment:
Why does this constructor necessary? It does exactly the same as the
existing one
https://github.com/apache/hive/blob/18c434f346dc590201afa4159aeec62b7dd5e2cf/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java#L87-L91
`IcebergMergeSplit` and `IcebergSplit` has the same ancestor (`InputSplit`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]