[GitHub] [hudi] nsivabalan commented on a diff in pull request #8978: [HUDI-6315] Optimize DELETE codepath to use meta fields instead of key generation and index lookup
nsivabalan commented on code in PR #8978: URL: https://github.com/apache/hudi/pull/8978#discussion_r1251481229 ## hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java: ## @@ -227,9 +231,21 @@ public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, Jav } } - public static HoodieWriteResult doDeleteOperation(SparkRDDWriteClient client, JavaRDD hoodieKeys, - String instantTime) { -return new HoodieWriteResult(client.delete(hoodieKeys, instantTime)); + public static HoodieWriteResult doDeleteOperation(SparkRDDWriteClient client, JavaRDD>> hoodieKeysAndLocations, + String instantTime, boolean isPrepped) { + +if (isPrepped) { + JavaRDD records = hoodieKeysAndLocations.map(tuple -> { +HoodieRecord record = client.getConfig().getRecordMerger().getRecordType() == HoodieRecord.HoodieRecordType.AVRO Review Comment: can you declare the record type as a variable outside and then just use that variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8978: [HUDI-6315] Optimize DELETE codepath to use meta fields instead of key generation and index lookup
nsivabalan commented on code in PR #8978: URL: https://github.com/apache/hudi/pull/8978#discussion_r1251480969 ## hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java: ## @@ -227,9 +231,21 @@ public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, Jav } } - public static HoodieWriteResult doDeleteOperation(SparkRDDWriteClient client, JavaRDD hoodieKeys, - String instantTime) { -return new HoodieWriteResult(client.delete(hoodieKeys, instantTime)); + public static HoodieWriteResult doDeleteOperation(SparkRDDWriteClient client, JavaRDD>> hoodieKeysAndLocations, + String instantTime, boolean isPrepped) { + +if (isPrepped) { + JavaRDD records = hoodieKeysAndLocations.map(tuple -> { +HoodieRecord record = client.getConfig().getRecordMerger().getRecordType() == HoodieRecord.HoodieRecordType.AVRO Review Comment: "client" object is in driver. since here we are accessing it in the executor, we might get NotSerializableException. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8978: [HUDI-6315] Optimize DELETE codepath to use meta fields instead of key generation and index lookup
nsivabalan commented on code in PR #8978: URL: https://github.com/apache/hudi/pull/8978#discussion_r1242926341 ## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePreppedCommitActionExecutor.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +/** + * Flink upsert prepped commit action executor. + */ +public class FlinkDeletePreppedCommitActionExecutor extends BaseFlinkCommitActionExecutor { + + private final List> preppedRecords; + + public FlinkDeletePreppedCommitActionExecutor(HoodieEngineContext context, Review Comment: Can you file a ticket or adding tests for delete prepped for flink. for spark, lets add tests in this patch only. ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java: ## @@ -105,6 +106,11 @@ public HoodieWriteMetadata> delete(HoodieEngineContext c return new SparkDeleteDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, keys).execute(); } + @Override + public HoodieWriteMetadata> deletePrepped(HoodieEngineContext context, String instantTime, HoodieData> preppedRecords) { Review Comment: my bad. thanks ## hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java: ## @@ -41,13 +42,30 @@ public Option> merge(HoodieRecord older, Schema oldSc ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.SPARK); ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.SPARK); -if (newer.getData() == null) { - // Delete record - return Option.empty(); +if (newer instanceof HoodieSparkRecord) { + HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer; + if (newSparkRecord.isDeleted()) { +// Delete record +return Option.empty(); + } +} else { + if (newer.getData() == null) { Review Comment: we need to understand whats going on in that test ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ## @@ -247,6 +247,15 @@ public JavaRDD delete(JavaRDD keys, String instantTime) return postWrite(resultRDD, instantTime, table); } + @Override + public JavaRDD deletePrepped(JavaRDD> preppedRecord, String instantTime) { Review Comment: we might need to add tests for this ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ## @@ -349,9 +366,9 @@ object HoodieSparkSqlWriter { // Remove meta columns from writerSchema if isPrepped is true. val isPrepped = hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false) val processedDataSchema = if (isPrepped) { - HoodieAvroUtils.removeMetadataFields(writerSchema); + HoodieAvroUtils.removeMetadataFields(writerSchema) Review Comment: guess this has to be ``` HoodieAvroUtils.removeMetadataFields(dataFileSchema) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8978: [HUDI-6315] Optimize DELETE codepath to use meta fields instead of key generation and index lookup
nsivabalan commented on code in PR #8978: URL: https://github.com/apache/hudi/pull/8978#discussion_r1237826497 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java: ## @@ -83,6 +83,7 @@ public class HoodieSparkRecord extends HoodieRecord { */ private final transient StructType schema; + private boolean isDeleted; Review Comment: can you add java docs as to how to deduce isDeleted ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java: ## @@ -105,6 +106,11 @@ public HoodieWriteMetadata> delete(HoodieEngineContext c return new SparkDeleteDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, keys).execute(); } + @Override + public HoodieWriteMetadata> deletePrepped(HoodieEngineContext context, String instantTime, HoodieData> preppedRecords) { Review Comment: we don't need to explicitly override here if we use SparkDeletePreppedCommitActionExecutor for MOR as well ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkDeletePreppedDeltaCommitActionExecutor.java: ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.deltacommit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +public class SparkDeletePreppedDeltaCommitActionExecutor +extends BaseSparkDeltaCommitActionExecutor { + + private final HoodieData> preppedRecords; + + public SparkDeletePreppedDeltaCommitActionExecutor(HoodieSparkEngineContext context, Review Comment: Why do we need this class? can't we use SparkDeletePreppedCommitActionExecutor for both COW and MOR table? ## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala: ## @@ -38,15 +39,14 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends HoodieLeafRunn logInfo(s"Executing 'DELETE FROM' command for $tableId") val condition = sparkAdapter.extractDeleteCondition(dft) - -val targetLogicalPlan = stripMetaFieldAttributes(dft.table) val filteredPlan = if (condition != null) { - Filter(condition, targetLogicalPlan) + Filter(condition, dft.table) } else { - targetLogicalPlan + dft.table } -val config = buildHoodieDeleteTableConfig(catalogTable, sparkSession) +var config = buildHoodieDeleteTableConfig(catalogTable, sparkSession) Review Comment: can we avoid var and stick w/ val ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala: ## @@ -194,7 +192,7 @@ object HoodieCreateRecordUtils { true } - private def validateMetaFieldsInAvroRecords(avroRec: GenericRecord): Unit = { + def validateMetaFieldsInAvroRecords(avroRec: GenericRecord): Unit = { Review Comment: why removed private access? ## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java: ## @@ -153,6 +154,27 @@ public HoodieWriteMetadata> delete( return new FlinkDeleteCommitActionExecutor<>(context, writeHandle, config, this, instantTime, keys).execute(); } + /** + * Delete the given prepared records from the Hoodie table, at the supplied instantTime. + * + * This implementation requires that the input records are already tagged, and de-duped if needed. + * + * Specifies the write handle explicitly in order to have fine-grained control with + * the underneath file. + * + * @param context {@link HoodieEngineContext} + * @param instantTime Instant Time for the action + * @param preppedRecords Hoodie records to delete + * @return {@link HoodieWriteMetadata}