aokolnychyi commented on code in PR #8803:
URL: https://github.com/apache/iceberg/pull/8803#discussion_r1391932246
##########
core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java:
##########
@@ -368,7 +369,9 @@ private CloseableIterable<ScanTask> toFileTasks(
ScanMetricsUtil.fileTask(scanMetrics(), dataFile, deleteFiles);
return new BaseFileScanTask(
- copyDataFiles ? dataFile.copy(shouldReturnColumnStats()) :
dataFile,
+ copyDataFiles
Review Comment:
Optional: I am not a big fan of using ternary operators when they are split
across multiple lines. To me, this makes the logic harder to follow. I'd
consider adding a method like below to `BaseScan`.
```
protected <F extends ContentFile<F>> F copy(F file) {
return ContentFileUtil.copy(file, shouldReturnColumnStats(),
columnsToKeepStats());
}
```
This will make the invocation shorter and we may reuse this method in other
scans in the future.
##########
core/src/main/java/org/apache/iceberg/DataScan.java:
##########
@@ -55,7 +55,8 @@ protected ManifestGroup newManifestGroup(
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
- .ignoreDeleted();
+ .ignoreDeleted()
+ .columnsToKeepStats(context().columnsToKeepStats());
Review Comment:
Here as well.
##########
.palantir/revapi.yml:
##########
@@ -866,6 +866,11 @@ acceptedBreaks:
old: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
new: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
justification: "Static utility class - should not have public
constructor"
+ "1.4.0":
+ org.apache.iceberg:iceberg-core:
+ - code: "java.field.serialVersionUIDChanged"
Review Comment:
While I think it should be fine, here is an idea. Java comes with
`serialver` utility that allows us to generate the version UID prior to the
change in this PR. We can use that value instead of `1L` to be fully
compatible. We don't modify the serialization of this class, we just missed to
assign `serialVersionUID`. If we can recover the default value, we shouldn't
worry about compatibility.
Here is the value I I got locally:
```
cd core/build/classes/java/main
serialver org.apache.iceberg.util.SerializableMap
org.apache.iceberg.util.SerializableMap: private static final long
serialVersionUID = -3377238354349859240L;
```
Could you double check, @pvary? If not, we can keep it as is.
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java:
##########
@@ -83,7 +83,8 @@ private CloseableIterable<FileScanTask>
appendFilesFromSnapshots(List<Snapshot>
snapshotIds.contains(manifestEntry.snapshotId())
&& manifestEntry.status() ==
ManifestEntry.Status.ADDED)
.specsById(table().specs())
- .ignoreDeleted();
+ .ignoreDeleted()
+ .columnsToKeepStats(context().columnsToKeepStats());
Review Comment:
I think you have defined `columnsToKeepStats()` method, which you can call
directly now.
See `BaseScan`.
##########
core/src/main/java/org/apache/iceberg/GenericDataFile.java:
##########
@@ -66,23 +67,31 @@ class GenericDataFile extends BaseFile<DataFile> implements
DataFile {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
- * @param fullCopy whether to copy all fields or to drop column-level stats
+ * @param copyStats whether to copy all fields or to drop column-level stats.
+ * @param requestedColumnIds column ids for which to keep stats. If
<code>null</code> then every
+ * column stat is kept.
*/
- private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
- super(toCopy, fullCopy);
+ private GenericDataFile(
+ GenericDataFile toCopy, boolean copyStats, Set<Integer>
requestedColumnIds) {
+ super(toCopy, copyStats, requestedColumnIds);
}
/** Constructor for Java serialization. */
GenericDataFile() {}
@Override
public DataFile copyWithoutStats() {
- return new GenericDataFile(this, false /* drop stats */);
+ return new GenericDataFile(this, false /* drop stats */, null);
+ }
+
+ @Override
+ public DataFile copyWithStats(Set<Integer> requestedColumnIds) {
+ return new GenericDataFile(this, true, requestedColumnIds);
}
@Override
public DataFile copy() {
- return new GenericDataFile(this, true /* full copy */);
+ return new GenericDataFile(this, true /* full copy */, null);
Review Comment:
You may consider overloading the constructor so that you don't have to pass
an extra null here or adding the comment for the second argument (we have a
comment for `true` but not `null`).
##########
core/src/main/java/org/apache/iceberg/GenericDeleteFile.java:
##########
@@ -67,23 +68,31 @@ class GenericDeleteFile extends BaseFile<DeleteFile>
implements DeleteFile {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
- * @param fullCopy whether to copy all fields or to drop column-level stats
+ * @param copyStats whether to copy all fields or to drop column-level stats.
+ * @param requestedColumnIds column ids for which to keep stats. If
<code>null</code> then every
+ * column stat is kept.
*/
- private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy) {
- super(toCopy, fullCopy);
+ private GenericDeleteFile(
+ GenericDeleteFile toCopy, boolean copyStats, Set<Integer>
requestedColumnIds) {
+ super(toCopy, copyStats, requestedColumnIds);
}
/** Constructor for Java serialization. */
GenericDeleteFile() {}
@Override
public DeleteFile copyWithoutStats() {
- return new GenericDeleteFile(this, false /* drop stats */);
+ return new GenericDeleteFile(this, false /* drop stats */, null);
+ }
+
+ @Override
+ public DeleteFile copyWithStats(Set<Integer> requestedColumnIds) {
+ return new GenericDeleteFile(this, true, requestedColumnIds);
}
@Override
public DeleteFile copy() {
- return new GenericDeleteFile(this, true /* full copy */);
+ return new GenericDeleteFile(this, true /* full copy */, null);
Review Comment:
Same as in `GenericDataFile`.
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -78,7 +78,8 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
.select(scanColumns())
.filterData(filter())
.filterManifestEntries(entry ->
changelogSnapshotIds.contains(entry.snapshotId()))
- .ignoreExisting();
+ .ignoreExisting()
+ .columnsToKeepStats(context().columnsToKeepStats());
Review Comment:
Same here.
##########
core/src/main/java/org/apache/iceberg/BaseFile.java:
##########
@@ -185,13 +188,30 @@ public PartitionData copy() {
this.partitionType = toCopy.partitionType;
this.recordCount = toCopy.recordCount;
this.fileSizeInBytes = toCopy.fileSizeInBytes;
- if (fullCopy) {
- this.columnSizes = SerializableMap.copyOf(toCopy.columnSizes);
- this.valueCounts = SerializableMap.copyOf(toCopy.valueCounts);
- this.nullValueCounts = SerializableMap.copyOf(toCopy.nullValueCounts);
- this.nanValueCounts = SerializableMap.copyOf(toCopy.nanValueCounts);
- this.lowerBounds =
SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.lowerBounds));
- this.upperBounds =
SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.upperBounds));
+ if (copyStats) {
+ if (requestedColumnIds == null) {
Review Comment:
Optional: I'd consider adding a few helper methods to simplify this block.
We will have 3 quite big branches now. I understand we will do more null checks
but I doubt it will have any noticeable performance impact.
```
private static <K, V> Map<K, V> copyMap(Map<K, V> map, Set<K> keys) {
return keys == null ? SerializableMap.copyOf(map) :
SerializableMap.filteredCopyOf(map, keys);
}
private static Map<Integer, ByteBuffer> copyByteBufferMap(
Map<Integer, ByteBuffer> map, Set<Integer> keys) {
return SerializableByteBufferMap.wrap(copyMap(map, keys));
}
...
if (copyStats) {
this.columnSizes = copyMap(toCopy.columnSizes, requestedColumnIds);
this.valueCounts = copyMap(toCopy.valueCounts, requestedColumnIds);
this.nullValueCounts = copyMap(toCopy.nullValueCounts, requestedColumnIds);
this.nanValueCounts = copyMap(toCopy.nanValueCounts, requestedColumnIds);
this.lowerBounds = copyByteBufferMap(toCopy.lowerBounds,
requestedColumnIds);
this.upperBounds = copyByteBufferMap(toCopy.upperBounds,
requestedColumnIds);
}
```
Up to you here, @pvary. Keep it as is if you want.
##########
core/src/main/java/org/apache/iceberg/DataTableScan.java:
##########
@@ -76,7 +76,8 @@ public CloseableIterable<FileScanTask> doPlanFiles() {
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
- .ignoreDeleted();
+ .ignoreDeleted()
+ .columnsToKeepStats(context().columnsToKeepStats());
Review Comment:
Here too.
##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -154,6 +156,12 @@ ManifestGroup caseSensitive(boolean newCaseSensitive) {
return this;
}
+ ManifestGroup columnsToKeepStats(Set<Integer> newColumnsToKeepStats) {
+ this.columnsToKeepStats =
+ newColumnsToKeepStats == null ? null :
Sets.newHashSet(newColumnsToKeepStats);
Review Comment:
This copy seems redundant but up to you.
##########
core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java:
##########
@@ -102,7 +102,8 @@ public CloseableIterable<FileScanTask> planFiles() {
snapshotIds.contains(manifestEntry.snapshotId())
&& manifestEntry.status() ==
ManifestEntry.Status.ADDED)
.specsById(table().specs())
- .ignoreDeleted();
+ .ignoreDeleted()
+ .columnsToKeepStats(context().columnsToKeepStats());
Review Comment:
`context().columnsToKeepStats()` -> `columnsToKeepStats()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]