[GitHub] [hudi] leesf commented on a diff in pull request #9199: [HUDI-6534]Support consistent hashing row writer

2023-08-03 Thread via GitHub


leesf commented on code in PR #9199:
URL: https://github.com/apache/hudi/pull/9199#discussion_r1283870818


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/ConsistentBucketBulkInsertDataInternalWriterHelper.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
+import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
+import org.apache.hudi.table.HoodieTable;
+import 
org.apache.hudi.table.action.cluster.util.ConsistentHashingUpdateStrategyUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class for native row writer for bulk_insert with consistent hashing 
bucket index.
+ */
+public class ConsistentBucketBulkInsertDataInternalWriterHelper extends 
BucketBulkInsertDataInternalWriterHelper {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConsistentBucketBulkInsertDataInternalWriterHelper.class);
+
+  public ConsistentBucketBulkInsertDataInternalWriterHelper(HoodieTable 
hoodieTable, HoodieWriteConfig writeConfig,

Review Comment:
   the constructor has too many args, which can wrap in a class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] leesf commented on a diff in pull request #9199: [HUDI-6534]Support consistent hashing row writer

2023-08-03 Thread via GitHub


leesf commented on code in PR #9199:
URL: https://github.com/apache/hudi/pull/9199#discussion_r1283870066


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java:
##
@@ -65,7 +71,6 @@ public void write(InternalRow row) throws IOException {
   int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), 
indexKeyFields, bucketNum);
   Pair fileId = Pair.of(partitionPath, bucketId);
   if (lastFileId == null || !lastFileId.equals(fileId)) {
-LOG.info("Creating new file for partition path " + partitionPath);

Review Comment:
   this log is useless then?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] leesf commented on a diff in pull request #9199: [HUDI-6534]Support consistent hashing row writer

2023-08-03 Thread via GitHub


leesf commented on code in PR #9199:
URL: https://github.com/apache/hudi/pull/9199#discussion_r1283867267


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
+import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.ConsistentHashingBucketInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+/**
+ * Bulk_insert partitioner of Spark row using consistent hashing bucket index.
+ */
+public class ConsistentBucketIndexBulkInsertPartitionerWithRows
+implements BulkInsertPartitioner>, 
ConsistentHashingBucketInsertPartitioner {
+
+  private final HoodieTable table;
+
+  private final String indexKeyFields;
+
+  private final List fileIdPfxList = new ArrayList<>();
+  private final Map> hashingChildrenNodes;
+
+  private Map partitionToIdentifier;
+
+  private final Option keyGeneratorOpt;
+
+  private Map> partitionToFileIdPfxIdxMap;
+
+  private final RowRecordKeyExtractor extractor;
+
+  public ConsistentBucketIndexBulkInsertPartitionerWithRows(HoodieTable table, 
boolean populateMetaFields) {
+this.indexKeyFields = table.getConfig().getBucketIndexHashField();
+this.table = table;
+this.hashingChildrenNodes = new HashMap<>();
+if (!populateMetaFields) {
+  this.keyGeneratorOpt = 
HoodieSparkKeyGeneratorFactory.getKeyGenerator(table.getConfig().getProps());
+} else {
+  this.keyGeneratorOpt = Option.empty();
+}
+this.extractor = 
RowRecordKeyExtractor.getRowRecordKeyExtractor(populateMetaFields, 
keyGeneratorOpt);
+
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
+"Consistent hash bucket index doesn't support CoW table");
+  }
+
+  private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
+HoodieSparkConsistentBucketIndex index = 
(HoodieSparkConsistentBucketIndex) table.getIndex();
+HoodieConsistentHashingMetadata metadata = 
ConsistentBucketIndexUtils.loadOrCreateMetadata(this.table, partition, 
index.getNumBuckets());
+if (hashingChildrenNodes.containsKey(partition)) {
+  metadata.setChildrenNodes(hashingChildrenNodes.get(partition));
+}
+return new ConsistentBucketIdentifier(metadata);
+  }
+
+  @Override
+  public Dataset repartitionRecords(Dataset rows, int 
outputPartitions) {
+JavaRDD rowJavaRDD = rows.toJavaRDD();
+prepareRepartition(rowJavaRDD);
+Partitioner partitioner = new Partitioner() {
+  @Override
+  public int getPartition(Object key) {
+return (int) key;
+  }
+
+  @Override
+  public int numPartitions() {
+return fileIdPfxList.size();
+  }
+};
+
+return rows.sparkSession().createDataFrame(rowJavaRDD
+.mapToPair(row -> new Tuple2<>(getBucketId(row), row))
+.partitionBy(partitioner)
+.values(), rows.schema());
+  }
+
+  /**
+   * Prepare consistent hashing metadata for repartition
+   *
+   * @param rows input records
+   */
+  private void prepareRepartition(JavaRDD rows) {
+this.partitionToIdentifier = initializeBucketIdentifier(r

[GitHub] [hudi] leesf commented on a diff in pull request #9199: [HUDI-6534]Support consistent hashing row writer

2023-08-03 Thread via GitHub


leesf commented on code in PR #9199:
URL: https://github.com/apache/hudi/pull/9199#discussion_r1283865112


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java:
##
@@ -79,15 +94,19 @@ public HoodieData 
performClusteringWithRecordsRDD(HoodieData partitioner = new 
RDDConsistentBucketBulkInsertPartitioner<>(getHoodieTable(), strategyParams, 
preserveHoodieMetadata);
+addHashingChildNodes(partitioner, extraMetadata);
+
+return (HoodieData) SparkBulkInsertHelper.newInstance()
+.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, 
false, partitioner, true, numOutputGroups);
+  }
+
+  private void addHashingChildNodes(ConsistentHashingBucketInsertPartitioner 
partitioner, Map extraMetadata) {
 try {
   List nodes = 
ConsistentHashingNode.fromJsonString(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));

Review Comment:
   will the nodes be null and need check in addHashingChildrenNodes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] leesf commented on a diff in pull request #9199: [HUDI-6534]Support consistent hashing row writer

2023-07-27 Thread via GitHub


leesf commented on code in PR #9199:
URL: https://github.com/apache/hudi/pull/9199#discussion_r1277121112


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java:
##
@@ -270,4 +273,24 @@ private static boolean recommitMetadataFile(HoodieTable 
table, FileStatus metaFi
 }
 return false;
   }
+
+  /**
+   * Initialize fileIdPfx for each data partition. Specifically, the following 
fields is constructed:
+   * - fileIdPfxList: the Nth element corresponds to the Nth data partition, 
indicating its fileIdPfx
+   * - partitionToFileIdPfxIdxMap (return value): (table partition) -> 
(fileIdPfx -> idx) mapping
+   *
+   * @param partitionToIdentifier Mapping from table partition to bucket 
identifier
+   */
+  public static Map> 
generatePartitionToFileIdPfxIdxMap(Map 
partitionToIdentifier) {

Review Comment:
   can also add uts for the method ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] leesf commented on a diff in pull request #9199: [HUDI-6534]Support consistent hashing row writer

2023-07-27 Thread via GitHub


leesf commented on code in PR #9199:
URL: https://github.com/apache/hudi/pull/9199#discussion_r1277119817


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java:
##
@@ -117,6 +119,29 @@ public static String 
getKeyGeneratorClassNameFromType(KeyGeneratorType type) {
 }
   }
 
+  /**
+   * Instantiate {@link BuiltinKeyGenerator}.
+   *
+   * @param properties properties map.
+   * @return the key generator thus instantiated.
+   */
+  public static Option getKeyGenerator(Properties 
properties) {
+TypedProperties typedProperties = new TypedProperties();
+typedProperties.putAll(properties);
+if 
(Option.ofNullable(properties.get(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key()))
+.map(v -> 
v.equals(NonpartitionedKeyGenerator.class.getName())).orElse(false)) {
+  return Option.empty(); // Do not instantiate NonPartitionKeyGen
+} else {
+  try {
+return Option.of((BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties));
+  } catch (ClassCastException cce) {
+throw new HoodieIOException("Only those key generators implementing 
BuiltInKeyGenerator interface is supported with virtual keys");

Review Comment:
   also throw the `cce` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] leesf commented on a diff in pull request #9199: [HUDI-6534]Support consistent hashing row writer

2023-07-27 Thread via GitHub


leesf commented on code in PR #9199:
URL: https://github.com/apache/hudi/pull/9199#discussion_r1276987981


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java:
##
@@ -79,15 +94,19 @@ public HoodieData 
performClusteringWithRecordsRDD(HoodieData partitioner = new 
RDDConsistentBucketBulkInsertPartitioner<>(getHoodieTable(), strategyParams, 
preserveHoodieMetadata);
+addHashingChildNodes(partitioner, extraMetadata);

Review Comment:
   I am a little confused by the name between 
ConsistentHashingBucketInsertPartitioner#addHashingChildrenNodes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] leesf commented on a diff in pull request #9199: [HUDI-6534]Support consistent hashing row writer

2023-07-27 Thread via GitHub


leesf commented on code in PR #9199:
URL: https://github.com/apache/hudi/pull/9199#discussion_r1276987330


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/ConsistentHashingBucketInsertPartitioner.java:
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.ConsistentHashingNode;
+
+import java.util.List;
+
+public interface ConsistentHashingBucketInsertPartitioner {
+  /**
+   * Set consistent hashing for partition, used in clustering
+   *
+   * @param partition partition to set Consistent Hashing nodes

Review Comment:
   `Consistent Hashing` -> `consistent hashing`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] leesf commented on a diff in pull request #9199: [HUDI-6534]Support consistent hashing row writer

2023-07-27 Thread via GitHub


leesf commented on code in PR #9199:
URL: https://github.com/apache/hudi/pull/9199#discussion_r1276987211


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/util/ConsistentHashingUpdateStrategyUtils.java:
##
@@ -83,7 +83,7 @@ private static void 
extractHashingMetadataFromClusteringPlan(String instant, Hoo
   ValidationUtils.checkState(p != null, "Clustering plan does not has 
partition info, plan: " + plan);
   // Skip unrelated clustering group
   if (!recordPartitions.contains(p)) {
-return;
+continue;

Review Comment:
would you please clarify why the logic changed here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] leesf commented on a diff in pull request #9199: [HUDI-6534]Support consistent hashing row writer

2023-07-24 Thread via GitHub


leesf commented on code in PR #9199:
URL: https://github.com/apache/hudi/pull/9199#discussion_r1273032213


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
+import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.ConsistentHashingBucketInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+/**
+ * Bulk_insert partitioner of Spark row using consistent hashing bucket index.
+ */
+public class ConsistentBucketIndexBulkInsertPartitionerWithRows
+implements BulkInsertPartitioner>, 
ConsistentHashingBucketInsertPartitioner {
+
+  private final HoodieTable table;
+
+  private final String indexKeyFields;
+
+  private final List fileIdPfxList = new ArrayList<>();
+  private final Map> hashingChildrenNodes;
+
+  private Map partitionToIdentifier;
+
+  private final Option keyGeneratorOpt;
+
+  private Map> partitionToFileIdPfxIdxMap;
+
+  private final RowRecordKeyExtractor extractor;
+
+  public ConsistentBucketIndexBulkInsertPartitionerWithRows(HoodieTable table, 
boolean populateMetaFields) {
+this.indexKeyFields = table.getConfig().getBucketIndexHashField();
+this.table = table;
+this.hashingChildrenNodes = new HashMap<>();
+if (!populateMetaFields) {
+  this.keyGeneratorOpt = 
HoodieSparkKeyGeneratorFactory.getKeyGenerator(table.getConfig().getProps());
+} else {
+  this.keyGeneratorOpt = Option.empty();
+}
+this.extractor = 
RowRecordKeyExtractor.getRowRecordKeyExtractor(populateMetaFields, 
keyGeneratorOpt);
+
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),

Review Comment:
   is the check by design for consistent hash index? if yes we could move the 
check to the parent class? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org