[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

2022-03-05 Thread GitBox


nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r820157441



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##
@@ -1301,4 +1369,33 @@ public void close() {
 this.heartbeatClient.stop();
 this.txnManager.close();
   }
+
+  private void setWriteTimer(HoodieTable table) {
+String commitType = table.getMetaClient().getCommitActionType();
+if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) {
+  writeTimer = metrics.getCommitCtx();
+} else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
+  writeTimer = metrics.getDeltaCommitCtx();
+}
+  }
+
+  private void tryUpgrade(HoodieTableMetaClient metaClient, Option 
instantTime) {
+UpgradeDowngrade upgradeDowngrade =
+new UpgradeDowngrade(metaClient, config, context, 
upgradeDowngradeHelper);
+
+if 
(upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {

Review comment:
   @yihua : we are making some minor refactoring relating to 
upgradedowngrade invocation for java engine. just wanted to keep you in the 
loop. Prior to this patch, we did not have any explicit upgrade downgrade. Just 
wanted to ensure unification does not cause any issues. if you can review it 
once and stamp it, would be nice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

2022-03-05 Thread GitBox


nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r820157190



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##
@@ -1301,4 +1369,33 @@ public void close() {
 this.heartbeatClient.stop();
 this.txnManager.close();
   }
+
+  private void setWriteTimer(HoodieTable table) {
+String commitType = table.getMetaClient().getCommitActionType();
+if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) {
+  writeTimer = metrics.getCommitCtx();
+} else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
+  writeTimer = metrics.getDeltaCommitCtx();
+}
+  }
+
+  private void tryUpgrade(HoodieTableMetaClient metaClient, Option 
instantTime) {
+UpgradeDowngrade upgradeDowngrade =
+new UpgradeDowngrade(metaClient, config, context, 
upgradeDowngradeHelper);
+
+if 
(upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {

Review comment:
   I am fine. but just that I want to get a stamp it from devs who work in 
flink and java. Just don't want to miss anything, bcoz, not many times every 
engine follows same logic. I checked the code and refactoring once again, seems 
to be fine. 
   @danny0405 : we are making some minor refactoring relating to 
upgradedowngrade invocation for flink engine. just wanted to keep you in the 
loop. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

2022-03-02 Thread GitBox


nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r818036625



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##
@@ -1301,4 +1369,33 @@ public void close() {
 this.heartbeatClient.stop();
 this.txnManager.close();
   }
+
+  private void setWriteTimer(HoodieTable table) {
+String commitType = table.getMetaClient().getCommitActionType();
+if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) {
+  writeTimer = metrics.getCommitCtx();
+} else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
+  writeTimer = metrics.getDeltaCommitCtx();
+}
+  }
+
+  private void tryUpgrade(HoodieTableMetaClient metaClient, Option 
instantTime) {
+UpgradeDowngrade upgradeDowngrade =
+new UpgradeDowngrade(metaClient, config, context, 
upgradeDowngradeHelper);
+
+if 
(upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {

Review comment:
   as per master, this code is available only in SparkEngine. Flink and 
java does not have this. Even if we wish to unify, I would do it in a separate 
patch and getting it reviewed by experts who have worked on it. Can you move 
this to SparkRDDWriteClient for now. 

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##
@@ -1246,17 +1243,88 @@ public HoodieMetrics getMetrics() {
   }
 
   /**
-   * Get HoodieTable and init {@link Timer.Context}.
+   * Instantiates engine-specific instance of {@link HoodieTable} as well as 
performs necessary
+   * bootstrapping operations (for ex, validating whether Metadata Table has 
to be bootstrapped)
*
-   * @param operationType write operation type
+   * NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY 
OPERATIONS
+   *   NOT REQUIRING EXTERNAL SYNCHRONIZATION
+   *
+   * @param metaClient instance of {@link HoodieTableMetaClient}
* @param instantTime current inflight instant time
-   * @return HoodieTable
+   * @return instantiated {@link HoodieTable}
*/
-  protected abstract HoodieTable 
getTableAndInitCtx(WriteOperationType operationType, String instantTime);
+  protected abstract HoodieTable doInitTable(HoodieTableMetaClient 
metaClient, Option instantTime);
 
   /**
-   * Sets write schema from last instant since deletes may not have schema set 
in the config.
+   * Instantiates and initializes instance of {@link HoodieTable}, performing 
crucial bootstrapping
+   * operations such as:
+   *
+   * NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please 
check on
+   * {@link #doInitTable(HoodieTableMetaClient, Option)} instead
+   *
+   * 
+   *   Checking whether upgrade/downgrade is required
+   *   Bootstrapping Metadata Table (if required)
+   *   Initializing metrics contexts
+   * 
*/
+  protected final HoodieTable initTable(WriteOperationType 
operationType, Option instantTime) {
+HoodieTableMetaClient metaClient = createMetaClient(true);
+// Setup write schemas for deletes
+if (operationType == WriteOperationType.DELETE) {
+  setWriteSchemaForDeletes(metaClient);
+}
+
+HoodieTable table;
+
+this.txnManager.beginTransaction();
+try {
+  tryUpgrade(metaClient, instantTime);
+  table = doInitTable(metaClient, instantTime);
+} finally {
+  this.txnManager.endTransaction();
+}
+
+// Validate table properties
+metaClient.validateTableProperties(config.getProps(), operationType);
+// Make sure that FS View is in sync
+table.getHoodieView().sync();
+
+switch (operationType) {
+  case INSERT:
+  case INSERT_PREPPED:
+  case UPSERT:
+  case UPSERT_PREPPED:
+  case BULK_INSERT:
+  case BULK_INSERT_PREPPED:
+  case INSERT_OVERWRITE:
+  case INSERT_OVERWRITE_TABLE:
+setWriteTimer(table);
+break;
+  case CLUSTER:
+clusteringTimer = metrics.getClusteringCtx();
+break;
+  case COMPACT:
+compactionTimer = metrics.getCompactionCtx();
+break;
+  default:
+}
+
+return table;
+  }
+
+  protected  R withLock(Supplier s) {

Review comment:
   is this used anywhere ?

##
File path: 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law

[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

2022-02-10 Thread GitBox


nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r80407



##
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##
@@ -362,28 +361,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
 return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
 Map combinedFileInfo = new HashMap<>();
+
+// First, add all files listed in the previous record
 if (previousRecord.filesystemMetadata != null) {
   combinedFileInfo.putAll(previousRecord.filesystemMetadata);
 }
 
+// Second, merge in the files listed in the new record
 if (filesystemMetadata != null) {
-  filesystemMetadata.forEach((filename, fileInfo) -> {
-// If the filename wasnt present then we carry it forward
-if (!combinedFileInfo.containsKey(filename)) {
-  combinedFileInfo.put(filename, fileInfo);
-} else {
-  if (fileInfo.getIsDeleted()) {
-// file deletion
-combinedFileInfo.remove(filename);
-  } else {
-// file appends.
-combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-  return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-});
-  }
-}
+  validatePayload(type, filesystemMetadata);
+
+  filesystemMetadata.forEach((key, fileInfo) -> {
+combinedFileInfo.merge(key, fileInfo,
+// Combine previous record w/ the new one, new records taking 
precedence over
+// the old one
+//
+// NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+//   record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+//   listing as well as drop the tombstone itself.
+//   However, if file is not present in the previous record we 
have to persist tombstone
+//   record in the listing to make sure we carry forward 
information that this file
+//   was deleted. This special case could occur since the 
merging flow is 2-stage:
+//  - First we merge records from all of the delta 
log-files
+//  - Then we merge records from base-files with the delta 
ones (coming as a result
+//  of the previous step)
+(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : 
newFileInfo);

Review comment:
   thanks a lot for capturing the gist. So, I assume you plan to fix the 
merge function to do max(old size, new size). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

2022-02-08 Thread GitBox


nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r802253573



##
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##
@@ -362,28 +361,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
 return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
 Map combinedFileInfo = new HashMap<>();
+
+// First, add all files listed in the previous record
 if (previousRecord.filesystemMetadata != null) {
   combinedFileInfo.putAll(previousRecord.filesystemMetadata);
 }
 
+// Second, merge in the files listed in the new record
 if (filesystemMetadata != null) {
-  filesystemMetadata.forEach((filename, fileInfo) -> {
-// If the filename wasnt present then we carry it forward
-if (!combinedFileInfo.containsKey(filename)) {
-  combinedFileInfo.put(filename, fileInfo);
-} else {
-  if (fileInfo.getIsDeleted()) {
-// file deletion
-combinedFileInfo.remove(filename);
-  } else {
-// file appends.
-combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-  return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-});
-  }
-}
+  validatePayload(type, filesystemMetadata);
+
+  filesystemMetadata.forEach((key, fileInfo) -> {
+combinedFileInfo.merge(key, fileInfo,
+// Combine previous record w/ the new one, new records taking 
precedence over
+// the old one
+//
+// NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+//   record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+//   listing as well as drop the tombstone itself.
+//   However, if file is not present in the previous record we 
have to persist tombstone
+//   record in the listing to make sure we carry forward 
information that this file
+//   was deleted. This special case could occur since the 
merging flow is 2-stage:
+//  - First we merge records from all of the delta 
log-files
+//  - Then we merge records from base-files with the delta 
ones (coming as a result
+//  of the previous step)
+(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : 
newFileInfo);

Review comment:
   I agree. but the scenario which you quoted are unlikely to happen in 
general.
   a. since cleaner will not touch latest file slice where regular writes go 
into (MOR table).
   b. incase of COW rollback, there is no log file concept and hence a rollback 
and a commit touching the same file is very unlikely. 
   So, two concurrent writes where one of them deletes while the other updates 
or adds a new file : I can't think of a valid scenario. may be there is, we 
need to think hard to see if we can come up w/ one.  
   
   But the scenario of rollback and concurrent updates to same file slice might 
happen in MOR table. 
   
   also, the fix that is being considered may not cause any regression and 
could possibly help thwart some of the scenarios. i.e taking the max file size 
rather than latest file size. Do you see any issues specifically to go w/ max 
of file sizes in merge function? Atleast for the rollback and regular writer in 
MOR use-case, we have to make the fix. I don't think we can ignore the 
use-case. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

2022-02-08 Thread GitBox


nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r802159013



##
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##
@@ -362,28 +361,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
 return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
 Map combinedFileInfo = new HashMap<>();
+
+// First, add all files listed in the previous record
 if (previousRecord.filesystemMetadata != null) {
   combinedFileInfo.putAll(previousRecord.filesystemMetadata);
 }
 
+// Second, merge in the files listed in the new record
 if (filesystemMetadata != null) {
-  filesystemMetadata.forEach((filename, fileInfo) -> {
-// If the filename wasnt present then we carry it forward
-if (!combinedFileInfo.containsKey(filename)) {
-  combinedFileInfo.put(filename, fileInfo);
-} else {
-  if (fileInfo.getIsDeleted()) {
-// file deletion
-combinedFileInfo.remove(filename);
-  } else {
-// file appends.
-combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-  return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-});
-  }
-}
+  validatePayload(type, filesystemMetadata);
+
+  filesystemMetadata.forEach((key, fileInfo) -> {
+combinedFileInfo.merge(key, fileInfo,
+// Combine previous record w/ the new one, new records taking 
precedence over
+// the old one
+//
+// NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+//   record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+//   listing as well as drop the tombstone itself.
+//   However, if file is not present in the previous record we 
have to persist tombstone
+//   record in the listing to make sure we carry forward 
information that this file
+//   was deleted. This special case could occur since the 
merging flow is 2-stage:
+//  - First we merge records from all of the delta 
log-files
+//  - Then we merge records from base-files with the delta 
ones (coming as a result
+//  of the previous step)
+(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : 
newFileInfo);

Review comment:
   ok, let me take a stab to see if this is a valid scenario. this could be 
applicable only for hdfs.
   
   Let's say there is a concurrent write to a log file by two diff writers. one 
of them is doing a rollback and another is appending a log block. Let's say 
writer1 (who is doing a rollback) updates the log file first and gets size as 
200 may be. And later writer2 appends to same log file and gets size as 300. 
Even though the order in which these writers appended to file could be writer1 
followed by writer2. Its not guaranteed that the same order will be maintained 
when they reach metadata table. So, due to various reasons writer2 could 
complete its write earlier(for eg writer2 is updating only one file group, 
where as writer1 is updating some 100 file groups) and could apply its changes 
to metadata first before writer1. 
   
   wouldn't the final resolved size from metadata be 200 in this case when both 
writers are done updating MDT? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

2022-02-08 Thread GitBox


nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r802159013



##
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##
@@ -362,28 +361,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
 return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
 Map combinedFileInfo = new HashMap<>();
+
+// First, add all files listed in the previous record
 if (previousRecord.filesystemMetadata != null) {
   combinedFileInfo.putAll(previousRecord.filesystemMetadata);
 }
 
+// Second, merge in the files listed in the new record
 if (filesystemMetadata != null) {
-  filesystemMetadata.forEach((filename, fileInfo) -> {
-// If the filename wasnt present then we carry it forward
-if (!combinedFileInfo.containsKey(filename)) {
-  combinedFileInfo.put(filename, fileInfo);
-} else {
-  if (fileInfo.getIsDeleted()) {
-// file deletion
-combinedFileInfo.remove(filename);
-  } else {
-// file appends.
-combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-  return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-});
-  }
-}
+  validatePayload(type, filesystemMetadata);
+
+  filesystemMetadata.forEach((key, fileInfo) -> {
+combinedFileInfo.merge(key, fileInfo,
+// Combine previous record w/ the new one, new records taking 
precedence over
+// the old one
+//
+// NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+//   record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+//   listing as well as drop the tombstone itself.
+//   However, if file is not present in the previous record we 
have to persist tombstone
+//   record in the listing to make sure we carry forward 
information that this file
+//   was deleted. This special case could occur since the 
merging flow is 2-stage:
+//  - First we merge records from all of the delta 
log-files
+//  - Then we merge records from base-files with the delta 
ones (coming as a result
+//  of the previous step)
+(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : 
newFileInfo);

Review comment:
   ok, let me take a stab to see if this is a valid scenario. this could be 
applicable only for hdfs.
   
   Let's say there is a concurrent write to a log file by two diff writers. one 
of them is doing a rollback and another is appending a log block. Let's say 
writer1 (who is doing a rollback) updates the log file first and gets size as 
200 may be. And later writer2 appends to same log file and gets size as 300. 
Even though the order in which these writers appended to file could be writer1 
followed by writer2. Its not guaranteed that the same order will be maintained 
when they reach metadata table. So, due to various reasons writer2 could 
complete its write earlier and could apply its changes to metadata first before 
writer1. 
   
   wouldn't the final resolved size from metadata be 200 in this case when both 
writers are done updating MDT? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

2022-02-08 Thread GitBox


nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r802157120



##
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##
@@ -362,28 +375,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
 return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
 Map combinedFileInfo = new HashMap<>();
+
+// First, add all files listed in the previous record
 if (previousRecord.filesystemMetadata != null) {
   combinedFileInfo.putAll(previousRecord.filesystemMetadata);
 }
 
+// Second, merge in the files listed in the new record
 if (filesystemMetadata != null) {
-  filesystemMetadata.forEach((filename, fileInfo) -> {
-// If the filename wasnt present then we carry it forward
-if (!combinedFileInfo.containsKey(filename)) {
-  combinedFileInfo.put(filename, fileInfo);
-} else {
-  if (fileInfo.getIsDeleted()) {
-// file deletion
-combinedFileInfo.remove(filename);
-  } else {
-// file appends.
-combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-  return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-});
-  }
-}
+  validatePayload(type, filesystemMetadata);
+
+  filesystemMetadata.forEach((key, fileInfo) -> {
+combinedFileInfo.merge(key, fileInfo,
+// Combine previous record w/ the new one, new records taking 
precedence over
+// the old one
+//
+// NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+//   record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+//   listing as well as drop the tombstone itself.
+//   However, if file is not present in the previous record we 
have to persist tombstone
+//   record in the listing to make sure we carry forward 
information that this file
+//   was deleted. This special case could occur since the 
merging flow is 2-stage:
+//  - First we merge records from all of the delta 
log-files
+//  - Then we merge records from base-files with the delta 
ones (coming as a result
+//  of the previous step)
+(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : 
newFileInfo);

Review comment:
   guess prashanth meant to check the oldFileInfo. the validate you are 
referring to validates the new incoming filesystemMetadata. 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##
@@ -362,28 +361,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
 return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
 Map combinedFileInfo = new HashMap<>();
+
+// First, add all files listed in the previous record
 if (previousRecord.filesystemMetadata != null) {
   combinedFileInfo.putAll(previousRecord.filesystemMetadata);
 }
 
+// Second, merge in the files listed in the new record
 if (filesystemMetadata != null) {
-  filesystemMetadata.forEach((filename, fileInfo) -> {
-// If the filename wasnt present then we carry it forward
-if (!combinedFileInfo.containsKey(filename)) {
-  combinedFileInfo.put(filename, fileInfo);
-} else {
-  if (fileInfo.getIsDeleted()) {
-// file deletion
-combinedFileInfo.remove(filename);
-  } else {
-// file appends.
-combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-  return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-});
-  }
-}
+  validatePayload(type, filesystemMetadata);
+
+  filesystemMetadata.forEach((key, fileInfo) -> {
+combinedFileInfo.merge(key, fileInfo,
+// Combine previous record w/ the new one, new records taking 
precedence over
+// the old one
+//
+// NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+//   record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+// 

[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

2022-02-06 Thread GitBox


nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r800214841



##
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##
@@ -362,28 +361,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
 return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
 Map combinedFileInfo = new HashMap<>();
+
+// First, add all files listed in the previous record
 if (previousRecord.filesystemMetadata != null) {
   combinedFileInfo.putAll(previousRecord.filesystemMetadata);
 }
 
+// Second, merge in the files listed in the new record
 if (filesystemMetadata != null) {
-  filesystemMetadata.forEach((filename, fileInfo) -> {
-// If the filename wasnt present then we carry it forward
-if (!combinedFileInfo.containsKey(filename)) {
-  combinedFileInfo.put(filename, fileInfo);
-} else {
-  if (fileInfo.getIsDeleted()) {
-// file deletion
-combinedFileInfo.remove(filename);
-  } else {
-// file appends.
-combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-  return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-});
-  }
-}
+  validatePayload(type, filesystemMetadata);
+
+  filesystemMetadata.forEach((key, fileInfo) -> {
+combinedFileInfo.merge(key, fileInfo,
+// Combine previous record w/ the new one, new records taking 
precedence over
+// the old one
+//
+// NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+//   record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+//   listing as well as drop the tombstone itself.
+//   However, if file is not present in the previous record we 
have to persist tombstone
+//   record in the listing to make sure we carry forward 
information that this file
+//   was deleted. This special case could occur since the 
merging flow is 2-stage:
+//  - First we merge records from all of the delta 
log-files
+//  - Then we merge records from base-files with the delta 
ones (coming as a result
+//  of the previous step)
+(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : 
newFileInfo);

Review comment:
   @prashantwason : aren't there any multi-writer scenarios, where a new 
commit could report a file w/ lower size compared to a previous commit. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

2022-02-06 Thread GitBox


nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r800214756



##
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##
@@ -362,28 +361,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
 return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
 Map combinedFileInfo = new HashMap<>();
+
+// First, add all files listed in the previous record
 if (previousRecord.filesystemMetadata != null) {
   combinedFileInfo.putAll(previousRecord.filesystemMetadata);
 }
 
+// Second, merge in the files listed in the new record
 if (filesystemMetadata != null) {
-  filesystemMetadata.forEach((filename, fileInfo) -> {
-// If the filename wasnt present then we carry it forward
-if (!combinedFileInfo.containsKey(filename)) {
-  combinedFileInfo.put(filename, fileInfo);
-} else {
-  if (fileInfo.getIsDeleted()) {
-// file deletion
-combinedFileInfo.remove(filename);
-  } else {
-// file appends.
-combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-  return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-});
-  }
-}
+  validatePayload(type, filesystemMetadata);
+
+  filesystemMetadata.forEach((key, fileInfo) -> {
+combinedFileInfo.merge(key, fileInfo,
+// Combine previous record w/ the new one, new records taking 
precedence over
+// the old one
+//
+// NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+//   record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+//   listing as well as drop the tombstone itself.
+//   However, if file is not present in the previous record we 
have to persist tombstone
+//   record in the listing to make sure we carry forward 
information that this file
+//   was deleted. This special case could occur since the 
merging flow is 2-stage:
+//  - First we merge records from all of the delta 
log-files
+//  - Then we merge records from base-files with the delta 
ones (coming as a result
+//  of the previous step)
+(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : 
newFileInfo);

Review comment:
   don't we need to take the max of either of sizes? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS

2022-02-02 Thread GitBox


nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r798098988



##
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##
@@ -215,19 +215,16 @@ public HoodieMetadataPayload 
preCombine(HoodieMetadataPayload previousRecord) {
 
 if (filesystemMetadata != null) {
   filesystemMetadata.forEach((filename, fileInfo) -> {
-// If the filename wasnt present then we carry it forward
-if (!combinedFileInfo.containsKey(filename)) {
-  combinedFileInfo.put(filename, fileInfo);
+if (fileInfo.getIsDeleted()) {
+  combinedFileInfo.remove(filename);
 } else {
-  if (fileInfo.getIsDeleted()) {
-// file deletion
-combinedFileInfo.remove(filename);
-  } else {
-// file appends.
-combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-  return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-});
-  }
+  // NOTE: There are 2 possible cases here:
+  //- New file is created: in that case we're simply adding its 
info
+  //- File is appended to (only log-files of MOR tables on 
supported FS): in that case
+  //  we simply pick the info w/ largest file-size as the most 
recent one, since file's
+  //  sizes are increasing monotonically (meaning that the larger 
file-size is more recent one)
+  combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) ->

Review comment:
   merge func takes care of adding an entry for the first time and hence 
remove L219 and 220 ? 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##
@@ -87,40 +89,58 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
* @return a list of metadata table records
*/
   public static List 
convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String 
instantTime) {
-List records = new LinkedList<>();
-List allPartitions = new LinkedList<>();
-commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, 
writeStats) -> {
-  final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) 
? NON_PARTITIONED_NAME : partitionStatName;
-  allPartitions.add(partition);
-
-  Map newFiles = new HashMap<>(writeStats.size());
-  writeStats.forEach(hoodieWriteStat -> {
-String pathWithPartition = hoodieWriteStat.getPath();
-if (pathWithPartition == null) {
-  // Empty partition
-  LOG.warn("Unable to find path in write stat to update metadata table 
" + hoodieWriteStat);
-  return;
-}
-
-int offset = partition.equals(NON_PARTITIONED_NAME) ? 
(pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1;
-String filename = pathWithPartition.substring(offset);
-long totalWriteBytes = newFiles.containsKey(filename)
-? newFiles.get(filename) + hoodieWriteStat.getTotalWriteBytes()
-: hoodieWriteStat.getTotalWriteBytes();
-newFiles.put(filename, totalWriteBytes);
-  });
-  // New files added to a partition
-  HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
-  partition, Option.of(newFiles), Option.empty());
-  records.add(record);
-});
+List records = new 
ArrayList<>(commitMetadata.getPartitionToWriteStats().size());
+
+// Add record bearing partitions list
+ArrayList partitionsList = new 
ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
+
+
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList));
+
+// New files added to a partition
+List> updatedFilesRecords =
+commitMetadata.getPartitionToWriteStats().entrySet()
+.stream()
+.map(entry -> {
+  String partitionStatName = entry.getKey();
+  List writeStats = entry.getValue();
+
+  String partition = 
partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : 
partitionStatName;
+
+  HashMap updatedFilesToSizesMapping =
+  writeStats.stream().reduce(new HashMap<>(writeStats.size()),
+  (map, stat) -> {
+String pathWithPartition = stat.getPath();
+if (pathWithPartition == null) {
+  // Empty partition
+  LOG.warn("Unable to find path in write stat to 
update metadata table " + stat);
+  return map;
+}
+
+int offset = partition.equals(NON_PARTITIONED_NAME)
+? (pathWithPartition.startsWith("/") ? 1 : 0)
+: partition.len