rdblue commented on a change in pull request #1320:
URL: https://github.com/apache/iceberg/pull/1320#discussion_r469498892
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java
##########
@@ -38,37 +40,38 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-class RowTaskWriterFactory implements TaskWriterFactory<Row> {
+class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
private final Schema schema;
private final PartitionSpec spec;
private final LocationProvider locations;
private final FileIO io;
private final EncryptionManager encryptionManager;
private final long targetFileSizeBytes;
private final FileFormat format;
- private final FileAppenderFactory<Row> appenderFactory;
+ private final RowType flinkSchema;
+ private final FileAppenderFactory<RowData> appenderFactory;
private OutputFileFactory outputFileFactory;
- RowTaskWriterFactory(Schema schema,
- PartitionSpec spec,
- LocationProvider locations,
- FileIO io,
- EncryptionManager encryptionManager,
- long targetFileSizeBytes,
- FileFormat format,
- Map<String, String> tableProperties) {
+ RowDataTaskWriterFactory(Schema schema,
+ PartitionSpec spec,
+ LocationProvider locations,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ long targetFileSizeBytes,
+ FileFormat format,
+ Map<String, String> tableProperties) {
this.schema = schema;
this.spec = spec;
this.locations = locations;
this.io = io;
this.encryptionManager = encryptionManager;
this.targetFileSizeBytes = targetFileSizeBytes;
this.format = format;
- this.appenderFactory = new FlinkFileAppenderFactory(schema,
tableProperties);
+ this.flinkSchema = FlinkSchemaUtil.convert(schema);
Review comment:
This only appears in Spark with a CTAS query because that's the only
time that Spark doesn't get a schema back from the table. When Spark has a
table schema, it will automatically insert casts to the appropriate types so
this problem doesn't happen. I'm not sure if Flink does that, but if it does
then you wouldn't need to worry about that bug.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]