amogh-jahagirdar commented on code in PR #13310:
URL: https://github.com/apache/iceberg/pull/13310#discussion_r2192811369
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -699,29 +708,41 @@ public DataWriter<InternalRow> createWriter(int
partitionId, long taskId, long e
writeSchema,
dsSchema,
targetFileSize,
- useFanoutWriter);
+ useFanoutWriter,
+ extractRowLineage);
}
}
}
private static class UnpartitionedDataWriter implements
DataWriter<InternalRow> {
private final FileWriter<InternalRow, DataWriteResult> delegate;
private final FileIO io;
+ private final Function<InternalRow, InternalRow>
extractRowLineageFromMetadata;
private UnpartitionedDataWriter(
SparkFileWriterFactory writerFactory,
OutputFileFactory fileFactory,
FileIO io,
PartitionSpec spec,
- long targetFileSize) {
+ long targetFileSize,
+ Function<InternalRow, InternalRow> extractRowLineageFromMetadata) {
this.delegate =
new RollingDataWriter<>(writerFactory, fileFactory, io,
targetFileSize, spec, null);
this.io = io;
+ this.extractRowLineageFromMetadata = extractRowLineageFromMetadata;
}
@Override
public void write(InternalRow record) throws IOException {
- delegate.write(record);
+ write(null, record);
Review Comment:
Yeah basically what @nastra said, this is delegating to the new Spark DSV2
writer write(InternalRow meta, InternalRow data) API where the first row is the
metadata row, and I pass through null to indicate that there is no metadata for
this existing Write(record) API.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]