amogh-jahagirdar commented on code in PR #11982:
URL: https://github.com/apache/iceberg/pull/11982#discussion_r1927584179
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -728,4 +710,13 @@ private String getMetadataLocation(Table tbl) {
!metadataDir.isEmpty(), "Failed to get the metadata file root
directory");
return metadataDir;
}
+
+ @VisibleForTesting
+ Broadcast<Table> tableBroadcast() {
+ if (tblBroadcast == null) {
Review Comment:
I checked out the code locally, see my comment above. I think we can just
remove the `tableBroadcast` argument for these helper functions, it's all just
local state anyways so I think we can just call `tableBroadcast()` in the
places that need it rather than passing it through as arguments. That'll solve
the naming issue.
On the whole testing aspect my take is that we really just need separate
tests for SerializableTable in Spark (To make sure it works with things like
Kryo). Once that component has confidence that fields like partition specs can
be serialized, actions that depend on it don't really need to explicitly test
broadcast/serialization cases and it can be safely assumed. Although I
recognize that the test in this case was necessary to demonstrate the issue
since we had a separate broadcast for the specs.
Again though, I'm good to just keep the test as is for now.
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -572,21 +564,12 @@ private void rewritePositionDeletes(TableMetadata
metadata, Set<DeleteFile> toRe
Dataset<DeleteFile> deleteFileDs =
spark().createDataset(Lists.newArrayList(toRewrite),
deleteFileEncoder);
- Broadcast<Table> serializableTable =
sparkContext().broadcast(SerializableTable.copyOf(table));
- Broadcast<Map<Integer, PartitionSpec>> specsById =
- sparkContext().broadcast(metadata.specsById());
-
PositionDeleteReaderWriter posDeleteReaderWriter = new
SparkPositionDeleteReaderWriter();
deleteFileDs
.repartition(toRewrite.size())
.foreach(
rewritePositionDelete(
- serializableTable,
- specsById,
- sourcePrefix,
- targetPrefix,
- stagingDir,
- posDeleteReaderWriter));
+ tableBroadcast(), sourcePrefix, targetPrefix, stagingDir,
posDeleteReaderWriter));
Review Comment:
rewritePositionDelete is a private helper function, can we just call
tableBroadcast() in that function rather than have it be an argument. This will
also help address the whole private field masking issue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]