[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r977042462 ## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java: ## @@ -291,59 +284,51 @@ public void checkState() { } } - // - - // - // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here - // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` - // is complete - // - public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException; + /** + * Get column in record to support RDDCustomColumnsSortPartitioner + */ + public abstract Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled); - public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException; + /** + * Support bootstrap. + */ + public abstract HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException; Review Comment: Understood. Let's keep it for now, but just rename it to `joinWith` to avoid confusion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r977041457 ## hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.{Input, Output} +import com.twitter.chill.KSerializer +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import org.apache.avro.SchemaNormalization +import org.apache.commons.io.IOUtils +import org.apache.hudi.commmon.model.HoodieSparkRecord +import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils +import org.apache.spark.{SparkEnv, SparkException} +import scala.collection.mutable + +/** + * Custom serializer used for generic spark records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + * @param schemas a map where the keys are unique IDs for spark schemas and the values are the + *string representation of the Avro schema, used to decrease the amount of data + *that needs to be serialized. + */ +class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends KSerializer[HoodieSparkRecord] { Review Comment: https://hudi.apache.org/docs/quick-start-guide -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r977040996 ## hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.{Input, Output} +import com.twitter.chill.KSerializer +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import org.apache.avro.SchemaNormalization +import org.apache.commons.io.IOUtils +import org.apache.hudi.commmon.model.HoodieSparkRecord +import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils +import org.apache.spark.{SparkEnv, SparkException} +import scala.collection.mutable + +/** + * Custom serializer used for generic spark records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + * @param schemas a map where the keys are unique IDs for spark schemas and the values are the + *string representation of the Avro schema, used to decrease the amount of data + *that needs to be serialized. + */ +class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends KSerializer[HoodieSparkRecord] { Review Comment: Sorry, my bad i wasn't clear enough -- we will have to - Implement Registrar to make sure it does register our custom serializer - Make sure we update the docs to include it (and make sure to highlight it in the change-log), similarly to how we recommend including `spark.serializer` config -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r975768738 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java: ## @@ -505,7 +514,9 @@ public class HoodieWriteConfig extends HoodieConfig { private HoodieMetadataConfig metadataConfig; private HoodieMetastoreConfig metastoreConfig; private HoodieCommonConfig commonConfig; + private HoodieStorageConfig storageConfig; private EngineType engineType; + private HoodieRecordMerger recordMerger; Review Comment: Let's avoid premature optimizations -- `RecordMerger` is specifically designed as stateless component and as such is lightweight enough to be initialized on-demand. Only thing we need to take care of is just making sure we don't init it for every record. The problem with caching it inside `HoodieWriteConfig` is that HoodieWriteConfig has nothing to do w/ RecordMerger and we should not open this can of warm when it's used as a state carriage for other components. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r975734758 ## hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.{Input, Output} +import com.twitter.chill.KSerializer +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import org.apache.avro.SchemaNormalization +import org.apache.commons.io.IOUtils +import org.apache.hudi.commmon.model.HoodieSparkRecord +import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils +import org.apache.spark.{SparkEnv, SparkException} +import scala.collection.mutable + +/** + * Custom serializer used for generic spark records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + * @param schemas a map where the keys are unique IDs for spark schemas and the values are the + *string representation of the Avro schema, used to decrease the amount of data + *that needs to be serialized. + */ +class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends KSerializer[HoodieSparkRecord] { Review Comment: We can't since we don't control Kryo lifecycle, and i don't think that we should -- Kryo injected into Spark's `SerializerManager` could be its own thing and we cache the schemas for our needs separately -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r975619535 ## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java: ## @@ -291,59 +284,51 @@ public void checkState() { } } - // - - // - // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here - // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` - // is complete - // - public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException; + /** + * Get column in record to support RDDCustomColumnsSortPartitioner + */ + public abstract Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled); - public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException; + /** + * Support bootstrap. + */ + public abstract HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException; Review Comment: I see what you're saying. This method is confusing though, since it's also called merge (and i think some impls are actually using RecordMerger) Ideally, we shouldn't even have it in the interface (since it's not generic enough). Can we try to extract it out? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r975619535 ## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java: ## @@ -291,59 +284,51 @@ public void checkState() { } } - // - - // - // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here - // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` - // is complete - // - public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException; + /** + * Get column in record to support RDDCustomColumnsSortPartitioner + */ + public abstract Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled); - public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException; + /** + * Support bootstrap. + */ + public abstract HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException; Review Comment: I see what you're saying. This method is confusing though, since it's also called merge. Ideally, we shouldn't even have it in the interface (since it's not generic enough). Can we try to extract it out? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r975617746 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.HoodieInternalRowUtils; +import org.apache.hudi.commmon.model.HoodieSparkRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.spark.sql.HoodieCatalystExpressionUtils$; +import org.apache.spark.sql.HoodieUnsafeRowUtils; +import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +public class HoodieSparkRecordUtils { + + /** + * Utility method to convert InternalRow to HoodieRecord using schema and payload class. + */ + public static HoodieRecord convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField) { +return convertToHoodieSparkRecord(structType, data, +Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), +withOperationField, Option.empty()); + } + + public static HoodieRecord convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField, + Option partitionName) { +return convertToHoodieSparkRecord(structType, data, +Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), +withOperationField, partitionName); + } + + /** + * Utility method to convert bytes to HoodieRecord using schema and payload class. + */ + public static HoodieRecord convertToHoodieSparkRecord(StructType structType, InternalRow data, Pair recordKeyPartitionPathFieldPair, + boolean withOperationField, Option partitionName) { +final String recKey = getValue(structType, recordKeyPartitionPathFieldPair.getKey(), data).toString(); Review Comment: Interesting, this seems like a bug to me -- if `populateMetaFields` is false this won't work for non-SimpleKeyGen -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r975617095 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.HoodieInternalRowUtils; +import org.apache.hudi.commmon.model.HoodieSparkRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.spark.sql.HoodieCatalystExpressionUtils$; +import org.apache.spark.sql.HoodieUnsafeRowUtils; +import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +public class HoodieSparkRecordUtils { + + /** + * Utility method to convert InternalRow to HoodieRecord using schema and payload class. + */ + public static HoodieRecord convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField) { +return convertToHoodieSparkRecord(structType, data, +Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), +withOperationField, Option.empty()); + } + + public static HoodieRecord convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField, + Option partitionName) { +return convertToHoodieSparkRecord(structType, data, +Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), +withOperationField, partitionName); + } + + /** + * Utility method to convert bytes to HoodieRecord using schema and payload class. + */ + public static HoodieRecord convertToHoodieSparkRecord(StructType structType, InternalRow data, Pair recordKeyPartitionPathFieldPair, + boolean withOperationField, Option partitionName) { +final String recKey = getValue(structType, recordKeyPartitionPathFieldPair.getKey(), data).toString(); +final String partitionPath = (partitionName.isPresent() ? partitionName.get() : +getValue(structType, recordKeyPartitionPathFieldPair.getRight(), data).toString()); + +HoodieOperation operation = withOperationField +? HoodieOperation.fromName(getNullableValAsString(structType, data, HoodieRecord.OPERATION_METADATA_FIELD)) : null; +return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), data, structType, operation); + } + + private static Object getValue(StructType structType, String fieldName, InternalRow row) { +NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, fieldName); +return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList); + } + + /** + * Returns the string value of the given record {@code rec} and field {@code fieldName}. The field and value both could be missing. + * + * @param row The record + * @param fieldName The field name + * @return the string form of the field or empty if the schema does not contain the field name or the value is null + */ + private static Option getNullableValAsString(StructType structType, InternalRow row, String fieldName) { +String fieldVal = !HoodieCatalystExpressionUtils$.MODULE$.existField(structType, fieldName) +? null : StringUtils.objToString(getValue(structType, fieldName, row)); +return Option.ofNullable(fieldVal); + } + + /** + * Gets record column values into one object. + * + * @param row InternalRow. + * @param columns Names of the columns to get values. + * @param structType {@link StructType} instance. + * @return Column value if a single column, or concatenated String values by comma. + */ + public static Object getRecordColumnValues(InternalRow row, Review Comment: @wzx140 ideally we should, but we don't want to make this PR to fix everything either. Hence i suggest to only focus this for
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r974403023 ## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java: ## @@ -18,21 +18,28 @@ package org.apache.hudi.common.model; +import java.io.IOException; +import java.util.Properties; import org.apache.avro.Schema; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.keygen.BaseKeyGenerator; -import java.io.IOException; -import java.io.Serializable; -import java.util.Properties; +public interface HoodieRecordCompatibilityInterface { -/** - * HoodieMerge defines how to merge two records. It is a stateless component. - * It can implement the merging logic of HoodieRecord of different engines - * and avoid the performance consumption caused by the serialization/deserialization of Avro payload. - */ -public interface HoodieMerge extends Serializable { - - HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer); + /** + * This method used to extract HoodieKey not through keyGenerator. + */ + HoodieRecord wrapIntoHoodieRecordPayloadWithParams( Review Comment: 👍 ## hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java: ## @@ -147,19 +141,19 @@ public static HoodieMergedLogRecordScanner.Builder newBuilder() { } @Override - protected void processNextRecord(HoodieRecord hoodieRecord) throws IOException { + protected void processNextRecord(HoodieRecord hoodieRecord) throws IOException { String key = hoodieRecord.getRecordKey(); if (records.containsKey(key)) { // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be // done when a DELETE (empty payload) is encountered before or after an insert/update. - HoodieRecord oldRecord = records.get(key); - HoodieRecordPayload oldValue = oldRecord.getData(); - HoodieRecordPayload combinedValue = (HoodieRecordPayload) merge.preCombine(oldRecord, hoodieRecord).getData(); + HoodieRecord oldRecord = records.get(key); + T oldValue = oldRecord.getData(); + T combinedValue = ((HoodieRecord) recordMerger.merge(oldRecord, hoodieRecord, readerSchema, this.hoodieTableMetaClient.getTableConfig().getProps()).get()).getData(); // If combinedValue is oldValue, no need rePut oldRecord if (combinedValue != oldValue) { -HoodieOperation operation = hoodieRecord.getOperation(); -records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation)); +hoodieRecord.setData(combinedValue); Review Comment: Why are we resetting the data instead of using new `HoodieRecord` returned by the Merger? ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java: ## @@ -126,11 +129,17 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. " + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective"); - public static final ConfigProperty MERGE_CLASS_NAME = ConfigProperty - .key("hoodie.datasource.write.merge.class") - .defaultValue(HoodieAvroRecordMerge.class.getName()) - .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord " - + "types, such as Spark records or Flink records."); + public static final ConfigProperty MERGER_IMPLS = ConfigProperty + .key("hoodie.datasource.write.merger.impls") + .defaultValue(HoodieAvroRecordMerger.class.getName()) + .withDocumentation("List of HoodieMerger implementations constituting Hudi's merging strategy -- based on the engine used. " + + "These merger impls will filter by hoodie.datasource.write.merger.strategy " + + "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)"); + + public static final ConfigProperty MERGER_STRATEGY = ConfigProperty + .key("hoodie.datasource.write.merger.strategy") + .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID) Review Comment: Let's move this to HoodieMerger, rather than `StringUtils` (we can do it in a follow-up) ## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java: ## @@ -156,11 +155,10 @@ public class HoodieTableConfig extends HoodieConfig { .withDocumentation("Payload class to use for performing compactions, i.e merge delta logs with current base file and then " + " produce a new base file."); - public static final ConfigProperty MERGE_CLASS_NAME = ConfigProperty - .key("hoodie.compact
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r959827795 ## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java: ## @@ -290,59 +286,59 @@ public void checkState() { } } - // + /** + * Get column in record to support RDDCustomColumnsSortPartitioner + */ + public abstract Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled); - // - // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here - // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` - // is complete - // - public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException; + /** + * Support bootstrap. + */ + public abstract HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException; - public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException; + /** + * Rewrite record into new schema(add meta columns) + */ + public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException; /** - * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields. + * Support schema evolution. */ - public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException; + public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) throws IOException; - public abstract HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException; + public abstract HoodieRecord updateValues(Schema recordSchema, Properties props, Map metadataValues) throws IOException; Review Comment: Yeah, found it already. Let's annotate this method as temporary (we'd likely revisit it in the future) -- we should be wary exposing API that allows mutations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r959063675 ## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java: ## @@ -290,59 +286,59 @@ public void checkState() { } } - // + /** + * Get column in record to support RDDCustomColumnsSortPartitioner + */ + public abstract Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled); - // - // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here - // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` - // is complete - // - public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException; + /** + * Support bootstrap. + */ + public abstract HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException; - public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException; + /** + * Rewrite record into new schema(add meta columns) + */ + public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException; /** - * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields. + * Support schema evolution. */ - public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException; + public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) throws IOException; Review Comment: Let's create an override for this method to avoid providing empty-map in every call: ``` HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema) { rewriteRecordWithNewSchema(recordSchema. props, newSchema, Collections.emptyMap()); } ``` ## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java: ## @@ -290,59 +286,59 @@ public void checkState() { } } - // + /** + * Get column in record to support RDDCustomColumnsSortPartitioner + */ + public abstract Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled); - // - // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here - // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` - // is complete - // - public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException; + /** + * Support bootstrap. + */ + public abstract HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException; - public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException; + /** + * Rewrite record into new schema(add meta columns) + */ + public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException; /** - * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields. + * Support schema evolution. */ - public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException; + public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) throws IOException; - public abstract HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException; + public abstract HoodieRecord updateValues(Schema recordSchema, Properties props, Map metadataValues) throws IOException; Review Comment: Where are we using this one (PR is already too large for GH, so can't search in the PR itself)? Seems quite dangerous method to have. ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java: ## @@ -126,11 +129,11 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. " + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective"); - public static final ConfigProperty MERGE_CLASS_NAME = ConfigProperty - .key("hoodie.datasource.write.merge.class") - .defaultValue(HoodieAvroRecordMerge.clas
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r946077270 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -20,62 +20,33 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodiePayloadConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.util.Iterator; -import java.util.stream.StreamSupport; +import java.util.Properties; /** * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. */ -public class HoodieFileSliceReader implements Iterator> { +public class HoodieFileSliceReader implements Iterator> { + private final Iterator> recordsIterator; public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, - String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { + Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); + Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); while (baseIterator.hasNext()) { -GenericRecord record = (GenericRecord) baseIterator.next(); -HoodieRecord hoodieRecord = transform( -record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); -scanner.processNextRecord(hoodieRecord); +scanner.processNextRecord(baseIterator.next().getKeyWithParams(schema, props, Review Comment: @wzx140 let's sync up on Slack to speed up this conversation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r943808765 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -20,62 +20,33 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodiePayloadConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.util.Iterator; -import java.util.stream.StreamSupport; +import java.util.Properties; /** * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. */ -public class HoodieFileSliceReader implements Iterator> { +public class HoodieFileSliceReader implements Iterator> { + private final Iterator> recordsIterator; public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, - String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { + Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); + Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); while (baseIterator.hasNext()) { -GenericRecord record = (GenericRecord) baseIterator.next(); -HoodieRecord hoodieRecord = transform( -record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); -scanner.processNextRecord(hoodieRecord); +scanner.processNextRecord(baseIterator.next().getKeyWithParams(schema, props, Review Comment: > One thing needs to be added, in addition to HoodieKey, getKeyWithParams func also converts avro data to HoodieRecordPayload. That's exactly what i'm referring to: this method is very cryptic with an unclear contract and expectation what is being done by it in the end. > I think the HoodieRecord returned by FileReader should be the original data in file rather than the processed data. Because the callers of FileReader need to process the HoodieRecord differently or not need to process it. We do not need to converge the process logic to FileReader. Can you please elaborate what you're referring to as "processed data"? I'm still a little bit unclear what this "processing" means. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r942820533 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -20,62 +20,33 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodiePayloadConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.util.Iterator; -import java.util.stream.StreamSupport; +import java.util.Properties; /** * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. */ -public class HoodieFileSliceReader implements Iterator> { +public class HoodieFileSliceReader implements Iterator> { + private final Iterator> recordsIterator; public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, - String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { + Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); + Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); while (baseIterator.hasNext()) { -GenericRecord record = (GenericRecord) baseIterator.next(); -HoodieRecord hoodieRecord = transform( -record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); -scanner.processNextRecord(hoodieRecord); +scanner.processNextRecord(baseIterator.next().getKeyWithParams(schema, props, Review Comment: > For old records read from FileReader and passing to HoodieMergeHanle, we do not need to extract HoodieKey. HoodieMergeHanle use keyGeneratorOpt to extract recordKey directly. Sorry, not sure i fully understand your point. Can you please elaborate? But even if we don't need to extract they keys in all cases we can still solve it by configuring the `FileReader` instead of adding this method, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r938253077 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -20,62 +20,33 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodiePayloadConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.util.Iterator; -import java.util.stream.StreamSupport; +import java.util.Properties; /** * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. */ -public class HoodieFileSliceReader implements Iterator> { +public class HoodieFileSliceReader implements Iterator> { + private final Iterator> recordsIterator; public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, - String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { + Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); + Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); while (baseIterator.hasNext()) { -GenericRecord record = (GenericRecord) baseIterator.next(); -HoodieRecord hoodieRecord = transform( -record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); -scanner.processNextRecord(hoodieRecord); +scanner.processNextRecord(baseIterator.next().getKeyWithParams(schema, props, Review Comment: Suggested transition doesn't make sense: 1. Previously, file-reader was producing `GenericRecord`s hence we had `transform` method that was wrapping them into `HoodieRecord` 2. Now, we have file-reader instance that produces `HoodieRecord`, and so calling the method `getKeyWithParams` that returns another `HoodieRecord` doesn't really make sense. Can you please explain why do we need it and why can't we return proper record from the file-reader in the first place? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r938247328 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.HoodieInternalRowUtils; +import org.apache.hudi.commmon.model.HoodieSparkRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.keygen.RowKeyGeneratorHelper; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import java.util.List; + +import scala.Tuple2; + +public class HoodieSparkRecordUtils { + + /** + * Utility method to convert bytes to HoodieRecord using schema and payload class. + */ + public static HoodieRecord convertToHoodieSparkRecord(InternalRow data, StructType structType) { +return new HoodieSparkRecord(data, structType); + } + + /** + * Utility method to convert InternalRow to HoodieRecord using schema and payload class. + */ + public static HoodieRecord convertToHoodieSparkRecord(StructType structType, InternalRow data, String preCombineField, boolean withOperationField) { +return convertToHoodieSparkRecord(structType, data, preCombineField, +Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), +withOperationField, Option.empty()); + } + + public static HoodieRecord convertToHoodieSparkRecord(StructType structType, InternalRow data, String preCombineField, boolean withOperationField, + Option partitionName) { +return convertToHoodieSparkRecord(structType, data, preCombineField, +Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), +withOperationField, partitionName); + } + + /** + * Utility method to convert bytes to HoodieRecord using schema and payload class. + */ + public static HoodieRecord convertToHoodieSparkRecord(StructType structType, InternalRow data, String preCombineField, Pair recordKeyPartitionPathFieldPair, + boolean withOperationField, Option partitionName) { +final String recKey = getValue(structType, recordKeyPartitionPathFieldPair.getKey(), data).toString(); +final String partitionPath = (partitionName.isPresent() ? partitionName.get() : +getValue(structType, recordKeyPartitionPathFieldPair.getRight(), data).toString()); + +Object preCombineVal = getPreCombineVal(structType, data, preCombineField); Review Comment: @wzx140 let's pull back here a little bit: > If every SparkRecord has a schema, it will bring huge performance loss (shuffle schema). The current solution is that those spark records that need to be shuffled do not save the schema like record payload. Yeah, we already had that discussion on Slack and it was a fair call-out that i previously had as a hunch and didn't get a chance to explore more thoroughly, which i'll try to rectify now: See, i was coming from an angle that Hudi currently passes fully-deserialized payload (Avro) bearing native Java types across the board. Therefore transitioning to a state where we'd be able to instead pass a *non-serialized* payload will require non-trivial amount of effort, since assumptions that you can get any value from the record *w/o providing any schema* have been baked in pretty firmly in some areas (like the one you're referring to). In that sense, i was thinking that our v1 `HoodieRecord` impl should carry schema as well so that transition from "fully-deserialized" to "non-deserialized" records being passed around would be a smoother sail. We can approach it actually in a way that is similar to how Spark is currently handling it for... Avro actually: eve
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r938157656 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java: ## @@ -124,12 +126,17 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. " + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective"); - public static final ConfigProperty MERGE_CLASS_NAME = ConfigProperty + public static final ConfigProperty MERGER_CLASS_NAME = ConfigProperty Review Comment: We probably fine either way, but for simplicity i'd suggest we stick with our current approach of keeping it table-property: 1. It's easier for users to understand (existing behavior) 2. It stays part of the table's (immutable) configuration, avoiding the toll of needing to specify it explicitly with every write (could be error-prone) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r938154747 ## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java: ## @@ -30,9 +34,19 @@ * It can implement the merging logic of HoodieRecord of different engines * and avoid the performance consumption caused by the serialization/deserialization of Avro payload. */ -public interface HoodieMerge extends Serializable { - - HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer); +@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) +public interface HoodieRecordMerger extends Serializable { + + /** + * This method converges combineAndGetUpdateValue and precombine from HoodiePayload. + * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C + * of the single record, both orders of operations applications have to yield the same result) + */ + Option merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException; - Option combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException; + /** + * The record type handled by the current merger. + * SPARK, AVRO, FLINK + */ + HoodieRecordType getRecordType(); Review Comment: @wzx140 we should actually do it in a similar fashion to how `KeyGenerator` interface is currently implemented: - You have generic API (`KeyGenerator`) accepting Avro (this is a bare minimum to be implemented, is used as a fallback) - You have engine specific API (`SparkKeyGeneratorInterface`) which provides for engine-specific APIs that you should implement natively for better performance. So when Acme implements their own `RecordMerger` they can choose to implement either 1. A bare-minimum API (Avro), that would allow it to work across the engines but will lack performance 2. Or fully (Avro, Spark, etc) which will be performant across engines -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r938150723 ## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java: ## @@ -19,6 +19,10 @@ package org.apache.hudi.common.model; import org.apache.avro.Schema; Review Comment: We've discussed this prior that it would be gigantic rename across the board, and agreed that it's probably better to be carried out separately -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r938150358 ## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java: ## @@ -30,9 +34,19 @@ * It can implement the merging logic of HoodieRecord of different engines * and avoid the performance consumption caused by the serialization/deserialization of Avro payload. */ -public interface HoodieMerge extends Serializable { - - HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer); +@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) +public interface HoodieRecordMerger extends Serializable { + + /** + * This method converges combineAndGetUpdateValue and precombine from HoodiePayload. + * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C + * of the single record, both orders of operations applications have to yield the same result) + */ + Option merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException; Review Comment: @vinothchandar not sure i understand the original question? We will retain the same semantic w/ tombstone value of `HoodieRecord` being passed around for deletion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r929142300 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -21,64 +21,46 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.util.Iterator; +import java.util.Properties; import java.util.stream.StreamSupport; /** * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. */ public class HoodieFileSliceReader implements Iterator> { + private final Iterator> recordsIterator; public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, - String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { + Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); + Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); while (baseIterator.hasNext()) { -GenericRecord record = (GenericRecord) baseIterator.next(); -HoodieRecord hoodieRecord = transform( -record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); -scanner.processNextRecord(hoodieRecord); +scanner.processNextRecord(baseIterator.next().expansion(props, simpleKeyGenFieldsOpt, +scanner.isWithOperationField(), scanner.getPartitionName(), false)); } return new HoodieFileSliceReader(scanner.iterator()); } else { Iterable iterable = () -> scanner.iterator(); - HoodiePayloadConfig payloadConfig = HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build(); return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false) .map(e -> { try { - GenericRecord record = (GenericRecord) e.toIndexedRecord(schema, payloadConfig.getProps()).get(); - return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); + return e.expansion(props, simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionName(), false); Review Comment: Great! New APIs i think are much better option. Thank you very much! ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -21,64 +21,46 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.util.Iterator; +import java.util.Properties; import java.util.stream.StreamSupport; /** * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. */ public class HoodieFileSliceReader implements Iterator> { + private final Iterator> recordsIterator; public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, - String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { + Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); + Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); while (baseIterator.hasNext()) { -GenericRecord record = (GenericRecord) baseIterator.next(); -HoodieRecord hoodieRecord = transform( -record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); -scanner.processNextRecord(hoodieRecord); +scanner
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r928059133 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -21,64 +21,46 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.util.Iterator; +import java.util.Properties; import java.util.stream.StreamSupport; /** * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. */ public class HoodieFileSliceReader implements Iterator> { + private final Iterator> recordsIterator; public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, - String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { + Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); + Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); while (baseIterator.hasNext()) { -GenericRecord record = (GenericRecord) baseIterator.next(); -HoodieRecord hoodieRecord = transform( -record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); -scanner.processNextRecord(hoodieRecord); +scanner.processNextRecord(baseIterator.next().expansion(props, simpleKeyGenFieldsOpt, +scanner.isWithOperationField(), scanner.getPartitionName(), false)); } return new HoodieFileSliceReader(scanner.iterator()); } else { Iterable iterable = () -> scanner.iterator(); - HoodiePayloadConfig payloadConfig = HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build(); return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false) .map(e -> { try { - GenericRecord record = (GenericRecord) e.toIndexedRecord(schema, payloadConfig.getProps()).get(); - return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); + return e.expansion(props, simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionName(), false); Review Comment: Let's pull back a little bit here: i fully appreciate that might be missing operational context that you have acquired performing this refactoring and seeing these nuanced interconnections. Zooming out, my primary issue with this API is that when i'm looking at this method in the interface -- i can't understand what it does and can't understand why it even should be here (even knowing what it does) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r926262756 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java: ## @@ -21,64 +21,46 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.util.Iterator; +import java.util.Properties; import java.util.stream.StreamSupport; /** * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. */ public class HoodieFileSliceReader implements Iterator> { + private final Iterator> recordsIterator; public static HoodieFileSliceReader getFileSliceReader( - Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, - String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { + Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option> simpleKeyGenFieldsOpt) throws IOException { if (baseFileReader.isPresent()) { - Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); + Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); while (baseIterator.hasNext()) { -GenericRecord record = (GenericRecord) baseIterator.next(); -HoodieRecord hoodieRecord = transform( -record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); -scanner.processNextRecord(hoodieRecord); +scanner.processNextRecord(baseIterator.next().expansion(props, simpleKeyGenFieldsOpt, +scanner.isWithOperationField(), scanner.getPartitionName(), false)); } return new HoodieFileSliceReader(scanner.iterator()); } else { Iterable iterable = () -> scanner.iterator(); - HoodiePayloadConfig payloadConfig = HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build(); return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false) .map(e -> { try { - GenericRecord record = (GenericRecord) e.toIndexedRecord(schema, payloadConfig.getProps()).get(); - return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); + return e.expansion(props, simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionName(), false); Review Comment: I don't think we should carry over existing `transform` into `HoodieRecord.expansion` as is -- while `transform` method was appropriate (was wrapping Avro into HoodieRecord), `expansion` method by itself doesn't make much sense: we already iterate by `HoodieRecord`, why do we need to expand it? We should be able to appropriately initialize `HoodieRecord` (key, partition-path) when we instantiate it during iteration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r920465935 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java: ## @@ -54,42 +63,64 @@ private final SerializableSchema schema; private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy; private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType; + private final HoodieRecordType recordType; public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContext, -String[] orderByColumns, - HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy, - HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType, -Schema schema) { + String[] orderByColumns, + LayoutOptimizationStrategy layoutOptStrategy, + SpatialCurveCompositionStrategyType curveCompositionStrategyType, + Schema schema, HoodieRecordType recordType) { this.sparkEngineContext = sparkEngineContext; this.orderByColumns = orderByColumns; this.layoutOptStrategy = layoutOptStrategy; this.curveCompositionStrategyType = curveCompositionStrategyType; this.schema = new SerializableSchema(schema); +this.recordType = recordType; } @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { -JavaRDD genericRecordsRDD = -records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new Properties()).get()); - -Dataset sourceDataset = -AvroConversionUtils.createDataFrame( -genericRecordsRDD.rdd(), -schema.toString(), -sparkEngineContext.getSqlContext().sparkSession() -); - -Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions); - -return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty()) -.toJavaRDD() -.map(record -> { - String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); - String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); - HoodieKey hoodieKey = new HoodieKey(key, partition); - HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new RewriteAvroPayload(record)); - return hoodieRecord; -}); +if (recordType == HoodieRecordType.AVRO) { + JavaRDD genericRecordsRDD = + records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new Properties()).get()); + + Dataset sourceDataset = + AvroConversionUtils.createDataFrame( + genericRecordsRDD.rdd(), + schema.toString(), + sparkEngineContext.getSqlContext().sparkSession() + ); + + Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions); + + return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty()) + .toJavaRDD() + .map(record -> { +String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); +String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); +HoodieKey hoodieKey = new HoodieKey(key, partition); +HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new RewriteAvroPayload(record)); +return hoodieRecord; + }); +} else if (recordType == HoodieRecordType.SPARK) { + StructType structType = HoodieInternalRowUtils.getCachedSchema(schema.get()); + Dataset sourceDataset = SparkConversionUtils.createDataFrame(records.rdd(), sparkEngineContext.getSqlContext().sparkSession(), structType); + + Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions); + + return sortedDataset.queryExecution().toRdd() + .toJavaRDD() + .map(row -> { +InternalRow internalRow = row.copy(); Review Comment: @minihippo let's add a comment explaining why they copy is being made here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r920463727 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java: ## @@ -54,42 +63,64 @@ private final SerializableSchema schema; private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy; private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType; + private final HoodieRecordType recordType; public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContext, -String[] orderByColumns, - HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy, - HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType, -Schema schema) { + String[] orderByColumns, + LayoutOptimizationStrategy layoutOptStrategy, + SpatialCurveCompositionStrategyType curveCompositionStrategyType, + Schema schema, HoodieRecordType recordType) { this.sparkEngineContext = sparkEngineContext; this.orderByColumns = orderByColumns; this.layoutOptStrategy = layoutOptStrategy; this.curveCompositionStrategyType = curveCompositionStrategyType; this.schema = new SerializableSchema(schema); +this.recordType = recordType; } @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { -JavaRDD genericRecordsRDD = -records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new Properties()).get()); - -Dataset sourceDataset = -AvroConversionUtils.createDataFrame( -genericRecordsRDD.rdd(), -schema.toString(), -sparkEngineContext.getSqlContext().sparkSession() -); - -Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions); - -return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty()) -.toJavaRDD() -.map(record -> { - String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); - String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); - HoodieKey hoodieKey = new HoodieKey(key, partition); - HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new RewriteAvroPayload(record)); - return hoodieRecord; -}); +if (recordType == HoodieRecordType.AVRO) { + JavaRDD genericRecordsRDD = + records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new Properties()).get()); + + Dataset sourceDataset = + AvroConversionUtils.createDataFrame( + genericRecordsRDD.rdd(), + schema.toString(), + sparkEngineContext.getSqlContext().sparkSession() + ); + + Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions); + + return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty()) + .toJavaRDD() + .map(record -> { +String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); +String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); +HoodieKey hoodieKey = new HoodieKey(key, partition); +HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new RewriteAvroPayload(record)); +return hoodieRecord; + }); +} else if (recordType == HoodieRecordType.SPARK) { + StructType structType = HoodieInternalRowUtils.getCachedSchema(schema.get()); + Dataset sourceDataset = SparkConversionUtils.createDataFrame(records.rdd(), sparkEngineContext.getSqlContext().sparkSession(), structType); + + Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions); + + return sortedDataset.queryExecution().toRdd() + .toJavaRDD() + .map(row -> { +InternalRow internalRow = row.copy(); Review Comment: @xushiyan in short: you can't hold the reference to `InternalRow` outside the closure where you have access to it, since underlying Spark uses mutable buffer to reduce # of allocations to a minimum (in other words you always get the same IR object that gets reset every time iterator moves) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.
alexeykudinkin commented on code in PR #5629: URL: https://github.com/apache/hudi/pull/5629#discussion_r920369408 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java: ## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.commmon.model; + +import org.apache.hudi.HoodieInternalRowUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.keygen.SparkKeyGeneratorInterface; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; +import org.apache.hudi.util.HoodieSparkRecordUtils; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.spark.sql.catalyst.CatalystTypeConverters; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import scala.Tuple2; + +import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS; +import static org.apache.spark.sql.types.DataTypes.BooleanType; +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** + * Spark Engine-specific Implementations of `HoodieRecord`. + */ +public class HoodieSparkRecord extends HoodieRecord { Review Comment: We should make `HoodieRecord` a non-generic interface just providing the APIs and move base implementation into `HoodieBaseRecord` so that all the code relying on `HoodieRecord` is not exposed to internal implementation details ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java: ## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.commmon.model; + +import org.apache.hudi.HoodieInternalRowUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.keygen.SparkKeyGeneratorInterface; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; +import org.apache.hudi.util.HoodieSparkRecordUtils; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.spark.sql.catalyst.CatalystTypeConverters; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import scala.Tuple2; + +import