[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-25 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r495137989



##
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##
@@ -586,24 +602,39 @@ public String startCommit() {
* @param instantTime Instant time to be generated
*/
   public void startCommitWithTime(String instantTime) {
+HoodieTableMetaClient metaClient = createMetaClient(true);
+startCommitWithTime(instantTime, metaClient.getCommitActionType(), 
metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) 
with specified action.
+   */
+  public void startCommitWithTime(String instantTime, String actionType) {
+HoodieTableMetaClient metaClient = createMetaClient(true);
+startCommitWithTime(instantTime, actionType, metaClient);
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) 
with specified action.
+   */
+  private void startCommitWithTime(String instantTime, String actionType, 
HoodieTableMetaClient metaClient) {
 // NOTE : Need to ensure that rollback is done before a new commit is 
started
 if (rollbackPending) {
   // Only rollback inflight commit/delta-commits. Do not touch compaction 
commits
   rollbackPendingCommits();
 }
-startCommit(instantTime);
+startCommit(instantTime, actionType, metaClient);
   }
 
-  private void startCommit(String instantTime) {
-LOG.info("Generate a new instant time " + instantTime);
-HoodieTableMetaClient metaClient = createMetaClient(true);
+  private void startCommit(String instantTime, String actionType, 
HoodieTableMetaClient metaClient) {

Review comment:
   This is a private method. Do you want to make this public static?  
Personally, I think having all startCommit methods in HoodieWriteClient makes 
more sense because user workflow is
   
   1) writeClient#startCommit
   2) writeClient#upsert
   3) writeClient#commit
   
   But if you have a strong preference to make this part of CommitUtils, I can 
move it. let me know.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-25 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r494727484



##
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##
@@ -586,24 +602,39 @@ public String startCommit() {
* @param instantTime Instant time to be generated
*/
   public void startCommitWithTime(String instantTime) {
+HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
   This is not calling function in the next line (calling method after 
that). So we only create meta client once. Please double check and let me know 
if i'm misinterpreting your suggestion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-24 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r494727484



##
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##
@@ -586,24 +602,39 @@ public String startCommit() {
* @param instantTime Instant time to be generated
*/
   public void startCommitWithTime(String instantTime) {
+HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
   This is not calling function in the next line (calling method after 
that). So we only create meta client once. Please double check and let me know 
if i'm misinterpreting your suggestion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-23 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r493882291



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, 
HoodieInstant instant)
 LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant 
instant) throws IOException {

Review comment:
   @bvaradar Made the change and added basic test. Please take a look. If 
the general approach looks good. I'll add more complex tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-23 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r493881151



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##
@@ -738,7 +799,9 @@ private String formatPartitionKey(String partitionStr) {
* @param commitsToReturn Commits
*/
   Stream fetchLatestFileSliceInRange(List commitsToReturn) {
-return fetchAllStoredFileGroups().map(fileGroup -> 
fileGroup.getLatestFileSliceInRange(commitsToReturn))
+return fetchAllStoredFileGroups()
+.filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup, 
commitsToReturn))

Review comment:
   Changed. Please take a look.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-21 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r492303122



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, 
HoodieInstant instant)
 LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant 
instant) throws IOException {

Review comment:
   I need to understand this flow a bit more. But, have a question on why 
we need to track commit-action-type and timestamp. Today, 
HoodieRollbackMetadata tracks successFiles, deletedFiles etc.  Do you think we 
can add replacedFileIds also there? This will be empty for regular commits. But 
for replace commits, it will have some content.  If this value is present, we 
can remove corresponding fileIds from View#replacedFileGroups. Let me know if 
i'm missing anything with this approach.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-21 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r492303122



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, 
HoodieInstant instant)
 LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant 
instant) throws IOException {

Review comment:
   I need to understand this flow a bit more. But, have a question on why 
we need to track commit-action-type and timestamp. Today, 
HoodieRollbackMetadata tracks successFiles, deletedFiles etc.  Do you think we 
can add replacedFileIds also there? This will be empty for regular commits. But 
for replace commits, it will have some content.  If this value is present, we 
can remove corresponding fileIds from View#replacedFileGroups. Let me know if 
i'm missing anything with this approach.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-17 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490621340



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {
+  // only delete files for completed instants
+  return;
+}
+
+TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+Stream fileGroupsToDelete = fileSystemView

Review comment:
   Done. PTAL.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-17 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490491745



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {
+  // only delete files for completed instants
+  return;
+}
+
+TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+Stream fileGroupsToDelete = fileSystemView

Review comment:
   So, this is tricky to explain. In FileSystemView, only metadata seems to 
be eagerly loaded. file groups are not eagerly loaded. i.e., 
fetchAllStoredFileGroups() returns empty.  For replace instants, we need to get 
List for all fileId. Because fetchAllStoredFileGroups() is empty, 
its also returning empty list of FileSlices. So we dont delete replaced files.
   
   I think instead of creating new HoodieTable in constructor. passing that 
from callers would help workaround this problem. But that is somewhat involved 
change because of test dependencies. Also, it might be better to refresh 
partition content in case new files are created by compaction or other process 
and somehow that is not reflected in table views. This might be safer option.
   
   Let me know if you want me to work on passing in HoodieTable to 
HoodieTimelineArchiveLog constructor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-17 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490491745



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {
+  // only delete files for completed instants
+  return;
+}
+
+TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+Stream fileGroupsToDelete = fileSystemView

Review comment:
   So, this is tricky to explain. In FileSystemView, only metadata seems to 
be eagerly loaded. file groups are not eagerly loaded. i.e., 
fetchAllStoredFileGroups() returns empty.  For replace instants, we need to get 
List for all replaced fileId. Because fetchAllStoredFileGroups() is 
empty, its also returning empty list of FileSlices. So we dont delete replaced 
files.
   
   I think instead of creating new HoodieTable in constructor. passing that 
from callers would help workaround this problem. But that is somewhat involved 
change because of test dependencies. Also, it might be better to refresh 
partition content in case new files are created by compaction or other process 
and somehow that is not reflected in table views. This might be safer option.
   
   Let me know if you want me to work on passing in HoodieTable to 
HoodieTimelineArchiveLog constructor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-17 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490476688



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {
+  // only delete files for completed instants
+  return;
+}
+
+TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+Stream fileGroupsToDelete = fileSystemView
+.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+fileGroupsToDelete.forEach(fg -> {

Review comment:
   Implemented parallel execution. PTAL





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-17 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490461331



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {

Review comment:
   Implemented parallel deletion. PTAL.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-17 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r490461178



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {
+  // only delete files for completed instants
+  return;
+}
+
+TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+Stream fileGroupsToDelete = fileSystemView
+.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+fileGroupsToDelete.forEach(fg -> {
+  fg.getAllRawFileSlices().forEach(fileSlice -> {
+fileSlice.getBaseFile().map(baseFile -> 
deletePath(baseFile.getFileStatus().getPath(), instant));
+fileSlice.getLogFiles().forEach(logFile -> 
deletePath(logFile.getPath(), instant));
+  });
+});
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in 
this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, 
TableFileSystemView fileSystemView) {
+Option replaceInstantOption = 
metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+.filter(replaceInstant -> 
replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+replaceInstantOption.ifPresent(replaceInstant -> {
+  try {
+HoodieReplaceCommitMetadata metadata = 
HoodieReplaceCommitMetadata.fromBytes(
+
metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+HoodieReplaceCommitMetadata.class);
+
+metadata.getPartitionToReplaceStats().keySet().forEach(partition -> 
fileSystemView.getAllFileGroups(partition));
+  } catch (IOException e) {
+throw new HoodieCommitException("Failed to archive because cannot 
delete replace files", e);
+  }
+});
+  }
+
+  private boolean deletePath(Path path, HoodieInstant instant) {
+try {
+  LOG.info("Deleting " + path + " before archiving " + instant);
+  metaClient.getFs().delete(path);

Review comment:
   i missed that we already have JavaSparkContext. Implemented parallel 
clean up. PTAL





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-15 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488990799



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {
+  // only delete files for completed instants
+  return;
+}
+
+TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+Stream fileGroupsToDelete = fileSystemView
+.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+fileGroupsToDelete.forEach(fg -> {
+  fg.getAllRawFileSlices().forEach(fileSlice -> {
+fileSlice.getBaseFile().map(baseFile -> 
deletePath(baseFile.getFileStatus().getPath(), instant));
+fileSlice.getLogFiles().forEach(logFile -> 
deletePath(logFile.getPath(), instant));
+  });
+});
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in 
this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, 
TableFileSystemView fileSystemView) {
+Option replaceInstantOption = 
metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+.filter(replaceInstant -> 
replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+replaceInstantOption.ifPresent(replaceInstant -> {
+  try {
+HoodieReplaceCommitMetadata metadata = 
HoodieReplaceCommitMetadata.fromBytes(
+
metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+HoodieReplaceCommitMetadata.class);
+
+metadata.getPartitionToReplaceStats().keySet().forEach(partition -> 
fileSystemView.getAllFileGroups(partition));
+  } catch (IOException e) {
+throw new HoodieCommitException("Failed to archive because cannot 
delete replace files", e);
+  }
+});
+  }
+
+  private boolean deletePath(Path path, HoodieInstant instant) {
+try {
+  LOG.info("Deleting " + path + " before archiving " + instant);
+  metaClient.getFs().delete(path);

Review comment:
   for parallel deletes, JavaSparkContext is not exposed to Archive 
process. Since we anyway want to move this to be part of clean, is it ok if  I 
defer this to https://issues.apache.org/jira/browse/HUDI-1276? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-15 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488987937



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor>
+extends CommitActionExecutor {
+
+  private static final Logger LOG = 
LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+  private final JavaRDD> inputRecordsRDD;
+
+  public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, 
HoodieTable table,
+ String instantTime, 
JavaRDD> inputRecordsRDD) {
+super(jsc, config, table, instantTime, 
WriteOperationType.INSERT_OVERWRITE);
+this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+return WriteHelper.write(instantTime, inputRecordsRDD, jsc, 
(HoodieTable) table,
+config.shouldCombineBeforeInsert(), 
config.getInsertShuffleParallelism(), this, false);
+  }
+
+  @Override
+  protected Partitioner getPartitioner(WorkloadProfile profile) {
+return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected JavaRDD processInputRecords(JavaRDD> 
inputRecordsRDD, WorkloadProfile profile) {
+// get all existing fileIds to mark them as replaced
+JavaRDD replaceStatuses = getAllReplaceWriteStatus(profile);

Review comment:
   I refactored it and removed boolean from WriteStatus. PTAL





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488346156



##
File path: 
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##
@@ -880,6 +880,89 @@ public void testDeletesWithDeleteApi() throws Exception {
 testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140, 
keysSoFar);
   }
 
+  /**
+   * Test scenario of writing more file groups than existing number of file 
groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlingWithMoreRecords() throws 
Exception {
+verifyInsertOverwritePartitionHandling(1000, 3000);
+  }
+
+  /**
+   * Test scenario of writing fewer file groups than existing number of file 
groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlingWithFewerRecords() throws 
Exception {
+verifyInsertOverwritePartitionHandling(3000, 1000);
+  }
+
+  /**
+   * Test scenario of writing similar number file groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords() 
throws Exception {
+verifyInsertOverwritePartitionHandling(3000, 3000);
+  }
+
+  /**
+   *  1) Do write1 (upsert) with 'batch1RecordsCount' number of records.
+   *  2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of 
records.
+   *
+   *  Verify that all records in step1 are overwritten
+   */
+  private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, 
int batch2RecordsCount) throws Exception {
+final String testPartitionPath = "americas";
+HoodieWriteConfig config = getSmallInsertWriteConfig(2000);
+HoodieWriteClient client = getHoodieWriteClient(config, false);
+dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+
+// Do Inserts
+String commitTime1 = "001";
+client.startCommitWithTime(commitTime1);
+List inserts1 = dataGen.generateInserts(commitTime1, 
batch1RecordsCount);
+JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
+List statuses = client.upsert(insertRecordsRDD1, 
commitTime1).collect();
+assertNoWriteErrors(statuses);
+Set batch1Buckets = statuses.stream().map(s -> 
s.getFileId()).collect(Collectors.toSet());
+verifyParquetFileData(commitTime1, inserts1, statuses);
+
+// Do Insert Overwrite
+String commitTime2 = "002";
+client.startCommitWithTime(commitTime2, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+List inserts2 = dataGen.generateInserts(commitTime2, 
batch2RecordsCount);
+List insertsAndUpdates2 = new ArrayList<>();
+insertsAndUpdates2.addAll(inserts2);
+JavaRDD insertAndUpdatesRDD2 = 
jsc.parallelize(insertsAndUpdates2, 2);
+statuses = client.insertOverwrite(insertAndUpdatesRDD2, 
commitTime2).collect();
+assertNoWriteErrors(statuses);
+Set replacedBuckets = statuses.stream().filter(s -> 
s.isReplacedFileId())
+.map(s -> s.getFileId()).collect(Collectors.toSet());
+assertEquals(batch1Buckets, replacedBuckets);
+List newBuckets = statuses.stream().filter(s -> 
!(s.isReplacedFileId()))
+.collect(Collectors.toList());
+verifyParquetFileData(commitTime2, inserts2, newBuckets);
+  }
+
+  /**
+   * Verify data in parquet files matches expected records and commit time.
+   */
+  private void verifyParquetFileData(String commitTime, List 
expectedRecords, List allStatus) {

Review comment:
   renamed it for now. I'll look into if there are any other helpers for 
doing this. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488345604



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
##
@@ -133,6 +137,21 @@
*/
   HoodieTimeline getCommitsAndCompactionTimeline();
 
+  /**
+   * Timeline to just include commits (commit/deltacommit), replace and 
compaction actions.
+   *
+   * @return
+   */
+  HoodieDefaultTimeline getWriteActionTimeline();

Review comment:
   I removed and included getCommitsAndCompactionTimeline. I think we will 
run into some edge cases for MOR tables where something would break. But don't 
have concrete examples. We can run it for a while on MOR table and see if 
works. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488344859



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * All the metadata that gets stored along with a commit.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieReplaceCommitMetadata.class);
+  protected Map> partitionToReplaceStats;

Review comment:
   I changed it to List to include only fileIds. I'm inclined 
against storing all file slices because they can evolve between metadata 
creation and archival/clean. Let me know if this understanding is incorrect





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488344523



##
File path: hudi-common/src/main/avro/HoodieReplaceCommitMetadata.avsc
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+   "namespace":"org.apache.hudi.avro.model",
+   "type":"record",
+   "name":"HoodieReplaceCommitMetadata",
+   "fields":[
+  {
+ "name":"partitionToWriteStats",

Review comment:
   Moved partitionToReplaceStats at the end. Its also possible to make this 
reference HoodieCommitMetadata directly i.e.,
   
   HoodieReplaceCommitMetadata contains HoodieCommitMetadata and 
replaceFileIds. But converting between json replaceCommit and avro version will 
require another layer of transform code. So to keep it simple, I copied all the 
fields from json structure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488343363



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class to generate compaction plan from FileGroup/FileSlice 
abstraction.
+ */
+public class CommitUtils {
+
+  private static final Logger LOG = LogManager.getLogger(CommitUtils.class);
+
+  public static HoodieCommitMetadata 
buildWriteActionMetadata(List writeStats,

Review comment:
   Added





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488343582



##
File path: hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
##
@@ -36,6 +36,14 @@
  ],
  "default": null
   },
+  {

Review comment:
   Moved it. will check if its possible to add unit test for backward 
compatibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488284965



##
File path: 
hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
##
@@ -102,7 +104,9 @@ public void commit(WriterCommitMessage[] messages) {
 .flatMap(m -> m.getWriteStatuses().stream().map(m2 -> 
m2.getStat())).collect(Collectors.toList());
 
 try {
-  writeClient.commitStats(instantTime, writeStatList, Option.empty());
+  writeClient.commitStats(instantTime, writeStatList, Option.empty(),

Review comment:
   Sure. Added new method





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488284655



##
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
##
@@ -200,6 +199,14 @@ public static void createInflightCommitFiles(String 
basePath, String... instantT
 }
   }
 
+  public static HoodieWriteStat createReplaceStat(final String partitionPath, 
final String fileId1) {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488283893



##
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##
@@ -378,9 +398,9 @@ public static void createCompactionAuxiliaryMetadata(String 
basePath, HoodieInst
 new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + 
"/" + instant.getFileName());
 FileSystem fs = FSUtils.getFs(basePath, configuration);
 try (FSDataOutputStream os = fs.create(commitFile, true)) {
-  HoodieCompactionPlan workload = new HoodieCompactionPlan();
+  HoodieCompactionPlan workload = 
HoodieCompactionPlan.newBuilder().setVersion(1).build();

Review comment:
   Version is not being set by default, so reading test plan generated here 
is failing. So I explicitly set version.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488268777



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java
##
@@ -22,5 +22,5 @@
  * The supported action types.
  */
 public enum ActionType {
-  commit, savepoint, compaction, clean, rollback
+  commit, savepoint, compaction, clean, rollback, replacecommit

Review comment:
   Filed HUDI-1281 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488226577



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -111,6 +111,9 @@ private[hudi] object HoodieSparkSqlWriter {
 tableConfig = tableMetaClient.getTableConfig
   }
 
+  val metaClient = new 
HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get)
+  val commitActionType = DataSourceUtils.getCommitActionType(operation, 
metaClient)

Review comment:
   We just need tableType to create commit/deltacommit in the default case. 
I moved this to a static method in util class and removed dependency on 
MetaClient.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488216199



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor>
+extends CommitActionExecutor {
+
+  private static final Logger LOG = 
LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+  private final JavaRDD> inputRecordsRDD;
+
+  public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config, 
HoodieTable table,
+ String instantTime, 
JavaRDD> inputRecordsRDD) {
+super(jsc, config, table, instantTime, 
WriteOperationType.INSERT_OVERWRITE);
+this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+return WriteHelper.write(instantTime, inputRecordsRDD, jsc, 
(HoodieTable) table,
+config.shouldCombineBeforeInsert(), 
config.getInsertShuffleParallelism(), this, false);
+  }
+
+  @Override
+  protected Partitioner getPartitioner(WorkloadProfile profile) {
+return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected JavaRDD processInputRecords(JavaRDD> 
inputRecordsRDD, WorkloadProfile profile) {
+// get all existing fileIds to mark them as replaced
+JavaRDD replaceStatuses = getAllReplaceWriteStatus(profile);
+// do necessary inserts into new file groups
+JavaRDD writeStatuses = 
super.processInputRecords(inputRecordsRDD, profile);
+return writeStatuses.union(replaceStatuses);
+  }
+
+  private JavaRDD getAllReplaceWriteStatus(WorkloadProfile 
profile) {
+JavaRDD partitions = jsc.parallelize(new 
ArrayList<>(profile.getPartitionPaths()));
+JavaRDD replaceStatuses = partitions.flatMap(partition ->
+getAllExistingFileIds(partition).map(fileId -> 
getReplaceWriteStatus(partition, fileId)).iterator());
+
+return replaceStatuses;
+  }
+
+  private Stream getAllExistingFileIds(String partitionPath) {
+// because new commit is not complete. it is safe to mark all base files 
as old files
+return 
table.getBaseFileOnlyView().getAllBaseFiles(partitionPath).map(baseFile -> 
baseFile.getFileId());

Review comment:
   My bad, missed that insert into log files case. I fixed it now. thanks 
for finding this bug.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488215834



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
##
@@ -94,6 +94,14 @@ public void setWriteStats(List writeStats) {
 this.writeStats = Option.of(writeStats);
   }
 
+  public Option> getReplacetats() {

Review comment:
   This is not needed anymore given we are tracking replaced files as 
boolean in WriteStatus. I removed this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488212928



##
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##
@@ -586,15 +603,23 @@ public String startCommit() {
* @param instantTime Instant time to be generated
*/
   public void startCommitWithTime(String instantTime) {
+HoodieTableMetaClient metaClient = createMetaClient(true);
+startCommitWithTime(instantTime, metaClient.getCommitActionType());
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) 
with specified action.
+   */
+  public void startCommitWithTime(String instantTime, String actionType) {
 // NOTE : Need to ensure that rollback is done before a new commit is 
started
 if (rollbackPending) {
   // Only rollback inflight commit/delta-commits. Do not touch compaction 
commits
   rollbackPendingCommits();
 }
-startCommit(instantTime);
+startCommit(instantTime, actionType);
   }
 
-  private void startCommit(String instantTime) {
+  private void startCommit(String instantTime, String actionType) {
 LOG.info("Generate a new instant time " + instantTime);
 HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
   Good point. Will do.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488209685



##
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##
@@ -576,7 +592,8 @@ public String startCommit() {
   rollbackPendingCommits();
 }
 String instantTime = HoodieActiveTimeline.createNewInstantTime();
-startCommit(instantTime);
+HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
   sure





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488208571



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {
+  // only delete files for completed instants
+  return;
+}
+
+TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+Stream fileGroupsToDelete = fileSystemView
+.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+fileGroupsToDelete.forEach(fg -> {
+  fg.getAllRawFileSlices().forEach(fileSlice -> {
+fileSlice.getBaseFile().map(baseFile -> 
deletePath(baseFile.getFileStatus().getPath(), instant));
+fileSlice.getLogFiles().forEach(logFile -> 
deletePath(logFile.getPath(), instant));
+  });
+});
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in 
this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, 
TableFileSystemView fileSystemView) {
+Option replaceInstantOption = 
metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+.filter(replaceInstant -> 
replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+replaceInstantOption.ifPresent(replaceInstant -> {

Review comment:
   correct, i'll restructure this code

##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {

Review comment:
   correct. will add it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488208180



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {
+  // only delete files for completed instants
+  return;
+}
+
+TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+Stream fileGroupsToDelete = fileSystemView
+.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+fileGroupsToDelete.forEach(fg -> {
+  fg.getAllRawFileSlices().forEach(fileSlice -> {
+fileSlice.getBaseFile().map(baseFile -> 
deletePath(baseFile.getFileStatus().getPath(), instant));
+fileSlice.getLogFiles().forEach(logFile -> 
deletePath(logFile.getPath(), instant));
+  });
+});
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in 
this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, 
TableFileSystemView fileSystemView) {
+Option replaceInstantOption = 
metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+.filter(replaceInstant -> 
replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+replaceInstantOption.ifPresent(replaceInstant -> {
+  try {
+HoodieReplaceCommitMetadata metadata = 
HoodieReplaceCommitMetadata.fromBytes(
+
metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+HoodieReplaceCommitMetadata.class);
+
+metadata.getPartitionToReplaceStats().keySet().forEach(partition -> 
fileSystemView.getAllFileGroups(partition));
+  } catch (IOException e) {
+throw new HoodieCommitException("Failed to archive because cannot 
delete replace files", e);
+  }
+});
+  }
+
+  private boolean deletePath(Path path, HoodieInstant instant) {
+try {
+  LOG.info("Deleting " + path + " before archiving " + instant);
+  metaClient.getFs().delete(path);

Review comment:
   metaclient.getFs() seems to be following singleton pattern, so it doesnt 
seem expensive to get this. am i reading it incorrectly?
   
   I can work to setup parallel deletes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-14 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488207278



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {
+  // only delete files for completed instants
+  return;
+}
+
+TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+Stream fileGroupsToDelete = fileSystemView

Review comment:
   Yes, compaction is the primary reason I only recorded fileId in the 
replace metadata. When deleting, we can get all file paths (through view or by 
listing using consolidated metadata) that have same fileId and delete these 
files.  
   
   There can be race conditions that compaction might create a new file with 
replaced fileId after we queried for existing files though. But because 
FileSystemView#get methods do not include replaced file groups, I think this is 
unlikely to happen. I'm not sure if there are edge cases with long running 
compactions.
   
   Please suggest any other improvements.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-01 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r481365985



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -301,6 +304,44 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
 }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+if (!instant.isCompleted()) {
+  // only delete files for completed instants
+  return;
+}
+Option replaceInstantOption = 
metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+.filter(replaceInstant -> 
replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+replaceInstantOption.ifPresent(replaceInstant -> {
+  try {
+HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
+
metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+HoodieCommitMetadata.class);
+
+
metadata.getPartitionToReplaceStats().entrySet().stream().forEach(entry ->
+deleteFileGroups(entry.getKey(), entry.getValue().stream().map(e 
-> e.getFileId()).collect(Collectors.toSet()), instant)
+);
+  } catch (IOException e) {
+throw new HoodieCommitException("Failed to archive because cannot 
delete replace files", e);
+  }
+});
+  }
+
+  private void deleteFileGroups(String partitionPath, Set 
fileIdsToDelete, HoodieInstant instant) {
+try {
+  FileStatus[] statuses = 
metaClient.getFs().listStatus(FSUtils.getPartitionPath(metaClient.getBasePath(),
 partitionPath));

Review comment:
   Modified to use FileSystemViews





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-01 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r481365742



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
##
@@ -77,6 +79,13 @@ public SpillableMapBasedFileSystemView(HoodieTableMetaClient 
metaClient, HoodieT
 }
   }
 
+  @Override
+  protected Map> createPartitionToExcludeFileGroups() {
+// TODO should we create another spillable directory under baseStoreDir?
+// the exclude file group is expected to be small, so use parent class 
in-memory representation
+return super.createPartitionToExcludeFileGroups();

Review comment:
   Please take a look if i did it correctly. (To be honest, dont fully 
understand compaction implementation in great detail)

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
##
@@ -148,6 +148,9 @@
*/
   Stream getAllFileGroups(String partitionPath);
 
+  Stream getAllExcludeFileGroups(String partitionPath);

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-01 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r481364995



##
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
##
@@ -104,4 +104,8 @@ protected SyncableFileSystemView 
getFileSystemViewWithUnCommittedSlices(HoodieTa
   protected HoodieTableType getTableType() {
 return HoodieTableType.COPY_ON_WRITE;
   }
+
+  protected boolean areTimeTravelTestsEnabled() {

Review comment:
   RocksDB and Remote FileSystemViews are not implemented yet. So i 
temporarily disabled those tests. Moved this method to view tests instead of 
common

##
File path: 
hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
##
@@ -102,7 +102,7 @@ public void commit(WriterCommitMessage[] messages) {
 .flatMap(m -> m.getWriteStatuses().stream().map(m2 -> 
m2.getStat())).collect(Collectors.toList());
 
 try {
-  writeClient.commitStats(instantTime, writeStatList, Option.empty());
+  writeClient.commitStats(instantTime, writeStatList, Option.empty(), 
HoodieTimeline.COMMIT_ACTION); //TODO get action type from 
HoodieWriterCommitMessage

Review comment:
   Fixed

##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
##
@@ -89,6 +89,10 @@ public FileSliceHandler(Configuration conf, 
FileSystemViewManager viewManager) t
 .collect(Collectors.toList());
   }
 
+  public List getExcludeFileGroups(String basePath, String 
partitionPath) {

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-01 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r481364462



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceStat.java
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Statistics about a single Hoodie replace operation.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceStat extends HoodieWriteStat {
+
+  // records from the 'getFileId()' can be written to multiple new file 
groups. This list tracks all new fileIds
+  private List newFileIds;

Review comment:
   its part of HoodieWriteStat#fileId

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
##
@@ -355,6 +357,18 @@ public RemoteHoodieTableFileSystemView(String server, int 
port, HoodieTableMetaC
 }
   }
 
+  @Override
+  public Stream getAllExcludeFileGroups(final String partitionPath) {

Review comment:
   changed

##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
##
@@ -284,6 +284,13 @@ private void registerFileSlicesAPI() {
   writeValueAsString(ctx, dtos);
 }, true));
 
+
app.get(RemoteHoodieTableFileSystemView.ALL_EXCLUDE_FILEGROUPS_FOR_PARTITION_URL,
 new ViewHandler(ctx -> {

Review comment:
   changed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] satishkotha commented on a change in pull request #2048: [HUDI-1072][WIP] Introduce REPLACE top level action

2020-09-01 Thread GitBox


satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r481364171



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##
@@ -173,29 +180,59 @@ protected void refreshTimeline(HoodieTimeline 
visibleActiveTimeline) {
 List fileGroups = new ArrayList<>();
 fileIdSet.forEach(pair -> {
   String fileId = pair.getValue();
-  HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, 
timeline);
-  if (baseFiles.containsKey(pair)) {
-baseFiles.get(pair).forEach(group::addBaseFile);
-  }
-  if (logFiles.containsKey(pair)) {
-logFiles.get(pair).forEach(group::addLogFile);
-  }
+  String partitionPath = pair.getKey();
+  if (isExcludeFileGroup(partitionPath, fileId)) {

Review comment:
   Modified. Please take a look. RocksDB needs to be implemented.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org