Could this be related to the size of the lookup result ?

I tried to recreate a similar scenario on the spark shell which causes an exception:

scala> val rdd = org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 4, 3).map(x => ( ( 0,"52fb9b1a3004f07d1a87c8f3" ), Seq.fill(400000)(Random.nextFloat) )) ) rdd: org.apache.spark.api.java.JavaPairRDD[(Int, String),Seq[Float]] = org.apache.spark.api.java.JavaPairRDD@1481cb6e

scala> rdd.count()
res53: Long = 4

scala> rdd.lookup((0,"52fb9b1a3004f07d1a87c8f3"))
org.apache.spark.SparkException: Job aborted: Task 39.0:2 failed 4 times (most recent failure: Exception failure: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 3, required: 4) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



On 4/28/14 11:28 AM, Yadid Ayzenberg wrote:
Thanks for your answer.
I tried running on a single machine - master and worker on one host. I get exactly the same results. Very little CPU activity on the machine in question. The web UI shows a single task and its state is RUNNING. it will remain so indefinitely.
I have a single partition, and its size is 1626.2 MB

Currently the RDD has 200 elements, but I have tried it with 20 and the behavior is the same.
The key is of the form:  (0,52fb9aff3004f07d1a87c8ea)
Where the first number in the tuple is always 0, and the second one is some string that can appear more than once.

The RDD is created by using the newAPIHadoopRDD.

Any additional info I can provide?

Yadid




On 4/28/14 10:46 AM, Daniel Darabos wrote:
That is quite mysterious, and I do not think we have enough information to answer. JavaPairRDD<String, Tuple2>.lookup() works fine on a remote Spark cluster:

$ MASTER=spark://localhost:7077 bin/spark-shell
scala> val rdd = org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 10, 3).map(x => ((x%3).toString, (x, x%3))))
scala> rdd.lookup("1")
res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)]

You suggest maybe the driver does not receive a message from an executor. I guess it is likely possible, though it has not happened to me. I would recommend running on a single machine in the standalone setup. Start the master and worker on the same machine, run the application there too. This should eliminate network configuration problems.

If you still see the issue, I'd check whether the task has really completed. What do you see on the web UI? Is the executor using CPU?

Good luck.




On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg <ya...@media.mit.edu <mailto:ya...@media.mit.edu>> wrote:

    Can someone please suggest how I can move forward with this?
    My spark version is 0.9.1.
    The big challenge is that this issue is not recreated when
    running in local mode. What could be the difference?

    I would really appreciate any pointers, as currently the the job
    just hangs.




    On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote:

        Some additional information - maybe this rings a bell with
        someone:

        I suspect this happens when the lookup returns more than one
        value.
        For 0 and 1 values, the function behaves as you would expect.

        Anyone ?



        On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:

            Hi All,

            Im running a lookup on a JavaPairRDD<String, Tuple2>.
            When running on local machine - the lookup is
            successfull. However, when running a standalone cluster
            with the exact same dataset - one of the tasks never ends
            (constantly in RUNNING status).
            When viewing the worker log, it seems that the task has
            finished successfully:

            14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0
            locally
            14/04/25 13:40:38 INFO Executor: Serialized size of
            result for 2 is 10896794
            14/04/25 13:40:38 INFO Executor: Sending result for 2
            directly to driver
            14/04/25 13:40:38 INFO Executor: Finished task ID 2

            But it seems the driver is not aware of this, and hangs
            indefinitely.

            If I execute a count priot to the lookup - I get the
            correct number which suggests that the cluster is
            operating as expected.

            The exact same scenario works with a different type of
            key (Tuple2): JavaPairRDD<Tuple2, Tuple2>.

            Any ideas on how to debug this problem ?

            Thanks,

            Yadid






Reply via email to