[ 
https://issues.apache.org/jira/browse/HUDI-5678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kudinkin updated HUDI-5678:
----------------------------------
    Priority: Blocker  (was: Major)

> deduceShuffleParallelism Returns 0 when that should never happen
> ----------------------------------------------------------------
>
>                 Key: HUDI-5678
>                 URL: https://issues.apache.org/jira/browse/HUDI-5678
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Jonathan Vexler
>            Assignee: Alexey Kudinkin
>            Priority: Blocker
>              Labels: pull-request-available
>         Attachments: image (1).png
>
>
> This test 
> {code:java}
>   forAll(BulkInsertSortMode.values().toList) { (sortMode: BulkInsertSortMode) 
> =>    val sortModeName = sortMode.name()    test(s"Test Bulk Insert with 
> BulkInsertSortMode: '$sortModeName'") {      withTempDir { basePath =>        
> testBulkInsertPartitioner(basePath, sortModeName)      }    }  }
>   def testBulkInsertPartitioner(basePath: File, sortModeName: String): Unit = 
> {    val tableName = generateTableName    //Remove these with [HUDI-5419]    
> spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")    
> spark.sessionState.conf.unsetConf("hoodie.datasource.write.insert.drop.duplicates")
>     
> spark.sessionState.conf.unsetConf("hoodie.merge.allow.duplicate.on.inserts")  
>   
> spark.sessionState.conf.unsetConf("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled")
>     //Default parallelism is 200 which means in global sort, each record will 
> end up in a different spark partition so    //9 files would be created. 
> Setting parallelism to 3 so that each spark partition will contain a hudi 
> partition.    val parallelism = if 
> (sortModeName.equals(BulkInsertSortMode.GLOBAL_SORT.name())) {      
> "hoodie.bulkinsert.shuffle.parallelism = 3,"    } else {      ""    }    
> spark.sql(      s"""         |create table $tableName (         |  id int,    
>      |  name string,         |  price double,         |  dt string         |) 
> using hudi         | tblproperties (         |  primaryKey = 'id',         |  
> preCombineField = 'name',         |  type = 'cow',         |  $parallelism    
>      |  hoodie.bulkinsert.sort.mode = '$sortModeName'         | )         | 
> partitioned by (dt)         | location 
> '${basePath.getCanonicalPath}/$tableName'        """.stripMargin)    
> spark.sql("set hoodie.sql.bulk.insert.enable = true")    spark.sql("set 
> hoodie.sql.insert.mode = non-strict")    spark.sql(      s"""insert into 
> $tableName  values         |(5, 'a', 35, '2021-05-21'),         |(1, 'a', 31, 
> '2021-01-21'),         |(3, 'a', 33, '2021-03-21'),         |(4, 'b', 16, 
> '2021-05-21'),         |(2, 'b', 18, '2021-01-21'),         |(6, 'b', 17, 
> '2021-03-21'),         |(8, 'a', 21, '2021-05-21'),         |(9, 'a', 22, 
> '2021-01-21'),         |(7, 'a', 23, '2021-03-21')         |""".stripMargin)  
>   assertResult(3)(spark.sql(s"select distinct _hoodie_file_name from 
> $tableName").count())  } {code}
> Fails due to 
> {code:java}
> requirement failed: Number of partitions (0) must be positive.
> java.lang.IllegalArgumentException: requirement failed: Number of partitions 
> (0) must be positive.
>       at scala.Predef$.require(Predef.scala:224)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.Repartition.<init>(basicLogicalOperators.scala:951)
>       at org.apache.spark.sql.Dataset.coalesce(Dataset.scala:2946)
>       at 
> org.apache.hudi.execution.bulkinsert.PartitionSortPartitionerWithRows.repartitionRecords(PartitionSortPartitionerWithRows.java:48)
>       at 
> org.apache.hudi.execution.bulkinsert.PartitionSortPartitionerWithRows.repartitionRecords(PartitionSortPartitionerWithRows.java:34)
>       at 
> org.apache.hudi.HoodieDatasetBulkInsertHelper$.prepareForBulkInsert(HoodieDatasetBulkInsertHelper.scala:124)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:763)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:239)
>       at 
> org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:107)
>       at 
> org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:60)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
>       at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
>       at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
>       at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3369)
>       at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
>       at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3368)
>       at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
>       at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
>       at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:643)
>       at 
> org.apache.spark.sql.hudi.TestInsertTable.testBulkInsertPartitioner(TestInsertTable.scala:1204)
>  {code}
> !image (1).png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to