Sorry, I didn't figure out how to use the gist to share code yet, just
paste the code below. As I mentioned in below email, the line in function
planDuplicateFix() returns empty file list that results in exception of
dupeDataSql.

val latestFiles: java.util.List[HoodieDataFile] =
fsView.getLatestDataFiles().collect(Collectors.toList[HoodieDataFile]())

-----------------------------------------------------------------------------------
  private def planDuplicateFix(): HashMap[String, HashSet[String]] = {

    val tmpTableName = s"htbl_${System.currentTimeMillis()}"
    val dedupeTblName = s"${tmpTableName}_dupeKeys"

    val metadata = new HoodieTableMetaClient(fs.getConf, basePath)

    val allFiles = fs.listStatus(new
org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath"))
    LOG.info(s"count of file status ${allFiles.length}")
    LOG.info(s"jack: $basePath/$duplicatedPartitionPath")

    val fsView = new HoodieTableFileSystemView(metadata,
metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(),
allFiles)
    val latestFiles: java.util.List[HoodieDataFile] =
fsView.getLatestDataFiles().collect(Collectors.toList[HoodieDataFile]())
    val filteredStatuses = latestFiles.map(f => f.getPath)

    LOG.info(s"file count: ${latestFiles.length}")
    LOG.info(s" List of files under partition: ${} =>
${filteredStatuses.mkString(" ")}")

    val df = sqlContext.parquetFile(filteredStatuses: _*)
    df.registerTempTable(tmpTableName)
    val dupeKeyDF = getDupeKeyDF(tmpTableName)
    dupeKeyDF.registerTempTable(dedupeTblName)

    // Obtain necessary satellite information for duplicate rows
    val dupeDataSql =
      s"""
        SELECT `_hoodie_record_key`, `_hoodie_partition_path`,
`_hoodie_file_name`, `_hoodie_commit_time`
        FROM $tmpTableName h
        JOIN $dedupeTblName d
        ON h.`_hoodie_record_key` = d.dupe_key
                      """
    val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r =>
r.getString(0))
    val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()







On Thu, Apr 25, 2019 at 8:25 AM Vinoth Chandar <[email protected]> wrote:

> Hi Jack,
>
> Trying to understand what your goal is. DeDupeSparkJob is used for
> repairing datasets with repairs.. Is this your intention?
> Mailing list does not show images. :( Can you please post the code here or
> in a gist? I can take a look.
>
> Thanks
> Vinoth
>
> On Wed, Apr 24, 2019 at 3:57 AM Jack Wang <[email protected]>
> wrote:
>
> > Hi forks,
> >
> > I have some issue when using Hudi to dedup, seems it doesn't work. Below
> > is the command I tried with Spark:
> >
> > spark-submit --master "spark://XXX.XXX.XXX.39:6066" \
> > --deploy-mode cluster \
> > --conf spark.sql.warehouse.dir=s3a://vungle2-dataeng/temp/dw/ \
> > --conf spark.eventLog.enabled=false \
> > --conf spark.hadoop.fs.s3a.secret.key=XXXXXXXX \
> > --conf spark.hadoop.fs.s3a.access.key=XXXXXXXX \
> > --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
> > --total-executor-cores 6 --executor-memory 1g  \
> > --jars
> >
> s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-client-0.4.6-SNAPSHOT.jar,s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-common-0.4.6-SNAPSHOT.jar
> > --class com.uber.hoodie.cli.commands.SparkMain  \
> >
> s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-cli-0.4.6-SNAPSHOT.jar
> > DEDUPLICATE 2019-04-22_07/
> > s3a://vungle2-dataeng/temp/stage20190422/2019-04-22_07
> > s3a://vungle2-dataeng/jun-test/stage20190422
> >
> > It always throws exception like below, from the error message, I checked
> > the parquet files on partition 2019-04-22_07, and I found there are 43
> > files, but Hudi CLI cannot enumerate files, and just returns empty, this
> > results in failure on spark sql used to find duplicated record keys.
> >
> > Lastly, I located the code which rises the exception:
> > [image: Screen Shot 2019-04-24 at 6.48.49 PM.png]
> >
> > The highlighted line returns empty file list. Could anyone help to
> explain
> > the potential reason? Thanks very much.
> >
> >
> > Below is the exception message for diagnose:
> >
> >
> =============================================================================
> > 19/04/24 09:50:11 INFO DedupeSparkJob:  List of files under partition: ()
> > =>
> > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> > Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> > 172.19.100.10:57930) with ID 4
> > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> > manager ip-172-19-106-60:46877 with 366.3 MB RAM, BlockManagerId(5,
> > ip-172-19-106-60, 46877, None)
> > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> > manager ip-172-19-100-10:32772 with 366.3 MB RAM, BlockManagerId(4,
> > ip-172-19-100-10, 32772, None)
> > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> > Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> > 172.19.104.28:46117) with ID 0
> > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> > Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> > 172.19.102.90:32967) with ID 2
> > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> > manager ip-172-19-104-28:33312 with 366.3 MB RAM, BlockManagerId(0,
> > ip-172-19-104-28, 33312, None)
> > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> > Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> > 172.19.111.216:58010) with ID 1
> > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> > manager ip-172-19-102-90:35450 with 366.3 MB RAM, BlockManagerId(2,
> > ip-172-19-102-90, 35450, None)
> > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> > manager ip-172-19-111-216:34525 with 366.3 MB RAM, BlockManagerId(1,
> > ip-172-19-111-216, 34525, None)
> > 19/04/24 09:50:12 INFO SharedState: Setting hive.metastore.warehouse.dir
> > ('null') to the value of spark.sql.warehouse.dir ('s3
> > a://vungle2-dataeng/temp/dw/').
> > 19/04/24 09:50:12 INFO SharedState: Warehouse path is 's3
> > a://vungle2-dataeng/temp/dw/'.
> > 19/04/24 09:50:13 INFO StateStoreCoordinatorRef: Registered
> > StateStoreCoordinator endpoint
> > Exception in thread "main" java.lang.reflect.InvocationTargetException
> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >         at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >         at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >         at java.lang.reflect.Method.invoke(Method.java:498)
> >         at
> >
> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
> >         at
> > org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> > Caused by: org.apache.spark.sql.AnalysisException: cannot resolve
> > '`_hoodie_record_key`' given input columns: []; line 5 pos 15;
> > 'Filter ('dupe_cnt > 1)
> > +- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0,
> > count(1) AS dupe_cnt#1L]
> >    +- SubqueryAlias htbl_1556099410774
> >       +- LogicalRDD false
> >
> >         at
> >
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> >         at
> >
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
> >         at
> >
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
> >         at
> >
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
> >         at
> >
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
> >         at
> >
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> >         at
> >
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
> >         at
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
> >         at
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
> >         at
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
> >         at
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
> >         at
> >
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> >         at
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
> >         at
> > org.apache.spark.sql.catalyst.plans.QueryPlan.org
> $apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:118
> > )
> >         at
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >         at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >         at
> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> >         at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >         at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >         at
> > org.apache.spark.sql.catalyst.plans.QueryPlan.org
> $apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:122
> > )
> >         at
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
> >         at
> >
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> >         at
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
> >         at
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
> >         at
> >
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
> >         at
> >
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
> >         at
> >
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
> >         at
> >
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
> >         at
> >
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
> >         at scala.collection.immutable.List.foreach(List.scala:381)
> >         at
> >
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
> >         at
> >
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
> >         at
> >
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
> >         at
> >
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
> >         at
> >
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
> >         at
> >
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
> >         at
> >
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
> >         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
> >         at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
> >         at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
> >         at
> > com.uber.hoodie.cli.DedupeSparkJob.getDupeKeyDF(DedupeSparkJob.scala:62)
> >         at
> >
> com.uber.hoodie.cli.DedupeSparkJob.planDuplicateFix(DedupeSparkJob.scala:93)
> >         at
> >
> com.uber.hoodie.cli.DedupeSparkJob.fixDuplicates(DedupeSparkJob.scala:142)
> >         at
> >
> com.uber.hoodie.cli.commands.SparkMain.deduplicatePartitionPath(SparkMain.java:217
> > )
> >         at com.uber.hoodie.cli.commands.SparkMain.main(SparkMain.java:62)
> >         ... 6 more
> >
> > --
> > [image: vshapesaqua11553186012.gif] <https://vungle.com/>   *Jianbin
> Wang*
> > Sr. Engineer II, Data
> > +86 18633600964
> >
> > [image: in1552694272.png] <https://www.linkedin.com/company/vungle>
> [image:
> > fb1552694203.png] <https://facebook.com/vungle>      [image:
> > tw1552694330.png] <https://twitter.com/vungle>      [image:
> > ig1552694392.png] <https://www.instagram.com/vungle>
> > Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China
> >
> >
>


-- 
[image: vshapesaqua11553186012.gif] <https://vungle.com/>   *Jianbin Wang*
Sr. Engineer II, Data
+86 18633600964

[image: in1552694272.png] <https://www.linkedin.com/company/vungle>    [image:
fb1552694203.png] <https://facebook.com/vungle>      [image:
tw1552694330.png] <https://twitter.com/vungle>      [image:
ig1552694392.png] <https://www.instagram.com/vungle>
Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China

Reply via email to