Thanks. Are there commits on the timeline? if so, then it should return set
of valid data files..
This API is very commonly used across the code, so that's my best guess.
Happy to drill into this more.

On Wed, Apr 24, 2019 at 6:33 PM Jack Wang <[email protected]>
wrote:

> Sorry, I didn't figure out how to use the gist to share code yet, just
> paste the code below. As I mentioned in below email, the line in function
> planDuplicateFix() returns empty file list that results in exception of
> dupeDataSql.
>
> val latestFiles: java.util.List[HoodieDataFile] =
> fsView.getLatestDataFiles().collect(Collectors.toList[HoodieDataFile]())
>
>
> -----------------------------------------------------------------------------------
>   private def planDuplicateFix(): HashMap[String, HashSet[String]] = {
>
>     val tmpTableName = s"htbl_${System.currentTimeMillis()}"
>     val dedupeTblName = s"${tmpTableName}_dupeKeys"
>
>     val metadata = new HoodieTableMetaClient(fs.getConf, basePath)
>
>     val allFiles = fs.listStatus(new
> org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath"))
>     LOG.info(s"count of file status ${allFiles.length}")
>     LOG.info(s"jack: $basePath/$duplicatedPartitionPath")
>
>     val fsView = new HoodieTableFileSystemView(metadata,
> metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(),
> allFiles)
>     val latestFiles: java.util.List[HoodieDataFile] =
> fsView.getLatestDataFiles().collect(Collectors.toList[HoodieDataFile]())
>     val filteredStatuses = latestFiles.map(f => f.getPath)
>
>     LOG.info(s"file count: ${latestFiles.length}")
>     LOG.info(s" List of files under partition: ${} =>
> ${filteredStatuses.mkString(" ")}")
>
>     val df = sqlContext.parquetFile(filteredStatuses: _*)
>     df.registerTempTable(tmpTableName)
>     val dupeKeyDF = getDupeKeyDF(tmpTableName)
>     dupeKeyDF.registerTempTable(dedupeTblName)
>
>     // Obtain necessary satellite information for duplicate rows
>     val dupeDataSql =
>       s"""
>         SELECT `_hoodie_record_key`, `_hoodie_partition_path`,
> `_hoodie_file_name`, `_hoodie_commit_time`
>         FROM $tmpTableName h
>         JOIN $dedupeTblName d
>         ON h.`_hoodie_record_key` = d.dupe_key
>                       """
>     val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r =>
> r.getString(0))
>     val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
>
>
>
>
>
>
>
> On Thu, Apr 25, 2019 at 8:25 AM Vinoth Chandar <[email protected]> wrote:
>
> > Hi Jack,
> >
> > Trying to understand what your goal is. DeDupeSparkJob is used for
> > repairing datasets with repairs.. Is this your intention?
> > Mailing list does not show images. :( Can you please post the code here
> or
> > in a gist? I can take a look.
> >
> > Thanks
> > Vinoth
> >
> > On Wed, Apr 24, 2019 at 3:57 AM Jack Wang <[email protected]>
> > wrote:
> >
> > > Hi forks,
> > >
> > > I have some issue when using Hudi to dedup, seems it doesn't work.
> Below
> > > is the command I tried with Spark:
> > >
> > > spark-submit --master "spark://XXX.XXX.XXX.39:6066" \
> > > --deploy-mode cluster \
> > > --conf spark.sql.warehouse.dir=s3a://vungle2-dataeng/temp/dw/ \
> > > --conf spark.eventLog.enabled=false \
> > > --conf spark.hadoop.fs.s3a.secret.key=XXXXXXXX \
> > > --conf spark.hadoop.fs.s3a.access.key=XXXXXXXX \
> > > --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
> \
> > > --total-executor-cores 6 --executor-memory 1g  \
> > > --jars
> > >
> >
> s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-client-0.4.6-SNAPSHOT.jar,s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-common-0.4.6-SNAPSHOT.jar
> > > --class com.uber.hoodie.cli.commands.SparkMain  \
> > >
> >
> s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-cli-0.4.6-SNAPSHOT.jar
> > > DEDUPLICATE 2019-04-22_07/
> > > s3a://vungle2-dataeng/temp/stage20190422/2019-04-22_07
> > > s3a://vungle2-dataeng/jun-test/stage20190422
> > >
> > > It always throws exception like below, from the error message, I
> checked
> > > the parquet files on partition 2019-04-22_07, and I found there are 43
> > > files, but Hudi CLI cannot enumerate files, and just returns empty,
> this
> > > results in failure on spark sql used to find duplicated record keys.
> > >
> > > Lastly, I located the code which rises the exception:
> > > [image: Screen Shot 2019-04-24 at 6.48.49 PM.png]
> > >
> > > The highlighted line returns empty file list. Could anyone help to
> > explain
> > > the potential reason? Thanks very much.
> > >
> > >
> > > Below is the exception message for diagnose:
> > >
> > >
> >
> =============================================================================
> > > 19/04/24 09:50:11 INFO DedupeSparkJob:  List of files under partition:
> ()
> > > =>
> > > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> > > Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> > > 172.19.100.10:57930) with ID 4
> > > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> > > manager ip-172-19-106-60:46877 with 366.3 MB RAM, BlockManagerId(5,
> > > ip-172-19-106-60, 46877, None)
> > > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> > > manager ip-172-19-100-10:32772 with 366.3 MB RAM, BlockManagerId(4,
> > > ip-172-19-100-10, 32772, None)
> > > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> > > Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> > > 172.19.104.28:46117) with ID 0
> > > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> > > Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> > > 172.19.102.90:32967) with ID 2
> > > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> > > manager ip-172-19-104-28:33312 with 366.3 MB RAM, BlockManagerId(0,
> > > ip-172-19-104-28, 33312, None)
> > > 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> > > Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> > > 172.19.111.216:58010) with ID 1
> > > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> > > manager ip-172-19-102-90:35450 with 366.3 MB RAM, BlockManagerId(2,
> > > ip-172-19-102-90, 35450, None)
> > > 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> > > manager ip-172-19-111-216:34525 with 366.3 MB RAM, BlockManagerId(1,
> > > ip-172-19-111-216, 34525, None)
> > > 19/04/24 09:50:12 INFO SharedState: Setting
> hive.metastore.warehouse.dir
> > > ('null') to the value of spark.sql.warehouse.dir ('s3
> > > a://vungle2-dataeng/temp/dw/').
> > > 19/04/24 09:50:12 INFO SharedState: Warehouse path is 's3
> > > a://vungle2-dataeng/temp/dw/'.
> > > 19/04/24 09:50:13 INFO StateStoreCoordinatorRef: Registered
> > > StateStoreCoordinator endpoint
> > > Exception in thread "main" java.lang.reflect.InvocationTargetException
> > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > >         at
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > >         at
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > >         at
> > >
> >
> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
> > >         at
> > > org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> > > Caused by: org.apache.spark.sql.AnalysisException: cannot resolve
> > > '`_hoodie_record_key`' given input columns: []; line 5 pos 15;
> > > 'Filter ('dupe_cnt > 1)
> > > +- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS
> dupe_key#0,
> > > count(1) AS dupe_cnt#1L]
> > >    +- SubqueryAlias htbl_1556099410774
> > >       +- LogicalRDD false
> > >
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
> > >         at
> > > org.apache.spark.sql.catalyst.plans.QueryPlan.org
> >
> $apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:118
> > > )
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
> > >         at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > >         at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > >         at
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > >         at
> > > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> > >         at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > >         at
> > scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > >         at
> > > org.apache.spark.sql.catalyst.plans.QueryPlan.org
> >
> $apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:122
> > > )
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
> > >         at scala.collection.immutable.List.foreach(List.scala:381)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
> > >         at
> > >
> >
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
> > >         at
> > >
> >
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
> > >         at
> > >
> >
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
> > >         at
> > >
> >
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
> > >         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
> > >         at
> org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
> > >         at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
> > >         at
> > >
> com.uber.hoodie.cli.DedupeSparkJob.getDupeKeyDF(DedupeSparkJob.scala:62)
> > >         at
> > >
> >
> com.uber.hoodie.cli.DedupeSparkJob.planDuplicateFix(DedupeSparkJob.scala:93)
> > >         at
> > >
> >
> com.uber.hoodie.cli.DedupeSparkJob.fixDuplicates(DedupeSparkJob.scala:142)
> > >         at
> > >
> >
> com.uber.hoodie.cli.commands.SparkMain.deduplicatePartitionPath(SparkMain.java:217
> > > )
> > >         at
> com.uber.hoodie.cli.commands.SparkMain.main(SparkMain.java:62)
> > >         ... 6 more
> > >
> > > --
> > > [image: vshapesaqua11553186012.gif] <https://vungle.com/>   *Jianbin
> > Wang*
> > > Sr. Engineer II, Data
> > > +86 18633600964
> > >
> > > [image: in1552694272.png] <https://www.linkedin.com/company/vungle>
> > [image:
> > > fb1552694203.png] <https://facebook.com/vungle>      [image:
> > > tw1552694330.png] <https://twitter.com/vungle>      [image:
> > > ig1552694392.png] <https://www.instagram.com/vungle>
> > > Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China
> > >
> > >
> >
>
>
> --
> [image: vshapesaqua11553186012.gif] <https://vungle.com/>   *Jianbin Wang*
> Sr. Engineer II, Data
> +86 18633600964
>
> [image: in1552694272.png] <https://www.linkedin.com/company/vungle>
> [image:
> fb1552694203.png] <https://facebook.com/vungle>      [image:
> tw1552694330.png] <https://twitter.com/vungle>      [image:
> ig1552694392.png] <https://www.instagram.com/vungle>
> Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China
>

Reply via email to