[ 
https://issues.apache.org/jira/browse/HIVE-25195?focusedWorklogId=606893&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-606893
 ]

ASF GitHub Bot logged work on HIVE-25195:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Jun/21 08:36
            Start Date: 04/Jun/21 08:36
    Worklog Time Spent: 10m 
      Work Description: marton-bod commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r645354564



##########
File path: 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -459,35 +454,35 @@ public void 
rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), 
table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, 
overwrite);
-    OutputCommitter committer = new HiveIcebergOutputCommitter();
-    try {
-      LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: 
{}", jobContext.getJobID(), tableName);
-      committer.abortJob(jobContext, JobStatus.State.FAILED);
-    } catch (IOException e) {
-      LOG.error("Error while trying to abort failed job. There might be 
uncleaned data files.", e);
-      // no throwing here because the original commitInsertTable exception 
should be propagated
-    } finally {
-      // avoid config pollution with prefixed/suffixed keys
-      cleanCommitConfig(tableName);
+    if (jobContext != null) {
+      OutputCommitter committer = new HiveIcebergOutputCommitter();
+      try {
+        LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: 
{}", jobContext.getJobID(), tableName);
+        committer.abortJob(jobContext, JobStatus.State.FAILED);
+      } catch (IOException e) {
+        LOG.error("Error while trying to abort failed job. There might be 
uncleaned data files.", e);
+        // no throwing here because the original commitInsertTable exception 
should be propagated
+      }
     }
   }
 
-  private void cleanCommitConfig(String tableName) {
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName);
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName);
-    conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName);
-    conf.unset(InputFormatConfig.OUTPUT_TABLES);
-  }
-
   private JobContext getJobContextForCommitOrAbort(String tableName, boolean 
overwrite) {

Review comment:
       Done

##########
File path: 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -183,10 +183,9 @@ private void createTableForCTAS(Configuration 
configuration, Properties serDePro
         serDeProperties.get(Catalogs.NAME), tableSchema, 
serDeProperties.get(InputFormatConfig.PARTITION_SPEC));
     Catalogs.createTable(configuration, serDeProperties);
 
-    // set these in the global conf so that we can rollback the table in the 
lifecycle hook in case of failures
-    String queryId = configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname);
-    configuration.set(String.format(InputFormatConfig.IS_CTAS_QUERY_TEMPLATE, 
queryId), "true");
-    
configuration.set(String.format(InputFormatConfig.CTAS_TABLE_NAME_TEMPLATE, 
queryId),
+    // set these in the query state so that we can rollback the table in the 
lifecycle hook in case of failures
+    SessionStateUtil.addResource(configuration, 
InputFormatConfig.IS_CTAS_QUERY, "true");

Review comment:
       Yes, true, CTAS_TABLE_NAME should be enough

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) 
throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {

Review comment:
       Good question. Don't think there's a faster way if we want to keep this 
generic and work for all existing and future iceberg properties. I did find a 
convenience method though which can do this for us without a loop:
   `public Map<String, String> getPropsWithPrefix(String confPrefix)`, but it 
also loops through the keys internally.
   
   If we are concerned about performance, we can find out which exact 
properties are needed to be propagated, hardcode those and simply pass only 
those few without the looping.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) 
throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) 
{
-                  
tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if 
(entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  
tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, 
query state)
+              // find iceberg props in jobConf as they can be needed, but not 
available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {

Review comment:
       You're right, will definitely merge them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 606893)
    Time Spent: 2h 20m  (was: 2h 10m)

> Store Iceberg write commit and ctas information in QueryState 
> --------------------------------------------------------------
>
>                 Key: HIVE-25195
>                 URL: https://issues.apache.org/jira/browse/HIVE-25195
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Marton Bod
>            Assignee: Marton Bod
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> We should replace the current method of passing Iceberg write commit-related 
> information (jobID, task num) and CTAS info via the session conf using 
> prefixed keys. We have a new way of doing that more cleanly, using the 
> QueryState object. This should make the code easier to maintain and guard 
> against accidental session conf pollution.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to