Hi Jack,

Trying to understand what your goal is. DeDupeSparkJob is used for
repairing datasets with repairs.. Is this your intention?
Mailing list does not show images. :( Can you please post the code here or
in a gist? I can take a look.

Thanks
Vinoth

On Wed, Apr 24, 2019 at 3:57 AM Jack Wang <[email protected]>
wrote:

> Hi forks,
>
> I have some issue when using Hudi to dedup, seems it doesn't work. Below
> is the command I tried with Spark:
>
> spark-submit --master "spark://XXX.XXX.XXX.39:6066" \
> --deploy-mode cluster \
> --conf spark.sql.warehouse.dir=s3a://vungle2-dataeng/temp/dw/ \
> --conf spark.eventLog.enabled=false \
> --conf spark.hadoop.fs.s3a.secret.key=XXXXXXXX \
> --conf spark.hadoop.fs.s3a.access.key=XXXXXXXX \
> --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
> --total-executor-cores 6 --executor-memory 1g  \
> --jars
> s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-client-0.4.6-SNAPSHOT.jar,s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-common-0.4.6-SNAPSHOT.jar
> --class com.uber.hoodie.cli.commands.SparkMain  \
> s3a://vungle2-dataeng/hudi-demo/jack-test/config/hoodie-cli-0.4.6-SNAPSHOT.jar
> DEDUPLICATE 2019-04-22_07/
> s3a://vungle2-dataeng/temp/stage20190422/2019-04-22_07
> s3a://vungle2-dataeng/jun-test/stage20190422
>
> It always throws exception like below, from the error message, I checked
> the parquet files on partition 2019-04-22_07, and I found there are 43
> files, but Hudi CLI cannot enumerate files, and just returns empty, this
> results in failure on spark sql used to find duplicated record keys.
>
> Lastly, I located the code which rises the exception:
> [image: Screen Shot 2019-04-24 at 6.48.49 PM.png]
>
> The highlighted line returns empty file list. Could anyone help to explain
> the potential reason? Thanks very much.
>
>
> Below is the exception message for diagnose:
>
> =============================================================================
> 19/04/24 09:50:11 INFO DedupeSparkJob:  List of files under partition: ()
> =>
> 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> 172.19.100.10:57930) with ID 4
> 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> manager ip-172-19-106-60:46877 with 366.3 MB RAM, BlockManagerId(5,
> ip-172-19-106-60, 46877, None)
> 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> manager ip-172-19-100-10:32772 with 366.3 MB RAM, BlockManagerId(4,
> ip-172-19-100-10, 32772, None)
> 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> 172.19.104.28:46117) with ID 0
> 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> 172.19.102.90:32967) with ID 2
> 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> manager ip-172-19-104-28:33312 with 366.3 MB RAM, BlockManagerId(0,
> ip-172-19-104-28, 33312, None)
> 19/04/24 09:50:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
> Registered executor NettyRpcEndpointRef(spark-client://Executor) (
> 172.19.111.216:58010) with ID 1
> 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> manager ip-172-19-102-90:35450 with 366.3 MB RAM, BlockManagerId(2,
> ip-172-19-102-90, 35450, None)
> 19/04/24 09:50:11 INFO BlockManagerMasterEndpoint: Registering block
> manager ip-172-19-111-216:34525 with 366.3 MB RAM, BlockManagerId(1,
> ip-172-19-111-216, 34525, None)
> 19/04/24 09:50:12 INFO SharedState: Setting hive.metastore.warehouse.dir
> ('null') to the value of spark.sql.warehouse.dir ('s3
> a://vungle2-dataeng/temp/dw/').
> 19/04/24 09:50:12 INFO SharedState: Warehouse path is 's3
> a://vungle2-dataeng/temp/dw/'.
> 19/04/24 09:50:13 INFO StateStoreCoordinatorRef: Registered
> StateStoreCoordinator endpoint
> Exception in thread "main" java.lang.reflect.InvocationTargetException
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
>         at
> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> Caused by: org.apache.spark.sql.AnalysisException: cannot resolve
> '`_hoodie_record_key`' given input columns: []; line 5 pos 15;
> 'Filter ('dupe_cnt > 1)
> +- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0,
> count(1) AS dupe_cnt#1L]
>    +- SubqueryAlias htbl_1556099410774
>       +- LogicalRDD false
>
>         at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>         at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
>         at
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
>         at
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
>         at
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
>         at
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
>         at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>         at
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
>         at
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:118
> )
>         at
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:122
> )
>         at
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>         at
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
>         at
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
>         at
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
>         at
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
>         at
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
>         at
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
>         at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
>         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
>         at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
>         at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
>         at
> com.uber.hoodie.cli.DedupeSparkJob.getDupeKeyDF(DedupeSparkJob.scala:62)
>         at
> com.uber.hoodie.cli.DedupeSparkJob.planDuplicateFix(DedupeSparkJob.scala:93)
>         at
> com.uber.hoodie.cli.DedupeSparkJob.fixDuplicates(DedupeSparkJob.scala:142)
>         at
> com.uber.hoodie.cli.commands.SparkMain.deduplicatePartitionPath(SparkMain.java:217
> )
>         at com.uber.hoodie.cli.commands.SparkMain.main(SparkMain.java:62)
>         ... 6 more
>
> --
> [image: vshapesaqua11553186012.gif] <https://vungle.com/>   *Jianbin Wang*
> Sr. Engineer II, Data
> +86 18633600964
>
> [image: in1552694272.png] <https://www.linkedin.com/company/vungle>    [image:
> fb1552694203.png] <https://facebook.com/vungle>      [image:
> tw1552694330.png] <https://twitter.com/vungle>      [image:
> ig1552694392.png] <https://www.instagram.com/vungle>
> Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China
>
>

Reply via email to