kbendick commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460577287



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = 
Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, 
FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, 
long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the 
implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing 
up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);

Review comment:
       This code here is handling the case where we've not seen this partition 
key yet. This is especially likely to happen when users did not `keyBy` or 
otherwise pre-shuffle the data according to the partition key.
   
   Is pre-shuffling something that the users should be doing before writing to 
the table (either `keyBy` or `ORDER BY` in Flink SQL)? I understand that this 
is specifically a `PartitionedFanoutWriter`, and so it makes sense that keys 
might not always come together (and even in the case where users did `keyBy` 
the partition key, if the number of TaskManager slots that are writing does not 
equal the cardinality of the partition key you'll still wind up with multiple 
RollingFileAppenders in a single Flink writing task and thus fanout). However, 
for long running streaming queries, it's possible that this TaskManager doesn't 
see this partition key again for days or even weeks (especially at a high 
enough volume to emit a complete file of the given target file size). 
   
   I guess my concern is that users wind up with a very high cardinality of 
keys on a single TaskManager. Either because they didn't pre-shuffle their data 
or perhaps they have an imbalance between the cardinality on the partition key 
and the parallelism at the write stage such that records might not naturally 
group together enough to emit an entire file. Or,  as another edge case, one 
partition key value is simply not common enough to emit an entire file from 
this `PartitionedFanoutWriter`.
   
   IIUC, if the `PartitionedFanoutWriter` does not see this partition key 
enough times in this TaskManager again to emit a full file for quite some time, 
a file containing this data won't be written until `close` is called. For very 
long running streaming jobs, this could be days or even weeks in my experience. 
This could also lead to small files upon `close`. Is this a concern that 
Iceberg should take into consideration or is this left to the users in their 
Flink query to determine when tuning their queries? 
   
   I imagine with S3, data locality of a file written much later than its 
timestamp of when the data was received is not a major concern, as the manifest 
file will tell whatever query engine reads this table which keys in their S3 
bucket to grab and the locality issue is relatively abstracted away from the 
user, but what about if the user is using HDFS? Could this lead to performance 
issues (or even correctness issues) on read if records with relatively similar 
timestamps at their RollingFileAppender are scattered across a potentially 
large number of files?
   
   I suppose this amounts to three concerns (and forgive me if these are 
non-issues as I am still new to the project, but not new to Flink so partially 
this is for helping me understand, as well as reviewing my concerns when 
reading this code):
   1) Should we be concerned that a writer won't emit a file until a streaming 
query is closed due to the previously mentioned case? Possibly tracking the 
time that each writer has existed and then emitting a file if it has been far 
too long (however that could be determined).
   2) If a record comes in at some time, and then the file containing that 
record isn't written for a much greater period of time (on the order of days or 
weeks), could this lead to correctness problems or very large performance 
problems when any query engine reads this table?
   3) Would it be beneficial to at least emit a warning or info level log to 
the user that it might be beneficial to pre-partition their data according to 
the partition key spec if perhaps the number of unique `RollingFileAppender` 
writers gets too high for one given Flink writer slot / TaskManager? 
Admittedly, it might be difficult to determine a heuristic of when this might 
be a problem vs just the natural difference in the parallelism of writing task 
slots vs the cardinality of the partition key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to