rdblue commented on a change in pull request #1320:
URL: https://github.com/apache/iceberg/pull/1320#discussion_r468808965



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java
##########
@@ -38,37 +40,38 @@
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
-class RowTaskWriterFactory implements TaskWriterFactory<Row> {
+class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
   private final Schema schema;
   private final PartitionSpec spec;
   private final LocationProvider locations;
   private final FileIO io;
   private final EncryptionManager encryptionManager;
   private final long targetFileSizeBytes;
   private final FileFormat format;
-  private final FileAppenderFactory<Row> appenderFactory;
+  private final RowType flinkSchema;
+  private final FileAppenderFactory<RowData> appenderFactory;
 
   private OutputFileFactory outputFileFactory;
 
-  RowTaskWriterFactory(Schema schema,
-                       PartitionSpec spec,
-                       LocationProvider locations,
-                       FileIO io,
-                       EncryptionManager encryptionManager,
-                       long targetFileSizeBytes,
-                       FileFormat format,
-                       Map<String, String> tableProperties) {
+  RowDataTaskWriterFactory(Schema schema,
+                           PartitionSpec spec,
+                           LocationProvider locations,
+                           FileIO io,
+                           EncryptionManager encryptionManager,
+                           long targetFileSizeBytes,
+                           FileFormat format,
+                           Map<String, String> tableProperties) {
     this.schema = schema;
     this.spec = spec;
     this.locations = locations;
     this.io = io;
     this.encryptionManager = encryptionManager;
     this.targetFileSizeBytes = targetFileSizeBytes;
     this.format = format;
-    this.appenderFactory = new FlinkFileAppenderFactory(schema, 
tableProperties);
+    this.flinkSchema = FlinkSchemaUtil.convert(schema);

Review comment:
       In Spark, we had a bug where Spark may produce a row with a short, which 
is stored as an int in Iceberg. In a CTAS query, data would actually get passed 
to Iceberg with the short and we would end up with a ClassCastException. That's 
why we now pass the dataset schema when creating writers. You might want to 
watch out for a similar bug.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to