[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-09-21 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r977042462


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##
@@ -291,59 +284,51 @@ public void checkState() {
 }
   }
 
-  
//
-
-  //
-  // NOTE: This method duplicates those ones of the HoodieRecordPayload and 
are placed here
-  //   for the duration of RFC-46 implementation, until migration off 
`HoodieRecordPayload`
-  //   is complete
-  //
-  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
readerSchema, Schema writerSchema) throws IOException;
+  /**
+   * Get column in record to support RDDCustomColumnsSortPartitioner
+   */
+  public abstract Object getRecordColumnValues(Schema recordSchema, String[] 
columns, boolean consistentLogicalTimestampEnabled);
 
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema 
targetSchema, TypedProperties props) throws IOException;
+  /**
+   * Support bootstrap.
+   */
+  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
targetSchema) throws IOException;

Review Comment:
   Understood. Let's keep it for now, but just rename it to `joinWith` to avoid 
confusion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-09-21 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r977041457


##
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.twitter.chill.KSerializer
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import org.apache.avro.SchemaNormalization
+import org.apache.commons.io.IOUtils
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkEnv, SparkException}
+import scala.collection.mutable
+
+/**
+ * Custom serializer used for generic spark records. If the user registers the 
schemas
+ * ahead of time, then the schema's fingerprint will be sent with each message 
instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally expensive 
so the serializer
+ * caches all previously seen values as to reduce the amount of work needed to 
do.
+ * @param schemas a map where the keys are unique IDs for spark schemas and 
the values are the
+ *string representation of the Avro schema, used to decrease 
the amount of data
+ *that needs to be serialized.
+ */
+class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends 
KSerializer[HoodieSparkRecord] {

Review Comment:
   https://hudi.apache.org/docs/quick-start-guide



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-09-21 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r977040996


##
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.twitter.chill.KSerializer
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import org.apache.avro.SchemaNormalization
+import org.apache.commons.io.IOUtils
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkEnv, SparkException}
+import scala.collection.mutable
+
+/**
+ * Custom serializer used for generic spark records. If the user registers the 
schemas
+ * ahead of time, then the schema's fingerprint will be sent with each message 
instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally expensive 
so the serializer
+ * caches all previously seen values as to reduce the amount of work needed to 
do.
+ * @param schemas a map where the keys are unique IDs for spark schemas and 
the values are the
+ *string representation of the Avro schema, used to decrease 
the amount of data
+ *that needs to be serialized.
+ */
+class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends 
KSerializer[HoodieSparkRecord] {

Review Comment:
   Sorry, my bad i wasn't clear enough -- we will have to
   
   - Implement Registrar to make sure it does register our custom serializer
   - Make sure we update the docs to include it (and make sure to highlight it 
in the change-log), similarly to how we recommend including `spark.serializer` 
config



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-09-20 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r975768738


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##
@@ -505,7 +514,9 @@ public class HoodieWriteConfig extends HoodieConfig {
   private HoodieMetadataConfig metadataConfig;
   private HoodieMetastoreConfig metastoreConfig;
   private HoodieCommonConfig commonConfig;
+  private HoodieStorageConfig storageConfig;
   private EngineType engineType;
+  private HoodieRecordMerger recordMerger;

Review Comment:
   Let's avoid premature optimizations -- `RecordMerger` is specifically 
designed as stateless component and as such is lightweight enough to be 
initialized on-demand. Only thing we need to take care of is just making sure 
we don't init it for every record.
   
   The problem with caching it inside `HoodieWriteConfig` is that 
HoodieWriteConfig has nothing to do w/ RecordMerger and we should not open this 
can of warm when it's used as a state carriage for other components.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-09-20 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r975734758


##
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.twitter.chill.KSerializer
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import org.apache.avro.SchemaNormalization
+import org.apache.commons.io.IOUtils
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkEnv, SparkException}
+import scala.collection.mutable
+
+/**
+ * Custom serializer used for generic spark records. If the user registers the 
schemas
+ * ahead of time, then the schema's fingerprint will be sent with each message 
instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally expensive 
so the serializer
+ * caches all previously seen values as to reduce the amount of work needed to 
do.
+ * @param schemas a map where the keys are unique IDs for spark schemas and 
the values are the
+ *string representation of the Avro schema, used to decrease 
the amount of data
+ *that needs to be serialized.
+ */
+class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends 
KSerializer[HoodieSparkRecord] {

Review Comment:
   We can't since we don't control Kryo lifecycle, and i don't think that we 
should -- Kryo injected into Spark's `SerializerManager` could be its own thing 
and we cache the schemas for our needs separately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-09-20 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r975619535


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##
@@ -291,59 +284,51 @@ public void checkState() {
 }
   }
 
-  
//
-
-  //
-  // NOTE: This method duplicates those ones of the HoodieRecordPayload and 
are placed here
-  //   for the duration of RFC-46 implementation, until migration off 
`HoodieRecordPayload`
-  //   is complete
-  //
-  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
readerSchema, Schema writerSchema) throws IOException;
+  /**
+   * Get column in record to support RDDCustomColumnsSortPartitioner
+   */
+  public abstract Object getRecordColumnValues(Schema recordSchema, String[] 
columns, boolean consistentLogicalTimestampEnabled);
 
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema 
targetSchema, TypedProperties props) throws IOException;
+  /**
+   * Support bootstrap.
+   */
+  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
targetSchema) throws IOException;

Review Comment:
   I see what you're saying. This method is confusing though, since it's also 
called merge (and i think some impls are actually using RecordMerger)
   
   Ideally, we shouldn't even have it in the interface (since it's not generic 
enough). Can we try to extract it out? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-09-20 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r975619535


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##
@@ -291,59 +284,51 @@ public void checkState() {
 }
   }
 
-  
//
-
-  //
-  // NOTE: This method duplicates those ones of the HoodieRecordPayload and 
are placed here
-  //   for the duration of RFC-46 implementation, until migration off 
`HoodieRecordPayload`
-  //   is complete
-  //
-  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
readerSchema, Schema writerSchema) throws IOException;
+  /**
+   * Get column in record to support RDDCustomColumnsSortPartitioner
+   */
+  public abstract Object getRecordColumnValues(Schema recordSchema, String[] 
columns, boolean consistentLogicalTimestampEnabled);
 
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema 
targetSchema, TypedProperties props) throws IOException;
+  /**
+   * Support bootstrap.
+   */
+  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
targetSchema) throws IOException;

Review Comment:
   I see what you're saying. This method is confusing though, since it's also 
called merge. 
   
   Ideally, we shouldn't even have it in the interface (since it's not generic 
enough). Can we try to extract it out? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-09-20 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r975617746


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+public class HoodieSparkRecordUtils {
+
+  /**
+   * Utility method to convert InternalRow to HoodieRecord using schema and 
payload class.
+   */
+  public static HoodieRecord 
convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean 
withOperationField) {
+return convertToHoodieSparkRecord(structType, data,
+Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+withOperationField, Option.empty());
+  }
+
+  public static HoodieRecord 
convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean 
withOperationField,
+  Option partitionName) {
+return convertToHoodieSparkRecord(structType, data,
+Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+withOperationField, partitionName);
+  }
+
+  /**
+   * Utility method to convert bytes to HoodieRecord using schema and payload 
class.
+   */
+  public static HoodieRecord 
convertToHoodieSparkRecord(StructType structType, InternalRow data, 
Pair recordKeyPartitionPathFieldPair,
+  boolean withOperationField, Option partitionName) {
+final String recKey = getValue(structType, 
recordKeyPartitionPathFieldPair.getKey(), data).toString();

Review Comment:
   Interesting, this seems like a bug to me -- if `populateMetaFields` is false 
this won't work for non-SimpleKeyGen



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-09-20 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r975617095


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+public class HoodieSparkRecordUtils {
+
+  /**
+   * Utility method to convert InternalRow to HoodieRecord using schema and 
payload class.
+   */
+  public static HoodieRecord 
convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean 
withOperationField) {
+return convertToHoodieSparkRecord(structType, data,
+Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+withOperationField, Option.empty());
+  }
+
+  public static HoodieRecord 
convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean 
withOperationField,
+  Option partitionName) {
+return convertToHoodieSparkRecord(structType, data,
+Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+withOperationField, partitionName);
+  }
+
+  /**
+   * Utility method to convert bytes to HoodieRecord using schema and payload 
class.
+   */
+  public static HoodieRecord 
convertToHoodieSparkRecord(StructType structType, InternalRow data, 
Pair recordKeyPartitionPathFieldPair,
+  boolean withOperationField, Option partitionName) {
+final String recKey = getValue(structType, 
recordKeyPartitionPathFieldPair.getKey(), data).toString();
+final String partitionPath = (partitionName.isPresent() ? 
partitionName.get() :
+getValue(structType, recordKeyPartitionPathFieldPair.getRight(), 
data).toString());
+
+HoodieOperation operation = withOperationField
+? HoodieOperation.fromName(getNullableValAsString(structType, data, 
HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), data, 
structType, operation);
+  }
+
+  private static Object getValue(StructType structType, String fieldName, 
InternalRow row) {
+NestedFieldPath posList = 
HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
+return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
+  }
+
+  /**
+   * Returns the string value of the given record {@code rec} and field {@code 
fieldName}. The field and value both could be missing.
+   *
+   * @param row   The record
+   * @param fieldName The field name
+   * @return the string form of the field or empty if the schema does not 
contain the field name or the value is null
+   */
+  private static Option getNullableValAsString(StructType structType, 
InternalRow row, String fieldName) {
+String fieldVal = 
!HoodieCatalystExpressionUtils$.MODULE$.existField(structType, fieldName)
+? null : StringUtils.objToString(getValue(structType, fieldName, row));
+return Option.ofNullable(fieldVal);
+  }
+
+  /**
+   * Gets record column values into one object.
+   *
+   * @param row  InternalRow.
+   * @param columns Names of the columns to get values.
+   * @param structType  {@link StructType} instance.
+   * @return Column value if a single column, or concatenated String values by 
comma.
+   */
+  public static Object getRecordColumnValues(InternalRow row,

Review Comment:
   @wzx140 ideally we should, but we don't want to make this PR to fix 
everything either. Hence i suggest to only focus this for

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-09-19 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r974403023


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java:
##
@@ -18,21 +18,28 @@
 
 package org.apache.hudi.common.model;
 
+import java.io.IOException;
+import java.util.Properties;
 import org.apache.avro.Schema;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.BaseKeyGenerator;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Properties;
+public interface HoodieRecordCompatibilityInterface {
 
-/**
- * HoodieMerge defines how to merge two records. It is a stateless component.
- * It can implement the merging logic of HoodieRecord of different engines
- * and avoid the performance consumption caused by the 
serialization/deserialization of Avro payload.
- */
-public interface HoodieMerge extends Serializable {
-  
-  HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
+  /**
+   * This method used to extract HoodieKey not through keyGenerator.
+   */
+  HoodieRecord wrapIntoHoodieRecordPayloadWithParams(

Review Comment:
   👍 



##
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##
@@ -147,19 +141,19 @@ public static HoodieMergedLogRecordScanner.Builder 
newBuilder() {
   }
 
   @Override
-  protected void processNextRecord(HoodieRecord hoodieRecord) throws 
IOException {
+  protected  void processNextRecord(HoodieRecord hoodieRecord) throws 
IOException {
 String key = hoodieRecord.getRecordKey();
 if (records.containsKey(key)) {
   // Merge and store the merged record. The HoodieRecordPayload 
implementation is free to decide what should be
   // done when a DELETE (empty payload) is encountered before or after an 
insert/update.
 
-  HoodieRecord oldRecord = records.get(key);
-  HoodieRecordPayload oldValue = oldRecord.getData();
-  HoodieRecordPayload combinedValue = (HoodieRecordPayload) 
merge.preCombine(oldRecord, hoodieRecord).getData();
+  HoodieRecord oldRecord = records.get(key);
+  T oldValue = oldRecord.getData();
+  T combinedValue = ((HoodieRecord) recordMerger.merge(oldRecord, 
hoodieRecord, readerSchema, 
this.hoodieTableMetaClient.getTableConfig().getProps()).get()).getData();
   // If combinedValue is oldValue, no need rePut oldRecord
   if (combinedValue != oldValue) {
-HoodieOperation operation = hoodieRecord.getOperation();
-records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, 
hoodieRecord.getPartitionPath()), combinedValue, operation));
+hoodieRecord.setData(combinedValue);

Review Comment:
   Why are we resetting the data instead of using new `HoodieRecord` returned 
by the Merger?



##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##
@@ -126,11 +129,17 @@ public class HoodieWriteConfig extends HoodieConfig {
   .withDocumentation("Payload class used. Override this, if you like to 
roll your own merge logic, when upserting/inserting. "
   + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL 
in-effective");
 
-  public static final ConfigProperty MERGE_CLASS_NAME = ConfigProperty
-  .key("hoodie.datasource.write.merge.class")
-  .defaultValue(HoodieAvroRecordMerge.class.getName())
-  .withDocumentation("Merge class provide stateless component interface 
for merging records, and support various HoodieRecord "
-  + "types, such as Spark records or Flink records.");
+  public static final ConfigProperty MERGER_IMPLS = ConfigProperty
+  .key("hoodie.datasource.write.merger.impls")
+  .defaultValue(HoodieAvroRecordMerger.class.getName())
+  .withDocumentation("List of HoodieMerger implementations constituting 
Hudi's merging strategy -- based on the engine used. "
+  + "These merger impls will filter by 
hoodie.datasource.write.merger.strategy "
+  + "Hudi will pick most efficient implementation to perform 
merging/combining of the records (during update, reading MOR table, etc)");
+
+  public static final ConfigProperty MERGER_STRATEGY = ConfigProperty
+  .key("hoodie.datasource.write.merger.strategy")
+  .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)

Review Comment:
   Let's move this to HoodieMerger, rather than `StringUtils` (we can do it in 
a follow-up)



##
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##
@@ -156,11 +155,10 @@ public class HoodieTableConfig extends HoodieConfig {
   .withDocumentation("Payload class to use for performing compactions, i.e 
merge delta logs with current base file and then "
   + " produce a new base file.");
 
-  public static final ConfigProperty MERGE_CLASS_NAME = ConfigProperty
-  .key("hoodie.compact

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-08-31 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r959827795


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##
@@ -290,59 +286,59 @@ public void checkState() {
 }
   }
 
-  
//
+  /**
+   * Get column in record to support RDDCustomColumnsSortPartitioner
+   */
+  public abstract Object getRecordColumnValues(Schema recordSchema, String[] 
columns, boolean consistentLogicalTimestampEnabled);
 
-  //
-  // NOTE: This method duplicates those ones of the HoodieRecordPayload and 
are placed here
-  //   for the duration of RFC-46 implementation, until migration off 
`HoodieRecordPayload`
-  //   is complete
-  //
-  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
readerSchema, Schema writerSchema) throws IOException;
+  /**
+   * Support bootstrap.
+   */
+  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
targetSchema) throws IOException;
 
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema 
targetSchema, TypedProperties props) throws IOException;
+  /**
+   * Rewrite record into new schema(add meta columns)
+   */
+  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties 
props, Schema targetSchema) throws IOException;
 
   /**
-   * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata 
fields.
+   * Support schema evolution.
*/
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties 
prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws 
IOException;
+  public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, 
Properties props, Schema newSchema, Map renameCols) throws 
IOException;
 
-  public abstract HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, 
Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, 
String fileName) throws IOException;
+  public abstract HoodieRecord updateValues(Schema recordSchema, Properties 
props, Map metadataValues) throws IOException;

Review Comment:
   Yeah, found it already. Let's annotate this method as temporary (we'd likely 
revisit it in the future) -- we should be wary exposing API that allows 
mutations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-08-30 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r959063675


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##
@@ -290,59 +286,59 @@ public void checkState() {
 }
   }
 
-  
//
+  /**
+   * Get column in record to support RDDCustomColumnsSortPartitioner
+   */
+  public abstract Object getRecordColumnValues(Schema recordSchema, String[] 
columns, boolean consistentLogicalTimestampEnabled);
 
-  //
-  // NOTE: This method duplicates those ones of the HoodieRecordPayload and 
are placed here
-  //   for the duration of RFC-46 implementation, until migration off 
`HoodieRecordPayload`
-  //   is complete
-  //
-  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
readerSchema, Schema writerSchema) throws IOException;
+  /**
+   * Support bootstrap.
+   */
+  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
targetSchema) throws IOException;
 
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema 
targetSchema, TypedProperties props) throws IOException;
+  /**
+   * Rewrite record into new schema(add meta columns)
+   */
+  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties 
props, Schema targetSchema) throws IOException;
 
   /**
-   * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata 
fields.
+   * Support schema evolution.
*/
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties 
prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws 
IOException;
+  public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, 
Properties props, Schema newSchema, Map renameCols) throws 
IOException;

Review Comment:
   Let's create an override for this method to avoid providing empty-map in 
every call: 
   ```
   HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties 
props, Schema newSchema) {
 rewriteRecordWithNewSchema(recordSchema. props, newSchema, 
Collections.emptyMap());
   }
   ```



##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##
@@ -290,59 +286,59 @@ public void checkState() {
 }
   }
 
-  
//
+  /**
+   * Get column in record to support RDDCustomColumnsSortPartitioner
+   */
+  public abstract Object getRecordColumnValues(Schema recordSchema, String[] 
columns, boolean consistentLogicalTimestampEnabled);
 
-  //
-  // NOTE: This method duplicates those ones of the HoodieRecordPayload and 
are placed here
-  //   for the duration of RFC-46 implementation, until migration off 
`HoodieRecordPayload`
-  //   is complete
-  //
-  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
readerSchema, Schema writerSchema) throws IOException;
+  /**
+   * Support bootstrap.
+   */
+  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
targetSchema) throws IOException;
 
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema 
targetSchema, TypedProperties props) throws IOException;
+  /**
+   * Rewrite record into new schema(add meta columns)
+   */
+  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties 
props, Schema targetSchema) throws IOException;
 
   /**
-   * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata 
fields.
+   * Support schema evolution.
*/
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties 
prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws 
IOException;
+  public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, 
Properties props, Schema newSchema, Map renameCols) throws 
IOException;
 
-  public abstract HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, 
Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, 
String fileName) throws IOException;
+  public abstract HoodieRecord updateValues(Schema recordSchema, Properties 
props, Map metadataValues) throws IOException;

Review Comment:
   Where are we using this one (PR is already too large for GH, so can't search 
in the PR itself)? Seems quite dangerous method to have.



##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##
@@ -126,11 +129,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   .withDocumentation("Payload class used. Override this, if you like to 
roll your own merge logic, when upserting/inserting. "
   + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL 
in-effective");
 
-  public static final ConfigProperty MERGE_CLASS_NAME = ConfigProperty
-  .key("hoodie.datasource.write.merge.class")
-  .defaultValue(HoodieAvroRecordMerge.clas

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-08-15 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r946077270


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##
@@ -20,62 +20,33 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodiePayloadConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.stream.StreamSupport;
+import java.util.Properties;
 
 /**
  * Reads records from base file and merges any updates from log files and 
provides iterable over all records in the file slice.
  */
-public class HoodieFileSliceReader implements 
Iterator> {
+public class HoodieFileSliceReader implements Iterator> {
+
   private final Iterator> recordsIterator;
 
   public static HoodieFileSliceReader getFileSliceReader(
-  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, String payloadClass,
-  String preCombineField, Option> 
simpleKeyGenFieldsOpt) throws IOException {
+  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, Properties props, Option> 
simpleKeyGenFieldsOpt) throws IOException {
 if (baseFileReader.isPresent()) {
-  Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
+  Iterator baseIterator = 
baseFileReader.get().getRecordIterator(schema);
   while (baseIterator.hasNext()) {
-GenericRecord record = (GenericRecord) baseIterator.next();
-HoodieRecord hoodieRecord = transform(
-record, scanner, payloadClass, preCombineField, 
simpleKeyGenFieldsOpt);
-scanner.processNextRecord(hoodieRecord);
+scanner.processNextRecord(baseIterator.next().getKeyWithParams(schema, 
props,

Review Comment:
   @wzx140 let's sync up on Slack to speed up this conversation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-08-11 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r943808765


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##
@@ -20,62 +20,33 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodiePayloadConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.stream.StreamSupport;
+import java.util.Properties;
 
 /**
  * Reads records from base file and merges any updates from log files and 
provides iterable over all records in the file slice.
  */
-public class HoodieFileSliceReader implements 
Iterator> {
+public class HoodieFileSliceReader implements Iterator> {
+
   private final Iterator> recordsIterator;
 
   public static HoodieFileSliceReader getFileSliceReader(
-  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, String payloadClass,
-  String preCombineField, Option> 
simpleKeyGenFieldsOpt) throws IOException {
+  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, Properties props, Option> 
simpleKeyGenFieldsOpt) throws IOException {
 if (baseFileReader.isPresent()) {
-  Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
+  Iterator baseIterator = 
baseFileReader.get().getRecordIterator(schema);
   while (baseIterator.hasNext()) {
-GenericRecord record = (GenericRecord) baseIterator.next();
-HoodieRecord hoodieRecord = transform(
-record, scanner, payloadClass, preCombineField, 
simpleKeyGenFieldsOpt);
-scanner.processNextRecord(hoodieRecord);
+scanner.processNextRecord(baseIterator.next().getKeyWithParams(schema, 
props,

Review Comment:
   > One thing needs to be added, in addition to HoodieKey, getKeyWithParams 
func also converts avro data to HoodieRecordPayload.
   
   That's exactly what i'm referring to: this method is very cryptic with an 
unclear contract and expectation what is being done by it in the end.
   
   > I think the HoodieRecord returned by FileReader should be the original 
data in file rather than the processed data. Because the callers of FileReader 
need to process the HoodieRecord differently or not need to process it. We do 
not need to converge the process logic to FileReader.
   
   Can you please elaborate what you're referring to as "processed data"? I'm 
still a little bit unclear what this "processing" means.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-08-10 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r942820533


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##
@@ -20,62 +20,33 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodiePayloadConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.stream.StreamSupport;
+import java.util.Properties;
 
 /**
  * Reads records from base file and merges any updates from log files and 
provides iterable over all records in the file slice.
  */
-public class HoodieFileSliceReader implements 
Iterator> {
+public class HoodieFileSliceReader implements Iterator> {
+
   private final Iterator> recordsIterator;
 
   public static HoodieFileSliceReader getFileSliceReader(
-  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, String payloadClass,
-  String preCombineField, Option> 
simpleKeyGenFieldsOpt) throws IOException {
+  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, Properties props, Option> 
simpleKeyGenFieldsOpt) throws IOException {
 if (baseFileReader.isPresent()) {
-  Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
+  Iterator baseIterator = 
baseFileReader.get().getRecordIterator(schema);
   while (baseIterator.hasNext()) {
-GenericRecord record = (GenericRecord) baseIterator.next();
-HoodieRecord hoodieRecord = transform(
-record, scanner, payloadClass, preCombineField, 
simpleKeyGenFieldsOpt);
-scanner.processNextRecord(hoodieRecord);
+scanner.processNextRecord(baseIterator.next().getKeyWithParams(schema, 
props,

Review Comment:
   > For old records read from FileReader and passing to HoodieMergeHanle, we 
do not need to extract HoodieKey.
   HoodieMergeHanle use keyGeneratorOpt to extract recordKey directly.
   
   Sorry, not sure i fully understand your point. Can you please elaborate?
   
   But even if we don't need to extract they keys in all cases we can still 
solve it by configuring the `FileReader` instead of adding this method, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-08-04 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r938253077


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##
@@ -20,62 +20,33 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodiePayloadConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.stream.StreamSupport;
+import java.util.Properties;
 
 /**
  * Reads records from base file and merges any updates from log files and 
provides iterable over all records in the file slice.
  */
-public class HoodieFileSliceReader implements 
Iterator> {
+public class HoodieFileSliceReader implements Iterator> {
+
   private final Iterator> recordsIterator;
 
   public static HoodieFileSliceReader getFileSliceReader(
-  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, String payloadClass,
-  String preCombineField, Option> 
simpleKeyGenFieldsOpt) throws IOException {
+  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, Properties props, Option> 
simpleKeyGenFieldsOpt) throws IOException {
 if (baseFileReader.isPresent()) {
-  Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
+  Iterator baseIterator = 
baseFileReader.get().getRecordIterator(schema);
   while (baseIterator.hasNext()) {
-GenericRecord record = (GenericRecord) baseIterator.next();
-HoodieRecord hoodieRecord = transform(
-record, scanner, payloadClass, preCombineField, 
simpleKeyGenFieldsOpt);
-scanner.processNextRecord(hoodieRecord);
+scanner.processNextRecord(baseIterator.next().getKeyWithParams(schema, 
props,

Review Comment:
   Suggested transition doesn't make sense: 
   
   1. Previously, file-reader was producing `GenericRecord`s hence we had 
`transform` method that was wrapping them into `HoodieRecord`
   2. Now, we have file-reader instance that produces `HoodieRecord`, and so 
calling the method `getKeyWithParams` that returns another `HoodieRecord` 
doesn't really make sense.
   
   Can you please explain why do we need it and why can't we return proper 
record from the file-reader in the first place? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-08-04 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r938247328


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.RowKeyGeneratorHelper;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+
+import scala.Tuple2;
+
+public class HoodieSparkRecordUtils {
+
+  /**
+   * Utility method to convert bytes to HoodieRecord using schema and payload 
class.
+   */
+  public static HoodieRecord 
convertToHoodieSparkRecord(InternalRow data, StructType structType) {
+return new HoodieSparkRecord(data, structType);
+  }
+
+  /**
+   * Utility method to convert InternalRow to HoodieRecord using schema and 
payload class.
+   */
+  public static HoodieRecord 
convertToHoodieSparkRecord(StructType structType, InternalRow data, String 
preCombineField, boolean withOperationField) {
+return convertToHoodieSparkRecord(structType, data, preCombineField,
+Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+withOperationField, Option.empty());
+  }
+
+  public static HoodieRecord 
convertToHoodieSparkRecord(StructType structType, InternalRow data, String 
preCombineField, boolean withOperationField,
+  Option partitionName) {
+return convertToHoodieSparkRecord(structType, data, preCombineField,
+Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+withOperationField, partitionName);
+  }
+
+  /**
+   * Utility method to convert bytes to HoodieRecord using schema and payload 
class.
+   */
+  public static HoodieRecord 
convertToHoodieSparkRecord(StructType structType, InternalRow data, String 
preCombineField, Pair recordKeyPartitionPathFieldPair,
+  boolean withOperationField, Option partitionName) {
+final String recKey = getValue(structType, 
recordKeyPartitionPathFieldPair.getKey(), data).toString();
+final String partitionPath = (partitionName.isPresent() ? 
partitionName.get() :
+getValue(structType, recordKeyPartitionPathFieldPair.getRight(), 
data).toString());
+
+Object preCombineVal = getPreCombineVal(structType, data, preCombineField);

Review Comment:
   @wzx140 let's pull back here a little bit:
   
   > If every SparkRecord has a schema, it will bring huge performance loss 
(shuffle schema). The current solution is that those spark records that need to 
be shuffled do not save the schema like record payload.
   
   Yeah, we already had that discussion on Slack and it was a fair call-out 
that i previously had as a hunch and didn't get a chance to explore more 
thoroughly, which i'll try to rectify now: 
   
   See, i was coming from an angle that Hudi currently passes 
fully-deserialized payload (Avro) bearing native Java types across the board. 
Therefore transitioning to a state where we'd be able to instead pass a 
*non-serialized* payload will require non-trivial amount of effort, since 
assumptions that you can get any value from the record *w/o providing any 
schema* have been baked in pretty firmly in some areas (like the one you're 
referring to).
   
   In that sense, i was thinking that our v1 `HoodieRecord` impl should carry 
schema as well so that transition from "fully-deserialized" to 
"non-deserialized" records being passed around would be a smoother sail.
   
   We can approach it actually in a way that is similar to how Spark is 
currently handling it for... Avro actually: eve

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-08-04 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r938157656


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##
@@ -124,12 +126,17 @@ public class HoodieWriteConfig extends HoodieConfig {
   .withDocumentation("Payload class used. Override this, if you like to 
roll your own merge logic, when upserting/inserting. "
   + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL 
in-effective");
 
-  public static final ConfigProperty MERGE_CLASS_NAME = ConfigProperty
+  public static final ConfigProperty MERGER_CLASS_NAME = ConfigProperty

Review Comment:
   We probably fine either way, but for simplicity i'd suggest we stick with 
our current approach of keeping it table-property:
   
   1. It's easier for users to understand (existing behavior)
   2. It stays part of the table's (immutable) configuration, avoiding the toll 
of needing to specify it explicitly with every write (could be error-prone)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-08-04 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r938154747


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##
@@ -30,9 +34,19 @@
  * It can implement the merging logic of HoodieRecord of different engines
  * and avoid the performance consumption caused by the 
serialization/deserialization of Avro payload.
  */
-public interface HoodieMerge extends Serializable {
-  
-  HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieRecordMerger extends Serializable {
+
+  /**
+   * This method converges combineAndGetUpdateValue and precombine from 
HoodiePayload.
+   * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we 
can translate as having 3 versions A, B, C
+   * of the single record, both orders of operations applications have to 
yield the same result)
+   */
+  Option merge(HoodieRecord older, HoodieRecord newer, Schema 
schema, Properties props) throws IOException;
 
-  Option combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) throws IOException;
+  /**
+   * The record type handled by the current merger.
+   * SPARK, AVRO, FLINK
+   */
+  HoodieRecordType getRecordType();

Review Comment:
   @wzx140 we should actually do it in a similar fashion to how `KeyGenerator` 
interface is currently implemented:
   
   - You have generic API (`KeyGenerator`) accepting Avro (this is a bare 
minimum to be implemented, is used as a fallback)
   - You have engine specific API (`SparkKeyGeneratorInterface`) which provides 
for engine-specific APIs that you should implement natively for better 
performance.
   
   So when Acme implements their own `RecordMerger` they can choose to 
implement either 
   
   1. A bare-minimum API (Avro), that would allow it to work across the engines 
but will lack performance
   2. Or fully (Avro, Spark, etc) which will be performant across engines



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-08-04 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r938150723


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##
@@ -19,6 +19,10 @@
 package org.apache.hudi.common.model;
 
 import org.apache.avro.Schema;

Review Comment:
   We've discussed this prior that it would be gigantic rename across the 
board, and agreed that it's probably better to be carried out separately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-08-04 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r938150358


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##
@@ -30,9 +34,19 @@
  * It can implement the merging logic of HoodieRecord of different engines
  * and avoid the performance consumption caused by the 
serialization/deserialization of Avro payload.
  */
-public interface HoodieMerge extends Serializable {
-  
-  HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieRecordMerger extends Serializable {
+
+  /**
+   * This method converges combineAndGetUpdateValue and precombine from 
HoodiePayload.
+   * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we 
can translate as having 3 versions A, B, C
+   * of the single record, both orders of operations applications have to 
yield the same result)
+   */
+  Option merge(HoodieRecord older, HoodieRecord newer, Schema 
schema, Properties props) throws IOException;

Review Comment:
   @vinothchandar not sure i understand the original question? We will retain 
the same semantic w/ tombstone value of `HoodieRecord` being passed around for 
deletion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-07-25 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r929142300


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##
@@ -21,64 +21,46 @@
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReader;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Properties;
 import java.util.stream.StreamSupport;
 
 /**
  * Reads records from base file and merges any updates from log files and 
provides iterable over all records in the file slice.
  */
 public class HoodieFileSliceReader implements Iterator> {
+
   private final Iterator> recordsIterator;
 
   public static HoodieFileSliceReader getFileSliceReader(
-  Option baseFileReader, 
HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
-  String preCombineField, Option> 
simpleKeyGenFieldsOpt) throws IOException {
+  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, Properties props, Option> 
simpleKeyGenFieldsOpt) throws IOException {
 if (baseFileReader.isPresent()) {
-  Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
+  Iterator baseIterator = 
baseFileReader.get().getRecordIterator(schema);
   while (baseIterator.hasNext()) {
-GenericRecord record = (GenericRecord) baseIterator.next();
-HoodieRecord hoodieRecord = transform(
-record, scanner, payloadClass, preCombineField, 
simpleKeyGenFieldsOpt);
-scanner.processNextRecord(hoodieRecord);
+scanner.processNextRecord(baseIterator.next().expansion(props, 
simpleKeyGenFieldsOpt,
+scanner.isWithOperationField(), scanner.getPartitionName(), 
false));
   }
   return new HoodieFileSliceReader(scanner.iterator());
 } else {
   Iterable iterable = () -> scanner.iterator();
-  HoodiePayloadConfig payloadConfig = 
HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build();
   return new 
HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false)
   .map(e -> {
 try {
-  GenericRecord record = (GenericRecord) e.toIndexedRecord(schema, 
payloadConfig.getProps()).get();
-  return transform(record, scanner, payloadClass, preCombineField, 
simpleKeyGenFieldsOpt);
+  return e.expansion(props, simpleKeyGenFieldsOpt, 
scanner.isWithOperationField(), scanner.getPartitionName(), false);

Review Comment:
   Great! New APIs i think are much better option. Thank you very much!



##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##
@@ -21,64 +21,46 @@
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReader;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Properties;
 import java.util.stream.StreamSupport;
 
 /**
  * Reads records from base file and merges any updates from log files and 
provides iterable over all records in the file slice.
  */
 public class HoodieFileSliceReader implements Iterator> {
+
   private final Iterator> recordsIterator;
 
   public static HoodieFileSliceReader getFileSliceReader(
-  Option baseFileReader, 
HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
-  String preCombineField, Option> 
simpleKeyGenFieldsOpt) throws IOException {
+  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, Properties props, Option> 
simpleKeyGenFieldsOpt) throws IOException {
 if (baseFileReader.isPresent()) {
-  Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
+  Iterator baseIterator = 
baseFileReader.get().getRecordIterator(schema);
   while (baseIterator.hasNext()) {
-GenericRecord record = (GenericRecord) baseIterator.next();
-HoodieRecord hoodieRecord = transform(
-record, scanner, payloadClass, preCombineField, 
simpleKeyGenFieldsOpt);
-scanner.processNextRecord(hoodieRecord);
+scanner

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-07-22 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r928059133


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##
@@ -21,64 +21,46 @@
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReader;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Properties;
 import java.util.stream.StreamSupport;
 
 /**
  * Reads records from base file and merges any updates from log files and 
provides iterable over all records in the file slice.
  */
 public class HoodieFileSliceReader implements Iterator> {
+
   private final Iterator> recordsIterator;
 
   public static HoodieFileSliceReader getFileSliceReader(
-  Option baseFileReader, 
HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
-  String preCombineField, Option> 
simpleKeyGenFieldsOpt) throws IOException {
+  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, Properties props, Option> 
simpleKeyGenFieldsOpt) throws IOException {
 if (baseFileReader.isPresent()) {
-  Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
+  Iterator baseIterator = 
baseFileReader.get().getRecordIterator(schema);
   while (baseIterator.hasNext()) {
-GenericRecord record = (GenericRecord) baseIterator.next();
-HoodieRecord hoodieRecord = transform(
-record, scanner, payloadClass, preCombineField, 
simpleKeyGenFieldsOpt);
-scanner.processNextRecord(hoodieRecord);
+scanner.processNextRecord(baseIterator.next().expansion(props, 
simpleKeyGenFieldsOpt,
+scanner.isWithOperationField(), scanner.getPartitionName(), 
false));
   }
   return new HoodieFileSliceReader(scanner.iterator());
 } else {
   Iterable iterable = () -> scanner.iterator();
-  HoodiePayloadConfig payloadConfig = 
HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build();
   return new 
HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false)
   .map(e -> {
 try {
-  GenericRecord record = (GenericRecord) e.toIndexedRecord(schema, 
payloadConfig.getProps()).get();
-  return transform(record, scanner, payloadClass, preCombineField, 
simpleKeyGenFieldsOpt);
+  return e.expansion(props, simpleKeyGenFieldsOpt, 
scanner.isWithOperationField(), scanner.getPartitionName(), false);

Review Comment:
   Let's pull back a little bit here:  i fully appreciate that might be missing 
operational context that you have acquired performing this refactoring and 
seeing these nuanced interconnections. Zooming out, my primary issue with this 
API is   that when i'm looking at this method in the interface -- i can't 
understand what it does and can't understand why it even should be here (even 
knowing what it does)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-07-20 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r926262756


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##
@@ -21,64 +21,46 @@
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReader;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Properties;
 import java.util.stream.StreamSupport;
 
 /**
  * Reads records from base file and merges any updates from log files and 
provides iterable over all records in the file slice.
  */
 public class HoodieFileSliceReader implements Iterator> {
+
   private final Iterator> recordsIterator;
 
   public static HoodieFileSliceReader getFileSliceReader(
-  Option baseFileReader, 
HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
-  String preCombineField, Option> 
simpleKeyGenFieldsOpt) throws IOException {
+  Option baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, Properties props, Option> 
simpleKeyGenFieldsOpt) throws IOException {
 if (baseFileReader.isPresent()) {
-  Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
+  Iterator baseIterator = 
baseFileReader.get().getRecordIterator(schema);
   while (baseIterator.hasNext()) {
-GenericRecord record = (GenericRecord) baseIterator.next();
-HoodieRecord hoodieRecord = transform(
-record, scanner, payloadClass, preCombineField, 
simpleKeyGenFieldsOpt);
-scanner.processNextRecord(hoodieRecord);
+scanner.processNextRecord(baseIterator.next().expansion(props, 
simpleKeyGenFieldsOpt,
+scanner.isWithOperationField(), scanner.getPartitionName(), 
false));
   }
   return new HoodieFileSliceReader(scanner.iterator());
 } else {
   Iterable iterable = () -> scanner.iterator();
-  HoodiePayloadConfig payloadConfig = 
HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build();
   return new 
HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false)
   .map(e -> {
 try {
-  GenericRecord record = (GenericRecord) e.toIndexedRecord(schema, 
payloadConfig.getProps()).get();
-  return transform(record, scanner, payloadClass, preCombineField, 
simpleKeyGenFieldsOpt);
+  return e.expansion(props, simpleKeyGenFieldsOpt, 
scanner.isWithOperationField(), scanner.getPartitionName(), false);

Review Comment:
   I don't think we should carry over existing `transform` into 
`HoodieRecord.expansion` as is -- 
   while `transform` method was appropriate (was wrapping Avro into 
HoodieRecord), `expansion` method by itself doesn't make much sense: we already 
iterate by `HoodieRecord`, why do we need to expand it? We should be able to 
appropriately initialize `HoodieRecord` (key, partition-path) when we 
instantiate it during iteration
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-07-13 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r920465935


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java:
##
@@ -54,42 +63,64 @@
   private final SerializableSchema schema;
   private final HoodieClusteringConfig.LayoutOptimizationStrategy 
layoutOptStrategy;
   private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType 
curveCompositionStrategyType;
+  private final HoodieRecordType recordType;
 
   public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext 
sparkEngineContext,
-String[] orderByColumns,
-
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
-
HoodieClusteringConfig.SpatialCurveCompositionStrategyType 
curveCompositionStrategyType,
-Schema schema) {
+  String[] orderByColumns,
+  LayoutOptimizationStrategy layoutOptStrategy,
+  SpatialCurveCompositionStrategyType curveCompositionStrategyType,
+  Schema schema, HoodieRecordType recordType) {
 this.sparkEngineContext = sparkEngineContext;
 this.orderByColumns = orderByColumns;
 this.layoutOptStrategy = layoutOptStrategy;
 this.curveCompositionStrategyType = curveCompositionStrategyType;
 this.schema = new SerializableSchema(schema);
+this.recordType = recordType;
   }
 
   @Override
   public JavaRDD> repartitionRecords(JavaRDD> 
records, int outputSparkPartitions) {
-JavaRDD genericRecordsRDD =
-records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new 
Properties()).get());
-
-Dataset sourceDataset =
-AvroConversionUtils.createDataFrame(
-genericRecordsRDD.rdd(),
-schema.toString(),
-sparkEngineContext.getSqlContext().sparkSession()
-);
-
-Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions);
-
-return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), 
schema.get().getNamespace(), false, Option.empty())
-.toJavaRDD()
-.map(record -> {
-  String key = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
-  String partition = 
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
-  HoodieKey hoodieKey = new HoodieKey(key, partition);
-  HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new 
RewriteAvroPayload(record));
-  return hoodieRecord;
-});
+if (recordType == HoodieRecordType.AVRO) {
+  JavaRDD genericRecordsRDD =
+  records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new 
Properties()).get());
+
+  Dataset sourceDataset =
+  AvroConversionUtils.createDataFrame(
+  genericRecordsRDD.rdd(),
+  schema.toString(),
+  sparkEngineContext.getSqlContext().sparkSession()
+  );
+
+  Dataset sortedDataset = reorder(sourceDataset, 
outputSparkPartitions);
+
+  return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), 
schema.get().getNamespace(), false, Option.empty())
+  .toJavaRDD()
+  .map(record -> {
+String key = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+String partition = 
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+HoodieKey hoodieKey = new HoodieKey(key, partition);
+HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new 
RewriteAvroPayload(record));
+return hoodieRecord;
+  });
+} else if (recordType == HoodieRecordType.SPARK) {
+  StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.get());
+  Dataset sourceDataset = 
SparkConversionUtils.createDataFrame(records.rdd(), 
sparkEngineContext.getSqlContext().sparkSession(), structType);
+
+  Dataset sortedDataset = reorder(sourceDataset, 
outputSparkPartitions);
+
+  return sortedDataset.queryExecution().toRdd()
+  .toJavaRDD()
+  .map(row -> {
+InternalRow internalRow = row.copy();

Review Comment:
   @minihippo let's add a comment explaining why they copy is being made here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-07-13 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r920463727


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java:
##
@@ -54,42 +63,64 @@
   private final SerializableSchema schema;
   private final HoodieClusteringConfig.LayoutOptimizationStrategy 
layoutOptStrategy;
   private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType 
curveCompositionStrategyType;
+  private final HoodieRecordType recordType;
 
   public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext 
sparkEngineContext,
-String[] orderByColumns,
-
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
-
HoodieClusteringConfig.SpatialCurveCompositionStrategyType 
curveCompositionStrategyType,
-Schema schema) {
+  String[] orderByColumns,
+  LayoutOptimizationStrategy layoutOptStrategy,
+  SpatialCurveCompositionStrategyType curveCompositionStrategyType,
+  Schema schema, HoodieRecordType recordType) {
 this.sparkEngineContext = sparkEngineContext;
 this.orderByColumns = orderByColumns;
 this.layoutOptStrategy = layoutOptStrategy;
 this.curveCompositionStrategyType = curveCompositionStrategyType;
 this.schema = new SerializableSchema(schema);
+this.recordType = recordType;
   }
 
   @Override
   public JavaRDD> repartitionRecords(JavaRDD> 
records, int outputSparkPartitions) {
-JavaRDD genericRecordsRDD =
-records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new 
Properties()).get());
-
-Dataset sourceDataset =
-AvroConversionUtils.createDataFrame(
-genericRecordsRDD.rdd(),
-schema.toString(),
-sparkEngineContext.getSqlContext().sparkSession()
-);
-
-Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions);
-
-return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), 
schema.get().getNamespace(), false, Option.empty())
-.toJavaRDD()
-.map(record -> {
-  String key = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
-  String partition = 
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
-  HoodieKey hoodieKey = new HoodieKey(key, partition);
-  HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new 
RewriteAvroPayload(record));
-  return hoodieRecord;
-});
+if (recordType == HoodieRecordType.AVRO) {
+  JavaRDD genericRecordsRDD =
+  records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new 
Properties()).get());
+
+  Dataset sourceDataset =
+  AvroConversionUtils.createDataFrame(
+  genericRecordsRDD.rdd(),
+  schema.toString(),
+  sparkEngineContext.getSqlContext().sparkSession()
+  );
+
+  Dataset sortedDataset = reorder(sourceDataset, 
outputSparkPartitions);
+
+  return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), 
schema.get().getNamespace(), false, Option.empty())
+  .toJavaRDD()
+  .map(record -> {
+String key = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+String partition = 
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+HoodieKey hoodieKey = new HoodieKey(key, partition);
+HoodieRecord hoodieRecord = new HoodieAvroRecord(hoodieKey, new 
RewriteAvroPayload(record));
+return hoodieRecord;
+  });
+} else if (recordType == HoodieRecordType.SPARK) {
+  StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.get());
+  Dataset sourceDataset = 
SparkConversionUtils.createDataFrame(records.rdd(), 
sparkEngineContext.getSqlContext().sparkSession(), structType);
+
+  Dataset sortedDataset = reorder(sourceDataset, 
outputSparkPartitions);
+
+  return sortedDataset.queryExecution().toRdd()
+  .toJavaRDD()
+  .map(row -> {
+InternalRow internalRow = row.copy();

Review Comment:
   @xushiyan in short: you can't hold the reference to `InternalRow` outside 
the closure where you have access to it, since underlying Spark uses mutable 
buffer to reduce # of allocations to a minimum (in other words you always get 
the same IR object that gets reset every time iterator moves)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

2022-07-13 Thread GitBox


alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r920369408


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static 
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord {

Review Comment:
   We should make `HoodieRecord` a non-generic interface just providing the 
APIs and move base implementation into `HoodieBaseRecord` so that all the 
code relying on `HoodieRecord` is not exposed to internal implementation details



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import