[PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-08 Thread via GitHub


SourabhBadhya opened a new pull request, #5076:
URL: https://github.com/apache/hive/pull/5076

   
   
   ### What changes were proposed in this pull request?
   
   HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables
   
   ### Why are the changes needed?
   
   To minimise the number of small files.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### Is the change a dependency upgrade?
   
   No
   
   ### How was this patch tested?
   
   QTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-09 Thread via GitHub


sonarcloud[bot] commented on PR #5076:
URL: https://github.com/apache/hive/pull/5076#issuecomment-1935669476

   ## [![Quality Gate 
Passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/qg-passed-20px.png
 'Quality Gate 
Passed')](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076) 
**Quality Gate passed**  
   Issues  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [12 New 
issues](https://sonarcloud.io/project/issues?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
   
   Measures  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [0 Security 
Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Coverage  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Duplication  
 
   [See analysis details on 
SonarCloud](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-09 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1484411532


##
iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_files.q:
##
@@ -0,0 +1,97 @@
+--! qt:dataset:src
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.merge.mapredfiles=true;

Review Comment:
   isn't that legacy config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-09 Thread via GitHub


sonarcloud[bot] commented on PR #5076:
URL: https://github.com/apache/hive/pull/5076#issuecomment-1936551525

   ## [![Quality Gate 
Passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/qg-passed-20px.png
 'Quality Gate 
Passed')](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076) 
**Quality Gate passed**  
   Issues  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [7 New 
issues](https://sonarcloud.io/project/issues?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
   
   Measures  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [0 Security 
Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Coverage  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Duplication  
 
   [See analysis details on 
SonarCloud](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-11 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1485790998


##
iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_files.q:
##
@@ -0,0 +1,97 @@
+--! qt:dataset:src
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.merge.mapredfiles=true;

Review Comment:
   Yeah but this is still being used in the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-12 Thread via GitHub


sonarcloud[bot] commented on PR #5076:
URL: https://github.com/apache/hive/pull/5076#issuecomment-1938623423

   ## [![Quality Gate 
Passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/qg-passed-20px.png
 'Quality Gate 
Passed')](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076) 
**Quality Gate passed**  
   Issues  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [5 New 
issues](https://sonarcloud.io/project/issues?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
   
   Measures  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [0 Security 
Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Coverage  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Duplication  
 
   [See analysis details on 
SonarCloud](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-14 Thread via GitHub


sonarcloud[bot] commented on PR #5076:
URL: https://github.com/apache/hive/pull/5076#issuecomment-1943314721

   ## [![Quality Gate 
Passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/qg-passed-20px.png
 'Quality Gate 
Passed')](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076) 
**Quality Gate passed**  
   Issues  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [5 New 
issues](https://sonarcloud.io/project/issues?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
   
   Measures  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [0 Security 
Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Coverage  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Duplication  
 
   [See analysis details on 
SonarCloud](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-19 Thread via GitHub


sonarcloud[bot] commented on PR #5076:
URL: https://github.com/apache/hive/pull/5076#issuecomment-1953585615

   ## [![Quality Gate 
Passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/qg-passed-20px.png
 'Quality Gate 
Passed')](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076) 
**Quality Gate passed**  
   Issues  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [5 New 
issues](https://sonarcloud.io/project/issues?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
   
   Measures  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [0 Security 
Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Coverage  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Duplication  
 
   [See analysis details on 
SonarCloud](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-21 Thread via GitHub


sonarcloud[bot] commented on PR #5076:
URL: https://github.com/apache/hive/pull/5076#issuecomment-1956444102

   ## [![Quality Gate 
Passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/qg-passed-20px.png
 'Quality Gate 
Passed')](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076) 
**Quality Gate passed**  
   Issues  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [5 New 
issues](https://sonarcloud.io/project/issues?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
   
   Measures  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [0 Security 
Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Coverage  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Duplication  
 
   [See analysis details on 
SonarCloud](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-21 Thread via GitHub


sonarcloud[bot] commented on PR #5076:
URL: https://github.com/apache/hive/pull/5076#issuecomment-1957732267

   ## [![Quality Gate 
Passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/qg-passed-20px.png
 'Quality Gate 
Passed')](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076) 
**Quality Gate passed**  
   Issues  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [8 New 
issues](https://sonarcloud.io/project/issues?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
   
   Measures  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [0 Security 
Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Coverage  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Duplication  
 
   [See analysis details on 
SonarCloud](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


kasakrisz commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500474777


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -163,6 +167,26 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
   LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
 }
   }, IOException.class);
+
+  // Merge task has merged several files into one. Hence we need to remove 
the stale files.
+  // At this stage the file is written and task-committed, but the old 
files are still present.
+  if 
(CombineHiveInputFormat.class.equals(jobConf.getInputFormat().getClass())) {

Review Comment:
   how about
   ```
   
jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class)
   ```



##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -163,6 +167,26 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
   LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
 }
   }, IOException.class);
+
+  // Merge task has merged several files into one. Hence we need to remove 
the stale files.
+  // At this stage the file is written and task-committed, but the old 
files are still present.
+  if 
(CombineHiveInputFormat.class.equals(jobConf.getInputFormat().getClass())) {
+MapWork mrwork = Utilities.getMapWork(jobConf);
+if (mrwork != null) {
+  List mergedPaths = mrwork.getInputPaths();
+  if (CollectionUtils.isNotEmpty(mergedPaths)) {
+Tasks.foreach(mergedPaths)

Review Comment:
   Does the empty check necessary? What happens when an empty collections is 
passed to `Tasks.foreach` ?



##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -512,7 +555,30 @@ private void setupWorkWhenUsingManifestFile(MapWork 
mapWork, List fi
 mapWork.setUseInputPathsDirectly(true);
   }
 
-  private Map> getManifestDirs(FileSystem inpFs, 
List fileStatuses)
+  private void setupWorkWhenUsingCustomHandler(MapWork mapWork, Path dirPath,
+   StorageHandlerMergeProperties 
mergeProperties) throws ClassNotFoundException {
+Map> aliasToWork = 
mapWork.getAliasToWork();
+Map pathToPartitionInfo = 
mapWork.getPathToPartitionInfo();
+Operator op = aliasToWork.get(dirPath.toString());
+PartitionDesc partitionDesc = pathToPartitionInfo.get(dirPath);
+Path tmpDir = mergeProperties.getTmpLocation();

Review Comment:
   Can `mergeProperties` be null?
   
   I saw earlier that sometimes we set it to null:
   ```
   StorageHandlerMergeProperties mergeProperties = useCustomStorageHandler ?
   
storageHandler.getStorageHandlerMergeProperties(ctx.getCustomStorageHandlerProps())
 : null;
   ```



##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  

Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500798174


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java:
##
@@ -178,7 +178,7 @@ public RecordReader> 
getRecordReader(InputSplit split, J
 
   @Override
   public boolean shouldSkipCombine(Path path, Configuration conf) {
-return true;
+return false;

Review Comment:
   to disable combine functionality we could use hive.merge.tezfiles=false, 
right?



##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java:
##
@@ -178,7 +178,7 @@ public RecordReader> 
getRecordReader(InputSplit split, J
 
   @Override
   public boolean shouldSkipCombine(Path path, Configuration conf) {
-return true;
+return false;

Review Comment:
   to disable combine functionality we could use `hive.merge.tezfiles`=`false`, 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500801678


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -163,6 +167,26 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
   LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
 }
   }, IOException.class);
+
+  // Merge task has merged several files into one. Hence we need to remove 
the stale files.

Review Comment:
   could we extract this part into some cleaner method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500803369


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -163,6 +167,26 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
   LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
 }
   }, IOException.class);
+
+  // Merge task has merged several files into one. Hence we need to remove 
the stale files.
+  // At this stage the file is written and task-committed, but the old 
files are still present.
+  if 
(CombineHiveInputFormat.class.equals(jobConf.getInputFormat().getClass())) {
+MapWork mrwork = Utilities.getMapWork(jobConf);
+if (mrwork != null) {
+  List mergedPaths = mrwork.getInputPaths();
+  if (CollectionUtils.isNotEmpty(mergedPaths)) {
+Tasks.foreach(mergedPaths)
+.retry(3)
+.stopOnFailure()
+.throwFailureWhenFinished()

Review Comment:
   should we fail if cleanup throws an exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500813178


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +744,42 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List jobContextList = jobContexts.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+List outputs = collectOutputs(jobContextList);
+ExecutorService fileExecutor = 
fileExecutor(jobContextList.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+dataFiles.addAll(results.dataFiles());
+  }, IOException.class);
+} finally {
+  fileExecutor.shutdown();
+  if (tableExecutor != null) {

Review Comment:
   can it be null? if not, why don't we have same if for the fileExecutor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500818017


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +744,42 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List jobContextList = jobContexts.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+List outputs = collectOutputs(jobContextList);
+ExecutorService fileExecutor = 
fileExecutor(jobContextList.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+dataFiles.addAll(results.dataFiles());
+  }, IOException.class);
+} finally {
+  fileExecutor.shutdown();
+  if (tableExecutor != null) {
+tableExecutor.shutdown();
+  }
+}
+return Lists.newArrayList(dataFiles);

Review Comment:
   why do we need an extra copy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500818017


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +744,42 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List jobContextList = jobContexts.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+List outputs = collectOutputs(jobContextList);
+ExecutorService fileExecutor = 
fileExecutor(jobContextList.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+dataFiles.addAll(results.dataFiles());
+  }, IOException.class);
+} finally {
+  fileExecutor.shutdown();
+  if (tableExecutor != null) {
+tableExecutor.shutdown();
+  }
+}
+return Lists.newArrayList(dataFiles);

Review Comment:
   why do we need an extra copy, to return List<>?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500832533


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -2021,4 +2023,34 @@ public List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
   throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
 }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+return true;
+  }
+
+  @Override
+  public List getMergeInputFiles(Properties properties) throws 
IOException {
+String tableName = properties.getProperty(Catalogs.NAME);
+String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+Configuration configuration = SessionState.getSessionConf();
+List originalContextList = generateJobContext(configuration, 
tableName, snapshotRef);
+List jobContextList = originalContextList.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+if (originalContextList.isEmpty()) {
+  return Collections.emptyList();
+}
+List dataFiles = Lists.newArrayList();
+for (DataFile dataFile : new 
HiveIcebergOutputCommitter().getWrittenFiles(jobContextList)) {

Review Comment:
   1. why do you need new instance of HiveIcebergOutputCommitter, can't you 
make the method static?
   2. Why do you need to enrichContextWithVertexId again inside of 
`getWrittenFiles`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500832533


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -2021,4 +2023,34 @@ public List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
   throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
 }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+return true;
+  }
+
+  @Override
+  public List getMergeInputFiles(Properties properties) throws 
IOException {
+String tableName = properties.getProperty(Catalogs.NAME);
+String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+Configuration configuration = SessionState.getSessionConf();
+List originalContextList = generateJobContext(configuration, 
tableName, snapshotRef);
+List jobContextList = originalContextList.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+if (originalContextList.isEmpty()) {
+  return Collections.emptyList();
+}
+List dataFiles = Lists.newArrayList();
+for (DataFile dataFile : new 
HiveIcebergOutputCommitter().getWrittenFiles(jobContextList)) {

Review Comment:
   1. why do you need new instance of HiveIcebergOutputCommitter, can't you 
make the method static? Maybe we should move it to Iceberg util class
   2. Why do you need to enrichContextWithVertexId again inside of 
`getWrittenFiles`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500832533


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -2021,4 +2023,34 @@ public List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
   throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
 }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+return true;
+  }
+
+  @Override
+  public List getMergeInputFiles(Properties properties) throws 
IOException {
+String tableName = properties.getProperty(Catalogs.NAME);
+String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+Configuration configuration = SessionState.getSessionConf();
+List originalContextList = generateJobContext(configuration, 
tableName, snapshotRef);
+List jobContextList = originalContextList.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+if (originalContextList.isEmpty()) {
+  return Collections.emptyList();
+}
+List dataFiles = Lists.newArrayList();
+for (DataFile dataFile : new 
HiveIcebergOutputCommitter().getWrittenFiles(jobContextList)) {

Review Comment:
   1. why do you need new instance of HiveIcebergOutputCommitter, can't you 
make the method static? Maybe we should move it to Iceberg util class
   2. Why do you need to enrichContextWithVertexId again inside of 
`getWrittenFiles`?
   3. Why don't we return  `List dataFiles` right away, but add 1 
more foreach with transformation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500848177


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+String location = properties.getProperty(Catalogs.LOCATION);
+return new Path(location + "/data/");
+  }
+
+  public String getInputFileFormatClassName() {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+if (fileFormat == FileFormat.ORC) {

Review Comment:
   can we use 
   
   StorageFormatDescriptor ss = 
storageHandler.getStorageFormatDescriptor(..);
   if (ss != null) {
 inputFormatClassName = ss.getInputFormat();
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500851164


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+String location = properties.getProperty(Catalogs.LOCATION);
+return new Path(location + "/data/");
+  }
+
+  public String getInputFileFormatClassName() {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+if (fileFormat == FileFormat.ORC) {
+  return OrcInputFormat.class.getName();
+} else if (fileFormat == FileFormat.PARQUET) {
+  return MapredParquetInputFormat.class.getName();
+} else if (fileFormat == FileFormat.AVRO) {
+  return AvroContainerInputFormat.class.getName();
+}
+return null;
+  }
+
+  public String getOutputFileFormatClassName() {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+if (fileFormat == FileFormat.ORC) {

Review Comment:
   this looks like copy of the above getInputFileFormatClassName



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500851164


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+String location = properties.getProperty(Catalogs.LOCATION);
+return new Path(location + "/data/");
+  }
+
+  public String getInputFileFormatClassName() {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+if (fileFormat == FileFormat.ORC) {
+  return OrcInputFormat.class.getName();
+} else if (fileFormat == FileFormat.PARQUET) {
+  return MapredParquetInputFormat.class.getName();
+} else if (fileFormat == FileFormat.AVRO) {
+  return AvroContainerInputFormat.class.getName();
+}
+return null;
+  }
+
+  public String getOutputFileFormatClassName() {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+if (fileFormat == FileFormat.ORC) {

Review Comment:
   this looks like copy of the above getInputFileFormatClassName



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500852533


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {

Review Comment:
   it looks more like StorageFormatDescriptor 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500854772


##
ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java:
##
@@ -370,7 +371,8 @@ private InputSplit[] getCombineSplits(JobConf job, int 
numSplits,
   PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
   pathToPartitionInfo, path, 
IOPrepareCache.get().allocatePartitionDescMap());
   TableDesc tableDesc = part.getTableDesc();
-  if ((tableDesc != null) && tableDesc.isNonNative()) {
+  boolean useDefaultFileFormat = 
part.getInputFileFormatClass().equals(tableDesc.getInputFileFormatClass());
+  if ((tableDesc != null) && tableDesc.isNonNative() && 
useDefaultFileFormat) {

Review Comment:
   why do we need to wrap tableDesc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500857349


##
ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java:
##
@@ -691,6 +693,18 @@ Map> removeScheme(Map> pathToAliases) {
   public RecordReader getRecordReader(InputSplit split, JobConf job,
   Reporter reporter) throws IOException {
 if (!(split instanceof CombineHiveInputSplit)) {
+  if (split instanceof FileSplit) {
+Map pathToPartitionInfo = 
Utilities.getMapWork(job).getPathToPartitionInfo();
+Path path = ((FileSplit) split).getPath();
+PartitionDesc partitionDesc = 
HiveFileFormatUtils.getFromPathRecursively(pathToPartitionInfo, path,
+IOPrepareCache.get().getPartitionDescMap());
+boolean useDefaultFileFormat = partitionDesc.getInputFileFormatClass()
+
.equals(partitionDesc.getTableDesc().getInputFileFormatClass());

Review Comment:
   isAssignableFrom?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500862337


##
ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java:
##
@@ -691,6 +693,18 @@ Map> removeScheme(Map> pathToAliases) {
   public RecordReader getRecordReader(InputSplit split, JobConf job,
   Reporter reporter) throws IOException {
 if (!(split instanceof CombineHiveInputSplit)) {
+  if (split instanceof FileSplit) {

Review Comment:
   won't we break here legacy code? prev if !(split instanceof 
CombineHiveInputSplit), we fallback to HiveInputFormat#getRecordReader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500868679


##
ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java:
##
@@ -747,4 +750,18 @@ default List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.T
 throw new UnsupportedOperationException("Storage handler does not support 
getting partitions by expression " +
 "for a table.");
   }
+
+  default boolean supportsMergeFiles() {

Review Comment:
   supportsMergeSmallFiles



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500871964


##
ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java:
##
@@ -747,4 +750,18 @@ default List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.T
 throw new UnsupportedOperationException("Storage handler does not support 
getting partitions by expression " +
 "for a table.");
   }
+
+  default boolean supportsMergeFiles() {
+return false;
+  }
+
+  default List getMergeInputFiles(Properties properties) throws 
IOException {

Review Comment:
   how about getOutputFiles?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500872749


##
ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java:
##
@@ -747,4 +750,18 @@ default List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.T
 throw new UnsupportedOperationException("Storage handler does not support 
getting partitions by expression " +
 "for a table.");
   }
+
+  default boolean supportsMergeFiles() {
+return false;
+  }
+
+  default List getMergeInputFiles(Properties properties) throws 
IOException {
+throw new UnsupportedOperationException("Storage handler does not support 
getting merge input files " +
+"for a table.");
+  }
+
+  default StorageHandlerMergeProperties 
getStorageHandlerMergeProperties(Properties properties) {

Review Comment:
   not sure about that class, could we get same information from 
StorageDescriptor?



##
ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java:
##
@@ -747,4 +750,18 @@ default List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.T
 throw new UnsupportedOperationException("Storage handler does not support 
getting partitions by expression " +
 "for a table.");
   }
+
+  default boolean supportsMergeFiles() {
+return false;
+  }
+
+  default List getMergeInputFiles(Properties properties) throws 
IOException {
+throw new UnsupportedOperationException("Storage handler does not support 
getting merge input files " +
+"for a table.");
+  }
+
+  default StorageHandlerMergeProperties 
getStorageHandlerMergeProperties(Properties properties) {

Review Comment:
   not sure about this class, could we get same information from 
StorageDescriptor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500878277


##
ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java:
##
@@ -1720,6 +1723,41 @@ protected static MoveWork mergeMovePaths(Path 
condInputPath, MoveWork linkedMove
 return newWork;
   }
 
+  private static void 
setCustomStorageHandlerPropertiesForMerge(ConditionalResolverMergeFilesCtx 
mrCtx, MoveWork work) {

Review Comment:
   that method sets both storageHandler and properties. btw what does it mean 
custom StorageHandler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500880192


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -125,6 +133,22 @@ public ListBucketingCtx getLbCtx() {
 public void setLbCtx(ListBucketingCtx lbCtx) {
   this.lbCtx = lbCtx;
 }
+
+public void setCustomStorageHandlerProps(Properties properties) {

Review Comment:
   do we need custom in a method name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500884580


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -147,18 +171,23 @@ public List> getTasks(HiveConf conf, Object 
objCtx) {
   Path dirPath = new Path(dirName);
   FileSystem inpFs = dirPath.getFileSystem(conf);
   DynamicPartitionCtx dpCtx = ctx.getDPCtx();
+  HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(conf, 
ctx.getStorageHandlerClass());
+  boolean dirExists = inpFs.exists(dirPath);
+  boolean useCustomStorageHandler = storageHandler != null && 
storageHandler.supportsMergeFiles();
 
-  if (inpFs.exists(dirPath)) {
+  MapWork work = null;
+  if (dirExists || useCustomStorageHandler) {

Review Comment:
   not sure if we actually need any checks here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500868679


##
ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java:
##
@@ -747,4 +750,18 @@ default List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.T
 throw new UnsupportedOperationException("Storage handler does not support 
getting partitions by expression " +
 "for a table.");
   }
+
+  default boolean supportsMergeFiles() {

Review Comment:
   supportsMergeSmallFiles



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500871964


##
ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java:
##
@@ -747,4 +750,18 @@ default List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.T
 throw new UnsupportedOperationException("Storage handler does not support 
getting partitions by expression " +
 "for a table.");
   }
+
+  default boolean supportsMergeFiles() {
+return false;
+  }
+
+  default List getMergeInputFiles(Properties properties) throws 
IOException {

Review Comment:
   how about getOutputFiles?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500889203


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -2021,4 +2023,34 @@ public List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
   throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
 }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+return true;
+  }
+
+  @Override
+  public List getMergeInputFiles(Properties properties) throws 
IOException {
+String tableName = properties.getProperty(Catalogs.NAME);
+String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+Configuration configuration = SessionState.getSessionConf();
+List originalContextList = generateJobContext(configuration, 
tableName, snapshotRef);
+List jobContextList = originalContextList.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+if (originalContextList.isEmpty()) {
+  return Collections.emptyList();
+}
+List dataFiles = Lists.newArrayList();
+for (DataFile dataFile : new 
HiveIcebergOutputCommitter().getWrittenFiles(jobContextList)) {
+  FileSystem fs = new Path(dataFile.path().toString()).getFileSystem(conf);
+  dataFiles.add(fs.getFileStatus(new Path(dataFile.path().toString(;
+}
+return dataFiles;
+  }
+
+  @Override
+  public StorageHandlerMergeProperties 
getStorageHandlerMergeProperties(Properties properties) {

Review Comment:
   why not simply `getMergeProperties`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500872749


##
ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java:
##
@@ -747,4 +750,18 @@ default List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.T
 throw new UnsupportedOperationException("Storage handler does not support 
getting partitions by expression " +
 "for a table.");
   }
+
+  default boolean supportsMergeFiles() {
+return false;
+  }
+
+  default List getMergeInputFiles(Properties properties) throws 
IOException {
+throw new UnsupportedOperationException("Storage handler does not support 
getting merge input files " +
+"for a table.");
+  }
+
+  default StorageHandlerMergeProperties 
getStorageHandlerMergeProperties(Properties properties) {

Review Comment:
   not sure about this class, could we get same information from 
StorageDescriptor? if not could we rename to `getMergeProperties`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500895017


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -254,18 +286,26 @@ private void generateActualTasks(HiveConf conf, 
List> resTsks,
   long trgtSize, long avgConditionSize, Task mvTask,
   Task mrTask, Task mrAndMvTask, Path dirPath,
   FileSystem inpFs, ConditionalResolverMergeFilesCtx ctx, MapWork work, 
int dpLbLevel,
-  boolean manifestFilePresent)
-  throws IOException {
+  boolean manifestFilePresent, HiveStorageHandler storageHandler)
+  throws IOException, ClassNotFoundException {
 DynamicPartitionCtx dpCtx = ctx.getDPCtx();
 List statusList;
-Map> manifestDirToFile = new HashMap<>();
+Map> parentDirToFile = new HashMap<>();
+boolean useCustomStorageHandler = storageHandler != null && 
storageHandler.supportsMergeFiles();
+StorageHandlerMergeProperties mergeProperties = useCustomStorageHandler ?
+
storageHandler.getStorageHandlerMergeProperties(ctx.getCustomStorageHandlerProps())
 : null;
 if (manifestFilePresent) {
   // Get the list of files from manifest file.
   List fileStatuses = getManifestFilePaths(conf, dirPath);
   // Setup the work to include all the files present in the manifest.
   setupWorkWhenUsingManifestFile(work, fileStatuses, dirPath, false);
-  manifestDirToFile = getManifestDirs(inpFs, fileStatuses);
-  statusList = new ArrayList<>(manifestDirToFile.keySet());
+  parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses);
+  statusList = new ArrayList<>(parentDirToFile.keySet());
+} else if (useCustomStorageHandler) {
+  List fileStatuses = 
storageHandler.getMergeInputFiles(ctx.getCustomStorageHandlerProps());
+  setupWorkWhenUsingCustomHandler(work, dirPath, mergeProperties);
+  parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses);
+  statusList = new ArrayList<>(parentDirToFile.keySet());

Review Comment:
   Lists.newArrayList(



##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -254,18 +286,26 @@ private void generateActualTasks(HiveConf conf, 
List> resTsks,
   long trgtSize, long avgConditionSize, Task mvTask,
   Task mrTask, Task mrAndMvTask, Path dirPath,
   FileSystem inpFs, ConditionalResolverMergeFilesCtx ctx, MapWork work, 
int dpLbLevel,
-  boolean manifestFilePresent)
-  throws IOException {
+  boolean manifestFilePresent, HiveStorageHandler storageHandler)
+  throws IOException, ClassNotFoundException {
 DynamicPartitionCtx dpCtx = ctx.getDPCtx();
 List statusList;
-Map> manifestDirToFile = new HashMap<>();
+Map> parentDirToFile = new HashMap<>();
+boolean useCustomStorageHandler = storageHandler != null && 
storageHandler.supportsMergeFiles();
+StorageHandlerMergeProperties mergeProperties = useCustomStorageHandler ?
+
storageHandler.getStorageHandlerMergeProperties(ctx.getCustomStorageHandlerProps())
 : null;
 if (manifestFilePresent) {
   // Get the list of files from manifest file.
   List fileStatuses = getManifestFilePaths(conf, dirPath);
   // Setup the work to include all the files present in the manifest.
   setupWorkWhenUsingManifestFile(work, fileStatuses, dirPath, false);
-  manifestDirToFile = getManifestDirs(inpFs, fileStatuses);
-  statusList = new ArrayList<>(manifestDirToFile.keySet());
+  parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses);
+  statusList = new ArrayList<>(parentDirToFile.keySet());

Review Comment:
   Lists.newArrayList(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500818017


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +744,42 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List jobContextList = jobContexts.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+List outputs = collectOutputs(jobContextList);
+ExecutorService fileExecutor = 
fileExecutor(jobContextList.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+dataFiles.addAll(results.dataFiles());
+  }, IOException.class);
+} finally {
+  fileExecutor.shutdown();
+  if (tableExecutor != null) {
+tableExecutor.shutdown();
+  }
+}
+return Lists.newArrayList(dataFiles);

Review Comment:
   why do we need an extra copy, to return List<>?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-23 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1500852533


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {

Review Comment:
   it looks more like StorageFormatDescriptor 
   
   public interface StorageFormatDescriptor {
 Set getNames();
 String getInputFormat();
 String getOutputFormat();
 String getSerde();
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505586021


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -163,6 +167,26 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
   LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
 }
   }, IOException.class);
+
+  // Merge task has merged several files into one. Hence we need to remove 
the stale files.
+  // At this stage the file is written and task-committed, but the old 
files are still present.
+  if 
(CombineHiveInputFormat.class.equals(jobConf.getInputFormat().getClass())) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505587079


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -163,6 +167,26 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
   LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
 }
   }, IOException.class);
+
+  // Merge task has merged several files into one. Hence we need to remove 
the stale files.
+  // At this stage the file is written and task-committed, but the old 
files are still present.
+  if 
(CombineHiveInputFormat.class.equals(jobConf.getInputFormat().getClass())) {
+MapWork mrwork = Utilities.getMapWork(jobConf);
+if (mrwork != null) {
+  List mergedPaths = mrwork.getInputPaths();
+  if (CollectionUtils.isNotEmpty(mergedPaths)) {
+Tasks.foreach(mergedPaths)

Review Comment:
   Removed empty collection check & replaced with null check which is 
necessary. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505587606


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+String location = properties.getProperty(Catalogs.LOCATION);
+return new Path(location + "/data/");
+  }
+
+  public String getInputFileFormatClassName() {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+if (fileFormat == FileFormat.ORC) {
+  return OrcInputFormat.class.getName();
+} else if (fileFormat == FileFormat.PARQUET) {
+  return MapredParquetInputFormat.class.getName();
+} else if (fileFormat == FileFormat.AVRO) {
+  return AvroContainerInputFormat.class.getName();
+}
+return null;
+  }
+
+  public String getOutputFileFormatClassName() {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+if (fileFormat == FileFormat.ORC) {
+  return OrcOutputFormat.class.getName();
+} else if (fileFormat == FileFormat.PARQUET) {
+  return MapredParquetOutputFormat.class.getName();
+} else if (fileFormat == FileFormat.AVRO) {
+  return AvroContainerOutputFormat.class.getName();
+}
+return null;
+  }
+
+  public String getFileSerdeClassName() {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+if (fileFormat == FileFormat.ORC) {
+  return OrcSerde.class.getName();
+} else if (fileFormat == FileFormat.PARQUET) {
+  return ParquetHiveSerDe.class.getName();
+} else if (fileFormat == FileFormat.AVRO) {
+  return AvroSerDe.class.getName();
+}
+return null;
+  }

Review Comment:
   Replaced with existing StorageFormatDescriptor. Done.



##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -527,4 +593,20 @@ private Map> 
getManifestDirs(FileSystem inpFs, List
 }
 return manifestDirsToPaths;
   }
+
+  private void setMergePropertiesToPartDesc(PartitionDesc partitionDesc,
+StorageHandlerMergeProperties 
mergeProperties) throws ClassNotFoundException{
+String inputFileFormatClassName = 
mergeProperties.getInputFileFormatClassName();
+String outputFileFormatClassName = 
mergeProperties.getOutputFileFormatClassName();
+String serdeClassName = mergeProperties.getFileSerdeClassName();

Review Comment:
   Added a null check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-m

Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505590111


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -512,7 +555,30 @@ private void setupWorkWhenUsingManifestFile(MapWork 
mapWork, List fi
 mapWork.setUseInputPathsDirectly(true);
   }
 
-  private Map> getManifestDirs(FileSystem inpFs, 
List fileStatuses)
+  private void setupWorkWhenUsingCustomHandler(MapWork mapWork, Path dirPath,
+   StorageHandlerMergeProperties 
mergeProperties) throws ClassNotFoundException {
+Map> aliasToWork = 
mapWork.getAliasToWork();
+Map pathToPartitionInfo = 
mapWork.getPathToPartitionInfo();
+Operator op = aliasToWork.get(dirPath.toString());
+PartitionDesc partitionDesc = pathToPartitionInfo.get(dirPath);
+Path tmpDir = mergeProperties.getTmpLocation();

Review Comment:
   `mergeProperties` is present when its used under storage handler which 
supports it. The boolean `customStorageHandler` if true should make sure that 
`mergeProperties`is not null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505593646


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java:
##
@@ -178,7 +178,7 @@ public RecordReader> 
getRecordReader(InputSplit split, J
 
   @Override
   public boolean shouldSkipCombine(Path path, Configuration conf) {
-return true;
+return false;

Review Comment:
   This function is used during generation of splits in CombineRecordReader 
(during generation of merge tasks). It is a flag function which tells whether 
merging is supported.
   `hive.merge.tezfiles=false` will still work to disable merge functionality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505593646


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java:
##
@@ -178,7 +178,7 @@ public RecordReader> 
getRecordReader(InputSplit split, J
 
   @Override
   public boolean shouldSkipCombine(Path path, Configuration conf) {
-return true;
+return false;

Review Comment:
   This function is used during generation of splits in CombineRecordReader 
(during execution of merge tasks). It is a flag function which tells whether 
merging is supported.
   `hive.merge.tezfiles=false` will still work to disable merge functionality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505593646


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java:
##
@@ -178,7 +178,7 @@ public RecordReader> 
getRecordReader(InputSplit split, J
 
   @Override
   public boolean shouldSkipCombine(Path path, Configuration conf) {
-return true;
+return false;

Review Comment:
   This function is used during generation of splits in CombineRecordReader 
(during execution of merge tasks). It is a flag function which tells whether 
merging is supported by the file format.
   `hive.merge.tezfiles=false` will still work to disable merge functionality.



##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -163,6 +167,26 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
   LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
 }
   }, IOException.class);
+
+  // Merge task has merged several files into one. Hence we need to remove 
the stale files.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505595228


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +744,42 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List jobContextList = jobContexts.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+List outputs = collectOutputs(jobContextList);
+ExecutorService fileExecutor = 
fileExecutor(jobContextList.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+dataFiles.addAll(results.dataFiles());
+  }, IOException.class);
+} finally {
+  fileExecutor.shutdown();
+  if (tableExecutor != null) {

Review Comment:
   `fileExecutor` is always created, whereas `tableExecutor` can be null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505599549


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -2021,4 +2023,34 @@ public List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
   throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
 }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+return true;
+  }
+
+  @Override
+  public List getMergeInputFiles(Properties properties) throws 
IOException {
+String tableName = properties.getProperty(Catalogs.NAME);
+String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+Configuration configuration = SessionState.getSessionConf();
+List originalContextList = generateJobContext(configuration, 
tableName, snapshotRef);
+List jobContextList = originalContextList.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+if (originalContextList.isEmpty()) {
+  return Collections.emptyList();
+}
+List dataFiles = Lists.newArrayList();
+for (DataFile dataFile : new 
HiveIcebergOutputCommitter().getWrittenFiles(jobContextList)) {

Review Comment:
   > why do you need new instance of HiveIcebergOutputCommitter, can't you make 
the method static? Maybe we should move it to Iceberg util class
   
   Kept it in the same place so that we could make use of existing helper 
functions already present in HiveIcebergOutputCommitter.
   
   > Why do you need to enrichContextWithVertexId again inside of 
getWrittenFiles?
   
   Removed and doing only once.
   
   >  Why don't we return List dataFiles right away, but add 1 more 
foreach with transformation?
   
   Added functionality to directly convert it into `Filestatus` objects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505601391


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+String location = properties.getProperty(Catalogs.LOCATION);
+return new Path(location + "/data/");
+  }
+
+  public String getInputFileFormatClassName() {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+if (fileFormat == FileFormat.ORC) {

Review Comment:
   Using `StorageFormatDescriptor` in the `StorageHandlerMergeProperties` 
itself. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505602116


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {

Review Comment:
   Thanks for the info. Added code to use it now.



##
ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java:
##
@@ -370,7 +371,8 @@ private InputSplit[] getCombineSplits(JobConf job, int 
numSplits,
   PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
   pathToPartitionInfo, path, 
IOPrepareCache.get().allocatePartitionDescMap());
   TableDesc tableDesc = part.getTableDesc();
-  if ((tableDesc != null) && tableDesc.isNonNative()) {
+  boolean useDefaultFileFormat = 
part.getInputFileFormatClass().equals(tableDesc.getInputFileFormatClass());
+  if ((tableDesc != null) && tableDesc.isNonNative() && 
useDefaultFileFormat) {

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505602500


##
ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java:
##
@@ -691,6 +693,18 @@ Map> removeScheme(Map> pathToAliases) {
   public RecordReader getRecordReader(InputSplit split, JobConf job,
   Reporter reporter) throws IOException {
 if (!(split instanceof CombineHiveInputSplit)) {
+  if (split instanceof FileSplit) {
+Map pathToPartitionInfo = 
Utilities.getMapWork(job).getPathToPartitionInfo();
+Path path = ((FileSplit) split).getPath();
+PartitionDesc partitionDesc = 
HiveFileFormatUtils.getFromPathRecursively(pathToPartitionInfo, path,
+IOPrepareCache.get().getPartitionDescMap());
+boolean useDefaultFileFormat = partitionDesc.getInputFileFormatClass()
+
.equals(partitionDesc.getTableDesc().getInputFileFormatClass());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505603811


##
ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java:
##
@@ -691,6 +693,18 @@ Map> removeScheme(Map> pathToAliases) {
   public RecordReader getRecordReader(InputSplit split, JobConf job,
   Reporter reporter) throws IOException {
 if (!(split instanceof CombineHiveInputSplit)) {
+  if (split instanceof FileSplit) {

Review Comment:
   If the conditions are satisfied then we use the `inputFormat` for getting 
the record reader. Otherwise we do fallback as previously done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505604310


##
ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java:
##
@@ -747,4 +750,18 @@ default List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.T
 throw new UnsupportedOperationException("Storage handler does not support 
getting partitions by expression " +
 "for a table.");
   }
+
+  default boolean supportsMergeFiles() {
+return false;
+  }
+
+  default List getMergeInputFiles(Properties properties) throws 
IOException {
+throw new UnsupportedOperationException("Storage handler does not support 
getting merge input files " +
+"for a table.");
+  }
+
+  default StorageHandlerMergeProperties 
getStorageHandlerMergeProperties(Properties properties) {

Review Comment:
   Renamed it to `getMergeProperties`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505606712


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -254,18 +286,26 @@ private void generateActualTasks(HiveConf conf, 
List> resTsks,
   long trgtSize, long avgConditionSize, Task mvTask,
   Task mrTask, Task mrAndMvTask, Path dirPath,
   FileSystem inpFs, ConditionalResolverMergeFilesCtx ctx, MapWork work, 
int dpLbLevel,
-  boolean manifestFilePresent)
-  throws IOException {
+  boolean manifestFilePresent, HiveStorageHandler storageHandler)
+  throws IOException, ClassNotFoundException {
 DynamicPartitionCtx dpCtx = ctx.getDPCtx();
 List statusList;
-Map> manifestDirToFile = new HashMap<>();
+Map> parentDirToFile = new HashMap<>();
+boolean useCustomStorageHandler = storageHandler != null && 
storageHandler.supportsMergeFiles();
+StorageHandlerMergeProperties mergeProperties = useCustomStorageHandler ?
+
storageHandler.getStorageHandlerMergeProperties(ctx.getCustomStorageHandlerProps())
 : null;
 if (manifestFilePresent) {
   // Get the list of files from manifest file.
   List fileStatuses = getManifestFilePaths(conf, dirPath);
   // Setup the work to include all the files present in the manifest.
   setupWorkWhenUsingManifestFile(work, fileStatuses, dirPath, false);
-  manifestDirToFile = getManifestDirs(inpFs, fileStatuses);
-  statusList = new ArrayList<>(manifestDirToFile.keySet());
+  parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses);
+  statusList = new ArrayList<>(parentDirToFile.keySet());
+} else if (useCustomStorageHandler) {
+  List fileStatuses = 
storageHandler.getMergeInputFiles(ctx.getCustomStorageHandlerProps());
+  setupWorkWhenUsingCustomHandler(work, dirPath, mergeProperties);
+  parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses);
+  statusList = new ArrayList<>(parentDirToFile.keySet());

Review Comment:
   Done.



##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -254,18 +286,26 @@ private void generateActualTasks(HiveConf conf, 
List> resTsks,
   long trgtSize, long avgConditionSize, Task mvTask,
   Task mrTask, Task mrAndMvTask, Path dirPath,
   FileSystem inpFs, ConditionalResolverMergeFilesCtx ctx, MapWork work, 
int dpLbLevel,
-  boolean manifestFilePresent)
-  throws IOException {
+  boolean manifestFilePresent, HiveStorageHandler storageHandler)
+  throws IOException, ClassNotFoundException {
 DynamicPartitionCtx dpCtx = ctx.getDPCtx();
 List statusList;
-Map> manifestDirToFile = new HashMap<>();
+Map> parentDirToFile = new HashMap<>();
+boolean useCustomStorageHandler = storageHandler != null && 
storageHandler.supportsMergeFiles();
+StorageHandlerMergeProperties mergeProperties = useCustomStorageHandler ?
+
storageHandler.getStorageHandlerMergeProperties(ctx.getCustomStorageHandlerProps())
 : null;
 if (manifestFilePresent) {
   // Get the list of files from manifest file.
   List fileStatuses = getManifestFilePaths(conf, dirPath);
   // Setup the work to include all the files present in the manifest.
   setupWorkWhenUsingManifestFile(work, fileStatuses, dirPath, false);
-  manifestDirToFile = getManifestDirs(inpFs, fileStatuses);
-  statusList = new ArrayList<>(manifestDirToFile.keySet());
+  parentDirToFile = getParentDirToFileMap(inpFs, fileStatuses);
+  statusList = new ArrayList<>(parentDirToFile.keySet());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505607087


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -147,18 +171,23 @@ public List> getTasks(HiveConf conf, Object 
objCtx) {
   Path dirPath = new Path(dirName);
   FileSystem inpFs = dirPath.getFileSystem(conf);
   DynamicPartitionCtx dpCtx = ctx.getDPCtx();
+  HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(conf, 
ctx.getStorageHandlerClass());
+  boolean dirExists = inpFs.exists(dirPath);
+  boolean useCustomStorageHandler = storageHandler != null && 
storageHandler.supportsMergeFiles();
 
-  if (inpFs.exists(dirPath)) {
+  MapWork work = null;
+  if (dirExists || useCustomStorageHandler) {

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505607838


##
ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java:
##
@@ -1720,6 +1723,41 @@ protected static MoveWork mergeMovePaths(Path 
condInputPath, MoveWork linkedMove
 return newWork;
   }
 
+  private static void 
setCustomStorageHandlerPropertiesForMerge(ConditionalResolverMergeFilesCtx 
mrCtx, MoveWork work) {

Review Comment:
   Renamed it to `setCustomStorageHandlerAndPropertiesForMerge`.



##
ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java:
##
@@ -1720,6 +1723,41 @@ protected static MoveWork mergeMovePaths(Path 
condInputPath, MoveWork linkedMove
 return newWork;
   }
 
+  private static void 
setCustomStorageHandlerPropertiesForMerge(ConditionalResolverMergeFilesCtx 
mrCtx, MoveWork work) {

Review Comment:
   Renamed it to `setStorageHandlerAndPropertiesForMerge`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505606288


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -125,6 +133,22 @@ public ListBucketingCtx getLbCtx() {
 public void setLbCtx(ListBucketingCtx lbCtx) {
   this.lbCtx = lbCtx;
 }
+
+public void setCustomStorageHandlerProps(Properties properties) {

Review Comment:
   Removed.



##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -2021,4 +2023,34 @@ public List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
   throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
 }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+return true;
+  }
+
+  @Override
+  public List getMergeInputFiles(Properties properties) throws 
IOException {
+String tableName = properties.getProperty(Catalogs.NAME);
+String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+Configuration configuration = SessionState.getSessionConf();
+List originalContextList = generateJobContext(configuration, 
tableName, snapshotRef);
+List jobContextList = originalContextList.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+if (originalContextList.isEmpty()) {
+  return Collections.emptyList();
+}
+List dataFiles = Lists.newArrayList();
+for (DataFile dataFile : new 
HiveIcebergOutputCommitter().getWrittenFiles(jobContextList)) {
+  FileSystem fs = new Path(dataFile.path().toString()).getFileSystem(conf);
+  dataFiles.add(fs.getFileStatus(new Path(dataFile.path().toString(;
+}
+return dataFiles;
+  }
+
+  @Override
+  public StorageHandlerMergeProperties 
getStorageHandlerMergeProperties(Properties properties) {

Review Comment:
   Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505609448


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -163,6 +167,26 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
   LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
 }
   }, IOException.class);
+
+  // Merge task has merged several files into one. Hence we need to remove 
the stale files.
+  // At this stage the file is written and task-committed, but the old 
files are still present.
+  if 
(CombineHiveInputFormat.class.equals(jobConf.getInputFormat().getClass())) {
+MapWork mrwork = Utilities.getMapWork(jobConf);
+if (mrwork != null) {
+  List mergedPaths = mrwork.getInputPaths();
+  if (CollectionUtils.isNotEmpty(mergedPaths)) {
+Tasks.foreach(mergedPaths)
+.retry(3)
+.stopOnFailure()
+.throwFailureWhenFinished()

Review Comment:
   I think we should not fail. Removed `stopOnFailure` and 
`throwFailureWhenFinished`, so as to not fail upon failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505610108


##
ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java:
##
@@ -370,7 +371,8 @@ private InputSplit[] getCombineSplits(JobConf job, int 
numSplits,
   PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
   pathToPartitionInfo, path, 
IOPrepareCache.get().allocatePartitionDescMap());
   TableDesc tableDesc = part.getTableDesc();
-  if ((tableDesc != null) && tableDesc.isNonNative()) {
+  boolean useDefaultFileFormat = 
part.getInputFileFormatClass().equals(tableDesc.getInputFileFormatClass());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-28 Thread via GitHub


sonarcloud[bot] commented on PR #5076:
URL: https://github.com/apache/hive/pull/5076#issuecomment-1969109963

   ## [![Quality Gate 
Passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/qg-passed-20px.png
 'Quality Gate 
Passed')](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076) 
**Quality Gate passed**  
   Issues  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [5 New 
issues](https://sonarcloud.io/project/issues?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/accepted-16px.png
 '') [0 Accepted 
issues](https://sonarcloud.io/component_measures?id=apache_hive&pullRequest=5076&metric=new_accepted_issues&view=list)
   
   Measures  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [0 Security 
Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Coverage  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Duplication  
 
   [See analysis details on 
SonarCloud](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-02-29 Thread via GitHub


sonarcloud[bot] commented on PR #5076:
URL: https://github.com/apache/hive/pull/5076#issuecomment-1972537661

   ## [![Quality Gate 
Passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/qg-passed-20px.png
 'Quality Gate 
Passed')](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076) 
**Quality Gate passed**  
   Issues  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [4 New 
issues](https://sonarcloud.io/project/issues?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/accepted-16px.png
 '') [0 Accepted 
issues](https://sonarcloud.io/component_measures?id=apache_hive&pullRequest=5076&metric=new_accepted_issues&view=list)
   
   Measures  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [0 Security 
Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Coverage  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Duplication  
 
   [See analysis details on 
SonarCloud](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-05 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1512384810


##
ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java:
##
@@ -691,6 +693,18 @@ Map> removeScheme(Map> pathToAliases) {
   public RecordReader getRecordReader(InputSplit split, JobConf job,
   Reporter reporter) throws IOException {
 if (!(split instanceof CombineHiveInputSplit)) {
+  if (split instanceof FileSplit) {

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-05 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1512388735


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java:
##
@@ -178,7 +178,7 @@ public RecordReader> 
getRecordReader(InputSplit split, J
 
   @Override
   public boolean shouldSkipCombine(Path path, Configuration conf) {
-return true;
+return false;

Review Comment:
   👍  i was just asking if we have flag to disable merge functionality.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-05 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1512393832


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AvroStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ORCFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ParquetFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+String location = properties.getProperty(Catalogs.LOCATION);
+return new Path(location + "/data/");
+  }
+
+  public StorageFormatDescriptor getStorageFormatDescriptor() {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+if (fileFormat == FileFormat.ORC) {
+  return new ORCFileStorageFormatDescriptor();
+} else if (fileFormat == FileFormat.PARQUET) {
+  return new ParquetFileStorageFormatDescriptor();
+} else if (fileFormat == FileFormat.AVRO) {
+  return new AvroStorageFormatDescriptor();
+}
+return null;

Review Comment:
   should we throw an exception - Unsupported Storage Format Descriptor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-11 Thread via GitHub


sonarcloud[bot] commented on PR #5076:
URL: https://github.com/apache/hive/pull/5076#issuecomment-1988200535

   ## [![Quality Gate 
Passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/qg-passed-20px.png
 'Quality Gate 
Passed')](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076) 
**Quality Gate passed**  
   Issues  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [5 New 
issues](https://sonarcloud.io/project/issues?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/accepted-16px.png
 '') [0 Accepted 
issues](https://sonarcloud.io/component_measures?id=apache_hive&pullRequest=5076&metric=new_accepted_issues&view=list)
   
   Measures  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/passed-16px.png
 '') [0 Security 
Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache_hive&pullRequest=5076&resolved=false&inNewCodePeriod=true)
  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Coverage  
   
![](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/no-data-16px.png
 '') No data about Duplication  
 
   [See analysis details on 
SonarCloud](https://sonarcloud.io/dashboard?id=apache_hive&pullRequest=5076)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-12 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522434597


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AvroStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ORCFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ParquetFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+String location = properties.getProperty(Catalogs.LOCATION);
+return new Path(location + "/data/");
+  }
+
+  public StorageFormatDescriptor getStorageFormatDescriptor() {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+if (fileFormat == FileFormat.ORC) {
+  return new ORCFileStorageFormatDescriptor();
+} else if (fileFormat == FileFormat.PARQUET) {
+  return new ParquetFileStorageFormatDescriptor();
+} else if (fileFormat == FileFormat.AVRO) {
+  return new AvroStorageFormatDescriptor();
+}
+return null;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522765893


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List outputs = collectOutputs(jobContexts);
+ExecutorService fileExecutor = 
fileExecutor(jobContexts.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+FileSystem fileSystem = new 
Path(table.location()).getFileSystem(jobConf);
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+for (DataFile dataFile : results.dataFiles()) {
+  FileStatus fileStatus = fileSystem.getFileStatus(new 
Path(dataFile.path().toString()));
+  dataFiles.add(fileStatus);
+}
+  }, IOException.class);
+} finally {
+  fileExecutor.shutdown();
+  if (tableExecutor != null) {
+tableExecutor.shutdown();
+  }
+}
+return Lists.newArrayList(dataFiles);
+  }
+
+  private void cleanMergeTaskInputFiles(JobConf jobConf,

Review Comment:
   why not simply `mergeTaskInputFiles`? what `clean` means here?



##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List outputs = collectOutputs(jobContexts);
+ExecutorService fileExecutor = 
fileExecutor(jobContexts.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+FileSystem fileSystem = new 
Path(table.location()).getFileSystem(jobConf);
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+for (DataFile dataFile : results.dataFiles()) {
+  FileStatus fileStatus = fileSystem.getFileStatus(new 
Path(dataFile.path().toString()));
+ 

Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522770882


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List outputs = collectOutputs(jobContexts);
+ExecutorService fileExecutor = 
fileExecutor(jobContexts.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+FileSystem fileSystem = new 
Path(table.location()).getFileSystem(jobConf);
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+for (DataFile dataFile : results.dataFiles()) {
+  FileStatus fileStatus = fileSystem.getFileStatus(new 
Path(dataFile.path().toString()));
+  dataFiles.add(fileStatus);
+}
+  }, IOException.class);
+} finally {
+  fileExecutor.shutdown();
+  if (tableExecutor != null) {
+tableExecutor.shutdown();
+  }
+}
+return Lists.newArrayList(dataFiles);
+  }
+
+  private void cleanMergeTaskInputFiles(JobConf jobConf,
+ExecutorService tableExecutor,
+TaskAttemptContext context) throws 
IOException {
+// Merge task has merged several files into one. Hence we need to remove 
the stale files.
+// At this stage the file is written and task-committed, but the old files 
are still present.
+if 
(jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
 {
+  MapWork mrwork = Utilities.getMapWork(jobConf);
+  if (mrwork != null) {
+List mergedPaths = mrwork.getInputPaths();
+if (mergedPaths != null) {
+  Tasks.foreach(mergedPaths)
+  .retry(3)
+  .executeWith(tableExecutor)

Review Comment:
   should we add
   
 .stopOnFailure()
 .throwFailureWhenFinished()
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522765893


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List outputs = collectOutputs(jobContexts);
+ExecutorService fileExecutor = 
fileExecutor(jobContexts.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+FileSystem fileSystem = new 
Path(table.location()).getFileSystem(jobConf);
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+for (DataFile dataFile : results.dataFiles()) {
+  FileStatus fileStatus = fileSystem.getFileStatus(new 
Path(dataFile.path().toString()));
+  dataFiles.add(fileStatus);
+}
+  }, IOException.class);
+} finally {
+  fileExecutor.shutdown();
+  if (tableExecutor != null) {
+tableExecutor.shutdown();
+  }
+}
+return Lists.newArrayList(dataFiles);
+  }
+
+  private void cleanMergeTaskInputFiles(JobConf jobConf,

Review Comment:
   why not simply `mergeTaskInputFiles`? what `clean` mean here? 
   also why TaskInput, isn't that output files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522765893


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List outputs = collectOutputs(jobContexts);
+ExecutorService fileExecutor = 
fileExecutor(jobContexts.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+FileSystem fileSystem = new 
Path(table.location()).getFileSystem(jobConf);
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+for (DataFile dataFile : results.dataFiles()) {
+  FileStatus fileStatus = fileSystem.getFileStatus(new 
Path(dataFile.path().toString()));
+  dataFiles.add(fileStatus);
+}
+  }, IOException.class);
+} finally {
+  fileExecutor.shutdown();
+  if (tableExecutor != null) {
+tableExecutor.shutdown();
+  }
+}
+return Lists.newArrayList(dataFiles);
+  }
+
+  private void cleanMergeTaskInputFiles(JobConf jobConf,

Review Comment:
   why TaskInput, isn't that output files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522765893


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List outputs = collectOutputs(jobContexts);
+ExecutorService fileExecutor = 
fileExecutor(jobContexts.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+FileSystem fileSystem = new 
Path(table.location()).getFileSystem(jobConf);
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+for (DataFile dataFile : results.dataFiles()) {
+  FileStatus fileStatus = fileSystem.getFileStatus(new 
Path(dataFile.path().toString()));
+  dataFiles.add(fileStatus);
+}
+  }, IOException.class);
+} finally {
+  fileExecutor.shutdown();
+  if (tableExecutor != null) {
+tableExecutor.shutdown();
+  }
+}
+return Lists.newArrayList(dataFiles);
+  }
+
+  private void cleanMergeTaskInputFiles(JobConf jobConf,

Review Comment:
   why TaskInput, isn't that output files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522779763


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {

Review Comment:
   `getOutputFiles`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522783053


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -2021,4 +2023,29 @@ public List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
   throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
 }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+return true;
+  }
+
+  @Override
+  public List getMergeInputFiles(Properties properties) throws 
IOException {

Review Comment:
   getMergeTaskInputFiles



##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -2021,4 +2023,29 @@ public List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
   throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
 }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+return true;
+  }
+
+  @Override
+  public List getMergeInputFiles(Properties properties) throws 
IOException {

Review Comment:
   `getMergeTaskInputFiles`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522787165


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -2021,4 +2023,29 @@ public List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
   throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
 }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+return true;
+  }
+
+  @Override
+  public List getMergeInputFiles(Properties properties) throws 
IOException {
+String tableName = properties.getProperty(Catalogs.NAME);
+String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+Configuration configuration = SessionState.getSessionConf();
+List originalContextList = generateJobContext(configuration, 
tableName, snapshotRef);
+List jobContextList = originalContextList.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+if (originalContextList.isEmpty()) {

Review Comment:
check `jobContextList`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522789753


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -2021,4 +2023,29 @@ public List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
   throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
 }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+return true;
+  }
+
+  @Override
+  public List getMergeInputFiles(Properties properties) throws 
IOException {
+String tableName = properties.getProperty(Catalogs.NAME);
+String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+Configuration configuration = SessionState.getSessionConf();
+List originalContextList = generateJobContext(configuration, 
tableName, snapshotRef);
+List jobContextList = originalContextList.stream()
+.map(TezUtil::enrichContextWithVertexId)
+.collect(Collectors.toList());
+if (originalContextList.isEmpty()) {
+  return Collections.emptyList();
+}
+return new HiveIcebergOutputCommitter().getWrittenFiles(jobContextList);
+  }
+
+  @Override
+  public StorageHandlerMergeProperties getMergeProperties(Properties 
properties) {

Review Comment:
   `getMergeTaskProperties`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522790927


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AvroStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ORCFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ParquetFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {

Review Comment:
   IcebergMergeTaskProperties



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522793217


##
ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java:
##
@@ -747,4 +750,18 @@ default List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.T
 throw new UnsupportedOperationException("Storage handler does not support 
getting partitions by expression " +
 "for a table.");
   }
+
+  default boolean supportsMergeFiles() {
+return false;
+  }
+
+  default List getMergeInputFiles(Properties properties) throws 
IOException {
+throw new UnsupportedOperationException("Storage handler does not support 
getting merge input files " +
+"for a table.");
+  }
+
+  default StorageHandlerMergeProperties getMergeProperties(Properties 
properties) {

Review Comment:
   getMergeTaskProperties



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522792841


##
ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java:
##
@@ -747,4 +750,18 @@ default List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.T
 throw new UnsupportedOperationException("Storage handler does not support 
getting partitions by expression " +
 "for a table.");
   }
+
+  default boolean supportsMergeFiles() {
+return false;
+  }
+
+  default List getMergeInputFiles(Properties properties) throws 
IOException {

Review Comment:
   getMergeTaskInputFiles



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1522794964


##
ql/src/java/org/apache/hadoop/hive/ql/plan/StorageHandlerMergeProperties.java:
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import java.io.IOException;
+
+public interface StorageHandlerMergeProperties {

Review Comment:
   MergeTaskProperties



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523018546


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AvroStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ORCFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ParquetFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+String location = properties.getProperty(Catalogs.LOCATION);
+return new Path(location + "/data/");
+  }
+
+  public StorageFormatDescriptor getStorageFormatDescriptor() throws 
IOException {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT,
+TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+if (fileFormat == FileFormat.ORC) {

Review Comment:
   that is enum, use switch instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523022157


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AvroStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ORCFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ParquetFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+String location = properties.getProperty(Catalogs.LOCATION);
+return new Path(location + "/data/");
+  }
+
+  public StorageFormatDescriptor getStorageFormatDescriptor() throws 
IOException {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT,

Review Comment:
   can't we use 
   
   StorageFormatFactory storageFormatFactory = new StorageFormatFactory();
   StorageFormatDescriptor descriptor = 
storageFormatFactory.get(fileFormat);
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523022157


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AvroStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ORCFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ParquetFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+String location = properties.getProperty(Catalogs.LOCATION);
+return new Path(location + "/data/");
+  }
+
+  public StorageFormatDescriptor getStorageFormatDescriptor() throws 
IOException {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT,

Review Comment:
   can't we use 
   
  StorageFormatDescriptor descriptor = storageFormatFactory.get(format);
   if (descriptor == null) {
 throw new SemanticException("Unrecognized file format in " + clause + 
":" + " '" + format + "'");
   }
   return descriptor;
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523022157


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergStorageHandlerMergeProperties.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AvroStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ORCFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.ParquetFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.plan.StorageHandlerMergeProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.mr.Catalogs;
+
+public class IcebergStorageHandlerMergeProperties implements 
StorageHandlerMergeProperties {
+
+  private final Properties properties;
+
+  IcebergStorageHandlerMergeProperties(Properties properties) {
+this.properties = properties;
+  }
+
+  public Path getTmpLocation() {
+String location = properties.getProperty(Catalogs.LOCATION);
+return new Path(location + "/data/");
+  }
+
+  public StorageFormatDescriptor getStorageFormatDescriptor() throws 
IOException {
+FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT,

Review Comment:
   can't we use 
   
  StorageFormatDescriptor descriptor = storageFormatFactory.get(format);
   if (descriptor == null) {
 throw new SemanticException("Unrecognized file format in " + clause + 
":" + " '" + format + "'");
   }
   return descriptor;
   
   private static final StorageFormatFactory storageFormatFactory = new 
StorageFormatFactory();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523034205


##
ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java:
##
@@ -370,7 +371,8 @@ private InputSplit[] getCombineSplits(JobConf job, int 
numSplits,
   PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
   pathToPartitionInfo, path, 
IOPrepareCache.get().allocatePartitionDescMap());
   TableDesc tableDesc = part.getTableDesc();
-  if ((tableDesc != null) && tableDesc.isNonNative()) {
+  boolean useDefaultFileFormat = 
part.getInputFileFormatClass().isAssignableFrom(tableDesc.getInputFileFormatClass());
+  if (tableDesc != null && tableDesc.isNonNative() && 
useDefaultFileFormat) {

Review Comment:
   tableDesc can't be null here, 1 line above you use it without any checks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523047140


##
ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java:
##
@@ -1720,6 +1723,41 @@ protected static MoveWork mergeMovePaths(Path 
condInputPath, MoveWork linkedMove
 return newWork;
   }
 
+  private static void 
setStorageHandlerAndPropertiesForMerge(ConditionalResolverMergeFilesCtx mrCtx, 
MoveWork work) {

Review Comment:
   could we follow single responsibility principle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523047140


##
ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java:
##
@@ -1720,6 +1723,41 @@ protected static MoveWork mergeMovePaths(Path 
condInputPath, MoveWork linkedMove
 return newWork;
   }
 
+  private static void 
setStorageHandlerAndPropertiesForMerge(ConditionalResolverMergeFilesCtx mrCtx, 
MoveWork work) {

Review Comment:
   could we follow single responsibility principle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523052133


##
ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java:
##
@@ -1720,6 +1723,41 @@ protected static MoveWork mergeMovePaths(Path 
condInputPath, MoveWork linkedMove
 return newWork;
   }
 
+  private static void 
setStorageHandlerAndPropertiesForMerge(ConditionalResolverMergeFilesCtx mrCtx, 
MoveWork work) {

Review Comment:
   `setStorageHandlerAndProperties`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523054231


##
ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java:
##
@@ -1720,6 +1723,41 @@ protected static MoveWork mergeMovePaths(Path 
condInputPath, MoveWork linkedMove
 return newWork;
   }
 
+  private static void 
setStorageHandlerAndPropertiesForMerge(ConditionalResolverMergeFilesCtx mrCtx, 
MoveWork work) {
+Properties storageHandlerProperties = null;
+String storageHandlerClass = null;
+if (work.getLoadTableWork() != null) {
+  // Get the info from the table data
+  TableDesc tableDesc = work.getLoadTableWork().getTable();
+  storageHandlerClass = tableDesc.getProperties().getProperty(
+  
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE);
+  storageHandlerProperties = new Properties(tableDesc.getProperties());
+} else {
+  // Get the info from the create table data
+  CreateTableDesc createTableDesc = 
work.getLoadFileWork().getCtasCreateTableDesc();
+  String location = null;
+  if (createTableDesc != null) {
+storageHandlerClass = createTableDesc.getStorageHandler();
+storageHandlerProperties = new Properties();
+storageHandlerProperties.put(hive_metastoreConstants.META_TABLE_NAME, 
createTableDesc.getDbTableName());
+location = createTableDesc.getLocation();
+  } else {
+CreateMaterializedViewDesc createViewDesc = 
work.getLoadFileWork().getCreateViewDesc();
+if (createViewDesc != null) {
+  storageHandlerClass = createViewDesc.getStorageHandler();
+  storageHandlerProperties = new Properties();
+  
storageHandlerProperties.put(hive_metastoreConstants.META_TABLE_NAME, 
createViewDesc.getViewName());
+  location = createViewDesc.getLocation();
+}
+  }
+  if (location != null) {
+
storageHandlerProperties.put(hive_metastoreConstants.META_TABLE_LOCATION, 
location);
+  }
+}
+mrCtx.setStorageHandlerProps(storageHandlerProperties);

Review Comment:
   `setTaskProperties`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523054231


##
ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java:
##
@@ -1720,6 +1723,41 @@ protected static MoveWork mergeMovePaths(Path 
condInputPath, MoveWork linkedMove
 return newWork;
   }
 
+  private static void 
setStorageHandlerAndPropertiesForMerge(ConditionalResolverMergeFilesCtx mrCtx, 
MoveWork work) {
+Properties storageHandlerProperties = null;
+String storageHandlerClass = null;
+if (work.getLoadTableWork() != null) {
+  // Get the info from the table data
+  TableDesc tableDesc = work.getLoadTableWork().getTable();
+  storageHandlerClass = tableDesc.getProperties().getProperty(
+  
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE);
+  storageHandlerProperties = new Properties(tableDesc.getProperties());
+} else {
+  // Get the info from the create table data
+  CreateTableDesc createTableDesc = 
work.getLoadFileWork().getCtasCreateTableDesc();
+  String location = null;
+  if (createTableDesc != null) {
+storageHandlerClass = createTableDesc.getStorageHandler();
+storageHandlerProperties = new Properties();
+storageHandlerProperties.put(hive_metastoreConstants.META_TABLE_NAME, 
createTableDesc.getDbTableName());
+location = createTableDesc.getLocation();
+  } else {
+CreateMaterializedViewDesc createViewDesc = 
work.getLoadFileWork().getCreateViewDesc();
+if (createViewDesc != null) {
+  storageHandlerClass = createViewDesc.getStorageHandler();
+  storageHandlerProperties = new Properties();
+  
storageHandlerProperties.put(hive_metastoreConstants.META_TABLE_NAME, 
createViewDesc.getViewName());
+  location = createViewDesc.getLocation();
+}
+  }
+  if (location != null) {
+
storageHandlerProperties.put(hive_metastoreConstants.META_TABLE_LOCATION, 
location);
+  }
+}
+mrCtx.setStorageHandlerProps(storageHandlerProperties);

Review Comment:
   mrCtx.`setTaskProperties`(mergeTaskProperties)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523057516


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -125,6 +133,22 @@ public ListBucketingCtx getLbCtx() {
 public void setLbCtx(ListBucketingCtx lbCtx) {
   this.lbCtx = lbCtx;
 }
+
+public void setStorageHandlerProps(Properties properties) {

Review Comment:
   `setTaskProperties()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523058156


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -125,6 +133,22 @@ public ListBucketingCtx getLbCtx() {
 public void setLbCtx(ListBucketingCtx lbCtx) {
   this.lbCtx = lbCtx;
 }
+
+public void setStorageHandlerProps(Properties properties) {
+  this.properties = properties;
+}
+
+public Properties getStorageHandlerProps() {

Review Comment:
   `getTaskProperties`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523065083


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -208,14 +235,17 @@ public List> getTasks(HiveConf conf, Object 
objCtx) {
   } else {
 // static partition and list bucketing
 generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, 
mvTask, mrTask,
-mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel, 
manifestFilePresent);
+mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel, 
manifestFilePresent, storageHandler);
   }
 }
+  } else if (useCustomStorageHandler) {
+generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, mvTask, 
mrTask,
+mrAndMvTask, dirPath, inpFs, ctx, work, 0, false, 
storageHandler);
   } else {
 Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " + 
dirPath);
 resTsks.add(mvTask);
   }
-} catch (IOException e) {
+} catch (ClassNotFoundException | HiveException | IOException e) {

Review Comment:
   should we handle ClassNotFoundException | HiveException in 
generateActualTasks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523065083


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -208,14 +235,17 @@ public List> getTasks(HiveConf conf, Object 
objCtx) {
   } else {
 // static partition and list bucketing
 generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, 
mvTask, mrTask,
-mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel, 
manifestFilePresent);
+mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel, 
manifestFilePresent, storageHandler);
   }
 }
+  } else if (useCustomStorageHandler) {
+generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, mvTask, 
mrTask,
+mrAndMvTask, dirPath, inpFs, ctx, work, 0, false, 
storageHandler);
   } else {
 Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " + 
dirPath);
 resTsks.add(mvTask);
   }
-} catch (IOException e) {
+} catch (ClassNotFoundException | HiveException | IOException e) {

Review Comment:
   should we handle ClassNotFoundException | HiveException outside of this try?
   
   try {
 handler = HiveUtils.getStorageHandler(conf, storageHandler);
   } catch (HiveException e) {
 throw new SemanticException("Failed to load storage handler:  
" + e.getMessage());
   }
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523065083


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -208,14 +235,17 @@ public List> getTasks(HiveConf conf, Object 
objCtx) {
   } else {
 // static partition and list bucketing
 generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, 
mvTask, mrTask,
-mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel, 
manifestFilePresent);
+mrAndMvTask, dirPath, inpFs, ctx, work, lbLevel, 
manifestFilePresent, storageHandler);
   }
 }
+  } else if (useCustomStorageHandler) {
+generateActualTasks(conf, resTsks, trgtSize, avgConditionSize, mvTask, 
mrTask,
+mrAndMvTask, dirPath, inpFs, ctx, work, 0, false, 
storageHandler);
   } else {
 Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " + 
dirPath);
 resTsks.add(mvTask);
   }
-} catch (IOException e) {
+} catch (ClassNotFoundException | HiveException | IOException e) {

Review Comment:
   should we handle ClassNotFoundException | HiveException outside of this try?
   
   try {
 handler = HiveUtils.getStorageHandler(conf, storageHandler);
   } catch (HiveException e) {
 throw new SomeException("Failed to load storage handler:  " + 
e.getMessage());
   }
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523074119


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -512,7 +553,30 @@ private void setupWorkWhenUsingManifestFile(MapWork 
mapWork, List fi
 mapWork.setUseInputPathsDirectly(true);
   }
 
-  private Map> getManifestDirs(FileSystem inpFs, 
List fileStatuses)
+  private void setupWorkWhenUsingCustomHandler(MapWork mapWork, Path dirPath,

Review Comment:
   `setupWorkUsingCustomHandler`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523076497


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -527,4 +591,22 @@ private Map> 
getManifestDirs(FileSystem inpFs, List
 }
 return manifestDirsToPaths;
   }
+
+  private void setMergePropertiesToPartDesc(PartitionDesc partitionDesc,

Review Comment:
   `updatePartDescProperties`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-13 Thread via GitHub


deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1523074119


##
ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java:
##
@@ -512,7 +553,30 @@ private void setupWorkWhenUsingManifestFile(MapWork 
mapWork, List fi
 mapWork.setUseInputPathsDirectly(true);
   }
 
-  private Map> getManifestDirs(FileSystem inpFs, 
List fileStatuses)
+  private void setupWorkWhenUsingCustomHandler(MapWork mapWork, Path dirPath,

Review Comment:
   `setupWorkWithCustomHandler`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-14 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1525735283


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {
+List outputs = collectOutputs(jobContexts);
+ExecutorService fileExecutor = 
fileExecutor(jobContexts.get(0).getJobConf());
+ExecutorService tableExecutor = 
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+Collection dataFiles = new ConcurrentLinkedQueue<>();
+try {
+  Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+  .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext
+  .suppressFailureWhenFinished()
+  .executeWith(tableExecutor)
+  .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+  .run(output -> {
+JobContext jobContext = output.getValue();
+JobConf jobConf = jobContext.getJobConf();
+LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+Table table = output.getKey();
+FileSystem fileSystem = new 
Path(table.location()).getFileSystem(jobConf);
+String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+// list jobLocation to get number of forCommit files
+// we do this because map/reduce num in jobConf is unreliable
+// and we have no access to vertex status info
+int numTasks = listForCommits(jobConf, jobLocation).size();
+FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+table.io(), false);
+for (DataFile dataFile : results.dataFiles()) {
+  FileStatus fileStatus = fileSystem.getFileStatus(new 
Path(dataFile.path().toString()));
+  dataFiles.add(fileStatus);
+}
+  }, IOException.class);
+} finally {
+  fileExecutor.shutdown();
+  if (tableExecutor != null) {
+tableExecutor.shutdown();
+  }
+}
+return Lists.newArrayList(dataFiles);
+  }
+
+  private void cleanMergeTaskInputFiles(JobConf jobConf,
+ExecutorService tableExecutor,
+TaskAttemptContext context) throws 
IOException {
+// Merge task has merged several files into one. Hence we need to remove 
the stale files.
+// At this stage the file is written and task-committed, but the old files 
are still present.
+if 
(jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
 {
+  MapWork mrwork = Utilities.getMapWork(jobConf);
+  if (mrwork != null) {
+List mergedPaths = mrwork.getInputPaths();
+if (mergedPaths != null) {
+  Tasks.foreach(mergedPaths)
+  .retry(3)
+  .executeWith(tableExecutor)

Review Comment:
   Same as this comment, 
https://github.com/apache/hive/pull/5076#discussion_r1505609448
   I decided not to add this since we are trying deleting data files which are 
stale. If we are not able to delete, does this become an orphan file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



Re: [PR] HIVE-28069: Iceberg: Implement Merge task functionality for Iceberg tables [hive]

2024-03-15 Thread via GitHub


SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1525841834


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
   throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
 }
   }
+
+  public List getWrittenFiles(List jobContexts) throws 
IOException {

Review Comment:
   Done.



##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -2021,4 +2023,29 @@ public List 
getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Ta
   throw new SemanticException(String.format("Error while fetching the 
partitions due to: %s", e));
 }
   }
+
+  @Override
+  public boolean supportsMergeFiles() {
+return true;
+  }
+
+  @Override
+  public List getMergeInputFiles(Properties properties) throws 
IOException {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



  1   2   >