[GitHub] [hudi] nsivabalan commented on a diff in pull request #8978: [HUDI-6315] Optimize DELETE codepath to use meta fields instead of key generation and index lookup

2023-07-03 Thread via GitHub


nsivabalan commented on code in PR #8978:
URL: https://github.com/apache/hudi/pull/8978#discussion_r1251481229


##
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##
@@ -227,9 +231,21 @@ public static HoodieWriteResult 
doWriteOperation(SparkRDDWriteClient client, Jav
 }
   }
 
-  public static HoodieWriteResult doDeleteOperation(SparkRDDWriteClient 
client, JavaRDD hoodieKeys,
-  String instantTime) {
-return new HoodieWriteResult(client.delete(hoodieKeys, instantTime));
+  public static HoodieWriteResult doDeleteOperation(SparkRDDWriteClient 
client, JavaRDD>> 
hoodieKeysAndLocations,
+  String instantTime, boolean isPrepped) {
+
+if (isPrepped) {
+  JavaRDD records = hoodieKeysAndLocations.map(tuple -> {
+HoodieRecord record = 
client.getConfig().getRecordMerger().getRecordType() == 
HoodieRecord.HoodieRecordType.AVRO

Review Comment:
   can you declare the record type as a variable outside and then just use that 
variable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a diff in pull request #8978: [HUDI-6315] Optimize DELETE codepath to use meta fields instead of key generation and index lookup

2023-07-03 Thread via GitHub


nsivabalan commented on code in PR #8978:
URL: https://github.com/apache/hudi/pull/8978#discussion_r1251480969


##
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##
@@ -227,9 +231,21 @@ public static HoodieWriteResult 
doWriteOperation(SparkRDDWriteClient client, Jav
 }
   }
 
-  public static HoodieWriteResult doDeleteOperation(SparkRDDWriteClient 
client, JavaRDD hoodieKeys,
-  String instantTime) {
-return new HoodieWriteResult(client.delete(hoodieKeys, instantTime));
+  public static HoodieWriteResult doDeleteOperation(SparkRDDWriteClient 
client, JavaRDD>> 
hoodieKeysAndLocations,
+  String instantTime, boolean isPrepped) {
+
+if (isPrepped) {
+  JavaRDD records = hoodieKeysAndLocations.map(tuple -> {
+HoodieRecord record = 
client.getConfig().getRecordMerger().getRecordType() == 
HoodieRecord.HoodieRecordType.AVRO

Review Comment:
   "client" object is in driver. since here we are accessing it in the 
executor, we might get NotSerializableException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a diff in pull request #8978: [HUDI-6315] Optimize DELETE codepath to use meta fields instead of key generation and index lookup

2023-06-26 Thread via GitHub


nsivabalan commented on code in PR #8978:
URL: https://github.com/apache/hudi/pull/8978#discussion_r1242926341


##
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePreppedCommitActionExecutor.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+/**
+ * Flink upsert prepped commit action executor.
+ */
+public class FlinkDeletePreppedCommitActionExecutor extends 
BaseFlinkCommitActionExecutor {
+
+  private final List> preppedRecords;
+
+  public FlinkDeletePreppedCommitActionExecutor(HoodieEngineContext context,

Review Comment:
   Can you file a ticket or adding tests for delete prepped for flink. 
   for spark, lets add tests in this patch only. 



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java:
##
@@ -105,6 +106,11 @@ public HoodieWriteMetadata> 
delete(HoodieEngineContext c
 return new 
SparkDeleteDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, 
config, this, instantTime, keys).execute();
   }
 
+  @Override
+  public HoodieWriteMetadata> 
deletePrepped(HoodieEngineContext context, String instantTime, 
HoodieData> preppedRecords) {

Review Comment:
   my bad. thanks



##
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##
@@ -41,13 +42,30 @@ public Option> 
merge(HoodieRecord older, Schema oldSc
 ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecordType.SPARK);
 ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecordType.SPARK);
 
-if (newer.getData() == null) {
-  // Delete record
-  return Option.empty();
+if (newer instanceof HoodieSparkRecord) {
+  HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
+  if (newSparkRecord.isDeleted()) {
+// Delete record
+return Option.empty();
+  }
+} else {
+  if (newer.getData() == null) {

Review Comment:
   we need to understand whats going on in that test



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##
@@ -247,6 +247,15 @@ public JavaRDD delete(JavaRDD 
keys, String instantTime)
 return postWrite(resultRDD, instantTime, table);
   }
 
+  @Override
+  public JavaRDD deletePrepped(JavaRDD> 
preppedRecord, String instantTime) {

Review Comment:
   we might need to add tests for this



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##
@@ -349,9 +366,9 @@ object HoodieSparkSqlWriter {
 // Remove meta columns from writerSchema if isPrepped is true.
 val isPrepped = 
hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
 val processedDataSchema = if (isPrepped) {
-  HoodieAvroUtils.removeMetadataFields(writerSchema);
+  HoodieAvroUtils.removeMetadataFields(writerSchema)

Review Comment:
   guess this has to be 
   ```
   HoodieAvroUtils.removeMetadataFields(dataFileSchema) 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a diff in pull request #8978: [HUDI-6315] Optimize DELETE codepath to use meta fields instead of key generation and index lookup

2023-06-21 Thread via GitHub


nsivabalan commented on code in PR #8978:
URL: https://github.com/apache/hudi/pull/8978#discussion_r1237826497


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##
@@ -83,6 +83,7 @@ public class HoodieSparkRecord extends 
HoodieRecord {
*/
   private final transient StructType schema;
 
+  private boolean isDeleted;

Review Comment:
   can you add java docs as to how to deduce isDeleted 



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java:
##
@@ -105,6 +106,11 @@ public HoodieWriteMetadata> 
delete(HoodieEngineContext c
 return new 
SparkDeleteDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, 
config, this, instantTime, keys).execute();
   }
 
+  @Override
+  public HoodieWriteMetadata> 
deletePrepped(HoodieEngineContext context, String instantTime, 
HoodieData> preppedRecords) {

Review Comment:
   we don't need to explicitly override here if we use 
SparkDeletePreppedCommitActionExecutor for MOR as well 



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkDeletePreppedDeltaCommitActionExecutor.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+public class SparkDeletePreppedDeltaCommitActionExecutor
+extends BaseSparkDeltaCommitActionExecutor {
+
+  private final HoodieData> preppedRecords;
+
+  public SparkDeletePreppedDeltaCommitActionExecutor(HoodieSparkEngineContext 
context,

Review Comment:
   Why do we need this class? can't we use 
SparkDeletePreppedCommitActionExecutor for both COW and MOR table?



##
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala:
##
@@ -38,15 +39,14 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable) 
extends HoodieLeafRunn
 logInfo(s"Executing 'DELETE FROM' command for $tableId")
 
 val condition = sparkAdapter.extractDeleteCondition(dft)
-
-val targetLogicalPlan = stripMetaFieldAttributes(dft.table)
 val filteredPlan = if (condition != null) {
-  Filter(condition, targetLogicalPlan)
+  Filter(condition, dft.table)
 } else {
-  targetLogicalPlan
+  dft.table
 }
 
-val config = buildHoodieDeleteTableConfig(catalogTable, sparkSession)
+var config = buildHoodieDeleteTableConfig(catalogTable, sparkSession)

Review Comment:
   can we avoid var and stick w/ val



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala:
##
@@ -194,7 +192,7 @@ object HoodieCreateRecordUtils {
 true
   }
 
-  private def validateMetaFieldsInAvroRecords(avroRec: GenericRecord): Unit = {
+  def validateMetaFieldsInAvroRecords(avroRec: GenericRecord): Unit = {

Review Comment:
   why removed private access? 



##
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java:
##
@@ -153,6 +154,27 @@ public HoodieWriteMetadata> delete(
 return new FlinkDeleteCommitActionExecutor<>(context, writeHandle, config, 
this, instantTime, keys).execute();
   }
 
+  /**
+   * Delete the given prepared records from the Hoodie table, at the supplied 
instantTime.
+   *
+   * This implementation requires that the input records are already 
tagged, and de-duped if needed.
+   *
+   * Specifies the write handle explicitly in order to have fine-grained 
control with
+   * the underneath file.
+   *
+   * @param context {@link HoodieEngineContext}
+   * @param instantTime Instant Time for the action
+   * @param preppedRecords Hoodie records to delete
+   * @return {@link HoodieWriteMetadata}