[ 
https://issues.apache.org/jira/browse/HUDI-1104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17375320#comment-17375320
 ] 

ASF GitHub Bot commented on HUDI-1104:
--------------------------------------

vinothchandar commented on a change in pull request #3149:
URL: https://github.com/apache/hudi/pull/3149#discussion_r664297381



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+/**
+ * A factory to generate built-in partitioner to repartition input Rows into 
at least
+ * expected number of output spark partitions for bulk insert operation.
+ */
+public abstract class BulkInsertInternalPartitionerWithRowsFactory {
+
+  public static BulkInsertPartitioner<Dataset<Row>> get(BulkInsertSortMode 
sortMode) {
+    switch (sortMode) {
+      case NONE:
+        return new NonSortPartitionerWithRows();
+      case GLOBAL_SORT:
+        return new GlobalSortPartitionerWithRows();
+      case PARTITION_SORT:
+        return new RDDPartitionSortPartitionerWithRows();
+      default:
+        throw new HoodieException("The bulk insert sort mode \"" + 
sortMode.name() + "\" is not supported.");

Review comment:
       throw an unsupported exception?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -116,6 +116,8 @@
   public static final String BULKINSERT_SORT_MODE = 
"hoodie.bulkinsert.sort.mode";
   public static final String DEFAULT_BULKINSERT_SORT_MODE = 
BulkInsertSortMode.GLOBAL_SORT
       .toString();
+  public static final String BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED = 
"hoodie.bulkinsert.are.partitioner.records.sorted";

Review comment:
       this name needs some work.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -116,6 +116,8 @@
   public static final String BULKINSERT_SORT_MODE = 
"hoodie.bulkinsert.sort.mode";
   public static final String DEFAULT_BULKINSERT_SORT_MODE = 
BulkInsertSortMode.GLOBAL_SORT
       .toString();
+  public static final String BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED = 
"hoodie.bulkinsert.are.partitioner.records.sorted";
+  public static final String DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED 
= "false";

Review comment:
       configs need to be reworked based on new framework

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+/**
+ * A factory to generate built-in partitioner to repartition input Rows into 
at least
+ * expected number of output spark partitions for bulk insert operation.
+ */
+public abstract class BulkInsertInternalPartitionerWithRowsFactory {
+
+  public static BulkInsertPartitioner<Dataset<Row>> get(BulkInsertSortMode 
sortMode) {
+    switch (sortMode) {
+      case NONE:
+        return new NonSortPartitionerWithRows();
+      case GLOBAL_SORT:
+        return new GlobalSortPartitionerWithRows();
+      case PARTITION_SORT:
+        return new RDDPartitionSortPartitionerWithRows();

Review comment:
       why is it called RDDxxxx ?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -338,7 +339,13 @@ object HoodieSparkSqlWriter {
     }
     val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA, 
schema.toString)
     val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, 
path.get, tblName, mapAsJavaMap(params))
-    val hoodieDF = 
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, 
writeConfig, df, structName, nameSpace)
+    val userDefinedBulkInsertPartitionerOpt = 
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
+    val bulkInsertPartitionerRows : BulkInsertPartitioner[Dataset[Row]] = if 
(userDefinedBulkInsertPartitionerOpt.isPresent) 
userDefinedBulkInsertPartitionerOpt.get

Review comment:
       indent the if-else block?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+/**
+ * A factory to generate built-in partitioner to repartition input Rows into 
at least
+ * expected number of output spark partitions for bulk insert operation.
+ */
+public abstract class BulkInsertInternalPartitionerWithRowsFactory {

Review comment:
       drop `Internal` from the name?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitionerWithRows.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+/**
+ * A built-in partitioner that does local sorting for each spark partitions 
after coalesce for bulk insert operation, corresponding to the {@code 
BulkInsertSortMode.PARTITION_SORT} mode.
+ */
+public class RDDPartitionSortPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
+
+  @Override
+  public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputSparkPartitions) {
+    ExpressionEncoder encoder = getEncoder(rows.schema());
+    return 
rows.coalesce(outputSparkPartitions).mapPartitions((MapPartitionsFunction<Row, 
Row>) input -> {
+      // Sort locally in partition

Review comment:
       This is a large OOM risk. Spark2.4 and 3 have 
https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/Dataset.html
 `sortWithinPartitions()` , can we leverage that

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -360,7 +367,7 @@ object HoodieSparkSqlWriter {
     val syncHiveSuccess =
       if (hiveSyncEnabled || metaSyncEnabled) {
         metaSync(sqlContext.sparkSession, parameters, basePath, df.schema)
-    } else {
+      } else {

Review comment:
       why this change?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -116,6 +116,8 @@
   public static final String BULKINSERT_SORT_MODE = 
"hoodie.bulkinsert.sort.mode";
   public static final String DEFAULT_BULKINSERT_SORT_MODE = 
BulkInsertSortMode.GLOBAL_SORT
       .toString();
+  public static final String BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED = 
"hoodie.bulkinsert.are.partitioner.records.sorted";

Review comment:
       also seems like you are deriving this value in HoodieSparkSqlWriter? I 
would like to avoid this new config if possible. can you clarify why we need 
this. it was not very clear to me 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bulk insert Dataset - UserDefinedPartitioner
> --------------------------------------------
>
>                 Key: HUDI-1104
>                 URL: https://issues.apache.org/jira/browse/HUDI-1104
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: Writer Core
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to