Re: Spark distributed SQL: JSON Data set on all worker node
Yes it is possible. You need to use jsonfile method on SQL context and then create a dataframe from the rdd. Then register it as a table. Should be 3 lines of code, thanks to spark. You may see few YouTube video esp for unifying pipelines. On 3 May 2015 19:02, Jai jai4l...@gmail.com wrote: Hi, I am noob to spark and related technology. i have JSON stored at same location on all worker clients spark cluster). I am looking to load JSON data set on these clients and do SQL query, like distributed SQL. is it possible to achieve? right now, master submits task to one node only. Thanks and regards Mrityunjay
Hardware requirements
I need to use spark to upload a 500 GB data from hadoop on standalone mode cluster what are the minimum hardware requirements if it's known that it will be used for advanced analysis (social network analysis)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hardware-requirements-tp22744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark distributed SQL: JSON Data set on all worker node
Hi, I am noob to spark and related technology. i have JSON stored at same location on all worker clients spark cluster). I am looking to load JSON data set on these clients and do SQL query, like distributed SQL. is it possible to achieve? right now, master submits task to one node only. Thanks and regards Mrityunjay
Re: Spark - Timeout Issues - OutOfMemoryError
I don't know the full context of what you're doing, but serialization errors usually mean you're attempting to serialize something that can't be serialized, like the SparkContext. Kryo won't help there. The arguments to spark-submit you posted previously look good: 2) --num-executors 96 --driver-memory 12g --driver-java-options -XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4 I suspect you aren't getting the parallelism you need. For partitioning, if your data is in HDFS and your block size is 128MB, then you'll get ~195 partitions anyway. If it takes 7 hours to do a join over 25GB of data, you have some other serious bottleneck. You should examine the web console and the logs to determine where all the time is going. Questions you might pursue: - How long does each task take to complete? - How many of those 195 partitions/tasks are processed at the same time? That is, how many slots are available? Maybe you need more nodes if the number of slots is too low. Based on your command arguments, you should be able to process 1/2 of them at a time, unless the cluster is busy. - Is the cluster swamped with other work? - How much data does each task process? Is the data roughly the same from one task to the next? If not, then you might have serious key skew? You may also need to research the details of how joins are implemented and some of the common tricks for organizing data to minimize having to shuffle all N by M records. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com wrote: IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts. Kryo is great, but if it's not working reliably for some reason, then don't use it. Rather than force 200 partitions, let Spark try to figure out a good-enough number. (If you really need to force a partition count, use the repartition method instead, unless you're overriding the partitioner.) So. I recommend that you eliminate all the optimizations: Kryo, partitionBy, etc. Just use the simplest code you can. Make it work first. Then, if it really isn't fast enough, look for actual evidence of bottlenecks and optimize those. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Dean Others, Thanks for your suggestions. I have two data sets and all i want to do is a simple equi join. I have 10G limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i switched back to use .join() API instead of map-side broadcast join. I am repartitioning the data with 100,200 and i see a NullPointerException now. When i run against 25G of each input and with .partitionBy(new org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption this trace does not include a line from my code and hence i do not what is causing error ? I do have registered kryo serializer. val conf = new SparkConf() .setAppName(detail) * .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)* .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) * .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* lMetricSum])) val sc = new SparkContext(conf) I see the exception when this task runs val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } Its a simple mapping of input records to (itemId, record) I found this
Long GC pauses with Spark SQL 1.3.0 and billion row tables
I'm currently trying to join two large tables (order 1B rows each) using Spark SQL (1.3.0) and am running into long GC pauses which bring the job to a halt. I'm reading in both tables using a HiveContext with the underlying files stored as Parquet Files. I'm using something along the lines of HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1) to set up the join. When I execute this (with an action such as .count) I see the first few stages complete, but the job eventually stalls. The GC counts keep increasing for each executor. Running with 6 workers, each with 2T disk and 100GB RAM. Has anyone else run into this issue? I'm thinking I might be running into issues with the shuffling of the data, but I'm unsure of how to get around this? Is there a way to redistribute the rows based on the join key first, and then do the join? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Drop a column from the DataFrame.
great thx Le sam. 2 mai 2015 à 23:58, Ted Yu yuzhih...@gmail.com a écrit : This is coming in 1.4.0 https://issues.apache.org/jira/browse/SPARK-7280 On May 2, 2015, at 2:27 PM, Olivier Girardot ssab...@gmail.com wrote: Sounds like a patch for a drop method... Le sam. 2 mai 2015 à 21:03, dsgriffin dsgrif...@gmail.com a écrit : Just use select() to create a new DataFrame with only the columns you want. Sort of the opposite of what you want -- but you can select all but the columns you want minus the one you don. You could even use a filter to remove just the one column you want on the fly: myDF.select(myDF.columns.filter(_ != column_you_do_not_want).map(colname = new Column(colname)).toList : _* ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Drop-a-column-from-the-DataFrame-tp22711p22737.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Questions about Accumulators
Yes that's it. If a partition is lost, to recompute it, some steps will need to be re-executed. Perhaps the map function in which you update the accumulator. I think you can do it more safely in a transformation near the action, where it is less likely that an error will occur (not always true...). You can also checkpoint the RDD after the step that updates the accumulator, so your transformation doesn't get applied again if some task fails. But this is kind of expensive considering you only want to update some counter... Another idea could be to implement a custom accumulator that holds a map of partition index - value and then on driver side merge the values in the map, but I never tried this not sure if it would really work. Cheers, Eugen 2015-05-03 14:08 GMT+02:00 xiazhuchang hk8...@163.com: The official document said In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed. I don't quite understand what is this mean. is that meas if i use the accumulator in transformations(i.e. map() operation), this operation will be execuated more than once if the task restarte? And then the final result will be many times of the real result? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Questions about Accumulators
Yes, correct. However, note that when an accumulator operation is *idempotent*, meaning that repeated application for the same data behaves exactly like one application, then that accumulator can be safely called in transformation steps (non-actions), too. For example, max and min tracking. Just last week I wrote one that used a hash map to track the latest timestamps seen for specific keys. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 8:07 AM, xiazhuchang hk8...@163.com wrote: “For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed. ” Is this mean the guarantees(accumulator only be updated once) only in actions? That is to say, one should use the accumulator only in actions, orelse there may be some errors(update more than once) if used in transformations? e.g. map(x = accumulator += x) After executed, the correct result of accumulator should be 1; Unfortunately, some errors happened, restart task, the map() operation re-executed(map(x = accumulator += x) re-executed), then the final result of acculumator will be 2, twice as the correct result? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746p22747.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark - Timeout Issues - OutOfMemoryError
How big is the data you're returning to the driver with collectAsMap? You are probably running out of memory trying to copy too much data back to it. If you're trying to force a map-side join, Spark can do that for you in some cases within the regular DataFrame/RDD context. See http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning and this talk by Michael Armbrust for example, http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Full Exception *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at VISummaryDataProvider.scala:37) failed in 884.087 s* *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s* 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)]) *Code at line 37* val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) } .collectAsMap Listing data set size is 26G (10 files) and my driver memory is 12G (I cant go beyond it). The reason i do collectAsMap is to brodcast it and do a map-side join instead of regular join. Please suggest ? On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: My Spark Job is failing and i see == 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] I see
Re: Spark - Timeout Issues - OutOfMemoryError
IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts. Kryo is great, but if it's not working reliably for some reason, then don't use it. Rather than force 200 partitions, let Spark try to figure out a good-enough number. (If you really need to force a partition count, use the repartition method instead, unless you're overriding the partitioner.) So. I recommend that you eliminate all the optimizations: Kryo, partitionBy, etc. Just use the simplest code you can. Make it work first. Then, if it really isn't fast enough, look for actual evidence of bottlenecks and optimize those. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Dean Others, Thanks for your suggestions. I have two data sets and all i want to do is a simple equi join. I have 10G limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i switched back to use .join() API instead of map-side broadcast join. I am repartitioning the data with 100,200 and i see a NullPointerException now. When i run against 25G of each input and with .partitionBy(new org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption this trace does not include a line from my code and hence i do not what is causing error ? I do have registered kryo serializer. val conf = new SparkConf() .setAppName(detail) * .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)* .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) * .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* lMetricSum])) val sc = new SparkContext(conf) I see the exception when this task runs val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } Its a simple mapping of input records to (itemId, record) I found this http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist and http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html Looks like Kryo (2.21v) changed something to stop using default constructors. (Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); Please suggest Trace: 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in stage 2.0 (TID 774) com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: values (org.apache.avro.generic.GenericData$Record) datum (org.apache.avro.mapred.AvroKey) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) Regards, Any suggestions. I am not able to get this thing to work over a month now, its kind of getting frustrating. On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com wrote: How big is the data you're returning to the driver with collectAsMap? You are probably running out of memory trying to copy too much data back to it. If you're trying to force a map-side join, Spark can do that for you in some cases within the regular DataFrame/RDD context. See http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning and this talk by Michael Armbrust for example, http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ
Re: Selecting download for 'hadoop 2.4 and later
See https://issues.apache.org/jira/browse/SPARK-5492 but I think you'll need to share the stack trace as I'm not sure how this can happen since the NoSuchMethodError (not NoSuchMethodException) indicates a call in the bytecode failed to link but there is only a call by reflection. On Fri, May 1, 2015 at 9:30 PM, Stephen Boesch java...@gmail.com wrote: What is the correct procedure for downloading a spark 1.2.X release for use with hadoop2.4? The existing download page has a link for hadoop 2.4+. However when using that with hadoop 2.4 an exception is thrown NoSuchMethodError for Statistics.getThreadStatistics. Upon brief investigation: The SparkHadoopUtil class invokes the hadoop FileSystem.Statistics.getThreadStatistics which exists in hadoop 2.5+ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Questions about Accumulators
The official document said In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed. I don't quite understand what is this mean. is that meas if i use the accumulator in transformations(i.e. map() operation), this operation will be execuated more than once if the task restarte? And then the final result will be many times of the real result? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Questions about Accumulators
Given the lazy nature of an RDD if you use an accumulator inside a map() and then you call count and saveAsTextfile over that accumulator will be called twice. IMHO, accumulators are a bit nondeterministic you need to be sure when to read them to avoid unexpected re-executions El 3/5/2015 2:09 p. m., xiazhuchang hk8...@163.com escribió: The official document said In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed. I don't quite understand what is this mean. is that meas if i use the accumulator in transformations(i.e. map() operation), this operation will be execuated more than once if the task restarte? And then the final result will be many times of the real result? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark distributed SQL: JSON Data set on all worker node
Looking at SQLContext.scala (in master branch), jsonFile() returns DataFrame directly: def jsonFile(path: String, samplingRatio: Double): DataFrame = FYI On Sun, May 3, 2015 at 2:14 AM, ayan guha guha.a...@gmail.com wrote: Yes it is possible. You need to use jsonfile method on SQL context and then create a dataframe from the rdd. Then register it as a table. Should be 3 lines of code, thanks to spark. You may see few YouTube video esp for unifying pipelines. On 3 May 2015 19:02, Jai jai4l...@gmail.com wrote: Hi, I am noob to spark and related technology. i have JSON stored at same location on all worker clients spark cluster). I am looking to load JSON data set on these clients and do SQL query, like distributed SQL. is it possible to achieve? right now, master submits task to one node only. Thanks and regards Mrityunjay
PriviledgedActionException- Executor error
Hi, i am running several jobs in standalone mode and i notice this error in the log files in some of my nodes at the start of my jobs: INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] INFO spark.SecurityManager: Changing view acls to: root INFO spark.SecurityManager: Changing modify acls to: root INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: NFO slf4j.Slf4jLogger: Slf4jLogger started INFO Remoting: Starting remoting ERROR security.UserGroupInformation: PriviledgedActionException as:root cause:java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] Exception in thread main java.lang.reflect.UndeclaredThrowableException: Unknown exception in doAs at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1134) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:163) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.security.PrivilegedActionException: java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) ... 4 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at akka.remote.Remoting.start(Remoting.scala:180) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1676) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1667) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:122) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) ... 7 more INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. These errors result in executor losses at the beginning and i have been trying to find a way to solve this with no success, so if anyone has a clue please let me know. Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PriviledgedActionException-Executor-error-tp22745.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Questions about Accumulators
“For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed. ” Is this mean the guarantees(accumulator only be updated once) only in actions? That is to say, one should use the accumulator only in actions, orelse there may be some errors(update more than once) if used in transformations? e.g. map(x = accumulator += x) After executed, the correct result of accumulator should be 1; Unfortunately, some errors happened, restart task, the map() operation re-executed(map(x = accumulator += x) re-executed), then the final result of acculumator will be 2, twice as the correct result? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746p22747.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL ThriftServer Impersonation Support
Thanks Andrew. What version of HS2 is the SparkSQL thrift server using? What would be involved in updating? Is it a simple case of increasing the deep version in one of the project POMs? Cheers, ~N On Sat, May 2, 2015 at 11:38 AM, Andrew Lee alee...@hotmail.com wrote: Hi N, See: https://issues.apache.org/jira/browse/SPARK-5159 I don't think it is yet supported until the HS2 code base is updated in Spark hive-thriftserver project. -- Date: Fri, 1 May 2015 15:56:30 +1000 Subject: Spark SQL ThriftServer Impersonation Support From: nightwolf...@gmail.com To: user@spark.apache.org Hi guys, Trying to use the SparkSQL Thriftserver with hive metastore. It seems that hive meta impersonation works fine (when running Hive tasks). However spinning up SparkSQL thrift server, impersonation doesn't seem to work... What settings do I need to enable impersonation? I've copied the same config as in my hive-site. Here is my launch command for the spark thrift server; --hiveconf hive.server2.enable.impersonation=true --hiveconf hive.server2.enable.doAs=true --hiveconf hive.metastore.execute.setugi=true Here is my full run script: export HIVE_SERVER2_THRIFT_BIND_HOST=0.0.0.0 export HIVE_SERVER2_THRIFT_PORT=1 export HIVE_CONF_DIR=/opt/mapr/hive/hive-0.13/conf/ export HIVE_HOME=/opt/mapr/hive/hive-0.13/ export HADOOP_HOME=/opt/mapr/hadoop/hadoop-2.5.1/ export HADOOP_CONF_DIR=/opt/mapr/hadoop/hadoop-2.5.1/etc/hadoop export EXECUTOR_MEMORY=30g export DRIVER_MEMORY=4g export EXECUTOR_CORES=15 export NUM_EXECUTORS=20 export KRYO_BUFFER=512 export SPARK_DRIVER_MAXRESULTSIZE=4096 export HIVE_METASTORE_URIS=thrift://localhost:9083 export HIVE_METASTORE_WAREHOUSE_DIR=/user/hive/warehouse export SPARK_DIST_CLASSPATH=/opt/mapr/lib/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/yarn/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/common/lib/*:/opt/mapr/hive/hive-current/lib/* export SPARK_LOG_DIR=/tmp/spark-log export SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=hdfs:///log/spark-events export SPARK_CONF_DIR=/apps/spark/global-conf/ export SPARK_HOME=/apps/spark/spark-1.3.1-bin-mapr4.0.2_yarn_j6_2.10 export SPARK_LIBRARY_PATH=/opt/mapr/lib/* export SPARK_JAVA_OPTS=-Djava.library.path=/opt/mapr/lib $SPARK_HOME/*sbin/start-thriftserver.sh* --master yarn-client --jars /opt/mapr/lib/libjpam.so --executor-memory $EXECUTOR_MEMORY --driver-memory $DRIVER_MEMORY --executor-cores $EXECUTOR_CORES --num-executors $NUM_EXECUTORS --conf spark.scheduler.mode=FAIR --conf spark.kryoserializer.buffer.mb=$KRYO_BUFFER --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.files.useFetchCache=false --conf spark.driver.maxResultSize=$SPARK_DRIVER_MAXRESULTSIZE --hiveconf hive.metastore.uris=$HIVE_METASTORE_URIS --hiveconf hive.metastore.warehouse.dir=$HIVE_METASTORE_WAREHOUSE_DIR --hiveconf hive.server2.enable.impersonation=true --hiveconf hive.server2.enable.doAs=true --hiveconf hive.metastore.execute.setugi=true Cheers, N
Re: Spark - Timeout Issues - OutOfMemoryError
Hello Dean Others, Thanks for your suggestions. I have two data sets and all i want to do is a simple equi join. I have 10G limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i switched back to use .join() API instead of map-side broadcast join. I am repartitioning the data with 100,200 and i see a NullPointerException now. When i run against 25G of each input and with .partitionBy(new org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption this trace does not include a line from my code and hence i do not what is causing error ? I do have registered kryo serializer. val conf = new SparkConf() .setAppName(detail) * .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)* .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) * .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* lMetricSum])) val sc = new SparkContext(conf) I see the exception when this task runs val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } Its a simple mapping of input records to (itemId, record) I found this http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist and http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html Looks like Kryo (2.21v) changed something to stop using default constructors. (Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); Please suggest Trace: 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in stage 2.0 (TID 774) com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: values (org.apache.avro.generic.GenericData$Record) datum (org.apache.avro.mapred.AvroKey) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) Regards, Any suggestions. I am not able to get this thing to work over a month now, its kind of getting frustrating. On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com wrote: How big is the data you're returning to the driver with collectAsMap? You are probably running out of memory trying to copy too much data back to it. If you're trying to force a map-side join, Spark can do that for you in some cases within the regular DataFrame/RDD context. See http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning and this talk by Michael Armbrust for example, http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Full Exception *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at VISummaryDataProvider.scala:37) failed in 884.087 s* *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s* 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw exception: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error sending message [message = GetLocations(taskresult_112)] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at
Re: Spark distributed SQL: JSON Data set on all worker node
Note that each JSON object has to be on a single line in the files. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 4:14 AM, ayan guha guha.a...@gmail.com wrote: Yes it is possible. You need to use jsonfile method on SQL context and then create a dataframe from the rdd. Then register it as a table. Should be 3 lines of code, thanks to spark. You may see few YouTube video esp for unifying pipelines. On 3 May 2015 19:02, Jai jai4l...@gmail.com wrote: Hi, I am noob to spark and related technology. i have JSON stored at same location on all worker clients spark cluster). I am looking to load JSON data set on these clients and do SQL query, like distributed SQL. is it possible to achieve? right now, master submits task to one node only. Thanks and regards Mrityunjay
Re: Spark - Timeout Issues - OutOfMemoryError
Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition. Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com wrote: IMHO, you are trying waaay to hard to optimize work on what is really a small data set. 25G, even 250G, is not that much data, especially if you've spent a month trying to get something to work that should be simple. All these errors are from optimization attempts. Kryo is great, but if it's not working reliably for some reason, then don't use it. Rather than force 200 partitions, let Spark try to figure out a good-enough number. (If you really need to force a partition count, use the repartition method instead, unless you're overriding the partitioner.) So. I recommend that you eliminate all the optimizations: Kryo, partitionBy, etc. Just use the simplest code you can. Make it work first. Then, if it really isn't fast enough, look for actual evidence of bottlenecks and optimize those. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Dean Others, Thanks for your suggestions. I have two data sets and all i want to do is a simple equi join. I have 10G limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i switched back to use .join() API instead of map-side broadcast join. I am repartitioning the data with 100,200 and i see a NullPointerException now. When i run against 25G of each input and with .partitionBy(new org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption this trace does not include a line from my code and hence i do not what is causing error ? I do have registered kryo serializer. val conf = new SparkConf() .setAppName(detail) * .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)* .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) * .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* lMetricSum])) val sc = new SparkContext(conf) I see the exception when this task runs val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } Its a simple mapping of input records to (itemId, record) I found this http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist and http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html Looks like Kryo (2.21v) changed something to stop using default constructors. (Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); Please suggest Trace: 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in stage 2.0 (TID 774) com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: values (org.apache.avro.generic.GenericData$Record) datum (org.apache.avro.mapred.AvroKey) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) Regards, Any suggestions. I am not able to get this thing to work over a month now, its kind of getting frustrating. On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com wrote: How big is the data you're returning to the driver with collectAsMap? You are probably running out of memory trying to
Re: PySpark: slicing issue with dataframes
Friendly reminder on this one. Just wanted to get a confirmation that this is not by design before I logged a JIRA Thanks! Ali On Tue, Apr 28, 2015 at 9:53 AM, Ali Bajwa ali.ba...@gmail.com wrote: Hi experts, Trying to use the slicing functionality in strings as part of a Spark program (PySpark) I get this error: Code import pandas as pd from pyspark.sql import SQLContext hc = SQLContext(sc) A = pd.DataFrame({'Firstname': ['James', 'Ali', 'Daniel'], 'Lastname': ['Jones', 'Bajwa', 'Day']}) a = hc.createDataFrame(A) print A b = a.select(a.Firstname[:2]) print b.toPandas() c = a.select(a.Lastname[2:]) print c.toPandas() Output: Firstname Lastname 0 JamesJones 1 AliBajwa 2Daniel Day SUBSTR(Firstname, 0, 2) 0 Ja 1 Al 2 Da --- Py4JError Traceback (most recent call last) ipython-input-17-6ee5d7d069ce in module() 10 b = a.select(a.Firstname[:2]) 11 print b.toPandas() --- 12 c = a.select(a.Lastname[2:]) 13 print c.toPandas() /home/jupyter/spark-1.3.1/python/pyspark/sql/dataframe.pyc in substr(self, startPos, length) 1089 raise TypeError(Can not mix the type) 1090 if isinstance(startPos, (int, long)): - 1091 jc = self._jc.substr(startPos, length) 1092 elif isinstance(startPos, Column): 1093 jc = self._jc.substr(startPos._jc, length._jc) /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 302 raise Py4JError( 303 'An error occurred while calling {0}{1}{2}. Trace:\n{3}\n'. -- 304 format(target_id, '.', name, value)) 305 else: 306 raise Py4JError( Py4JError: An error occurred while calling o1887.substr. Trace: py4j.Py4JException: Method substr([class java.lang.Integer, class java.lang.Long]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Looks like X[:2] works but X[2:] fails with the error above Anyone else have this issue? Clearly I can use substr() to workaround this, but if this is a confirmed bug we should open a JIRA. Thanks, Ali
How to skip corrupted avro files
Hi, I am using Spark 1.3.1 to read a directory of about 2000 avro files. The avro files are from a third party and a few of them are corrupted. val path = {myDirecotry of avro files} val sparkConf = new SparkConf().setAppName(avroDemo).setMaster(local) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val data = sqlContext.avroFile(path); data.select(.) When I run the above code, I get the following exception. org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync! at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222) ~[classes/:1.7.7] at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) ~[spark-core_2.10-1.3.1.jar:1.3.1] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129) ~[spark-sql_2.10-1.3.1.jar:1.3.1] at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126) ~[spark-sql_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.1.jar:1.3.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.1.jar:1.3.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]Caused by: java.io.IOException: Invalid sync! at org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314) ~[classes/:1.7.7] at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209) ~[classes/:1.7.7] ... 25 common frames omitted Is there an easy way to skip a corrupted avro file without reading the files one by one using sqlContext.avroFile(file) ? At present, my solution (hack) is to have my own version of org.apache.avro.file.DataFileStream with method hasNext returns false (to signal the end file), when java.io.IOException: Invalid sync! is thrown. Please see line 210 in https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java Thanks in advance for any assistance ! Shing