[ 
https://issues.apache.org/jira/browse/HIVE-26202?focusedWorklogId=768356&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768356
 ]

ASF GitHub Bot logged work on HIVE-26202:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/May/22 08:11
            Start Date: 10/May/22 08:11
    Worklog Time Spent: 10m 
      Work Description: lcspinter commented on code in PR #3269:
URL: https://github.com/apache/hive/pull/3269#discussion_r868905820


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -571,6 +571,15 @@ public static boolean isUpdate(Configuration conf, String 
tableName) {
         conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
   }
 
+  public static Operation operation(Configuration conf, String tableName) {

Review Comment:
   nit: can we use this new function in `isWrite`, `isDelete` and `isUpdate` 
functions to get the operation type? 



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java:
##########
@@ -63,39 +60,19 @@ public void checkOutputSpecs(FileSystem ignored, JobConf 
job) {
     // Not doing any check.
   }
 
-  private static HiveIcebergWriterBase writer(JobConf jc) {
+  private static HiveIcebergWriter writer(JobConf jc) {
     TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
     // It gets the config from the FileSinkOperator which has its own config 
for every target table
     Table table = HiveIcebergStorageHandler.table(jc, 
jc.get(hive_metastoreConstants.META_TABLE_NAME));
-    Schema schema = HiveIcebergStorageHandler.schema(jc);
-    FileFormat fileFormat = 
FileFormat.valueOf(PropertyUtil.propertyAsString(table.properties(),
-        TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT).toUpperCase(Locale.ENGLISH));
-    long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), 
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
-        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-    FileIO io = table.io();
-    int partitionId = taskAttemptID.getTaskID().getId();
-    int taskId = taskAttemptID.getId();
-    String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + 
taskAttemptID.getJobID();
-    OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 
partitionId, taskId)
-        .format(fileFormat)
-        .operationId(operationId)
-        .build();
     String tableName = jc.get(Catalogs.NAME);
-    if (HiveIcebergStorageHandler.isDelete(jc, tableName)) {
-      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, 
fileFormat, schema, null, fileFormat,
-          null, null, null, schema);
-      return new HiveIcebergDeleteWriter(schema, table.specs(), fileFormat, 
writerFactory, outputFileFactory, io,
-          targetFileSize, taskAttemptID, tableName);
-    } else if (HiveIcebergStorageHandler.isUpdate(jc, tableName)) {
-      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, 
fileFormat, schema, null, fileFormat,
-          null, null, null, null);
-      return new HiveIcebergUpdateWriter(schema, table.specs(), 
table.spec().specId(), fileFormat, writerFactory,
-          outputFileFactory, io, targetFileSize, taskAttemptID, tableName, jc);
-    } else {
-      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, 
fileFormat, schema, null, fileFormat,
-          null, null, null, schema);
-      return new HiveIcebergRecordWriter(schema, table.specs(), 
table.spec().specId(), fileFormat, writerFactory,
-          outputFileFactory, io, targetFileSize, taskAttemptID, tableName, 
false);
-    }
+    int poolSize = jc.getInt(DELETE_FILE_THREAD_POOL_SIZE, 
DELETE_FILE_THREAD_POOL_SIZE_DEFAULT);
+
+    return WriterBuilder.builderFor(table)
+        .queryId(jc.get(HiveConf.ConfVars.HIVEQUERYID.varname))
+        .tableName(tableName)

Review Comment:
   why do we pass the tableName separately? It is not possible to reuse the 
iceberg table's name? 



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java:
##########
@@ -32,19 +31,17 @@
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
+import org.apache.iceberg.mr.hive.writer.WriterBuilder;
 import org.apache.iceberg.mr.mapred.Container;
-import org.apache.iceberg.util.PropertyUtil;
 
 public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, 
Container<Record>>,
     HiveOutputFormat<NullWritable, Container<Record>> {
+  private static final String DELETE_FILE_THREAD_POOL_SIZE = 
"iceberg.delete.file.thread.pool.size";

Review Comment:
   We have many iceberg config parameters that are hidden from the end-user. Do 
we want to keep all these params internal or are we collecting them somewhere? 
   If for some reason the customer has an issue with the delete thread pool and 
they would like to increase the size of it, it's not clear what parameter 
should they touch, unless a developer or someone who knows the hive code tells 
them. 



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.writer;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.hadoop.hive.ql.Context.Operation;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.util.PropertyUtil;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
+
+public class WriterBuilder {
+  private Table table;

Review Comment:
   nit: can be final





Issue Time Tracking
-------------------

    Worklog Id:     (was: 768356)
    Time Spent: 20m  (was: 10m)

> Refactor Iceberg Writers
> ------------------------
>
>                 Key: HIVE-26202
>                 URL: https://issues.apache.org/jira/browse/HIVE-26202
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Peter Vary
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to