Re: Spark distributed SQL: JSON Data set on all worker node

2015-05-03 Thread ayan guha
Yes it is possible. You need to use jsonfile method on SQL context and then
create a dataframe from the rdd. Then register it as a table. Should be 3
lines of code, thanks to spark.

You may see few YouTube video esp for unifying pipelines.
On 3 May 2015 19:02, Jai jai4l...@gmail.com wrote:

 Hi,

 I am noob to spark and related technology.

 i have JSON stored at same location on all worker clients spark cluster).
 I am looking to load JSON data set on these clients and do SQL query, like
 distributed SQL.

 is it possible to achieve?

 right now, master submits task to one node only.

 Thanks and regards
 Mrityunjay



Hardware requirements

2015-05-03 Thread sherine ahmed
I need to use spark to upload a 500 GB data from hadoop on standalone mode
cluster what are the minimum hardware requirements if it's known that it
will be used for advanced analysis (social network analysis)?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hardware-requirements-tp22744.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark distributed SQL: JSON Data set on all worker node

2015-05-03 Thread Jai
Hi,

I am noob to spark and related technology.

i have JSON stored at same location on all worker clients spark cluster). I
am looking to load JSON data set on these clients and do SQL query, like
distributed SQL.

is it possible to achieve?

right now, master submits task to one node only.

Thanks and regards
Mrityunjay


Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread Dean Wampler
I don't know the full context of what you're doing, but serialization
errors usually mean you're attempting to serialize something that can't be
serialized, like the SparkContext. Kryo won't help there.

The arguments to spark-submit you posted previously look good:

2)  --num-executors 96 --driver-memory 12g --driver-java-options
-XX:MaxPermSize=10G --executor-memory 12g --executor-cores 4

I suspect you aren't getting the parallelism you need. For partitioning, if
your data is in HDFS and your block size is 128MB, then you'll get ~195
partitions anyway. If it takes 7 hours to do a join over 25GB of data, you
have some other serious bottleneck. You should examine the web console and
the logs to determine where all the time is going. Questions you might
pursue:

   - How long does each task take to complete?
   - How many of those 195 partitions/tasks are processed at the same time?
   That is, how many slots are available?  Maybe you need more nodes if the
   number of slots is too low. Based on your command arguments, you should be
   able to process 1/2 of them at a time, unless the cluster is busy.
   - Is the cluster swamped with other work?
   - How much data does each task process? Is the data roughly the same
   from one task to the next? If not, then you might have serious key skew?

You may also need to research the details of how joins are implemented and
some of the common tricks for organizing data to minimize having to shuffle
all N by M records.



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Hello Deam,
 If I don;t use Kryo serializer i got Serialization error and hence am
 using it.
 If I don';t use partitionBy/reparition then the simply join never
 completed even after 7 hours and infact as next step i need to run it
 against 250G as that is my full dataset size. Someone here suggested to me
 to use repartition.

 Assuming reparition is mandatory , how do i decide whats the right number
 ? When i am using 400 i do not get NullPointerException that i talked
 about, which is strange. I never saw that exception against small random
 dataset but see it with 25G and again with 400 partitions , i do not see it.


 On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 IMHO, you are trying waaay to hard to optimize work on what is really a
 small data set. 25G, even 250G, is not that much data, especially if you've
 spent a month trying to get something to work that should be simple. All
 these errors are from optimization attempts.

 Kryo is great, but if it's not working reliably for some reason, then
 don't use it. Rather than force 200 partitions, let Spark try to figure out
 a good-enough number. (If you really need to force a partition count, use
 the repartition method instead, unless you're overriding the partitioner.)

 So. I recommend that you eliminate all the optimizations: Kryo,
 partitionBy, etc. Just use the simplest code you can. Make it work first.
 Then, if it really isn't fast enough, look for actual evidence of
 bottlenecks and optimize those.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Dean  Others,
 Thanks for your suggestions.
 I have two data sets and all i want to do is a simple equi join. I have
 10G limit and as my dataset_1 exceeded that it was throwing OOM error.
 Hence i switched back to use .join() API instead of map-side broadcast
 join.
 I am repartitioning the data with 100,200 and i see a
 NullPointerException now.

 When i run against 25G of each input and with .partitionBy(new
 org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption


 this trace does not include a line from my code and hence i do not what
 is causing error ?
 I do have registered kryo serializer.

 val conf = new SparkConf()
   .setAppName(detail)
 *  .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)*
   .set(spark.kryoserializer.buffer.mb,
 arguments.get(buffersize).get)
   .set(spark.kryoserializer.buffer.max.mb,
 arguments.get(maxbuffersize).get)
   .set(spark.driver.maxResultSize,
 arguments.get(maxResultSize).get)
   .set(spark.yarn.maxAppAttempts, 0)
 * 
 .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
 lMetricSum]))
 val sc = new SparkContext(conf)

 I see the exception when this task runs

 val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

 Its a simple mapping of input records to (itemId, record)

 I found this

 

Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-05-03 Thread Nick Travers
I'm currently trying to join two large tables (order 1B rows each) using
Spark SQL (1.3.0) and am running into long GC pauses which bring the job to
a halt.

I'm reading in both tables using a HiveContext with the underlying files
stored as Parquet Files. I'm using  something along the lines of
HiveContext.sql(SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1) to
set up the join.

When I execute this (with an action such as .count) I see the first few
stages complete, but the job eventually stalls. The GC counts keep
increasing for each executor.

Running with 6 workers, each with 2T disk and 100GB RAM.

Has anyone else run into this issue? I'm thinking I might be running into
issues with the shuffling of the data, but I'm unsure of how to get around
this? Is there a way to redistribute the rows based on the join key first,
and then do the join?

Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Drop a column from the DataFrame.

2015-05-03 Thread Olivier Girardot
great thx

Le sam. 2 mai 2015 à 23:58, Ted Yu yuzhih...@gmail.com a écrit :

 This is coming in 1.4.0
 https://issues.apache.org/jira/browse/SPARK-7280



 On May 2, 2015, at 2:27 PM, Olivier Girardot ssab...@gmail.com wrote:

 Sounds like a patch for a drop method...

 Le sam. 2 mai 2015 à 21:03, dsgriffin dsgrif...@gmail.com a écrit :

 Just use select() to create a new DataFrame with only the columns you
 want.
 Sort of the opposite of what you want -- but you can select all but the
 columns you want minus the one you don. You could even use a filter to
 remove just the one column you want on the fly:

 myDF.select(myDF.columns.filter(_ != column_you_do_not_want).map(colname
 = new Column(colname)).toList : _* )



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Drop-a-column-from-the-DataFrame-tp22711p22737.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Questions about Accumulators

2015-05-03 Thread Eugen Cepoi
Yes that's it. If a partition is lost, to recompute it, some steps will
need to be re-executed. Perhaps the map function in which you update the
accumulator.

I think you can do it more safely in a transformation near the action,
where it is less likely that an error will occur (not always true...). You
can also checkpoint the RDD after the step that updates the accumulator, so
your transformation doesn't get applied again if some task fails. But this
is kind of expensive considering you only want to update some counter...

Another idea could be to implement a custom accumulator that holds a map of
partition index - value and then on driver side merge the values in the
map, but I never tried this not sure if it would really work.

Cheers,
Eugen

2015-05-03 14:08 GMT+02:00 xiazhuchang hk8...@163.com:

 The official document said  In transformations, users should be aware of
 that each task’s update may be applied more than once if tasks or job
 stages
 are re-executed.
 I don't quite understand what is this mean. is that meas if i use the
 accumulator in transformations(i.e. map() operation), this operation will
 be
 execuated more than once if the task restarte? And then the final result
 will be many times of the real result?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Questions about Accumulators

2015-05-03 Thread Dean Wampler
Yes, correct.

However, note that when an accumulator operation is *idempotent*, meaning
that repeated application for the same data behaves exactly like one
application, then that accumulator can be safely called in transformation
steps (non-actions), too.

For example, max and min tracking. Just last week I wrote one that used a
hash map to track the latest timestamps seen for specific keys.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, May 3, 2015 at 8:07 AM, xiazhuchang hk8...@163.com wrote:

 “For accumulator updates performed inside actions only, Spark guarantees
 that
 each task’s update to the accumulator will only be applied once, i.e.
 restarted tasks will not update the value. In transformations, users should
 be aware of that each task’s update may be applied more than once if tasks
 or job stages are re-executed. ”
 Is this mean the guarantees(accumulator only be updated once) only in
 actions? That is to say, one should use the accumulator only in actions,
 orelse there may be some errors(update more than once) if used in
 transformations?
 e.g. map(x = accumulator += x)
 After executed, the correct result of accumulator should be 1;
 Unfortunately, some errors happened, restart task, the map() operation
 re-executed(map(x = accumulator += x)  re-executed), then the final result
 of acculumator will be 2, twice as the correct result?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746p22747.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread Dean Wampler
How big is the data you're returning to the driver with collectAsMap? You
are probably running out of memory trying to copy too much data back to it.

If you're trying to force a map-side join, Spark can do that for you in
some cases within the regular DataFrame/RDD context. See
http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
and this talk by Michael Armbrust for example,
http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.


dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Full Exception
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at
 VISummaryDataProvider.scala:37) failed in 884.087 s*
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap
 at VISummaryDataProvider.scala:37, took 1093.418249 s*
 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
 exception: Job aborted due to stage failure: Exception while getting task
 result: org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)]
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception while getting task result: org.apache.spark.SparkException: Error
 sending message [message = GetLocations(taskresult_112)]
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
 exitCode: 15, (reason: User class threw exception: Job aborted due to stage
 failure: Exception while getting task result:
 org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)])


 *Code at line 37*

 val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) }
 .collectAsMap

 Listing data set size is 26G (10 files) and my driver memory is 12G (I
 cant go beyond it). The reason i do collectAsMap is to brodcast it and do a
 map-side join instead of regular join.


 Please suggest ?


 On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 My Spark Job is failing  and i see

 ==

 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
 exception: Job aborted due to stage failure: Exception while getting task
 result: org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)]

 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception while getting task result: org.apache.spark.SparkException: Error
 sending message [message = GetLocations(taskresult_112)]

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

 at scala.Option.foreach(Option.scala:236)

 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)


 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]


 I see 

Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread Dean Wampler
IMHO, you are trying waaay to hard to optimize work on what is really a
small data set. 25G, even 250G, is not that much data, especially if you've
spent a month trying to get something to work that should be simple. All
these errors are from optimization attempts.

Kryo is great, but if it's not working reliably for some reason, then don't
use it. Rather than force 200 partitions, let Spark try to figure out a
good-enough number. (If you really need to force a partition count, use the
repartition method instead, unless you're overriding the partitioner.)

So. I recommend that you eliminate all the optimizations: Kryo,
partitionBy, etc. Just use the simplest code you can. Make it work first.
Then, if it really isn't fast enough, look for actual evidence of
bottlenecks and optimize those.



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Hello Dean  Others,
 Thanks for your suggestions.
 I have two data sets and all i want to do is a simple equi join. I have
 10G limit and as my dataset_1 exceeded that it was throwing OOM error.
 Hence i switched back to use .join() API instead of map-side broadcast
 join.
 I am repartitioning the data with 100,200 and i see a NullPointerException
 now.

 When i run against 25G of each input and with .partitionBy(new
 org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption


 this trace does not include a line from my code and hence i do not what is
 causing error ?
 I do have registered kryo serializer.

 val conf = new SparkConf()
   .setAppName(detail)
 *  .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)*
   .set(spark.kryoserializer.buffer.mb,
 arguments.get(buffersize).get)
   .set(spark.kryoserializer.buffer.max.mb,
 arguments.get(maxbuffersize).get)
   .set(spark.driver.maxResultSize,
 arguments.get(maxResultSize).get)
   .set(spark.yarn.maxAppAttempts, 0)
 * 
 .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
 lMetricSum]))
 val sc = new SparkContext(conf)

 I see the exception when this task runs

 val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

 Its a simple mapping of input records to (itemId, record)

 I found this

 http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
 and

 http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html

 Looks like Kryo (2.21v)  changed something to stop using default
 constructors.

 (Kryo.DefaultInstantiatorStrategy) 
 kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new 
 StdInstantiatorStrategy());


 Please suggest


 Trace:
 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in
 stage 2.0 (TID 774)
 com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
 Serialization trace:
 values (org.apache.avro.generic.GenericData$Record)
 datum (org.apache.avro.mapred.AvroKey)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 Regards,


 Any suggestions.
 I am not able to get this thing to work over a month now, its kind of
 getting frustrating.

 On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 How big is the data you're returning to the driver with collectAsMap? You
 are probably running out of memory trying to copy too much data back to it.

 If you're trying to force a map-side join, Spark can do that for you in
 some cases within the regular DataFrame/RDD context. See
 http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
 and this talk by Michael Armbrust for example,
 http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.


 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ 

Re: Selecting download for 'hadoop 2.4 and later

2015-05-03 Thread Sean Owen
See https://issues.apache.org/jira/browse/SPARK-5492 but I think
you'll need to share the stack trace as I'm not sure how this can
happen since the NoSuchMethodError (not NoSuchMethodException)
indicates a call in the bytecode failed to link but there is only a
call by reflection.

On Fri, May 1, 2015 at 9:30 PM, Stephen Boesch java...@gmail.com wrote:

 What is the correct procedure for downloading a  spark 1.2.X release for use
 with hadoop2.4?

 The existing download page has a link for hadoop 2.4+. However when using
 that with hadoop 2.4 an exception is thrown NoSuchMethodError for
 Statistics.getThreadStatistics.

 Upon brief investigation:   The SparkHadoopUtil class invokes the hadoop
 FileSystem.Statistics.getThreadStatistics which exists in  hadoop 2.5+



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Questions about Accumulators

2015-05-03 Thread xiazhuchang
The official document said  In transformations, users should be aware of
that each task’s update may be applied more than once if tasks or job stages
are re-executed.
I don't quite understand what is this mean. is that meas if i use the
accumulator in transformations(i.e. map() operation), this operation will be
execuated more than once if the task restarte? And then the final result
will be many times of the real result?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Questions about Accumulators

2015-05-03 Thread Ignacio Blasco
Given the lazy nature of an RDD if you use an accumulator inside a map()
and then you call count and  saveAsTextfile over that accumulator will be
called twice. IMHO, accumulators are a bit nondeterministic you need to be
sure when to read them to avoid unexpected re-executions
El 3/5/2015 2:09 p. m., xiazhuchang hk8...@163.com escribió:

 The official document said  In transformations, users should be aware of
 that each task’s update may be applied more than once if tasks or job
 stages
 are re-executed.
 I don't quite understand what is this mean. is that meas if i use the
 accumulator in transformations(i.e. map() operation), this operation will
 be
 execuated more than once if the task restarte? And then the final result
 will be many times of the real result?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark distributed SQL: JSON Data set on all worker node

2015-05-03 Thread Ted Yu
Looking at SQLContext.scala (in master branch), jsonFile() returns
DataFrame directly:
  def jsonFile(path: String, samplingRatio: Double): DataFrame =

FYI

On Sun, May 3, 2015 at 2:14 AM, ayan guha guha.a...@gmail.com wrote:

 Yes it is possible. You need to use jsonfile method on SQL context and
 then create a dataframe from the rdd. Then register it as a table. Should
 be 3 lines of code, thanks to spark.

 You may see few YouTube video esp for unifying pipelines.
 On 3 May 2015 19:02, Jai jai4l...@gmail.com wrote:

 Hi,

 I am noob to spark and related technology.

 i have JSON stored at same location on all worker clients spark cluster).
 I am looking to load JSON data set on these clients and do SQL query, like
 distributed SQL.

 is it possible to achieve?

 right now, master submits task to one node only.

 Thanks and regards
 Mrityunjay




PriviledgedActionException- Executor error

2015-05-03 Thread podioss
Hi,
i am running several jobs in standalone mode and i notice this error in the
log files in some of my nodes at the start of my jobs: 

INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for
[TERM, HUP, INT]
INFO spark.SecurityManager: Changing view acls to: root
INFO spark.SecurityManager: Changing modify acls to: root
INFO spark.SecurityManager: SecurityManager: authentication disabled; ui
acls disabled; users with view permissions: NFO slf4j.Slf4jLogger:
Slf4jLogger started
INFO Remoting: Starting remoting
ERROR security.UserGroupInformation: PriviledgedActionException as:root
cause:java.util.concurrent.TimeoutException: Futures timed out after [1
milliseconds]
Exception in thread main java.lang.reflect.UndeclaredThrowableException:
Unknown exception in doAs
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1134)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:163)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.security.PrivilegedActionException:
java.util.concurrent.TimeoutException: Futures timed out after [1
milliseconds]
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
... 4 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at akka.remote.Remoting.start(Remoting.scala:180)
at
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1676)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1667)
at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:122)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)
... 7 more
INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote
daemon.

These errors result in executor losses at the beginning and i have been
trying to find a way to solve this with no success, so if anyone has a clue
please let me know.

Thank you   



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PriviledgedActionException-Executor-error-tp22745.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Questions about Accumulators

2015-05-03 Thread xiazhuchang
“For accumulator updates performed inside actions only, Spark guarantees that
each task’s update to the accumulator will only be applied once, i.e.
restarted tasks will not update the value. In transformations, users should
be aware of that each task’s update may be applied more than once if tasks
or job stages are re-executed. ”
Is this mean the guarantees(accumulator only be updated once) only in
actions? That is to say, one should use the accumulator only in actions,
orelse there may be some errors(update more than once) if used in
transformations?
e.g. map(x = accumulator += x)
After executed, the correct result of accumulator should be 1;
Unfortunately, some errors happened, restart task, the map() operation
re-executed(map(x = accumulator += x)  re-executed), then the final result
of acculumator will be 2, twice as the correct result?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746p22747.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL ThriftServer Impersonation Support

2015-05-03 Thread Night Wolf
Thanks Andrew. What version of HS2 is the SparkSQL thrift server using?
What would be involved in updating? Is it a simple case of increasing the
deep version in one of the project POMs?

Cheers,
~N

On Sat, May 2, 2015 at 11:38 AM, Andrew Lee alee...@hotmail.com wrote:

 Hi N,

 See: https://issues.apache.org/jira/browse/SPARK-5159

 I don't think it is yet supported until the HS2 code base is updated in
 Spark hive-thriftserver project.

 --
 Date: Fri, 1 May 2015 15:56:30 +1000
 Subject: Spark SQL ThriftServer Impersonation Support
 From: nightwolf...@gmail.com
 To: user@spark.apache.org


 Hi guys,


 Trying to use the SparkSQL Thriftserver with hive metastore. It seems that
 hive meta impersonation works fine (when running Hive tasks). However
 spinning up SparkSQL thrift server, impersonation doesn't seem to work...

 What settings do I need to enable impersonation?

 I've copied the same config as in my hive-site. Here is my launch command
 for the spark thrift server;

 --hiveconf hive.server2.enable.impersonation=true --hiveconf
 hive.server2.enable.doAs=true --hiveconf hive.metastore.execute.setugi=true

 Here is my full run script:

 export HIVE_SERVER2_THRIFT_BIND_HOST=0.0.0.0
 export HIVE_SERVER2_THRIFT_PORT=1

 export HIVE_CONF_DIR=/opt/mapr/hive/hive-0.13/conf/
 export HIVE_HOME=/opt/mapr/hive/hive-0.13/
 export HADOOP_HOME=/opt/mapr/hadoop/hadoop-2.5.1/
 export HADOOP_CONF_DIR=/opt/mapr/hadoop/hadoop-2.5.1/etc/hadoop

 export EXECUTOR_MEMORY=30g
 export DRIVER_MEMORY=4g
 export EXECUTOR_CORES=15
 export NUM_EXECUTORS=20
 export KRYO_BUFFER=512
 export SPARK_DRIVER_MAXRESULTSIZE=4096

 export HIVE_METASTORE_URIS=thrift://localhost:9083
 export HIVE_METASTORE_WAREHOUSE_DIR=/user/hive/warehouse

 export
 SPARK_DIST_CLASSPATH=/opt/mapr/lib/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/yarn/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/common/lib/*:/opt/mapr/hive/hive-current/lib/*
 export SPARK_LOG_DIR=/tmp/spark-log
 export
 SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=hdfs:///log/spark-events
 export SPARK_CONF_DIR=/apps/spark/global-conf/

 export SPARK_HOME=/apps/spark/spark-1.3.1-bin-mapr4.0.2_yarn_j6_2.10

 export SPARK_LIBRARY_PATH=/opt/mapr/lib/*
 export SPARK_JAVA_OPTS=-Djava.library.path=/opt/mapr/lib


 $SPARK_HOME/*sbin/start-thriftserver.sh* --master yarn-client --jars
 /opt/mapr/lib/libjpam.so --executor-memory $EXECUTOR_MEMORY --driver-memory
 $DRIVER_MEMORY --executor-cores $EXECUTOR_CORES --num-executors
 $NUM_EXECUTORS --conf spark.scheduler.mode=FAIR --conf
 spark.kryoserializer.buffer.mb=$KRYO_BUFFER --conf
 spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
 spark.files.useFetchCache=false --conf
 spark.driver.maxResultSize=$SPARK_DRIVER_MAXRESULTSIZE --hiveconf
 hive.metastore.uris=$HIVE_METASTORE_URIS --hiveconf
 hive.metastore.warehouse.dir=$HIVE_METASTORE_WAREHOUSE_DIR --hiveconf
 hive.server2.enable.impersonation=true --hiveconf
 hive.server2.enable.doAs=true --hiveconf hive.metastore.execute.setugi=true


 Cheers,
 N



Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread ๏̯͡๏
Hello Dean  Others,
Thanks for your suggestions.
I have two data sets and all i want to do is a simple equi join. I have 10G
limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i
switched back to use .join() API instead of map-side broadcast join.
I am repartitioning the data with 100,200 and i see a NullPointerException
now.

When i run against 25G of each input and with .partitionBy(new
org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption


this trace does not include a line from my code and hence i do not what is
causing error ?
I do have registered kryo serializer.

val conf = new SparkConf()
  .setAppName(detail)
*  .set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)*
  .set(spark.kryoserializer.buffer.mb,
arguments.get(buffersize).get)
  .set(spark.kryoserializer.buffer.max.mb,
arguments.get(maxbuffersize).get)
  .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get)
  .set(spark.yarn.maxAppAttempts, 0)
* 
.registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
lMetricSum]))
val sc = new SparkContext(conf)

I see the exception when this task runs

val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

Its a simple mapping of input records to (itemId, record)

I found this
http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
and
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html

Looks like Kryo (2.21v)  changed something to stop using default
constructors.

(Kryo.DefaultInstantiatorStrategy)
kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new
StdInstantiatorStrategy());


Please suggest


Trace:
15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in stage
2.0 (TID 774)
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
datum (org.apache.avro.mapred.AvroKey)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
Regards,


Any suggestions.
I am not able to get this thing to work over a month now, its kind of
getting frustrating.

On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com wrote:

 How big is the data you're returning to the driver with collectAsMap? You
 are probably running out of memory trying to copy too much data back to it.

 If you're trying to force a map-side join, Spark can do that for you in
 some cases within the regular DataFrame/RDD context. See
 http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
 and this talk by Michael Armbrust for example,
 http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.


 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Full Exception
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at
 VISummaryDataProvider.scala:37) failed in 884.087 s*
 *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed:
 collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s*
 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
 exception: Job aborted due to stage failure: Exception while getting task
 result: org.apache.spark.SparkException: Error sending message [message =
 GetLocations(taskresult_112)]
 org.apache.spark.SparkException: Job aborted due to stage failure:
 Exception while getting task result: org.apache.spark.SparkException: Error
 sending message [message = GetLocations(taskresult_112)]
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at
 

Re: Spark distributed SQL: JSON Data set on all worker node

2015-05-03 Thread Dean Wampler
Note that each JSON object has to be on a single line in the files.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, May 3, 2015 at 4:14 AM, ayan guha guha.a...@gmail.com wrote:

 Yes it is possible. You need to use jsonfile method on SQL context and
 then create a dataframe from the rdd. Then register it as a table. Should
 be 3 lines of code, thanks to spark.

 You may see few YouTube video esp for unifying pipelines.
 On 3 May 2015 19:02, Jai jai4l...@gmail.com wrote:

 Hi,

 I am noob to spark and related technology.

 i have JSON stored at same location on all worker clients spark cluster).
 I am looking to load JSON data set on these clients and do SQL query, like
 distributed SQL.

 is it possible to achieve?

 right now, master submits task to one node only.

 Thanks and regards
 Mrityunjay




Re: Spark - Timeout Issues - OutOfMemoryError

2015-05-03 Thread ๏̯͡๏
Hello Deam,
If I don;t use Kryo serializer i got Serialization error and hence am using
it.
If I don';t use partitionBy/reparition then the simply join never completed
even after 7 hours and infact as next step i need to run it against 250G as
that is my full dataset size. Someone here suggested to me to use
repartition.

Assuming reparition is mandatory , how do i decide whats the right number ?
When i am using 400 i do not get NullPointerException that i talked about,
which is strange. I never saw that exception against small random dataset
but see it with 25G and again with 400 partitions , i do not see it.


On Sun, May 3, 2015 at 9:15 PM, Dean Wampler deanwamp...@gmail.com wrote:

 IMHO, you are trying waaay to hard to optimize work on what is really a
 small data set. 25G, even 250G, is not that much data, especially if you've
 spent a month trying to get something to work that should be simple. All
 these errors are from optimization attempts.

 Kryo is great, but if it's not working reliably for some reason, then
 don't use it. Rather than force 200 partitions, let Spark try to figure out
 a good-enough number. (If you really need to force a partition count, use
 the repartition method instead, unless you're overriding the partitioner.)

 So. I recommend that you eliminate all the optimizations: Kryo,
 partitionBy, etc. Just use the simplest code you can. Make it work first.
 Then, if it really isn't fast enough, look for actual evidence of
 bottlenecks and optimize those.



 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 Hello Dean  Others,
 Thanks for your suggestions.
 I have two data sets and all i want to do is a simple equi join. I have
 10G limit and as my dataset_1 exceeded that it was throwing OOM error.
 Hence i switched back to use .join() API instead of map-side broadcast
 join.
 I am repartitioning the data with 100,200 and i see a
 NullPointerException now.

 When i run against 25G of each input and with .partitionBy(new
 org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption


 this trace does not include a line from my code and hence i do not what
 is causing error ?
 I do have registered kryo serializer.

 val conf = new SparkConf()
   .setAppName(detail)
 *  .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)*
   .set(spark.kryoserializer.buffer.mb,
 arguments.get(buffersize).get)
   .set(spark.kryoserializer.buffer.max.mb,
 arguments.get(maxbuffersize).get)
   .set(spark.driver.maxResultSize,
 arguments.get(maxResultSize).get)
   .set(spark.yarn.maxAppAttempts, 0)
 * 
 .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
 lMetricSum]))
 val sc = new SparkContext(conf)

 I see the exception when this task runs

 val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }

 Its a simple mapping of input records to (itemId, record)

 I found this

 http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
 and

 http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html

 Looks like Kryo (2.21v)  changed something to stop using default
 constructors.

 (Kryo.DefaultInstantiatorStrategy) 
 kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new 
 StdInstantiatorStrategy());


 Please suggest


 Trace:
 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in
 stage 2.0 (TID 774)
 com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
 Serialization trace:
 values (org.apache.avro.generic.GenericData$Record)
 datum (org.apache.avro.mapred.AvroKey)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 Regards,


 Any suggestions.
 I am not able to get this thing to work over a month now, its kind of
 getting frustrating.

 On Sun, May 3, 2015 at 8:03 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 How big is the data you're returning to the driver with collectAsMap?
 You are probably running out of memory trying to 

Re: PySpark: slicing issue with dataframes

2015-05-03 Thread Ali Bajwa
Friendly reminder on this one. Just wanted to get a confirmation that this
is not by design before I logged a JIRA

Thanks!
Ali


On Tue, Apr 28, 2015 at 9:53 AM, Ali Bajwa ali.ba...@gmail.com wrote:

 Hi experts,

 Trying to use the slicing functionality in strings as part of a Spark
 program (PySpark) I get this error:

  Code 

 import pandas as pd
 from pyspark.sql import SQLContext
 hc = SQLContext(sc)
 A = pd.DataFrame({'Firstname': ['James', 'Ali', 'Daniel'], 'Lastname':
 ['Jones', 'Bajwa', 'Day']})
 a = hc.createDataFrame(A)
 print A

 b = a.select(a.Firstname[:2])
 print b.toPandas()
 c = a.select(a.Lastname[2:])
 print c.toPandas()

 Output:

  Firstname Lastname
 0 JamesJones
 1   AliBajwa
 2Daniel  Day
   SUBSTR(Firstname, 0, 2)
 0  Ja
 1  Al
 2  Da

 ---
 Py4JError Traceback (most recent call last)
 ipython-input-17-6ee5d7d069ce in module()
  10 b = a.select(a.Firstname[:2])
  11 print b.toPandas()
 --- 12 c = a.select(a.Lastname[2:])
  13 print c.toPandas()

 /home/jupyter/spark-1.3.1/python/pyspark/sql/dataframe.pyc in substr(self,
 startPos, length)
1089 raise TypeError(Can not mix the type)
1090 if isinstance(startPos, (int, long)):
 - 1091 jc = self._jc.substr(startPos, length)
1092 elif isinstance(startPos, Column):
1093 jc = self._jc.substr(startPos._jc, length._jc)

 /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer,
 self.gateway_client,
 -- 538 self.target_id, self.name)
 539
 540 for temp_arg in temp_args:

 /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
 302 raise Py4JError(
 303 'An error occurred while calling {0}{1}{2}.
 Trace:\n{3}\n'.
 -- 304 format(target_id, '.', name, value))
 305 else:
 306 raise Py4JError(

 Py4JError: An error occurred while calling o1887.substr. Trace:
 py4j.Py4JException: Method substr([class java.lang.Integer, class
 java.lang.Long]) does not exist
 at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
 at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
 at py4j.Gateway.invoke(Gateway.java:252)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:745)

 Looks like X[:2] works but X[2:] fails with the error above
 Anyone else have this issue?

 Clearly I can use substr() to workaround this, but if this is a confirmed
 bug we should open a JIRA.

 Thanks,
 Ali



How to skip corrupted avro files

2015-05-03 Thread Shing Hing Man

Hi, I am using Spark 1.3.1 to read a directory of about 2000 avro files. The 
avro files are from a third party and a few of them are corrupted.
  val path = {myDirecotry of avro files}
 val sparkConf = new SparkConf().setAppName(avroDemo).setMaster(local)  val 
sc = new SparkContext(sparkConf)
 val sqlContext = new SQLContext(sc)
 val data = sqlContext.avroFile(path); data.select(.)
 When I run the above code, I get the following exception.  
org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync! at 
org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222) 
~[classes/:1.7.7] at 
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) 
~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at 
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) 
~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at 
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na] at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129)
 ~[spark-sql_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126)
 ~[spark-sql_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.scheduler.Task.run(Task.scala:64) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
~[spark-core_2.10-1.3.1.jar:1.3.1] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_71] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[na:1.7.0_71] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]Caused by: 
java.io.IOException: Invalid sync! at 
org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314) 
~[classes/:1.7.7] at 
org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209) 
~[classes/:1.7.7] ... 25 common frames omitted
   Is there an easy way to skip a corrupted avro file without reading the files 
one by one using sqlContext.avroFile(file) ? At present, my solution (hack)  is 
to have my own version of org.apache.avro.file.DataFileStream with method 
hasNext returns false (to signal the end file), when  java.io.IOException: 
Invalid sync!   is thrown.  Please see  line 210 in  
https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
  Thanks in advance for any assistance !  Shing