Also noticed that there are 8 - "dispatcher-event-loop-0 .... 7" and 8 -
"map-output-dispatcher-0 .... 7" all waiting at the same location in the
code that is -
*sun.misc.Unsafe.park(Native Method)*
*java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)*
*java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)*
*java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)*
org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:338)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

So clearly there is a race condition. May be only option is to avoid it...
but how ??


On Fri, Mar 24, 2017 at 5:40 PM Ravindra <ravindra.baj...@gmail.com> wrote:

> Hi All,
>
> My Spark job hangs here... Looking into the thread dump I noticed that it
> hangs here (stack trace given below) on the count action on dataframe
> (given below). Data is very small. Its actually not more than even 10 rows.
>
> I noticed some JIRAs about this issue but all are resolved-closed in
> previous versions.
>
> Its running with 1 executor. Also noticed that the Storage tab is empty so
> no dataframe is cached.
>
> Looking into the DAGScheduler, I notice its stuck at runJob, probably its
> trying to run tasks concurrently and waiting here
>
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
> *org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)*
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> org.apache.spark.rdd.RDD.collect(RDD.scala:911)
>
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
>
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
>
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
> org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
> org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2227)
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2226)
> org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
> org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
>
>
> Spark Properties
> NameValue
> spark.app.id local-1490350724879
> spark.app.name TestJob
> spark.default.parallelism 1
> spark.driver.allowMultipleContexts true
> spark.driver.host localhost
> spark.driver.memory 4g
> spark.driver.port 63405
> spark.executor.id driver
> spark.executor.memory 4g
> spark.hadoop.validateOutputSpecs false
> spark.master local[2]
> spark.scheduler.mode FIFO
> spark.sql.catalogImplementation hive
> spark.sql.crossJoin.enabled true
> spark.sql.shuffle.partitions 1
> spark.sql.warehouse.dir /tmp/hive/spark-warehouse
> spark.ui.enabled true
> spark.yarn.executor.memoryOverhead 2048
>
> Its count action on the given below dataframe -
> == Parsed Logical Plan ==
> Project [hash_composite_keys#27440L, id#27441, name#27442, master#27439,
> created_time#27448]
> +- Filter (rn#27493 = 1)
>    +- Project [hash_composite_keys#27440L, id#27441, name#27442,
> master#27439, created_time#27448, rn#27493]
>       +- Project [hash_composite_keys#27440L, id#27441, name#27442,
> master#27439, created_time#27448, rn#27493, rn#27493]
>          +- Window [rownumber()
> windowspecdefinition(hash_composite_keys#27440L, created_time#27448 ASC,
> ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn#27493],
> [hash_composite_keys#27440L], [created_time#27448 ASC]
>             +- Project [hash_composite_keys#27440L, id#27441, name#27442,
> master#27439, created_time#27448]
>                +- Union
>                   :- Project [hash_composite_keys#27440L, id#27441,
> name#27442, master#27439, cast(0 as timestamp) AS created_time#27448]
>                   :  +- Project [hash_composite_keys#27440L, id#27441,
> name#27442, master#27439]
>                   :     +- SubqueryAlias table1
>                   :        +-
> Relation[hash_composite_keys#27440L,id#27441,name#27442,master#27439]
> parquet
>                   +- Project [hash_composite_keys#27391L, id#27397,
> name#27399, master#27401, created_time#27414]
>                      +- Project [id#27397, name#27399,
> hash_composite_keys#27391L, master#27401, cast(if
> (isnull(created_time#27407L)) null else UDF(created_time#27407L) as
> timestamp) AS created_time#27414]
>                         +- Project [id#27397, name#27399,
> hash_composite_keys#27391L, master#27401, 1490350732895 AS
> created_time#27407L]
>                            +- Project [id#27397, name#27399,
> hash_composite_keys#27391L, master AS master#27401]
>                               +- Aggregate [hash_composite_keys#27391L],
> [first(id#27389, false) AS id#27397, first(name#27390, false) AS
> name#27399, hash_composite_keys#27391L]
>                                  +-
> Relation[id#27389,name#27390,hash_composite_keys#27391L] parquet
>
>
>
>

Reply via email to