amogh-jahagirdar commented on code in PR #13310:
URL: https://github.com/apache/iceberg/pull/13310#discussion_r2151310129
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -699,29 +706,83 @@ public DataWriter<InternalRow> createWriter(int
partitionId, long taskId, long e
writeSchema,
dsSchema,
targetFileSize,
- useFanoutWriter);
+ useFanoutWriter,
+ persistLineageFields);
}
}
}
- private static class UnpartitionedDataWriter implements
DataWriter<InternalRow> {
+ private abstract static class DataWriterWithLineage implements
DataWriter<InternalRow> {
+ private Integer rowIdOrdinal = null;
+ private Integer lastUpdatedOrdinal = null;
+
+ protected InternalRow lineageRow(InternalRow meta) {
+ GenericInternalRow row = new GenericInternalRow(2);
+ row.setNullAt(0);
+ row.setNullAt(1);
+
+ if (meta == null) {
+ return row;
+ }
+
+ // Ordinals are cached
+ if (rowIdOrdinal != null && lastUpdatedOrdinal != null) {
+ setIfNotNull(row, 0, meta, rowIdOrdinal);
+ setIfNotNull(row, 1, meta, lastUpdatedOrdinal);
Review Comment:
Ok I think the right thing to do is pass through a `Function<InternalRow,
InternalRow> rowLineageExtractor` to the writers and then we can have a common
implementation of that function across CoW/MoR writers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]