amogh-jahagirdar commented on code in PR #13310:
URL: https://github.com/apache/iceberg/pull/13310#discussion_r2193334597
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -699,29 +708,41 @@ public DataWriter<InternalRow> createWriter(int
partitionId, long taskId, long e
writeSchema,
dsSchema,
targetFileSize,
- useFanoutWriter);
+ useFanoutWriter,
+ extractRowLineage);
}
}
}
private static class UnpartitionedDataWriter implements
DataWriter<InternalRow> {
private final FileWriter<InternalRow, DataWriteResult> delegate;
private final FileIO io;
+ private final Function<InternalRow, InternalRow>
extractRowLineageFromMetadata;
private UnpartitionedDataWriter(
SparkFileWriterFactory writerFactory,
OutputFileFactory fileFactory,
FileIO io,
PartitionSpec spec,
- long targetFileSize) {
+ long targetFileSize,
+ Function<InternalRow, InternalRow> extractRowLineageFromMetadata) {
this.delegate =
new RollingDataWriter<>(writerFactory, fileFactory, io,
targetFileSize, spec, null);
this.io = io;
+ this.extractRowLineageFromMetadata = extractRowLineageFromMetadata;
}
@Override
public void write(InternalRow record) throws IOException {
- delegate.write(record);
+ write(null, record);
+ }
+
+ @Override
+ public void write(InternalRow meta, InternalRow record) throws IOException
{
+ InternalRow rowLineage = extractRowLineageFromMetadata.apply(meta);
+ InternalRow recordWithLineage =
+ rowLineage == null ? record : new JoinedRow(record, rowLineage);
Review Comment:
So yes, JoinedRow will put whatever metadata contents we extract at the end
of the overall record which will be persisted. At the moment, we currently only
extract and persist the lineage fields since that's what we care about but in
the future if we had some other use case to persist new metadata columns, we
could just change the extraction logic implementation to include the new
metadata columns and it should just work.
Let me know if that makes sense @stevenzwu
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -736,7 +769,14 @@ public void update(InternalRow meta, InternalRow id,
InternalRow row) throws IOE
@Override
public void insert(InternalRow row) throws IOException {
- delegate.insert(row, dataSpec, null);
+ reinsert(null, row);
+ }
+
+ @Override
+ public void reinsert(InternalRow meta, InternalRow row) throws IOException
{
Review Comment:
You're right I was also looking at how we can just project fields from a row
without copying, and I saw this pattern too. I will incorporate this in my next
update!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]