Hi Dhiren,

How many resources are your using when submit this job?
For large scale data, the simple solution is to use more resources for the
calculation job.
For limited resources, a common solution is to partition your data by date
or hour, to make it smaller in each partition, then you can calculate the
partial data each time.

Thanks,
Lionel




On Mon, Dec 3, 2018 at 3:33 PM Dhiren Sangani <dhiren.sang...@enquero.com>
wrote:

> Hi Dev Team,
>
> I am facing issue with Accuracy measure while running job on larger
> dataset (1B records).
> I have source and target tables in Hive and using simple accuracy rule as
> below:
>
> Source.Column1 = target.column1 AND source.column2 = target.column2
>
> Getting below exception while running the job.
>
> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
> java.util.concurrent.TimeoutException: Futures timed out after [3600
> seconds]
>         at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
>         at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
>         at
> org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>         at
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>        at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>         at
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:185)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>         at
> org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:110)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>         at
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
>         at org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
>         at
> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
>         at
> org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2727)
>         at
> org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
>         at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
>         at org.apache.spark.sql.Dataset.collect(Dataset.scala:2727)
>         at
> org.apache.griffin.measure.step.write.MetricWriteStep.getMetricMaps(MetricWriteStep.scala:86)
>         at
> org.apache.griffin.measure.step.write.MetricWriteStep.execute(MetricWriteStep.scala:46)
>         at
> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:37)
>         at
> org.apache.griffin.measure.step.SeqDQStep$$anonfun$execute$1.apply(SeqDQStep.scala:36)
>         at
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>         at scala.collection.immutable.List.foldLeft(List.scala:84)
>         at
> org.apache.griffin.measure.step.SeqDQStep.execute(SeqDQStep.scala:36)
>         at
> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:31)
>         at
> org.apache.griffin.measure.job.DQJob$$anonfun$execute$1.apply(DQJob.scala:30)
>         at
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>         at scala.collection.immutable.List.foldLeft(List.scala:84)
>         at org.apache.griffin.measure.job.DQJob.execute(DQJob.scala:30)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply$mcZ$sp(BatchDQApp.scala:103)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp$$anonfun$run$1.apply(BatchDQApp.scala:67)
>         at scala.util.Try$.apply(Try.scala:192)
>         at
> org.apache.griffin.measure.launch.batch.BatchDQApp.run(BatchDQApp.scala:67)
>         at
> org.apache.griffin.measure.Application$.main(Application.scala:88)
>         at org.apache.griffin.measure.Application.main(Application.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()
>
> I tried to run Job with half of the data sets (500M) and it run
> successfully.
> Below is the log of the metric.
> 18/11/30 20:37:31 INFO MetricWriteStep: metricMaps =>
> WrappedArray(Map(total -> 508880897, miss -> 0, matched -> 508880897,
> matchedFraction -> 1.0))
>
> But if I try to run it even with 700M records, it fails.
> 18/11/30 21:44:12 ERROR MetricWriteStep: get metric accuracy fails
> 18/11/30 21:44:12 INFO MetricWriteStep: metricMaps => List()
>
> Is it something related to Join query being used inside
> AccuracyExpr2DQSteps.scala to calculate miss records?
>
> Any pointers will be appreciated.
>
> Thanks,
> Dhiren
>
>

Reply via email to