On Wed, Jun 18, 2014 at 6:19 PM, Surendranauth Hiraman
<suren.hira...@velos.io> wrote:
> Patrick,
>
> My team is using shuffle consolidation but not speculation. We are also
> using persist(DISK_ONLY) for caching.


Use of shuffle consolidation is probably what is causing the issue.
Would be good idea to try again with that turned off (which is the default).

It should get fixed most likely in 1.1 timeframe.


Regards,
Mridul


>
> Here are some config changes that are in our work-in-progress.
>
> We've been trying for 2 weeks to get our production flow (maybe around
> 50-70 stages, a few forks and joins with up to 20 branches in the forks) to
> run end to end without any success, running into other problems besides
> this one as well. For example, we have run into situations where saving to
> HDFS just hangs on a couple of tasks, which are printing out nothing in
> their logs and not taking any CPU. For testing, our input data is 10 GB
> across 320 input splits and generates maybe around 200-300 GB of
> intermediate and final data.
>
>
>         conf.set("spark.executor.memory", "14g")     // TODO make this
> configurable
>
>         // shuffle configs
>         conf.set("spark.default.parallelism", "320") // TODO make this
> configurable
>         conf.set("spark.shuffle.consolidateFiles","true")
>
>         conf.set("spark.shuffle.file.buffer.kb", "200")
>         conf.set("spark.reducer.maxMbInFlight", "96")
>
>         conf.set("spark.rdd.compress","true"
>
>         // we ran into a problem with the default timeout of 60 seconds
>         // this is also being set in the master's spark-env.sh. Not sure if
> it needs to be in both places
>         conf.set("spark.worker.timeout","180")
>
>         // akka settings
>         conf.set("spark.akka.threads", "300")
>         conf.set("spark.akka.timeout", "180")
>         conf.set("spark.akka.frameSize", "100")
>         conf.set("spark.akka.batchSize", "30")
>         conf.set("spark.akka.askTimeout", "30")
>
>         // block manager
>         conf.set("spark.storage.blockManagerTimeoutIntervalMs", "180000")
>         conf.set("spark.blockManagerHeartBeatMs", "80000")
>
> -Suren
>
>
>
> On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell <pwend...@gmail.com> wrote:
>
>> Out of curiosity - are you guys using speculation, shuffle
>> consolidation, or any other non-default option? If so that would help
>> narrow down what's causing this corruption.
>>
>> On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
>> <suren.hira...@velos.io> wrote:
>> > Matt/Ryan,
>> >
>> > Did you make any headway on this? My team is running into this also.
>> > Doesn't happen on smaller datasets. Our input set is about 10 GB but we
>> > generate 100s of GBs in the flow itself.
>> >
>> > -Suren
>> >
>> >
>> >
>> >
>> > On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <compton.r...@gmail.com>
>> wrote:
>> >
>> >> Just ran into this today myself. I'm on branch-1.0 using a CDH3
>> >> cluster (no modifications to Spark or its dependencies). The error
>> >> appeared trying to run GraphX's .connectedComponents() on a ~200GB
>> >> edge list (GraphX worked beautifully on smaller data).
>> >>
>> >> Here's the stacktrace (it's quite similar to yours
>> >> https://imgur.com/7iBA4nJ ).
>> >>
>> >> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
>> >> 4 times; aborting job
>> >> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
>> >> VertexRDD.scala:100
>> >> Exception in thread "main" org.apache.spark.SparkException: Job
>> >> aborted due to stage failure: Task 5.599:39 failed 4 times, most
>> >> recent failure: Exception failure in TID 29735 on host node18:
>> >> java.io.StreamCorruptedException: invalid type code: AC
>> >>
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
>> >>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>> >>
>> >>
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>> >>
>> >>
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
>> >>
>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> >>
>> >>
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>> >>
>> >>
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>
>> >>
>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
>> >>
>> >>
>> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
>> >>
>> >>
>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
>> >>
>> >>
>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
>> >>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>
>> >>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>> >>
>> >>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>> >>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>> >>
>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>> >>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >>         java.lang.Thread.run(Thread.java:662)
>> >> Driver stacktrace:
>> >> at org.apache.spark.scheduler.DAGScheduler.org
>> >>
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>> >> at
>> >>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> >> at scala.Option.foreach(Option.scala:236)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> >> at
>> >>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >> at
>> >>
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >> at
>> >>
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
>> >>
>> >> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com> wrote:
>> >> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mki...@oculusinfo.com>
>> >> wrote:
>> >> >> Im trying run some spark code on a cluster but I keep running into a
>> >> >> "java.io.StreamCorruptedException: invalid type code: AC" error. My
>> task
>> >> >> involves analyzing ~50GB of data (some operations involve sorting)
>> then
>> >> >> writing them out to a JSON file. Im running the analysis on each of
>> the
>> >> >> data's ~10 columns and have never had a successful run. My program
>> >> seems to
>> >> >> run for a varying amount of time each time (~between 5-30 minutes)
>> but
>> >> it
>> >> >> always terminates with this error.
>> >> >
>> >> > I can tell you that this usually means somewhere something wrote
>> >> > objects to the same OutputStream with multiple ObjectOutputStreams. AC
>> >> > is a header value.
>> >> >
>> >> > I don't obviously see where/how that could happen, but maybe it rings
>> >> > a bell for someone. This could happen if an OutputStream is reused
>> >> > across object serializations but new ObjectOutputStreams are opened,
>> >> > for example.
>> >>
>> >
>> >
>> >
>> > --
>> >
>> > SUREN HIRAMAN, VP TECHNOLOGY
>> > Velos
>> > Accelerating Machine Learning
>> >
>> > 440 NINTH AVENUE, 11TH FLOOR
>> > NEW YORK, NY 10001
>> > O: (917) 525-2466 ext. 105
>> > F: 646.349.4063
>> > E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io
>> > W: www.velos.io
>>
>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io
> W: www.velos.io

Reply via email to