[ https://issues.apache.org/jira/browse/HIVE-26202?focusedWorklogId=768524&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768524 ]
ASF GitHub Bot logged work on HIVE-26202: ----------------------------------------- Author: ASF GitHub Bot Created on: 10/May/22 14:14 Start Date: 10/May/22 14:14 Worklog Time Spent: 10m Work Description: lcspinter commented on code in PR #3269: URL: https://github.com/apache/hive/pull/3269#discussion_r869294281 ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java: ########## @@ -63,39 +60,19 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job) { // Not doing any check. } - private static HiveIcebergWriterBase writer(JobConf jc) { + private static HiveIcebergWriter writer(JobConf jc) { TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc); // It gets the config from the FileSinkOperator which has its own config for every target table Table table = HiveIcebergStorageHandler.table(jc, jc.get(hive_metastoreConstants.META_TABLE_NAME)); - Schema schema = HiveIcebergStorageHandler.schema(jc); - FileFormat fileFormat = FileFormat.valueOf(PropertyUtil.propertyAsString(table.properties(), - TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT).toUpperCase(Locale.ENGLISH)); - long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, - TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); - FileIO io = table.io(); - int partitionId = taskAttemptID.getTaskID().getId(); - int taskId = taskAttemptID.getId(); - String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID(); - OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId) - .format(fileFormat) - .operationId(operationId) - .build(); String tableName = jc.get(Catalogs.NAME); - if (HiveIcebergStorageHandler.isDelete(jc, tableName)) { - HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, fileFormat, schema, null, fileFormat, - null, null, null, schema); - return new HiveIcebergDeleteWriter(schema, table.specs(), fileFormat, writerFactory, outputFileFactory, io, - targetFileSize, taskAttemptID, tableName); - } else if (HiveIcebergStorageHandler.isUpdate(jc, tableName)) { - HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, fileFormat, schema, null, fileFormat, - null, null, null, null); - return new HiveIcebergUpdateWriter(schema, table.specs(), table.spec().specId(), fileFormat, writerFactory, - outputFileFactory, io, targetFileSize, taskAttemptID, tableName, jc); - } else { - HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, fileFormat, schema, null, fileFormat, - null, null, null, schema); - return new HiveIcebergRecordWriter(schema, table.specs(), table.spec().specId(), fileFormat, writerFactory, - outputFileFactory, io, targetFileSize, taskAttemptID, tableName, false); - } + int poolSize = jc.getInt(DELETE_FILE_THREAD_POOL_SIZE, DELETE_FILE_THREAD_POOL_SIZE_DEFAULT); + + return WriterBuilder.builderFor(table) + .queryId(jc.get(HiveConf.ConfVars.HIVEQUERYID.varname)) + .tableName(tableName) Review Comment: Sorry, my mistake. I confused with table.name() which returns `catalogName.dbName.tblName` Issue Time Tracking ------------------- Worklog Id: (was: 768524) Time Spent: 1h (was: 50m) > Refactor Iceberg Writers > ------------------------ > > Key: HIVE-26202 > URL: https://issues.apache.org/jira/browse/HIVE-26202 > Project: Hive > Issue Type: Improvement > Reporter: Peter Vary > Priority: Major > Labels: pull-request-available > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.7#820007)