vinothchandar commented on a change in pull request #3306:
URL: https://github.com/apache/hudi/pull/3306#discussion_r675205989



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
##########
@@ -39,18 +41,27 @@
 public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload, I, K, 
O> extends HoodieReadHandle<T, I, K, O> {
 
   private final Pair<String, HoodieBaseFile> partitionPathBaseFilePair;
+  private final Option<BaseKeyGenerator> keyGeneratorOpt;
 
   public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable<T, 
I, K, O> hoodieTable,
-                                      Pair<String, HoodieBaseFile> 
partitionPathBaseFilePair) {
+                                      Pair<String, HoodieBaseFile> 
partitionPathBaseFilePair, Option<BaseKeyGenerator> keyGeneratorOpt) {
     super(config, null, hoodieTable, 
Pair.of(partitionPathBaseFilePair.getLeft(), 
partitionPathBaseFilePair.getRight().getFileId()));
     this.partitionPathBaseFilePair = partitionPathBaseFilePair;
+    this.keyGeneratorOpt = keyGeneratorOpt;
   }
 
   public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
     HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
-    return 
BaseFileUtils.getInstance(baseFile.getPath()).fetchRecordKeyPartitionPath(
-        hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
-        .map(entry -> Pair.of(entry,
-            new HoodieRecordLocation(baseFile.getCommitTime(), 
baseFile.getFileId())));
+    if (config.populateMetaFields()) {

Review comment:
       you already decide in a upper layer to pass in Option.empty if 
`config.popularMetaFields() == true` right? In these cases, it advisable to 
just use `keyGeneratorOpt.map(keyGen -> /* else block call */).orElse(/* if 
block*/)` and not rely on checking the config again and again

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -101,33 +103,44 @@
   protected long updatedRecordsWritten = 0;
   protected long insertRecordsWritten = 0;
   protected boolean useWriterSchema;
+  protected boolean populateMetaFields;
+  protected Option<BaseKeyGenerator> keyGeneratorOpt;
   private HoodieBaseFile baseFileToMerge;
 
   public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                            Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
-                           TaskContextSupplier taskContextSupplier) {
+                           TaskContextSupplier taskContextSupplier, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
     this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier,
-        hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, 
fileId).get());
+        hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, 
fileId).get(), keyGeneratorOpt);
   }
 
   public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                            Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
-                           TaskContextSupplier taskContextSupplier, 
HoodieBaseFile baseFile) {
+                           TaskContextSupplier taskContextSupplier, 
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
     super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier);
     init(fileId, recordItr);
     init(fileId, partitionPath, baseFile);
+    this.populateMetaFields = config.populateMetaFields();

Review comment:
       same question: can we just work off `keyGeneratorOpt.isEmpty()` 

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -244,6 +244,22 @@ public static Schema getRecordKeyPartitionPathSchema() {
     return recordSchema;
   }
 
+  /**
+   * Fetch schema for record key and partition path.
+   */
+  public static Schema getRecordKeyPartitionPathSchema(Schema fileSchema, 
List<String> recordKeyFields, List<String> partitionPathFields) {

Review comment:
       any reason why we can't just merge the lists outside and keep this 
method simpler. i.e take a list of fields and get a subschema? in fact, we may 
have a method like that already, that we can reuse around. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -101,33 +103,44 @@
   protected long updatedRecordsWritten = 0;
   protected long insertRecordsWritten = 0;
   protected boolean useWriterSchema;
+  protected boolean populateMetaFields;
+  protected Option<BaseKeyGenerator> keyGeneratorOpt;
   private HoodieBaseFile baseFileToMerge;
 
   public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                            Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
-                           TaskContextSupplier taskContextSupplier) {
+                           TaskContextSupplier taskContextSupplier, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
     this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier,
-        hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, 
fileId).get());
+        hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, 
fileId).get(), keyGeneratorOpt);
   }
 
   public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                            Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
-                           TaskContextSupplier taskContextSupplier, 
HoodieBaseFile baseFile) {
+                           TaskContextSupplier taskContextSupplier, 
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
     super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier);
     init(fileId, recordItr);
     init(fileId, partitionPath, baseFile);
+    this.populateMetaFields = config.populateMetaFields();
+    setAndValidateKeyGenProps(keyGeneratorOpt);
   }
 
   /**
    * Called by compactor code path.
    */
   public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                            Map<String, HoodieRecord<T>> keyToNewRecords, 
String partitionPath, String fileId,
-                           HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier) {
+                           HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
     super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier);
     this.keyToNewRecords = keyToNewRecords;
     this.useWriterSchema = true;
     init(fileId, this.partitionPath, dataFileToBeMerged);
+    this.populateMetaFields = config.populateMetaFields();
+    setAndValidateKeyGenProps(keyGeneratorOpt);
+  }
+
+  private void setAndValidateKeyGenProps(Option<BaseKeyGenerator> 
keyGeneratorOpt) {

Review comment:
       validate and then set?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
##########
@@ -78,13 +82,16 @@
     BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, HoodieWriteMetadata> {
 
   private static final Logger LOG = 
LogManager.getLogger(BaseSparkCommitActionExecutor.class);
+  protected boolean populateMetaFields;

Review comment:
       do we need both?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
##########
@@ -45,7 +45,8 @@
       TaskContextSupplier taskContextSupplier) throws IOException {
     final String extension = FSUtils.getFileExtension(path.getName());
     if (PARQUET.getFileExtension().equals(extension)) {
-      return newParquetFileWriter(instantTime, path, config, schema, 
hoodieTable, taskContextSupplier);
+      return newParquetFileWriter(instantTime, path, config, schema, 
hoodieTable, taskContextSupplier, config.populateMetaFields(),

Review comment:
       why would we need to pass the same value twice?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
##########
@@ -39,18 +41,27 @@
 public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload, I, K, 
O> extends HoodieReadHandle<T, I, K, O> {
 
   private final Pair<String, HoodieBaseFile> partitionPathBaseFilePair;
+  private final Option<BaseKeyGenerator> keyGeneratorOpt;
 
   public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable<T, 
I, K, O> hoodieTable,
-                                      Pair<String, HoodieBaseFile> 
partitionPathBaseFilePair) {
+                                      Pair<String, HoodieBaseFile> 
partitionPathBaseFilePair, Option<BaseKeyGenerator> keyGeneratorOpt) {
     super(config, null, hoodieTable, 
Pair.of(partitionPathBaseFilePair.getLeft(), 
partitionPathBaseFilePair.getRight().getFileId()));
     this.partitionPathBaseFilePair = partitionPathBaseFilePair;
+    this.keyGeneratorOpt = keyGeneratorOpt;
   }
 
   public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
     HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
-    return 
BaseFileUtils.getInstance(baseFile.getPath()).fetchRecordKeyPartitionPath(
-        hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
-        .map(entry -> Pair.of(entry,
-            new HoodieRecordLocation(baseFile.getCommitTime(), 
baseFile.getFileId())));
+    if (config.populateMetaFields()) {
+      return 
BaseFileUtils.getInstance(baseFile.getPath()).fetchRecordKeyPartitionPath(
+          hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
+          .map(entry -> Pair.of(entry,
+              new HoodieRecordLocation(baseFile.getCommitTime(), 
baseFile.getFileId())));
+    } else {
+      return 
BaseFileUtils.getInstance(baseFile.getPath()).fetchRecordKeyPartitionPath(
+          hoodieTable.getHadoopConf(), new Path(baseFile.getPath()), 
keyGeneratorOpt.get()).stream()
+          .map(entry -> Pair.of(entry,

Review comment:
       can we avoid repeating lines 63, 58. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieSimpleIndex.java
##########
@@ -146,8 +151,15 @@ public boolean isImplicitWithStorage() {
                                                                               
List<Pair<String, HoodieBaseFile>> baseFiles) {
     JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
     int fetchParallelism = Math.max(1, Math.max(baseFiles.size(), 
parallelism));
-    return jsc.parallelize(baseFiles, fetchParallelism)
-        .flatMapToPair(partitionPathBaseFile -> new 
HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile)
+
+    try {
+      Option<BaseKeyGenerator> keyGeneratorOpt = config.populateMetaFields() ? 
Option.empty()
+          : Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(config.getProps())));
+      return jsc.parallelize(baseFiles, fetchParallelism)
+        .flatMapToPair(partitionPathBaseFile -> new 
HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile, 
keyGeneratorOpt)
                 .locations().map(x -> Tuple2.apply(((Pair)x).getLeft(), 
((Pair)x).getRight())).iterator());
+    } catch (IOException e) {
+      throw new HoodieIOException("KeyGenerator instantiation throwed 
exception " + e);

Review comment:
       `", e` ? instead of `+` ? 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
##########
@@ -28,4 +30,6 @@
   String getRecordKey(Row row);
 
   String getPartitionPath(Row row);
+
+  String getPartitionPath(InternalRow internalRow, StructType structType);

Review comment:
       should we do a default implementation, so that not everyone is forced to 
implemment this?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
##########
@@ -94,6 +101,18 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext 
context,
                                        WriteOperationType operationType,
                                        Option extraMetadata) {
     super(context, config, table, instantTime, operationType, extraMetadata);
+    initKeyGenIfNeeded();
+  }
+
+  private void initKeyGenIfNeeded() {
+    this.populateMetaFields = config.populateMetaFields();

Review comment:
       move this to constructor?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java
##########
@@ -66,21 +68,22 @@
   private static final Logger LOG = 
LogManager.getLogger(HoodieConcatHandle.class);
 
   public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable, Iterator recordItr,
-      String partitionPath, String fileId, TaskContextSupplier 
taskContextSupplier) {
-    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier);
+                            String partitionPath, String fileId, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier, keyGeneratorOpt);
   }
 
   public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable, Map keyToNewRecords, String partitionPath, String 
fileId,
       HoodieBaseFile dataFileToBeMerged, TaskContextSupplier 
taskContextSupplier) {
-    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, 
fileId, dataFileToBeMerged, taskContextSupplier);
+    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, 
fileId, dataFileToBeMerged, taskContextSupplier,
+        Option.empty());
   }
 
   /**
    * Write old record as is w/o merging with incoming record.
    */
   @Override
   public void write(GenericRecord oldRecord) {
-    String key = 
oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+    String key = populateMetaFields ? 
oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString() : 
keyGeneratorOpt.get().getKey(oldRecord).getRecordKey();

Review comment:
       can this code be shared?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
##########
@@ -45,7 +45,8 @@
       TaskContextSupplier taskContextSupplier) throws IOException {
     final String extension = FSUtils.getFileExtension(path.getName());
     if (PARQUET.getFileExtension().equals(extension)) {
-      return newParquetFileWriter(instantTime, path, config, schema, 
hoodieTable, taskContextSupplier);
+      return newParquetFileWriter(instantTime, path, config, schema, 
hoodieTable, taskContextSupplier, config.populateMetaFields(),

Review comment:
       if you want this control for testing, lets add a new overload for 
`newParquetFileWriter() `?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -247,8 +247,8 @@ protected String getCommitActionType() {
    */
   private HoodieRecord<? extends HoodieRecordPayload> transform(IndexedRecord 
indexedRecord) {
     GenericRecord record = (GenericRecord) indexedRecord;
-    String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
-    String partition = 
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+    String key = populateMetaFields ? 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString() : 
keyGeneratorOpt.get().getRecordKey(record);

Review comment:
       this ternary switch is kind of everywhere. single method call like 
`fetchKey(populateMetaFields, record, keyGeneratorOpt)`  would be nice

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
##########
@@ -210,12 +214,20 @@ public void rollbackBootstrap(HoodieEngineContext 
context, String instantTime) {
 
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String 
partitionPath, String fileId,
       Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile 
dataFileToBeMerged) {
+    Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
+    if (!config.populateMetaFields()) {
+      try {
+        keyGeneratorOpt = Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(config.getProps())));
+      } catch (IOException e) {
+        throw new HoodieIOException("Only BaseKyGenerators are supported when 
meta columns are disabled ", e);

Review comment:
       would a user understand `BaseKeyGenerators are supported` ? (note typo). 
Can we make exception messages more user-friendly? 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
##########
@@ -94,6 +101,18 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext 
context,
                                        WriteOperationType operationType,
                                        Option extraMetadata) {
     super(context, config, table, instantTime, operationType, extraMetadata);
+    initKeyGenIfNeeded();
+  }
+
+  private void initKeyGenIfNeeded() {
+    this.populateMetaFields = config.populateMetaFields();
+    if (!populateMetaFields) {
+      try {
+        keyGeneratorOpt = Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(config.getProps())));
+      } catch (IOException e) {
+        throw new HoodieIOException("Only BaseKeyGenerators are supported when 
meta columns are disabled ", e);

Review comment:
       move this exception handling  into the method itself? its an unchecked 
exception anyway. we can save some lines

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -244,6 +244,22 @@ public static Schema getRecordKeyPartitionPathSchema() {
     return recordSchema;
   }
 
+  /**
+   * Fetch schema for record key and partition path.
+   */
+  public static Schema getRecordKeyPartitionPathSchema(Schema fileSchema, 
List<String> recordKeyFields, List<String> partitionPathFields) {

Review comment:
       unit test for this?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -142,6 +143,43 @@
     return hoodieKeys;
   }
 
+  /**
+   * Fetch {@link HoodieKey}s from the given parquet file.
+   *
+   * @param filePath      The parquet file path.
+   * @param configuration configuration to build fs object
+   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+   */
+  @Override
+  public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration 
configuration, Path filePath, BaseKeyGenerator keyGenerator) {

Review comment:
       probably. but this method has a lot of code duplication atm. can we 
reduce that

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
##########
@@ -58,16 +59,15 @@
 
   private static <T extends HoodieRecordPayload, R extends IndexedRecord> 
HoodieFileWriter<R> newParquetFileWriter(
       String instantTime, Path path, HoodieWriteConfig config, Schema schema, 
HoodieTable hoodieTable,
-      TaskContextSupplier taskContextSupplier) throws IOException {
-    BloomFilter filter = createBloomFilter(config);
-    HoodieAvroWriteSupport writeSupport =
-        new HoodieAvroWriteSupport(new 
AvroSchemaConverter(hoodieTable.getHadoopConf()).convert(schema), schema, 
filter);
+      TaskContextSupplier taskContextSupplier, boolean populateMetaFields, 
boolean enableBloomFilter) throws IOException {
+    BloomFilter filter = enableBloomFilter ? createBloomFilter(config) : null;

Review comment:
       yes, let's fix this to not do nulls, if its not a lot of change

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -1588,6 +1588,11 @@ public Builder withWriteMetaKeyPrefixes(String 
writeMetaKeyPrefixes) {
       return this;
     }
 
+    public Builder withPopulateMetaFields(boolean populateMetaFields) {

Review comment:
       yeah. probably composing it like that is the right away. Separate out 
the table configs from the write configs. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -142,6 +143,43 @@
     return hoodieKeys;
   }
 
+  /**
+   * Fetch {@link HoodieKey}s from the given parquet file.
+   *
+   * @param filePath      The parquet file path.
+   * @param configuration configuration to build fs object
+   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+   */
+  @Override
+  public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration 
configuration, Path filePath, BaseKeyGenerator keyGenerator) {
+    List<HoodieKey> hoodieKeys = new ArrayList<>();
+    try {
+      if (!filePath.getFileSystem(configuration).exists(filePath)) {

Review comment:
       let avoid this call? and have it error out if does not exist. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to