rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559047177



##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,63 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, 
Container>,
+    HiveOutputFormat<NullWritable, Container> {
+
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path 
finalOutPath, Class valueClass,
+      boolean isCompressed, Properties tableAndSerDeProperties, Progressable 
progress) {
+
+    TaskAttemptID taskAttemptID = 
TaskAttemptID.forName(jc.get(TASK_ATTEMPT_ID_KEY));
+    Schema schema = HiveIcebergStorageHandler.schema(jc);
+    PartitionSpec spec = HiveIcebergStorageHandler.spec(jc);
+    FileFormat fileFormat = 
FileFormat.valueOf(jc.get(InputFormatConfig.WRITE_FILE_FORMAT));
+    long targetFileSize = jc.getLong(InputFormatConfig.WRITE_TARGET_FILE_SIZE, 
Long.MAX_VALUE);
+    FileIO io = HiveIcebergStorageHandler.io(jc);
+    LocationProvider location = HiveIcebergStorageHandler.location(jc);
+    EncryptionManager encryption = HiveIcebergStorageHandler.encryption(jc);
+    OutputFileFactory outputFileFactory =
+        new OutputFileFactory(spec, FileFormat.PARQUET, location, io, 
encryption, taskAttemptID.getTaskID().getId(),
+            taskAttemptID.getId(), 
jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
+    HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec, 
fileFormat,
+        new GenericAppenderFactory(schema), outputFileFactory, io, 
targetFileSize, taskAttemptID);
+
+    return writer;
+  }
 
   @Override
-  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf 
job, String name, Progressable progress) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, Container> 
getRecordWriter(FileSystem ignored,
+      JobConf job, String name, Progressable progress) {
+    throw new UnsupportedOperationException("Please implement if needed");

Review comment:
       Is there a reason not to implement this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to