Error running sbt package on Windows 7 for Spark 1.3.1 and SimpleApp.scala

2015-06-04 Thread Joseph Washington
Hi all,
I'm trying to run the standalone application SimpleApp.scala following the
instructions on the
http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala
I was able to create a .jar file by doing sbt package. However when I tried
to do

$ YOUR_SPARK_HOME/bin/spark-submit --class SimpleApp --master local[4]
c:/myproject/target/scala-2.10/simple-project_2.10-1.0.jar

I didn't get the desired result. There is a lot of output, but a few areas,
said ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
[image: Inline image 2]

Furthermore, trying sbt run and sbt compile from the myproject folder gives
this error:


[image: Inline image 1]

Any ideas?


SparkSQL : using Hive UDF returning Map throws rror: scala.MatchError: interface java.util.Map (of class java.lang.Class) (state=,code=0)

2015-06-04 Thread ogoh

Hello,
I tested some custom udf on SparkSql's ThriftServer  Beeline (Spark 1.3.1).
Some udfs work fine (access array parameter and returning int or string
type). 
But my udf returning map type throws an error:
Error: scala.MatchError: interface java.util.Map (of class java.lang.Class)
(state=,code=0)

I converted the code into Hive's GenericUDF since I worried that using
complex type parameter (array of map) and returning complex type (map) can
be supported in Hive's GenericUDF instead of simple UDF.
But SparkSQL doesn't seem supporting GenericUDF.(error message : Error:
java.lang.IllegalAccessException: Class
org.apache.spark.sql.hive.HiveFunctionWrapper can not access ..).

Below is my example udf code returning MAP type.
I appreciate any advice.
Thanks

--

public final class ArrayToMap extends UDF {

public MapString,String evaluate(ArrayListString arrayOfString) {
// add code to handle all index problem

MapString, String map = new HashMapString, String();
   
int count = 0;
for (String element : arrayOfString) {
map.put(count + , element);
count++;

}
return map;
}
}






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-using-Hive-UDF-returning-Map-throws-rror-scala-MatchError-interface-java-util-Map-of-class--tp23164.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark terminology

2015-06-04 Thread ๏̯͡๏
   - I see these in my mapper only task.
   -
   - *Input Size / Records: *68.0 GB / 577195178
   - *Shuffle write: *95.1 GB / 282559291
   - *Shuffle spill (memory): *2.8 TB
   - *Shuffle spill (disk): *90.3 GB


I understand the first one, can someone give 1/2 liners for the next three
? also tell if these numbers are good/bad ?
-- 
Deepak


RE: SparkSQL : using Hive UDF returning Map throws rror: scala.MatchError: interface java.util.Map (of class java.lang.Class) (state=,code=0)

2015-06-04 Thread Cheng, Hao
Which version of Hive jar are you using? Hive 0.13.1 or Hive 0.12.0?

-Original Message-
From: ogoh [mailto:oke...@gmail.com] 
Sent: Friday, June 5, 2015 10:10 AM
To: user@spark.apache.org
Subject: SparkSQL : using Hive UDF returning Map throws rror: 
scala.MatchError: interface java.util.Map (of class java.lang.Class) 
(state=,code=0)


Hello,
I tested some custom udf on SparkSql's ThriftServer  Beeline (Spark 1.3.1).
Some udfs work fine (access array parameter and returning int or string type). 
But my udf returning map type throws an error:
Error: scala.MatchError: interface java.util.Map (of class java.lang.Class) 
(state=,code=0)

I converted the code into Hive's GenericUDF since I worried that using complex 
type parameter (array of map) and returning complex type (map) can be supported 
in Hive's GenericUDF instead of simple UDF.
But SparkSQL doesn't seem supporting GenericUDF.(error message : Error:
java.lang.IllegalAccessException: Class
org.apache.spark.sql.hive.HiveFunctionWrapper can not access ..).

Below is my example udf code returning MAP type.
I appreciate any advice.
Thanks

--

public final class ArrayToMap extends UDF {

public MapString,String evaluate(ArrayListString arrayOfString) {
// add code to handle all index problem

MapString, String map = new HashMapString, String();
   
int count = 0;
for (String element : arrayOfString) {
map.put(count + , element);
count++;

}
return map;
}
}






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-using-Hive-UDF-returning-Map-throws-rror-scala-MatchError-interface-java-util-Map-of-class--tp23164.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: TF-IDF Question

2015-06-04 Thread Somnath Pandeya
Hi,

org.apache.spark.mllib.linalg.Vector = 
(1048576,[35587,884670],[3.458767233,3.458767233])
it is sparse vector representation of terms
so the first term(1048576) is the length of vector
[35587,884670] is the index of words
[3.458767233,3.458767233] are the tf-idf values of the terms.

Thanks
Somnath



From: franco barrientos [mailto:franco.barrien...@exalitica.com]
Sent: Thursday, June 04, 2015 11:17 PM
To: user@spark.apache.org
Subject: TF-IDF Question

Hi all!,

I have a .txt file where each row of it it's a collection of terms of a 
document separated by space. For example:

1 Hola spark
2 ..

I followed this example of spark site 
https://spark.apache.org/docs/latest/mllib-feature-extraction.html and i get 
something like this:

tfidf.first()
org.apache.spark.mllib.linalg.Vector = 
(1048576,[35587,884670],[3.458767233,3.458767233])

I think this:


  1.  First parameter 1048576 i don't know what it is but always it´s the 
same number (maybe the number of terms).
  2.  Second parameter [35587,884670] i think are the terms of the first line 
in my .txt file.
  3.  Third parameter [3.458767233,3.458767233] i think are the tfidf values 
for my terms.
Anyone knows the exact interpretation of this and in the second point if these 
values are the terms, how can i match this values with the original terms 
values ([35587=Hola,884670=spark])?.

Regards and thanks in advance.

Franco Barrientos
Data Scientist
Málaga #115, Of. 1003, Las Condes.
Santiago, Chile.
(+562)-29699649
(+569)-76347893
franco.barrien...@exalitica.commailto:franco.barrien...@exalitica.com
www.exalitica.com
http://www.exalitica.com/
[http://exalitica.com/web/img/frim.png]


FetchFailed Exception

2015-06-04 Thread ๏̯͡๏
I see this

Is this a problem with my code or the cluster ? Is there any way to fix it ?

FetchFailed(BlockManagerId(2, phxdpehdc9dn2441.stratus.phx.ebay.com,
59574), shuffleId=1, mapId=80, reduceId=20, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to
phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to
phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
... 3 more
Caused by: java.net.ConnectException: Connection refused:
phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more

)

-- 
Deepak


Why the default Params.copy doesn't work for Model.copy?

2015-06-04 Thread Justin Yip
Hello,

I have a question with Spark 1.4 ml library. In the copy function, it is
stated that the default implementation doesn't work of Params doesn't work
for models. (
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/Model.scala#L49
)

As a result, some feature generation transformer like StringIndexerModel
cannot be used in Pipeline.

Maybe due to my limited knowledge in ML pipeline, can anyone give me some
hints why Model.copy behaves differently as other Params?

Thanks!

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-the-default-Params-copy-doesn-t-work-for-Model-copy-tp23169.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Column operation on Spark RDDs.

2015-06-04 Thread Carter
Hi, I have a RDD with MANY columns (e.g., hundreds), and most of my operation
is on columns, e.g., I need to create many intermediate variables from
different columns, what is the most efficient way to do this?

For example, if my dataRDD[Array[String]] is like below: 

123, 523, 534, ..., 893 
536, 98, 1623, ..., 98472 
537, 89, 83640, ..., 9265 
7297, 98364, 9, ..., 735 
.. 
29, 94, 956, ..., 758 

I will need to create a new column or a variable as newCol1 =
2ndCol+19thCol, and another new column based on newCol1 and the existing
columns: newCol2 = function(newCol1, 34thCol), what is the best way of doing
this?

I have been thinking using index for the intermediate variables and the
dataRDD, and then join them together on the index to do my calculation:
var dataRDD = sc.textFile(/test.csv).map(_.split(,))
val dt = dataRDD.zipWithIndex.map(_.swap)
val newCol1 = dataRDD.map(x = x(1)+x(18)).zipWithIndex.map(_.swap)
val newCol2 = newCol1.join(dt).map(x= function(.))

Is there a better way of doing this?

Thank you very much!












--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Column-operation-on-Spark-RDDs-tp23165.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Deduping events using Spark

2015-06-04 Thread William Briggs
Hi Lee,

You should be able to create a PairRDD using the Nonce as the key, and the
AnalyticsEvent as the value. I'm very new to Spark, but here is some
uncompilable pseudo code that may or may not help:

events.map(event = (event.getNonce, event)).reduceByKey((a, b) =
a).map(_._2)

The above code is more Scala-like, since that's the syntax with which I
have more familiarity - it looks like the Spark Java 8 API is similar, but
you won't get implicit conversion to a PairRDD when you use a 2-Tuple as
the mapped value. Instead, will need to use the mapToPair function -
there's a good example in the Spark Programming Guide under Working With
Key-Value Pairs
https://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs
.

Hope this helps!

Regards,
Will

On Thu, Jun 4, 2015 at 1:10 PM, lbierman leebier...@gmail.com wrote:

 I'm still a bit new to Spark and am struggilng to figure out the best way
 to
 Dedupe my events.

 I load my Avro files from HDFS and then I want to dedupe events that have
 the same nonce.

 For example my code so far:

  JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent)
 context.newAPIHadoopRDD(
 context.hadoopConfiguration(),
 AvroKeyInputFormat.class,
 AvroKey.class,
 NullWritable.class
 ).keys())
 .map(event - AnalyticsEvent.newBuilder(event.datum()).build())
 .filter(key - { return
 Optional.ofNullable(key.getStepEventKey()).isPresent(); })

 Now I want to get back an RDD of AnalyticsEvents that are unique. So I
 basically want to do:
 if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of
 them.

 I'm not sure how to do this? If I do reduceByKey it reduces by
 AnalyticsEvent not by the values inside?

 Any guidance would be much appreciated how I can walk this list of events
 and only return a filtered version of unique nocnes.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Compute Median in Spark Dataframe

2015-06-04 Thread Holden Karau
My current example doesn't use a Hive UDAF, but you would  do something
pretty similar (it calls a new user defined UDAF, and there are wrappers to
make Spark SQL UDAFs from Hive UDAFs but they are private). So this is
doable, but since it pokes at internals it will likely break between
versions of Spark. If you want to see the WIP PR I have with Sparkling
Pandas its at
https://github.com/sparklingpandas/sparklingpandas/pull/90/files . If your
doing this in JVM and just want to know how to wrap the Hive UDAF, you can
grep/look in sql/hive/ in Spark, but I'd encourage you to see if there is
another way to accomplish what you want (since poking at the internals is
kind of dangerous).

On Thu, Jun 4, 2015 at 6:28 AM, Deenar Toraskar deenar.toras...@gmail.com
wrote:

 Hi Holden, Olivier


 So for column you need to pass in a Java function, I have some sample
 code which does this but it does terrible things to access Spark internals.
 I also need to call a Hive UDAF in a dataframe agg function. Are there any
 examples of what Column expects?

 Deenar

 On 2 June 2015 at 21:13, Holden Karau hol...@pigscanfly.ca wrote:

 So for column you need to pass in a Java function, I have some sample
 code which does this but it does terrible things to access Spark internals.


 On Tuesday, June 2, 2015, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Nice to hear from you Holden ! I ended up trying exactly that (Column) -
 but I may have done it wrong :

 In [*5*]: g.agg(Column(percentile(value, 0.5)))
 Py4JError: An error occurred while calling o97.agg. Trace:
 py4j.Py4JException: Method agg([class java.lang.String, class
 scala.collection.immutable.Nil$]) does not exist
 at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)

 Any idea ?

 Olivier.
 Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca a
 écrit :

 Not super easily, the GroupedData class uses a strToExpr function which
 has a pretty limited set of functions so we cant pass in the name of an
 arbitrary hive UDAF (unless I'm missing something). We can instead
 construct an column with the expression you want and then pass it in to
 agg() that way (although then you need to call the hive UDAF there). There
 are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark
 SQL AggregateExpressions, but they are private.

 On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 I've finally come to the same conclusion, but isn't there any way to
 call this Hive UDAFs from the agg(percentile(key,0.5)) ??

 Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com
 a écrit :

 Like this...sqlContext should be a HiveContext instance

 case class KeyValue(key: Int, value: String)
 val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF
 df.registerTempTable(table)
 sqlContext.sql(select percentile(key,0.5) from table).show()

 ​

 On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 Is there any way to compute a median on a column using Spark's
 Dataframe. I know you can use stats in a RDD but I'd rather stay within 
 a
 dataframe.
 Hive seems to imply that using ntile one can compute percentiles,
 quartiles and therefore a median.
 Does anyone have experience with this ?

 Regards,

 Olivier.





 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau



 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau





-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau
Linked In: https://www.linkedin.com/in/holdenkarau


Re: Roadmap for Spark with Kafka on Scala 2.11?

2015-06-04 Thread Tathagata Das
But compile scope is supposed to be added to the assembly.
https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope



On Thu, Jun 4, 2015 at 1:24 PM, algermissen1971 algermissen1...@icloud.com
wrote:

 Hi Iulian,

 On 26 May 2015, at 13:04, Iulian Dragoș iulian.dra...@typesafe.com
 wrote:

 
  On Tue, May 26, 2015 at 10:09 AM, algermissen1971 
 algermissen1...@icloud.com wrote:
  Hi,
 
  I am setting up a project that requires Kafka support and I wonder what
 the roadmap is for Scala 2.11 Support (including Kafka).
 
  Can we expect to see 2.11 support anytime soon?
 
  The upcoming 1.4 release (now at RC2) includes support for Kafka and
 Scala 2.11.6. It'd be great if you could give it a try. You can find the
 binaries (and staging repository including 2.11 artifacts) here:
 
   https://www.mail-archive.com/dev@spark.apache.org/msg09347.html
 

 Feedback after a coupl eof days:

 - I am using 1.4.0-rc4 now without problems
 - Not used Kafka support yet
 - I am using this with akka-2.3.11 and akka-http 1.0-RC3 (and
 sbt-assembly) and this has produced a dependency nightmare. I am even
 adding guava manually to the assembly because I just could not get
 sbt-assembly to not complain.

 I am far from a good understanding of sbt / maven internals, but it seems
 that the ‘compile’ scope set in the spark POM for a lot of dependencies is
 somehow not honored and the libs end up causing conflicts in sbt-assembly.

 (I am writing this to share experience, not to complain. Thanks for the
 great work!!)

 onward...

 Jan





  iulian
 
 
  Jan
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
  --
 
  --
  Iulian Dragos
 
  --
  Reactive Apps on the JVM
  www.typesafe.com
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to share large resources like dictionaries while processing data with Spark ?

2015-06-04 Thread Olivier Girardot
You can use it as a broadcast variable, but if it's too large (more than
1Gb I guess), you may need to share it joining this using some kind of key
to the other RDDs.
But this is the kind of thing broadcast variables were designed for.

Regards,

Olivier.

Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a
écrit :

 We have some pipelines defined where sometimes we need to load potentially
 large resources such as dictionaries.

 What would be the best strategy for sharing such resources among the
 transformations/actions within a consumer?  Can they be shared somehow
 across the RDD's?

 I'm looking for a way to load such a resource once into the cluster memory
 and have it be available throughout the lifecycle of a consumer...

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




sqlCtx.load a single big csv file from s3 in parallel

2015-06-04 Thread gy8
Hi there!

I'm trying to read a large .csv file (14GB) into a dataframe from S3 via the
spark-csv package. I want to load this data in parallel utilizing all 20
executors that I have, however by default only 3 executors are being used
(which downloaded 5gb/5gb/4gb).

Here is my script (im using pyspark):

lol_file = sqlCtx.load(source=com.databricks.spark.csv,
  header=false,
  path=lol_file_path)

I have tried add option flags 1) minSplits=120, 2) minPartitions=120 but
neither worked. I tried reading the source code but I'm noob at scala and
could not figure out how the options are being used :(

Thank you for reading and any help is much appreciated!

Guang



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sqlCtx-load-a-single-big-csv-file-from-s3-in-parallel-tp23163.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How to share large resources like dictionaries while processing data with Spark ?

2015-06-04 Thread Huang, Roger
Is the dictionary read-only?
Did you look at 
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables ?


-Original Message-
From: dgoldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Thursday, June 04, 2015 4:50 PM
To: user@spark.apache.org
Subject: How to share large resources like dictionaries while processing data 
with Spark ?

We have some pipelines defined where sometimes we need to load potentially 
large resources such as dictionaries.

What would be the best strategy for sharing such resources among the 
transformations/actions within a consumer?  Can they be shared somehow across 
the RDD's?

I'm looking for a way to load such a resource once into the cluster memory and 
have it be available throughout the lifecycle of a consumer...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: TreeReduce Functionality in Spark

2015-06-04 Thread DB Tsai
For the first round, you will have 16 reducers working since you have
32 partitions. Two of 32 partitions will know which reducer they will
go by sharing the same key using reduceByKey.

After this step is done, you will have 16 partitions, so the next
round will be 8 reducers.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Thu, Jun 4, 2015 at 12:06 PM, Raghav Shankar raghav0110...@gmail.com wrote:
 Hey DB,

 Thanks for the reply!

 I still don't think this answers my question. For example, if I have a top()
 action being executed and I have 32 workers(32 partitions), and I choose a
 depth of 4, what does the overlay of intermediate reducers look like? How
 many reducers are there excluding the master and the worker? How many
 partitions get sent to each of these intermediate reducers? Does this number
 vary at each level?

 Thanks!


 On Thursday, June 4, 2015, DB Tsai dbt...@dbtsai.com wrote:

 By default, the depth of the tree is 2. Each partition will be one node.

 Sincerely,

 DB Tsai
 ---
 Blog: https://www.dbtsai.com


 On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar raghav0110...@gmail.com
 wrote:
  Hey Reza,
 
  Thanks for your response!
 
  Your response clarifies some of my initial thoughts. However, what I
  don't
  understand is how the depth of the tree is used to identify how many
  intermediate reducers there will be, and how many partitions are sent to
  the
  intermediate reducers. Could you provide some insight into this?
 
  Thanks,
  Raghav
 
  On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com wrote:
 
  In a regular reduce, all partitions have to send their reduced value to
  a
  single machine, and that machine can become a bottleneck.
 
  In a treeReduce, the partitions talk to each other in a logarithmic
  number
  of rounds. Imagine a binary tree that has all the partitions at its
  leaves
  and the root will contain the final reduced value. This way there is no
  single bottleneck machine.
 
  It remains to decide the number of children each node should have and
  how
  deep the tree should be, which is some of the logic in the method you
  pasted.
 
  On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com wrote:
 
  I am trying to understand what the treeReduce function for an RDD
  does,
  and
  how it is different from the normal reduce function. My current
  understanding is that treeReduce tries to split up the reduce into
  multiple
  steps. We do a partial reduce on different nodes, and then a final
  reduce
  is
  done to get the final result. Is this correct? If so, I guess what I
  am
  curious about is, how does spark decide how many nodes will be on each
  level, and how many partitions will be sent to a given node?
 
  The bulk of the implementation is within this function:
 
  partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
.getOrElse(throw new UnsupportedOperationException(empty
  collection))
 
  The above function is expanded to
 
  val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
  (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp,
  cleanCombOp)
var partiallyAggregated = mapPartitions(it =
  Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
  depth)).toInt, 2)
// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.
while (numPartitions  scale + numPartitions / scale) {
  numPartitions /= scale
  val curNumPartitions = numPartitions
  partiallyAggregated =
  partiallyAggregated.mapPartitionsWithIndex
  {
(i, iter) = iter.map((i % curNumPartitions, _))
  }.reduceByKey(new HashPartitioner(curNumPartitions),
  cleanCombOp).values
}
partiallyAggregated.reduce(cleanCombOp)
 
  I am completely lost about what is happening in this function. I would
  greatly appreciate some sort of explanation.
 
 
 
 
  --
  View this message in context:
 
  http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
  Sent from the Apache Spark User List mailing list archive at
  Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to share large resources like dictionaries while processing data with Spark ?

2015-06-04 Thread Yiannis Gkoufas
Hi there,

I would recommend checking out
https://github.com/spark-jobserver/spark-jobserver which I think gives the
functionality you are looking for.
I haven't tested it though.

BR

On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com wrote:

 You can use it as a broadcast variable, but if it's too large (more than
 1Gb I guess), you may need to share it joining this using some kind of key
 to the other RDDs.
 But this is the kind of thing broadcast variables were designed for.

 Regards,

 Olivier.

 Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a
 écrit :

 We have some pipelines defined where sometimes we need to load potentially
 large resources such as dictionaries.

 What would be the best strategy for sharing such resources among the
 transformations/actions within a consumer?  Can they be shared somehow
 across the RDD's?

 I'm looking for a way to load such a resource once into the cluster memory
 and have it be available throughout the lifecycle of a consumer...

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to share large resources like dictionaries while processing data with Spark ?

2015-06-04 Thread Dmitry Goldenberg
Thanks so much, Yiannis, Olivier, Huang!

On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas johngou...@gmail.com
wrote:

 Hi there,

 I would recommend checking out
 https://github.com/spark-jobserver/spark-jobserver which I think gives
 the functionality you are looking for.
 I haven't tested it though.

 BR

 On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com wrote:

 You can use it as a broadcast variable, but if it's too large (more
 than 1Gb I guess), you may need to share it joining this using some kind of
 key to the other RDDs.
 But this is the kind of thing broadcast variables were designed for.

 Regards,

 Olivier.

 Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a
 écrit :

 We have some pipelines defined where sometimes we need to load
 potentially
 large resources such as dictionaries.

 What would be the best strategy for sharing such resources among the
 transformations/actions within a consumer?  Can they be shared somehow
 across the RDD's?

 I'm looking for a way to load such a resource once into the cluster
 memory
 and have it be available throughout the lifecycle of a consumer...

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





How to share large resources like dictionaries while processing data with Spark ?

2015-06-04 Thread dgoldenberg
We have some pipelines defined where sometimes we need to load potentially
large resources such as dictionaries.

What would be the best strategy for sharing such resources among the
transformations/actions within a consumer?  Can they be shared somehow
across the RDD's?

I'm looking for a way to load such a resource once into the cluster memory
and have it be available throughout the lifecycle of a consumer...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Deduping events using Spark

2015-06-04 Thread Richard Marscher
I think if you create a bidirectional mapping from AnalyticsEvent to
another type that would wrap it and use the nonce as its equality, you
could then do something like reduceByKey to group by nonce and map back to
AnalyticsEvent after.

On Thu, Jun 4, 2015 at 1:10 PM, lbierman leebier...@gmail.com wrote:

 I'm still a bit new to Spark and am struggilng to figure out the best way
 to
 Dedupe my events.

 I load my Avro files from HDFS and then I want to dedupe events that have
 the same nonce.

 For example my code so far:

  JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent)
 context.newAPIHadoopRDD(
 context.hadoopConfiguration(),
 AvroKeyInputFormat.class,
 AvroKey.class,
 NullWritable.class
 ).keys())
 .map(event - AnalyticsEvent.newBuilder(event.datum()).build())
 .filter(key - { return
 Optional.ofNullable(key.getStepEventKey()).isPresent(); })

 Now I want to get back an RDD of AnalyticsEvents that are unique. So I
 basically want to do:
 if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of
 them.

 I'm not sure how to do this? If I do reduceByKey it reduces by
 AnalyticsEvent not by the values inside?

 Any guidance would be much appreciated how I can walk this list of events
 and only return a filtered version of unique nocnes.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: inlcudePackage() deprecated?

2015-06-04 Thread Shivaram Venkataraman
Yeah - We don't have support for running UDFs on DataFrames yet. There is
an open issue to track this https://issues.apache.org/jira/browse/SPARK-6817

Thanks
Shivaram

On Thu, Jun 4, 2015 at 3:10 AM, Daniel Emaasit daniel.emaa...@gmail.com
wrote:

 Hello Shivaram,
 Was the includePackage() function deprecated in SparkR 1.4.0?
 I don't see it in the documentation? If it was, does that mean that we can
 use R packages on Spark DataFrames the usual way we do for local R
 dataframes?

 Daniel

 --
 Daniel Emaasit
 Ph.D. Research Assistant
 Transportation Research Center (TRC)
 University of Nevada, Las Vegas
 Las Vegas, NV 89154-4015
 Cell: 615-649-2489
 www.danielemaasit.com  http://www.danielemaasit.com/






Re: TreeReduce Functionality in Spark

2015-06-04 Thread DB Tsai
By default, the depth of the tree is 2. Each partition will be one node.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar raghav0110...@gmail.com wrote:
 Hey Reza,

 Thanks for your response!

 Your response clarifies some of my initial thoughts. However, what I don't
 understand is how the depth of the tree is used to identify how many
 intermediate reducers there will be, and how many partitions are sent to the
 intermediate reducers. Could you provide some insight into this?

 Thanks,
 Raghav

 On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com wrote:

 In a regular reduce, all partitions have to send their reduced value to a
 single machine, and that machine can become a bottleneck.

 In a treeReduce, the partitions talk to each other in a logarithmic number
 of rounds. Imagine a binary tree that has all the partitions at its leaves
 and the root will contain the final reduced value. This way there is no
 single bottleneck machine.

 It remains to decide the number of children each node should have and how
 deep the tree should be, which is some of the logic in the method you
 pasted.

 On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com wrote:

 I am trying to understand what the treeReduce function for an RDD does,
 and
 how it is different from the normal reduce function. My current
 understanding is that treeReduce tries to split up the reduce into
 multiple
 steps. We do a partial reduce on different nodes, and then a final reduce
 is
 done to get the final result. Is this correct? If so, I guess what I am
 curious about is, how does spark decide how many nodes will be on each
 level, and how many partitions will be sent to a given node?

 The bulk of the implementation is within this function:

 partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
   .getOrElse(throw new UnsupportedOperationException(empty
 collection))

 The above function is expanded to

 val cleanSeqOp = context.clean(seqOp)
   val cleanCombOp = context.clean(combOp)
   val aggregatePartition =
 (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp,
 cleanCombOp)
   var partiallyAggregated = mapPartitions(it =
 Iterator(aggregatePartition(it)))
   var numPartitions = partiallyAggregated.partitions.length
   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
 depth)).toInt, 2)
   // If creating an extra level doesn't help reduce
   // the wall-clock time, we stop tree aggregation.
   while (numPartitions  scale + numPartitions / scale) {
 numPartitions /= scale
 val curNumPartitions = numPartitions
 partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex
 {
   (i, iter) = iter.map((i % curNumPartitions, _))
 }.reduceByKey(new HashPartitioner(curNumPartitions),
 cleanCombOp).values
   }
   partiallyAggregated.reduce(cleanCombOp)

 I am completely lost about what is happening in this function. I would
 greatly appreciate some sort of explanation.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Anybody using Spark SQL JDBC server with DSE Cassandra?

2015-06-04 Thread Mohammed Guller
Deenar,
Thanks for the suggestion.
That is one of the ideas that I have, but didn’t get chance to try it out yet. 
One of the things that could potentially cause problems is that we use wide 
rows. In addition, the schema is dynamic, with new columns getting added on a 
regular basis. That is why I am considering DSE, which has integrated Spark SQL 
Thrift/JDBC server with Cassandra.

Mohammed

From: Deenar Toraskar [mailto:deenar.toras...@gmail.com]
Sent: Thursday, June 4, 2015 7:42 AM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: Anybody using Spark SQL JDBC server with DSE Cassandra?

Mohammed

Have you tried registering your Cassandra tables in Hive/Spark SQL using the 
data frames API. These should be then available to query via the Spark 
SQL/Thrift JDBC Server.

Deenar

On 1 June 2015 at 19:33, Mohammed Guller 
moham...@glassbeam.commailto:moham...@glassbeam.com wrote:
Nobody using Spark SQL JDBC/Thrift server with DSE Cassandra?

Mohammed

From: Mohammed Guller 
[mailto:moham...@glassbeam.commailto:moham...@glassbeam.com]
Sent: Friday, May 29, 2015 11:49 AM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Anybody using Spark SQL JDBC server with DSE Cassandra?

Hi –

We have successfully integrated Spark SQL with Cassandra. We have a backend 
that provides a REST API that allows users to execute SQL queries on data in 
C*. Now we would like to also support JDBC/ODBC connectivity , so that user can 
use tools like Tableau to query data in C* through the Spark SQL JDBC server.

However, I have been unable to find a driver that would allow the Spark SQL 
Thrift/JDBC server to connect with Cassandra. DataStax provides a closed-source 
driver that comes only with the DSE version of Cassandra.

I would like to find out how many people are using the Spark SQL JDBC server + 
DSE Cassandra combination. If you do use Spark SQL JDBC server + DSE, I would 
appreciate if you could share your experience. For example, what kind of issues 
you have run into? How is the performance? What reporting tools you are using?

Thank  you.

Mohammed




Re: TreeReduce Functionality in Spark

2015-06-04 Thread Raghav Shankar
Hey Reza,

Thanks for your response!

Your response clarifies some of my initial thoughts. However, what I don't
understand is how the depth of the tree is used to identify how many
intermediate reducers there will be, and how many partitions are sent to
the intermediate reducers. Could you provide some insight into this?

Thanks,
Raghav

On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com wrote:

 In a regular reduce, all partitions have to send their reduced value to a
 single machine, and that machine can become a bottleneck.

 In a treeReduce, the partitions talk to each other in a logarithmic number
 of rounds. Imagine a binary tree that has all the partitions at its leaves
 and the root will contain the final reduced value. This way there is no
 single bottleneck machine.

 It remains to decide the number of children each node should have and how
 deep the tree should be, which is some of the logic in the method you
 pasted.

 On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com
 javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote:

 I am trying to understand what the treeReduce function for an RDD does,
 and
 how it is different from the normal reduce function. My current
 understanding is that treeReduce tries to split up the reduce into
 multiple
 steps. We do a partial reduce on different nodes, and then a final reduce
 is
 done to get the final result. Is this correct? If so, I guess what I am
 curious about is, how does spark decide how many nodes will be on each
 level, and how many partitions will be sent to a given node?

 The bulk of the implementation is within this function:

 partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
   .getOrElse(throw new UnsupportedOperationException(empty
 collection))

 The above function is expanded to

 val cleanSeqOp = context.clean(seqOp)
   val cleanCombOp = context.clean(combOp)
   val aggregatePartition =
 (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp,
 cleanCombOp)
   var partiallyAggregated = mapPartitions(it =
 Iterator(aggregatePartition(it)))
   var numPartitions = partiallyAggregated.partitions.length
   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
 depth)).toInt, 2)
   // If creating an extra level doesn't help reduce
   // the wall-clock time, we stop tree aggregation.
   while (numPartitions  scale + numPartitions / scale) {
 numPartitions /= scale
 val curNumPartitions = numPartitions
 partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
   (i, iter) = iter.map((i % curNumPartitions, _))
 }.reduceByKey(new HashPartitioner(curNumPartitions),
 cleanCombOp).values
   }
   partiallyAggregated.reduce(cleanCombOp)

 I am completely lost about what is happening in this function. I would
 greatly appreciate some sort of explanation.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 javascript:_e(%7B%7D,'cvml','user-unsubscr...@spark.apache.org');
 For additional commands, e-mail: user-h...@spark.apache.org
 javascript:_e(%7B%7D,'cvml','user-h...@spark.apache.org');





Re: Spark Job always cause a node to reboot

2015-06-04 Thread Ruslan Dautkhanov
vm.swappiness=0? Some vendors recommend this set to 0 (zero), although I've
seen this causes even kernel to fail to allocate memory.
It may cause node reboot. If that's the case, set vm.swappiness to 5-10 and
decrease spark.*.memory. Your spark.driver.memory+
spark.executor.memory + OS + etc  amount of memory node has.



-- 
Ruslan Dautkhanov

On Thu, Jun 4, 2015 at 8:59 AM, Chao Chen kandy...@gmail.com wrote:

 Hi all,
 I am new to spark. I am trying to deploy HDFS (hadoop-2.6.0) and
 Spark-1.3.1 with four nodes, and each node has 8-cores and 8GB memory.
 One is configured as headnode running masters, and 3 others are workers

 But when I try to run the Pagerank from HiBench, it always cause a node to
 reboot during the middle of the work for all scala, java, and python
 versions. But works fine
 with the MapReduce version from the same benchmark.

 I also tried standalone deployment, got the same issue.

 My spark-defaults.conf
 spark.masteryarn-client
 spark.driver.memory 4g
 spark.executor.memory   4g
 spark.rdd.compress  false


 The job submit script is:

 bin/spark-submit  --properties-file
 HiBench/report/pagerank/spark/scala/conf/sparkbench/spark.conf --class
 org.apache.spark.examples.SparkPageRank --master yarn-client
 --num-executors 2 --executor-cores 4 --executor-memory 4G --driver-memory
 4G
 HiBench/src/sparkbench/target/sparkbench-4.0-SNAPSHOT-MR2-spark1.3-jar-with-dependencies.jar
 hdfs://discfarm:9000/HiBench/Pagerank/Input/edges
 hdfs://discfarm:9000/HiBench/Pagerank/Output 3

 What is problem with my configuration ? and How can I find the cause ?

 any help is welcome !











 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Adding new Spark workers on AWS EC2 - access error

2015-06-04 Thread barmaley
The issue was that SSH key generated on Spark Master was not transferred to
this new slave. Spark-ec2 script with `start` command omits this step. The
solution is to use `launch` command with `--resume` options. Then the SSH
key is transferred to the new slave and everything goes smooth.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Adding-new-Spark-workers-on-AWS-EC2-access-error-tp23143p23155.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to run spark streaming application on YARN?

2015-06-04 Thread Saiph Kappa
Thanks! It is working fine now with spark-submit. Just out of curiosity,
how would you use org.apache.spark.deploy.yarn.Client? Adding that
spark_yarn jar to the configuration inside the application?

On Thu, Jun 4, 2015 at 6:37 PM, Vova Shelgunov vvs...@gmail.com wrote:

 You should run it with spark-submit or using org
 .apache.spark.deploy.yarn.Client.

 2015-06-04 20:30 GMT+03:00 Saiph Kappa saiph.ka...@gmail.com:

 No, I am not. I run it with sbt «sbt run-main Branchmark». I thought it
 was the same thing since I am passing all the configurations through the
 application code. Is that the problem?

 On Thu, Jun 4, 2015 at 6:26 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Saiph,

 Are you launching using spark-submit?

 -Sandy

 On Thu, Jun 4, 2015 at 10:20 AM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Hi,

 I've been running my spark streaming application in standalone mode
 without any worries. Now, I've been trying to run it on YARN (hadoop 2.7.0)
 but I am having some problems.

 Here are the config parameters of my application:
 «
 val sparkConf = new SparkConf()

 sparkConf.setMaster(yarn-client)
 sparkConf.set(spark.yarn.am.memory, 2g)
 sparkConf.set(spark.executor.instances, 2)

 sparkConf.setAppName(Benchmark)

 sparkConf.setJars(Array(target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar))
 sparkConf.set(spark.executor.memory, 4g)
 sparkConf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 sparkConf.set(spark.executor.extraJavaOptions, 
 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC  +
   -XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300
 )
 if (sparkConf.getOption(spark.master) == None) {
   sparkConf.setMaster(local[*])
 }
 »

 The jar I'm including there only contains the application classes.


 Here is the log of the application: http://pastebin.com/7RSktezA

 Here is the userlog on hadoop/YARN:
 «
 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/Logging
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
 at
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
 at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at
 org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:596)
 at
 org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 14 more
 »

 I tried to add the spark core jar to ${HADOOP_HOME}/lib but the error
 persists. Am I doing something wrong?

 Thanks.







Re: How to run spark streaming application on YARN?

2015-06-04 Thread Sandy Ryza
That might work, but there might also be other steps that are required.

-Sandy

On Thu, Jun 4, 2015 at 11:13 AM, Saiph Kappa saiph.ka...@gmail.com wrote:

 Thanks! It is working fine now with spark-submit. Just out of curiosity,
 how would you use org.apache.spark.deploy.yarn.Client? Adding that
 spark_yarn jar to the configuration inside the application?

 On Thu, Jun 4, 2015 at 6:37 PM, Vova Shelgunov vvs...@gmail.com wrote:

 You should run it with spark-submit or using org
 .apache.spark.deploy.yarn.Client.

 2015-06-04 20:30 GMT+03:00 Saiph Kappa saiph.ka...@gmail.com:

 No, I am not. I run it with sbt «sbt run-main Branchmark». I thought
 it was the same thing since I am passing all the configurations through the
 application code. Is that the problem?

 On Thu, Jun 4, 2015 at 6:26 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Saiph,

 Are you launching using spark-submit?

 -Sandy

 On Thu, Jun 4, 2015 at 10:20 AM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Hi,

 I've been running my spark streaming application in standalone mode
 without any worries. Now, I've been trying to run it on YARN (hadoop 
 2.7.0)
 but I am having some problems.

 Here are the config parameters of my application:
 «
 val sparkConf = new SparkConf()

 sparkConf.setMaster(yarn-client)
 sparkConf.set(spark.yarn.am.memory, 2g)
 sparkConf.set(spark.executor.instances, 2)

 sparkConf.setAppName(Benchmark)

 sparkConf.setJars(Array(target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar))
 sparkConf.set(spark.executor.memory, 4g)
 sparkConf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 sparkConf.set(spark.executor.extraJavaOptions, 
 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC  +
   -XX:+AggressiveOpts -XX:FreqInlineSize=300
 -XX:MaxInlineSize=300 )
 if (sparkConf.getOption(spark.master) == None) {
   sparkConf.setMaster(local[*])
 }
 »

 The jar I'm including there only contains the application classes.


 Here is the log of the application: http://pastebin.com/7RSktezA

 Here is the userlog on hadoop/YARN:
 «
 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/Logging
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
 at
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
 at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at
 org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:596)
 at
 org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 14 more
 »

 I tried to add the spark core jar to ${HADOOP_HOME}/lib but the error
 persists. Am I doing something wrong?

 Thanks.








Re: How to run spark streaming application on YARN?

2015-06-04 Thread Saiph Kappa
Additionally, I think this document (
https://spark.apache.org/docs/latest/building-spark.html ) should mention
that the protobuf.version might need to be changed to match the one used in
the chosen hadoop version. For instance, with hadoop 2.7.0 I had to change
protobuf.version to 1.5.0 to be able to run my application.

On Thu, Jun 4, 2015 at 7:14 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 That might work, but there might also be other steps that are required.

 -Sandy

 On Thu, Jun 4, 2015 at 11:13 AM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Thanks! It is working fine now with spark-submit. Just out of curiosity,
 how would you use org.apache.spark.deploy.yarn.Client? Adding that
 spark_yarn jar to the configuration inside the application?

 On Thu, Jun 4, 2015 at 6:37 PM, Vova Shelgunov vvs...@gmail.com wrote:

 You should run it with spark-submit or using org
 .apache.spark.deploy.yarn.Client.

 2015-06-04 20:30 GMT+03:00 Saiph Kappa saiph.ka...@gmail.com:

 No, I am not. I run it with sbt «sbt run-main Branchmark». I thought
 it was the same thing since I am passing all the configurations through the
 application code. Is that the problem?

 On Thu, Jun 4, 2015 at 6:26 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Saiph,

 Are you launching using spark-submit?

 -Sandy

 On Thu, Jun 4, 2015 at 10:20 AM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Hi,

 I've been running my spark streaming application in standalone mode
 without any worries. Now, I've been trying to run it on YARN (hadoop 
 2.7.0)
 but I am having some problems.

 Here are the config parameters of my application:
 «
 val sparkConf = new SparkConf()

 sparkConf.setMaster(yarn-client)
 sparkConf.set(spark.yarn.am.memory, 2g)
 sparkConf.set(spark.executor.instances, 2)

 sparkConf.setAppName(Benchmark)

 sparkConf.setJars(Array(target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar))
 sparkConf.set(spark.executor.memory, 4g)
 sparkConf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 sparkConf.set(spark.executor.extraJavaOptions, 
 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC  +
   -XX:+AggressiveOpts -XX:FreqInlineSize=300
 -XX:MaxInlineSize=300 )
 if (sparkConf.getOption(spark.master) == None) {
   sparkConf.setMaster(local[*])
 }
 »

 The jar I'm including there only contains the application classes.


 Here is the log of the application: http://pastebin.com/7RSktezA

 Here is the userlog on hadoop/YARN:
 «
 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/Logging
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
 at
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
 at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at
 org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:596)
 at
 org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 14 more
 »

 I tried to add the spark core jar to ${HADOOP_HOME}/lib but the error
 persists. Am I doing something wrong?

 Thanks.









Re: Required settings for permanent HDFS Spark on EC2

2015-06-04 Thread barmaley
Hi - I'm having similar problem with switching from ephemeral to persistent
HDFS - it always looks for 9000 port regardless of options I set for 9010
persistent HDFS. Have you figured out a solution? Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Required-settings-for-permanent-HDFS-Spark-on-EC2-tp22860p23157.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Deduping events using Spark

2015-06-04 Thread lbierman
I'm still a bit new to Spark and am struggilng to figure out the best way to
Dedupe my events.

I load my Avro files from HDFS and then I want to dedupe events that have
the same nonce. 

For example my code so far:

 JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent)
context.newAPIHadoopRDD(
context.hadoopConfiguration(),
AvroKeyInputFormat.class,
AvroKey.class,
NullWritable.class
).keys())
.map(event - AnalyticsEvent.newBuilder(event.datum()).build())
.filter(key - { return
Optional.ofNullable(key.getStepEventKey()).isPresent(); })

Now I want to get back an RDD of AnalyticsEvents that are unique. So I
basically want to do:
if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of
them.

I'm not sure how to do this? If I do reduceByKey it reduces by
AnalyticsEvent not by the values inside?

Any guidance would be much appreciated how I can walk this list of events
and only return a filtered version of unique nocnes.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Standard Scaler taking 1.5hrs

2015-06-04 Thread Piero Cinquegrana
Hi DB,

Yes I am running count() operations on the previous steps and it appears that 
something is slow prior to the scaler. I thought that running take(5) and print 
the results would execute the command at each step and materialize the RDD, but 
is that not the case? That’s how I was testing each step.

Thanks,

Piero

From: DB Tsai [mailto:dbt...@dbtsai.com]
Sent: Wednesday, June 03, 2015 10:33 PM
To: Piero Cinquegrana
Cc: user@spark.apache.org
Subject: Re: Standard Scaler taking 1.5hrs

Can you do count() before fit to force materialize the RDD? I think something 
before fit is slow.

On Wednesday, June 3, 2015, Piero Cinquegrana 
pcinquegr...@marketshare.commailto:pcinquegr...@marketshare.com wrote:
The fit part is very slow, transform not at all.

The number of partitions was 210 vs number of executors 80.

Spark 1.4 sounds great but as my company is using Qubole we are dependent upon 
them to upgrade from version 1.3.1. Until that happens, can you think of any 
other reasons as to why it could be slow. Sparse vectors? Excessive number of 
columns?

Sent from my mobile device. Please excuse any typos.

On Jun 3, 2015, at 9:53 PM, DB Tsai 
dbt...@dbtsai.comjavascript:_e(%7B%7D,'cvml','dbt...@dbtsai.com'); wrote:
Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but 
very small, and transform doesn't do shuffle. I guess you don't have enough 
partition, so please repartition your input dataset to a number at least larger 
than the # of executors you have.

In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic net, 
and in that version, we use quasi newton for optimization, so it will be a way 
faster than SGD implementation. Also, in that implementation, StandardScaler is 
not required since in computing the loss function, we implicitly do this for 
you.

https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef

Please try this out and give us feedback. Thanks.

On Wednesday, June 3, 2015, Piero Cinquegrana 
pcinquegr...@marketshare.comjavascript:_e(%7B%7D,'cvml','pcinquegr...@marketshare.com');
 wrote:
Hello User group,

I have a RDD of LabeledPoint composed of sparse vectors like showing below. In 
the next step, I am standardizing the columns with the Standard Scaler. The 
data has 2450 columns and ~110M rows. It took 1.5hrs to complete the 
standardization with 10 nodes and 80 executors. The spark.executor.memory was 
set to 2g and the driver memory to 5g.

scala val parsedData = stack_sorted.mapPartitions( partition =
partition.map{row = 
LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, 
InteractionIds, tupleMap, vecLength))
 }, 
preservesPartitioning=true).cache()

CategoriesIdx: Array[Int] = Array(3, 8, 12)
InteractionIds: Array[(Int, Int)] = Array((13,12))
vecLength: Int = 2450
parsedData: 
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = 
MapPartitionsRDD[93] at mapPartitions at console:111
(1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0]))
(0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0]))
(2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0]))
(1.0,(2450,[297,1322,2430],[1.0,1.0,1.0]))
(0.0,(2450,[898,1322,2430],[1.0,1.0,1.0]))


My suspicious is that because the data is partitioned using a custom 
partitioner the Standard Scaler is causing a major shuffle operation. Any 
suggestion on how to improve the performance this step and a 
LinearRegressionWithSGD() which is also taking a very long time?

scala parsedData.partitioner
res72: Option[org.apache.spark.Partitioner] = 
Some(org.apache.spark.HashPartitioner@d2mailto:org.apache.spark.HashPartitioner@d2)

scala val scaler = new StandardScaler(withMean = false, withStd = 
true).fit(parsedData.map( row =  row.features))
scala val scaledData = parsedData.mapPartitions(partition = partition.map{row 
= LabeledPoint(row.label, scaler.transform(row.features))}).cache()

scala val numIterations = 100
scala val stepSize = 0.1
scala val miniBatchFraction = 0.1
scala val algorithm = new LinearRegressionWithSGD()

scala algorithm.setIntercept(false)
scala algorithm.optimizer.setNumIterations(numIterations)
scala algorithm.optimizer.setStepSize(stepSize)
scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)

scala val model = algorithm.run(scaledData)

Best,

Piero Cinquegrana
Marketing Scientist | MarketShare
11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
P: 310.914.5677 x242tel:310.914.5677%20x242 M: 323.377.9197tel:323.377.9197
www.marketshare.comhttp://www.marketsharepartners.com/
twitter.com/marketsharephttp://twitter.com/marketsharep



--
- DB

Sent from my iPhone


Re: importerror using external library with pyspark

2015-06-04 Thread Don Drake
I would try setting PYSPARK_DRIVER_PYTHON environment variable to the
location of your python binary, especially if you are using a virtual
environment.

-Don

On Wed, Jun 3, 2015 at 8:24 PM, AlexG swift...@gmail.com wrote:

 I have libskylark installed on both machines in my two node cluster in the
 same locations, and checked that the following code, which calls
 libskylark,
 works on both nodes with 'pyspark rfmtest.py':

 import re
 import numpy
 import skylark.ml.kernels
 import random
 import os

 from pyspark import SparkContext
 sc = SparkContext(appName=test)

 SIGMA = 10
 NUM_RF = 500
 numfeatures = 100
 numpoints = 1000
 kernel = skylark.ml.kernels.Gaussian(numfeatures, SIGMA)
 S = kernel.rft(NUM_RF)

 rows = sc.parallelize(numpy.random.rand(numpoints, numfeatures).tolist(),
 6)
 sketched_rows = rows.map(lambda row : S /
 numpy.ndarray(shape=(1,numfeatures), buffer=numpy.array(row)).copy())

 os.system(rm -rf spark_out)
 sketched_rows.saveAsTextFile('spark_out')

 However, when I try to run the same code on the cluster with 'spark-submit
 --master spark://master:7077 rfmtest.py', I get an ImportError saying that
 skylark.sketch does not exist:

 15/06/04 01:21:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
 on master:40244 (size: 67.5 KB, free: 265.3 MB)
 15/06/04 01:21:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
 on node001:45690 (size: 67.5 KB, free: 265.3 MB)
 15/06/04 01:21:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
 master): org.apache.spark.api.python.PythonException: Traceback (most
 recent
 call last):
   File /opt/Spark/python/pyspark/worker.py, line 88, in main
 command = pickleSer._read_with_length(infile)
   File /opt/Spark/python/pyspark/serializers.py, line 156, in
 _read_with_length
 return self.loads(obj)
   File /opt/Spark/python/pyspark/serializers.py, line 405, in loads
 return cPickle.loads(obj)
 ImportError: No module named skylark.sketch

 at
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
 at
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
 at
 org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Any ideas what might be going on?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/importerror-using-external-library-with-pyspark-tp23145.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
http://www.MailLaunder.com/
800-733-2143


Re: Standard Scaler taking 1.5hrs

2015-06-04 Thread Holden Karau
take(5) will only evaluate enough partitions to provide 5 elements
(sometimes a few more but you get the idea), so it won't trigger a full
evaluation of all partitions unlike count().

On Thursday, June 4, 2015, Piero Cinquegrana pcinquegr...@marketshare.com
wrote:

  Hi DB,



 Yes I am running count() operations on the previous steps and it appears
 that something is slow prior to the scaler. I thought that running take(5)
 and print the results would execute the command at each step and
 materialize the RDD, but is that not the case? That’s how I was testing
 each step.



 Thanks,



 Piero



 *From:* DB Tsai [mailto:dbt...@dbtsai.com
 javascript:_e(%7B%7D,'cvml','dbt...@dbtsai.com');]
 *Sent:* Wednesday, June 03, 2015 10:33 PM
 *To:* Piero Cinquegrana
 *Cc:* user@spark.apache.org
 javascript:_e(%7B%7D,'cvml','user@spark.apache.org');
 *Subject:* Re: Standard Scaler taking 1.5hrs



 Can you do count() before fit to force materialize the RDD? I think
 something before fit is slow.

 On Wednesday, June 3, 2015, Piero Cinquegrana 
 pcinquegr...@marketshare.com
 javascript:_e(%7B%7D,'cvml','pcinquegr...@marketshare.com'); wrote:

 The fit part is very slow, transform not at all.



 The number of partitions was 210 vs number of executors 80.



 Spark 1.4 sounds great but as my company is using Qubole we are dependent
 upon them to upgrade from version 1.3.1. Until that happens, can you think
 of any other reasons as to why it could be slow. Sparse vectors? Excessive
 number of columns?


 Sent from my mobile device. Please excuse any typos.


 On Jun 3, 2015, at 9:53 PM, DB Tsai dbt...@dbtsai.com wrote:

  Which part of StandardScaler is slow? Fit or transform? Fit has shuffle
 but very small, and transform doesn't do shuffle. I guess you don't have
 enough partition, so please repartition your input dataset to a number at
 least larger than the # of executors you have.



 In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic
 net, and in that version, we use quasi newton for optimization, so it will
 be a way faster than SGD implementation. Also, in that
 implementation, StandardScaler is not required since in computing the loss
 function, we implicitly do this for you.




 https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef



 Please try this out and give us feedback. Thanks.

 On Wednesday, June 3, 2015, Piero Cinquegrana 
 pcinquegr...@marketshare.com wrote:

 Hello User group,



 I have a RDD of LabeledPoint composed of sparse vectors like showing
 below. In the next step, I am standardizing the columns with the Standard
 Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to
 complete the standardization with 10 nodes and 80 executors. The
 spark.executor.memory was set to 2g and the driver memory to 5g.



 scala val parsedData = stack_sorted.mapPartitions( partition =

 partition.map{row
 = LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2,
 CategoriesIdx, InteractionIds, tupleMap, vecLength))

  },
 preservesPartitioning=true).cache()


 CategoriesIdx: Array[Int] = Array(3, 8, 12)

 InteractionIds: Array[(Int, Int)] = Array((13,12))

 vecLength: Int = 2450

 parsedData:
 org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] =
 MapPartitionsRDD[93] at mapPartitions at console:111

 (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0]))

 (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0]))

 (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0]))

 (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0]))

 (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0]))





 My suspicious is that because the data is partitioned using a custom
 partitioner the Standard Scaler is causing a major shuffle operation. Any
 suggestion on how to improve the performance this step and a
 LinearRegressionWithSGD() which is also taking a very long time?



 scala parsedData.partitioner

 res72: Option[org.apache.spark.Partitioner] = Some(
 org.apache.spark.HashPartitioner@d2
 javascript:_e(%7B%7D,'cvml','org.apache.spark.HashPartitioner@d2');)



 scala val scaler = new StandardScaler(withMean = false, withStd =
 true).fit(parsedData.map( row =  row.features))

 scala val scaledData = parsedData.mapPartitions(partition =
 partition.map{row = LabeledPoint(row.label,
 scaler.transform(row.features))}).cache()



 scala val numIterations = 100

 scala val stepSize = 0.1

 scala val miniBatchFraction = 0.1

 scala val algorithm = new LinearRegressionWithSGD()



 scala algorithm.setIntercept(false)

 scala algorithm.optimizer.setNumIterations(numIterations)

 scala algorithm.optimizer.setStepSize(stepSize)

 scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)



 scala val model = algorithm.run(scaledData)



 Best,



 Piero Cinquegrana

 Marketing Scientist | MarketShare
 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
 P: 310.914.5677 

Re: Scaling spark jobs returning large amount of data

2015-06-04 Thread Richard Marscher
It is possible to start multiple concurrent drivers, Spark dynamically
allocates ports per spark application on driver, master, and workers from
a port range. When you collect results back to the driver, they do not go
through the master. The master is mostly there as a coordinator between the
driver and the cluster of worker nodes, but otherwise the workers and
driver communicate directly for the underlying workload.

A spark application relates to one instance of a SparkContext
programmatically or to one call to one of the spark submit scripts.
Assuming you don't have dynamic resource allocation setup, each application
takes a fixed amount of the cluster resources to run. So as long as you
subdivide your cluster resources properly you can run multiple concurrent
applications against it. We are doing this in production presently.

Alternately, as Igor suggests, you can share a spark application and launch
different jobs within it. They will share the resources allocated to the
application in this case. An effect of this is you will only have a finite
amount of concurrent spark tasks (roughly translates to 1 task can execute
1 partition of a job at a time). If you launch multiple independent jobs
within the same application you will likely want to enable fair job
scheduling, otherwise stages between independent jobs will run in a FIFO
order instead of interleaving execution.

Hope this helps,
Richard

On Thu, Jun 4, 2015 at 11:20 AM, Igor Berman igor.ber...@gmail.com wrote:

 Hi,
 as far as I understand you shouldn't send data to driver. Suppose you have
 file in hdfs/s3 or cassandra partitioning, you should create your job such
 that every executor/worker of spark will handle part of your input,
 transform, filter it and at the end write back to cassandra as output(once
 again every executor/core inside worker will write part of the output, in
 your case they will write part of report)

 In general I find that submitting multiple jobs in same spark context(aka
 driver) is more performant(you don't pay startup-shutdown time), for this
 some use rest server for submitting jobs to long running spark
 context(driver)

 I'm not sure you can run multiple concurrent drivers because of ports

 On 4 June 2015 at 17:30, Giuseppe Sarno giuseppesa...@fico.com wrote:

  Hello,

 I am relatively new to spark and I am currently trying to understand how
 to scale large numbers of jobs with spark.

 I understand that spark architecture is split in “Driver”, “Master” and
 “Workers”. Master has a standby node in case of failure and workers can
 scale out.

 All the examples I have seen show Spark been able to distribute the load
 to the workers and returning small amount of data to the Driver. In my case
 I would like to explore the scenario where I need to generate a large
 report on data stored on Cassandra and understand how Spark architecture
 will handle this case when multiple report jobs will be running in parallel.

 According to this  presentation
 https://trongkhoanguyenblog.wordpress.com/2015/01/07/understand-the-spark-deployment-modes/
 responses from workers go through the Master and finally to the Driver.
 Does this mean that the Driver and/ or Master is a single point for all the
 responses coming back from workers ?

 Is it possible to start multiple concurrent Drivers ?



 Regards,

 Giuseppe.



 Fair Isaac Services Limited (Co. No. 01998476) and Fair Isaac (Adeptra)
 Limited (Co. No. 03295455) are registered in England and Wales and have a
 registered office address of Cottons Centre, 5th Floor, Hays Lane, London,
 SE1 2QP.

 This email and any files transmitted with it are confidential,
 proprietary and intended solely for the individual or entity to whom they
 are addressed. If you have received this email in error please delete it
 immediately.





Re: Problem reading Parquet from 1.2 to 1.3

2015-06-04 Thread Marcelo Vanzin
I talked to Don outside the list and he says that he's seeing this issue
with Apache Spark 1.3 too (not just CDH Spark), so it seems like there is a
real issue here.

On Wed, Jun 3, 2015 at 1:39 PM, Don Drake dondr...@gmail.com wrote:

 As part of upgrading a cluster from CDH 5.3.x to CDH 5.4.x I noticed that
 Spark is behaving differently when reading Parquet directories that contain
 a .metadata directory.

 It seems that in spark 1.2.x, it would just ignore the .metadata
 directory, but now that I'm using Spark 1.3, reading these files causes the
 following exceptions:

 scala val d = sqlContext.parquetFile(/user/ddrak/parq_dir)

 SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder.

 SLF4J: Defaulting to no-operation (NOP) logger implementation

 SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
 details.

 scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown
 during a parallel computation: java.lang.RuntimeException:
 hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schema.avsc is not a
 Parquet file. expected magic number at tail [80, 65, 82, 49] but found
 [116, 34, 10, 125]

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

 scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)


 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

 scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

 .

 .

 .



 java.lang.RuntimeException:
 hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schemas/1.avsc is not a
 Parquet file. expected magic number at tail [80, 65, 82, 49] but found
 [116, 34, 10, 125]

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

 scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)


 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

 scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

 .

 .

 .



 java.lang.RuntimeException:
 hdfs://nameservice1/user/ddrak/parq_dir/.metadata/descriptor.properties
 is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but
 found [117, 101, 116, 10]

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

 scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)


 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

 scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

 .

 .

 .

 at
 scala.collection.parallel.package$$anon$1.alongWith(package.scala:87)

 at
 scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)

 at
 scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)

 at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)

 at
 scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)

 at
 scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190)

 at
 scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)

 at
 scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)

 at
 scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)

 at
 scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)


TF-IDF Question

2015-06-04 Thread franco barrientos
Hi all!,

I have a .txt file where each row of it it¹s a collection of terms of a
document separated by space. For example:

1 Hola spark²
2 ..

I followed this example of spark site
https://spark.apache.org/docs/latest/mllib-feature-extraction.html and i get
something like this:

tfidf.first()
org.apache.spark.mllib.linalg.Vector =
(1048576,[35587,884670],[3.458767233,3.458767233])

I think this:

1. First parameter ³1048576² i don¹t know what it is but always it´s the
same number (maybe the number of terms).
2. Second parameter ³[35587,884670]² i think are the terms of the first line
in my .txt file.
3. Third parameter ³[3.458767233,3.458767233]² i think are the tfidf values
for my terms.
Anyone knows the exact interpretation of this and in the second point if
these values are the terms, how can i match this values with the original
terms values (³[35587=Hola,884670=spark]²)?.

Regards and thanks in advance.

Franco Barrientos
Data Scientist
Málaga #115, Of. 1003, Las Condes.
Santiago, Chile.
(+562)-29699649
(+569)-76347893
franco.barrien...@exalitica.com mailto:franco.barrien...@exalitica.com
www.exalitica.com
 http://www.exalitica.com/




How to speed up Spark Job?

2015-06-04 Thread ๏̯͡๏
I have a spark app that reads avro  sequence file data and performs join,
reduceByKey

Results:
Command for all runs:
./bin/spark-submit -v --master yarn-cluster --driver-class-path
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
--jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar
 *--num-executors 9973 --driver-memory 14g --driver-java-options
-XX:MaxPermSize=512M --executor-memory 14g --executor-cores 1 --queue xy*
--class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-04-29 endDate=2015-04-29
input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=viewItem
output=/user/dvasthimal/epdatasets/viewItem buffersize=128
maxbuffersize=1068 maxResultSize=200G

I)
Input:
(View) RDD1: 2 Days = 20 Files = 17,328,796,820 bytes = PARTIAL
(Listing) RDD2: 100 Files = 267,748,253,700 bytes  = PARTIAL
(SPS) RDD3: 63 Files = 142,687,611,360 bytes = FULL

Output:
hadoop fs -count epdatasets/viewItem
   1  101  342246603 epdatasets/viewItem
Runtime: 26mins, 36sec


II)
Input:
(View) RDD1: 2 Days = 40 Files = 34,657,593,640 bytes = PARTIAL
(Listing) RDD2: 100 Files = 267,748,253,700 bytes  = PARTIAL
(SPS) RDD3: 63 Files = 142,687,611,360 bytes = FULL

Output:
hadoop fs -count epdatasets/viewItem
   1  101  667790782 epdatasets/viewItem
Runtime: 40mins, 49sec


I cannot increase memory as 14G is limit.
I can increase number of executors and cores. Please suggest how to make
this app run faster.
-- 
Deepak


Re: Optimisation advice for Avro-Parquet merge job

2015-06-04 Thread James Aley
Thanks for the confirmation! We're quite new to Spark, so a little
reassurance is a good thing to have sometimes :-)

The thing that's concerning me at the moment is that my job doesn't seem to
run any faster with more compute resources added to the cluster, and this
is proving a little tricky to debug. There are a lot of variables, so
here's what we've tried already and the apparent impact. If anyone has any
further suggestions, we'd love to hear!

* Increase the minimum number of output files (targetPartitions above),
so that input groups smaller than our minimum chunk size can still be
worked on by more than one executor. This does measurably speed things up,
but obviously it's a trade-off, as the original goal for this job is to
merge our data into fewer, larger files.

* Submit many jobs in parallel, by running the above code in a Callable, on
an executor pool. This seems to help, to some extent, but I'm not sure what
else needs to be configured alongside it -- driver threads, scheduling
policy, etc. We set scheduling to FAIR when doing this, as that seemed
like the right approach, but we're not 100% confident. It seemed to help
quite substantially anyway, so perhaps this just needs further tuning?

* Increasing executors, RAM, etc. This doesn't make a difference by itself
for this job, so I'm thinking we're already not fully utilising the
resources we have in a smaller cluster.

Again, any recommendations appreciated. Thanks for the help!


James.

On 4 June 2015 at 15:00, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Hi

 2015-06-04 15:29 GMT+02:00 James Aley james.a...@swiftkey.com:

 Hi,

 We have a load of Avro data coming into our data systems in the form of
 relatively small files, which we're merging into larger Parquet files with
 Spark. I've been following the docs and the approach I'm taking seemed
 fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
 not the most optimal approach.

 I was wondering if anyone on this list might have some advice to make to
 make this job as efficient as possible. Here's some code:

 DataFrame dfInput = sqlContext.load(inputPaths.get(0),
 com.databricks.spark.avro);
 long totalSize = getDirSize(inputPaths.get(0));

 for (int i = 1; i  inputs.size(); ++i) {
 dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro));
 totalSize += getDirSize(inputPaths.get(i));
 }

 int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
 DataFrame outputFrame;

 // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
 // the synchronize block below. Suggestions welcome here too! :-)
 synchronized (this) {
 RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
 null);
 outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
 }

 outputFrame.save(outputPath, parquet, SaveMode.Overwrite);

 Here are some things bothering me:

- Conversion to an RDD and back again so that we can use coalesce()
to reduce the number of partitions. This is because we read that
repartition() is not as efficient as coalesce(), and local micro 
 benchmarks
seemed to somewhat confirm that this was faster. Is this really a good 
 idea
though? Should we be doing something else?

 Repartition uses coalesce but with a forced shuffle step. Its just a
 shortcut for coalesce(xxx, true)
 Doing a coalesce sounds correct, I'd do the same :) Note that if you add
 the shuffle step, then your partitions should be better balanced.


- Usage of unionAll() - this is the only way I could find to join the
separate data sets into a single data frame to save as Parquet. Is there a
better way?

 When using directly the inputformats you can do this
 FileInputFormat.addInputPath, it should perform at least as good as union.


- Do I need to be using the DataFrame API at all? I'm not querying
any data here, so the nice API for SQL-like transformations of the data
isn't being used. The DataFrame API just seemed like the path of least
resistance for working with Avro and Parquet. Would there be any advantage
to using hadoopRDD() with the appropriate Input/Output formats?



 Using directly the input/outputformats sounds viable. But the snippet you
 show seems clean enough and I am not sure there would be much value in
 making something (maybe) slightly faster but harder to understand.


 Eugen

 Any advice or tips greatly appreciated!


 James.






Re: Spark 1.3.1 On Mesos Issues.

2015-06-04 Thread John Omernik
So a few updates.  When I run local as stated before, it works fine. When I
run in Yarn (via Apache Myriad on Mesos) it also runs fine. The only issue
is specifically with Mesos. I wonder if there is some sort of class path
goodness I need to fix or something along that lines.  Any tips would be
appreciated.

Thanks!

John

On Mon, Jun 1, 2015 at 6:14 PM, Dean Wampler deanwamp...@gmail.com wrote:

 It would be nice to see the code for MapR FS Java API, but my google foo
 failed me (assuming it's open source)...

 So, shooting in the dark ;) there are a few things I would check, if you
 haven't already:

 1. Could there be 1.2 versions of some Spark jars that get picked up at
 run time (but apparently not in local mode) on one or more nodes? (Side
 question: Does your node experiment fail on all nodes?) Put another way,
 are the classpaths good for all JVM tasks?
 2. Can you use just MapR and Spark 1.3.1 successfully, bypassing Mesos?

 Incidentally, how are you combining Mesos and MapR? Are you running Spark
 in Mesos, but accessing data in MapR-FS?

 Perhaps the MapR shim library doesn't support Spark 1.3.1.

 HTH,

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Jun 1, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote:

 All -

 I am facing and odd issue and I am not really sure where to go for
 support at this point.  I am running MapR which complicates things as it
 relates to Mesos, however this HAS worked in the past with no issues so I
 am stumped here.

 So for starters, here is what I am trying to run. This is a simple show
 tables using the Hive Context:

 from pyspark import SparkContext, SparkConf
 from pyspark.sql import SQLContext, Row, HiveContext
 sparkhc = HiveContext(sc)
 test = sparkhc.sql(show tables)
 for r in test.collect():
   print r

 When I run it on 1.3.1 using ./bin/pyspark --master local  This works
 with no issues.

 When I run it using Mesos with all the settings configured (as they had
 worked in the past) I get lost tasks and when I zoom in them, the error
 that is being reported is below.  Basically it's a NullPointerException on
 the com.mapr.fs.ShimLoader.  What's weird to me is is I took each instance
 and compared both together, the class path, everything is exactly the same.
 Yet running in local mode works, and running in mesos fails.  Also of note,
 when the task is scheduled to run on the same node as when I run locally,
 that fails too! (Baffling).

 Ok, for comparison, how I configured Mesos was to download the mapr4
 package from spark.apache.org.  Using the exact same configuration file
 (except for changing the executor tgz from 1.2.0 to 1.3.1) from the 1.2.0.
 When I run this example with the mapr4 for 1.2.0 there is no issue in
 Mesos, everything runs as intended. Using the same package for 1.3.1 then
 it fails.

 (Also of note, 1.2.1 gives a 404 error, 1.2.2 fails, and 1.3.0 fails as
 well).

 So basically When I used 1.2.0 and followed a set of steps, it worked on
 Mesos and 1.3.1 fails.  Since this is a current version of Spark, MapR is
 supports 1.2.1 only.  (Still working on that).

 I guess I am at a loss right now on why this would be happening, any
 pointers on where I could look or what I could tweak would be greatly
 appreciated. Additionally, if there is something I could specifically draw
 to the attention of MapR on this problem please let me know, I am perplexed
 on the change from 1.2.0 to 1.3.1.

 Thank you,

 John




 Full Error on 1.3.1 on Mesos:
 15/05/19 09:31:26 INFO MemoryStore: MemoryStore started with capacity
 1060.3 MB java.lang.NullPointerException at
 com.mapr.fs.ShimLoader.getRootClassLoader(ShimLoader.java:96) at
 com.mapr.fs.ShimLoader.injectNativeLoader(ShimLoader.java:232) at
 com.mapr.fs.ShimLoader.load(ShimLoader.java:194) at
 org.apache.hadoop.conf.CoreDefaultProperties.(CoreDefaultProperties.java:60)
 at java.lang.Class.forName0(Native Method) at
 java.lang.Class.forName(Class.java:274) at
 org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1847)
 at
 org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2062)
 at
 org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2272)
 at
 org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2224)
 at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2141)
 at org.apache.hadoop.conf.Configuration.set(Configuration.java:992) at
 org.apache.hadoop.conf.Configuration.set(Configuration.java:966) at
 org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:98)
 at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) at
 org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:220) at
 org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) at
 

Re: Optimisation advice for Avro-Parquet merge job

2015-06-04 Thread Eugen Cepoi
Hi

2015-06-04 15:29 GMT+02:00 James Aley james.a...@swiftkey.com:

 Hi,

 We have a load of Avro data coming into our data systems in the form of
 relatively small files, which we're merging into larger Parquet files with
 Spark. I've been following the docs and the approach I'm taking seemed
 fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
 not the most optimal approach.

 I was wondering if anyone on this list might have some advice to make to
 make this job as efficient as possible. Here's some code:

 DataFrame dfInput = sqlContext.load(inputPaths.get(0),
 com.databricks.spark.avro);
 long totalSize = getDirSize(inputPaths.get(0));

 for (int i = 1; i  inputs.size(); ++i) {
 dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro));
 totalSize += getDirSize(inputPaths.get(i));
 }

 int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
 DataFrame outputFrame;

 // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
 // the synchronize block below. Suggestions welcome here too! :-)
 synchronized (this) {
 RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
 null);
 outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
 }

 outputFrame.save(outputPath, parquet, SaveMode.Overwrite);

 Here are some things bothering me:

- Conversion to an RDD and back again so that we can use coalesce() to
reduce the number of partitions. This is because we read that repartition()
is not as efficient as coalesce(), and local micro benchmarks seemed to
somewhat confirm that this was faster. Is this really a good idea though?
Should we be doing something else?

 Repartition uses coalesce but with a forced shuffle step. Its just a
shortcut for coalesce(xxx, true)
Doing a coalesce sounds correct, I'd do the same :) Note that if you add
the shuffle step, then your partitions should be better balanced.


- Usage of unionAll() - this is the only way I could find to join the
separate data sets into a single data frame to save as Parquet. Is there a
better way?

 When using directly the inputformats you can do this
FileInputFormat.addInputPath, it should perform at least as good as union.


- Do I need to be using the DataFrame API at all? I'm not querying any
data here, so the nice API for SQL-like transformations of the data isn't
being used. The DataFrame API just seemed like the path of least resistance
for working with Avro and Parquet. Would there be any advantage to using
hadoopRDD() with the appropriate Input/Output formats?



Using directly the input/outputformats sounds viable. But the snippet you
show seems clean enough and I am not sure there would be much value in
making something (maybe) slightly faster but harder to understand.


Eugen

Any advice or tips greatly appreciated!


 James.





Big performance difference when joining 3 tables in different order

2015-06-04 Thread Hao Ren
Hi,

I encountered a performance issue when join 3 tables in sparkSQL.

Here is the query:

SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
FROM t_category c, t_zipcode z, click_meter_site_grouped g
WHERE c.refCategoryID = g.category AND z.regionCode = g.region

I need to pay a lot of attention to the table order in FROM clause, if not, 
some order makes the driver broken, 
some order makes the job extremely slow,
only one order makes the job finished quickly.

For the slow one, I noticed a table is loaded 56 times !!! from its CSV
file.

I would like to know more about join implement in SparkSQL the understand
the issue (auto broadcast, etc).

For ones want to know more about the details, here is the jira:
https://issues.apache.org/jira/browse/SPARK-8102

Any help is welcome. =) Thx

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Big-performance-difference-when-joining-3-tables-in-different-order-tp23150.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Scaling spark jobs returning large amount of data

2015-06-04 Thread Giuseppe Sarno
Hello,
I am relatively new to spark and I am currently trying to understand how to 
scale large numbers of jobs with spark.
I understand that spark architecture is split in Driver, Master and 
Workers. Master has a standby node in case of failure and workers can scale 
out.
All the examples I have seen show Spark been able to distribute the load to the 
workers and returning small amount of data to the Driver. In my case I would 
like to explore the scenario where I need to generate a large report on data 
stored on Cassandra and understand how Spark architecture will handle this case 
when multiple report jobs will be running in parallel.
According to this  presentation 
https://trongkhoanguyenblog.wordpress.com/2015/01/07/understand-the-spark-deployment-modes/
 responses from workers go through the Master and finally to the Driver. Does 
this mean that the Driver and/ or Master is a single point for all the 
responses coming back from workers ?
Is it possible to start multiple concurrent Drivers ?

Regards,
Giuseppe.


Fair Isaac Services Limited (Co. No. 01998476) and Fair Isaac (Adeptra) Limited 
(Co. No. 03295455) are registered in England and Wales and have a registered 
office address of Cottons Centre, 5th Floor, Hays Lane, London, SE1 2QP.

This email and any files transmitted with it are confidential, proprietary and 
intended solely for the individual or entity to whom they are addressed. If you 
have received this email in error please delete it immediately.


Re: Anybody using Spark SQL JDBC server with DSE Cassandra?

2015-06-04 Thread Deenar Toraskar
Mohammed

Have you tried registering your Cassandra tables in Hive/Spark SQL using
the data frames API. These should be then available to query via the Spark
SQL/Thrift JDBC Server.

Deenar

On 1 June 2015 at 19:33, Mohammed Guller moham...@glassbeam.com wrote:

  Nobody using Spark SQL JDBC/Thrift server with DSE Cassandra?



 Mohammed



 *From:* Mohammed Guller [mailto:moham...@glassbeam.com]
 *Sent:* Friday, May 29, 2015 11:49 AM
 *To:* user@spark.apache.org
 *Subject:* Anybody using Spark SQL JDBC server with DSE Cassandra?



 Hi –



 We have successfully integrated Spark SQL with Cassandra. We have a
 backend that provides a REST API that allows users to execute SQL queries
 on data in C*. Now we would like to also support JDBC/ODBC connectivity ,
 so that user can use tools like Tableau to query data in C* through the
 Spark SQL JDBC server.



 However, I have been unable to find a driver that would allow the Spark
 SQL Thrift/JDBC server to connect with Cassandra. DataStax provides a
 closed-source driver that comes only with the DSE version of Cassandra.



 I would like to find out how many people are using the Spark SQL JDBC
 server + DSE Cassandra combination. If you do use Spark SQL JDBC server +
 DSE, I would appreciate if you could share your experience. For example,
 what kind of issues you have run into? How is the performance? What
 reporting tools you are using?



 Thank  you.



 Mohammed





Re: Spark 1.4.0-rc4 HiveContext.table(db.tbl) NoSuchTableException

2015-06-04 Thread Doug Balog
Hi Yin,
 I’m very surprised to hear that its not supported in 1.3 because I’ve been 
using it since 1.3.0.
It worked great up until  SPARK-6908 was merged into master.

What is the supported way to get  DF for a table that is not in the default 
database ?

IMHO, If you are not going to support “databaseName.tableName”, 
sqlContext.table() should have a version that takes a database and a table, ie

def table(databaseName: String, tableName: String): DataFrame =
  DataFrame(this, catalog.lookupRelation(Seq(databaseName,tableName)))

The handling of databases in Spark(sqlContext, hiveContext, Catalog) could be 
better.

Thanks,

Doug

 On Jun 3, 2015, at 8:21 PM, Yin Huai yh...@databricks.com wrote:
 
 Hi Doug,
 
 Actually, sqlContext.table does not support database name in both Spark 1.3 
 and Spark 1.4. We will support it in future version. 
 
 Thanks,
 
 Yin
 
  
 
 On Wed, Jun 3, 2015 at 10:45 AM, Doug Balog doug.sparku...@dugos.com wrote:
 Hi,
 
 sqlContext.table(“db.tbl”) isn’t working for me, I get a NoSuchTableException.
 
 But I can access the table via
 
 sqlContext.sql(“select * from db.tbl”)
 
 So I know it has the table info from the metastore.
 
 Anyone else see this ?
 
 I’ll keep digging.
 I compiled via make-distribution  -Pyarn -phadoop-2.4 -Phive 
 -Phive-thriftserver
 It worked for me in 1.3.1
 
 Cheers,
 
 Doug
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Setting S3 output file grantees for spark output files

2015-06-04 Thread Justin Steigel
Hi all,

I'm running Spark on AWS EMR and I'm having some issues getting the correct
permissions on the output files using
rdd.saveAsTextFile('file_dir_name').  In hive, I would add a line in the
beginning of the script with

set fs.s3.canned.acl=BucketOwnerFullControl

and that would set the correct grantees for the files. For Spark, I tried
adding the permissions as a --conf option:

hadoop jar /mnt/var/lib/hadoop/steps/s-3HIRLHJJXV3SJ/script-runner.jar \
/home/hadoop/spark/bin/spark-submit --deploy-mode cluster --master
yarn-cluster \
--conf spark.driver.extraJavaOptions
-Dfs.s3.canned.acl=BucketOwnerFullControl \
hdfs:///user/hadoop/spark.py

But the permissions do not get set properly on the output files. What is
the proper way to pass in the 'fs.s3.canned.acl=BucketOwnerFullControl' or
any of the S3 canned permissions to the spark job?

Thanks in advance


Re: Spark 1.4 HiveContext fails to initialise with native libs

2015-06-04 Thread Yin Huai
Are you using RC4?

On Wed, Jun 3, 2015 at 10:58 PM, Night Wolf nightwolf...@gmail.com wrote:

 Thanks Yin, that seems to work with the Shell. But on a compiled
 application with Spark-submit it still fails with the same exception.

 On Thu, Jun 4, 2015 at 2:46 PM, Yin Huai yh...@databricks.com wrote:

 Can you put the following setting in spark-defaults.conf and try again?

 spark.sql.hive.metastore.sharedPrefixes
 com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc,com.mapr.fs.shim.LibraryLoader,com.mapr.security.JNISecurity,com.mapr.fs.jni

 https://issues.apache.org/jira/browse/SPARK-7819 has more context about
 it.

 On Wed, Jun 3, 2015 at 9:38 PM, Night Wolf nightwolf...@gmail.com
 wrote:

 Hi all,

 Trying out Spark 1.4 RC4 on MapR4/Hadoop 2.5.1 running in yarn-client
 mode with Hive support.

 *Build command;*
 ./make-distribution.sh --name mapr4.0.2_yarn_j6_2.10 --tgz -Pyarn
 -Pmapr4 -Phadoop-2.4 -Pmapr4 -Phive -Phadoop-provided
 -Dhadoop.version=2.5.1-mapr-1501 -Dyarn.version=2.5.1-mapr-1501 -DskipTests
 -e -X


 When trying to run a hive query in the spark shell *sqlContext.sql(show
 tables)* I get the following exception;

 scala sqlContext.sql(show tables)
 15/06/04 04:33:16 INFO hive.HiveContext: Initializing
 HiveMetastoreConnection version 0.13.1 using Spark classes.
 java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.mapr.fs.ShimLoader.loadNativeLibrary(ShimLoader.java:323)
 at com.mapr.fs.ShimLoader.load(ShimLoader.java:198)
 at
 org.apache.hadoop.conf.CoreDefaultProperties.clinit(CoreDefaultProperties.java:59)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:274)
 at
 org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1857)
 at
 org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2072)
 at
 org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2282)
 at
 org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2234)
 at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2151)
 at org.apache.hadoop.conf.Configuration.set(Configuration.java:1002)
 at org.apache.hadoop.conf.Configuration.set(Configuration.java:974)
 at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:518)
 at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:536)
 at org.apache.hadoop.mapred.JobConf.init(JobConf.java:430)
 at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:1366)
 at org.apache.hadoop.hive.conf.HiveConf.init(HiveConf.java:1332)
 at
 org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:99)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at
 org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:170)
 at
 org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:166)
 at
 org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:212)
 at
 org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:175)
 at
 org.apache.spark.sql.hive.HiveContext$$anon$2.init(HiveContext.scala:370)
 at
 org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:370)
 at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:369)
 at
 org.apache.spark.sql.hive.HiveContext$$anon$1.init(HiveContext.scala:382)
 at
 org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:382)
 at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:381)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:901)
 at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
 at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
 at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
 at
 $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21)
 at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:26)
 at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28)
 at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30)
 at $line37.$read$$iwC$$iwC$$iwC$$iwC.init(console:32)
 at $line37.$read$$iwC$$iwC$$iwC.init(console:34)
 at $line37.$read$$iwC$$iwC.init(console:36)
 at $line37.$read$$iwC.init(console:38)
 at $line37.$read.init(console:40)
 at $line37.$read$.init(console:44)
 at $line37.$read$.clinit(console)
 at $line37.$eval$.init(console:7)
 at 

Spark Job always cause a node to reboot

2015-06-04 Thread Chao Chen
Hi all, 
I am new to spark. I am trying to deploy HDFS (hadoop-2.6.0) and Spark-1.3.1 
with four nodes, and each node has 8-cores and 8GB memory.
One is configured as headnode running masters, and 3 others are workers 

But when I try to run the Pagerank from HiBench, it always cause a node to 
reboot during the middle of the work for all scala, java, and python versions. 
But works fine
with the MapReduce version from the same benchmark. 

I also tried standalone deployment, got the same issue. 

My spark-defaults.conf
spark.masteryarn-client
spark.driver.memory 4g
spark.executor.memory   4g
spark.rdd.compress  false   


The job submit script is:

bin/spark-submit  --properties-file 
HiBench/report/pagerank/spark/scala/conf/sparkbench/spark.conf --class 
org.apache.spark.examples.SparkPageRank --master yarn-client --num-executors 2 
--executor-cores 4 --executor-memory 4G --driver-memory 4G 
HiBench/src/sparkbench/target/sparkbench-4.0-SNAPSHOT-MR2-spark1.3-jar-with-dependencies.jar
 hdfs://discfarm:9000/HiBench/Pagerank/Input/edges 
hdfs://discfarm:9000/HiBench/Pagerank/Output 3

What is problem with my configuration ? and How can I find the cause ?

any help is welcome !











-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StreamingListener, anyone?

2015-06-04 Thread Akhil Das
Hi

Here's a working example: https://gist.github.com/akhld/b10dc491aad1a2007183

[image: Inline image 1]

Thanks
Best Regards

On Wed, Jun 3, 2015 at 10:09 PM, dgoldenberg dgoldenberg...@gmail.com
wrote:

 Hi,

 I've got a Spark Streaming driver job implemented and in it, I register a
 streaming listener, like so:

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));
 jssc.addStreamingListener(new JobListener(jssc));

 where JobListener is defined like so
 private static class JobListener implements StreamingListener {

 private JavaStreamingContext jssc;

 JobListener(JavaStreamingContext jssc) {
 this.jssc = jssc;
 }

 @Override
 public void
 onBatchCompleted(StreamingListenerBatchCompleted
 batchCompleted) {
 System.out.println( Batch completed.);
 jssc.stop(true);
 System.out.println( The job has been stopped.);
 }
 

 I do not seem to be seeing onBatchCompleted being triggered.  Am I doing
 something wrong?

 In this particular case, I was trying to implement a bulk ingest type of
 logic where the first batch is all we're interested in (reading out of a
 Kafka topic with offset reset set to smallest).




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-04 Thread Ji ZHANG
Hi,

I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this
setting can be seen in web ui's environment tab. But, it still eats memory,
i.e. -Xmx set to 512M but RES grows to 1.5G in half a day.


On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you set spark.shuffle.io.preferDirectBufs to false to turn off the
 off-heap allocation of netty?

 Best Regards,
 Shixiong Zhu

 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com:

 Hi,

 Thanks for you information. I'll give spark1.4 a try when it's released.

 On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com
 wrote:

 Could you try it out with Spark 1.4 RC3?

 Also pinging, Cloudera folks, they may be aware of something.

 BTW, the way I have debugged memory leaks in the past is as follows.

 Run with a small driver memory, say 1 GB. Periodically (maybe a script),
 take snapshots of histogram and also do memory dumps. Say every hour. And
 then compare the difference between two histo/dumps that are few hours
 separated (more the better). Diffing histo is easy. Diff two dumps can be
 done in JVisualVM, it will show the diff in the objects that got added in
 the later dump. That makes it easy to debug what is not getting cleaned.

 TD


 On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live
 result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory is
 consumed by some off-heap data. I can't find a way to figure out what is in
 there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the logs,
 level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team
 decides to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com
 wrote:

 While you are running is it possible for you login into the YARN node
 and get histograms of live objects using jmap -histo:live. That may
 reveal something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com
 wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for
 example:

 

Re: StreamingListener, anyone?

2015-06-04 Thread Shixiong Zhu
You should not call `jssc.stop(true);` in a StreamingListener. It will
cause a dead-lock: `jssc.stop` won't return until `listenerBus` exits. But
since `jssc.stop` blocks `StreamingListener`, `listenerBus` cannot exit.

Best Regards,
Shixiong Zhu

2015-06-04 0:39 GMT+08:00 dgoldenberg dgoldenberg...@gmail.com:

 Hi,

 I've got a Spark Streaming driver job implemented and in it, I register a
 streaming listener, like so:

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));
 jssc.addStreamingListener(new JobListener(jssc));

 where JobListener is defined like so
 private static class JobListener implements StreamingListener {

 private JavaStreamingContext jssc;

 JobListener(JavaStreamingContext jssc) {
 this.jssc = jssc;
 }

 @Override
 public void
 onBatchCompleted(StreamingListenerBatchCompleted
 batchCompleted) {
 System.out.println( Batch completed.);
 jssc.stop(true);
 System.out.println( The job has been stopped.);
 }
 

 I do not seem to be seeing onBatchCompleted being triggered.  Am I doing
 something wrong?

 In this particular case, I was trying to implement a bulk ingest type of
 logic where the first batch is all we're interested in (reading out of a
 Kafka topic with offset reset set to smallest).




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




NullPointerException SQLConf.setConf

2015-06-04 Thread patcharee

Hi,

I am using Hive 0.14 and spark 0.13. I got 
java.lang.NullPointerException when inserted into hive. Any suggestions 
please.


hiveContext.sql(INSERT OVERWRITE table 4dim partition (zone= + ZONE + 
,z= + zz + ,year= + YEAR + ,month= + MONTH + )  +
select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, 
qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim where z= 
+ zz);


java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.setConf(SQLConf.scala:196)
at org.apache.spark.sql.SQLContext.setConf(SQLContext.scala:74)
at 
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:251)
at 
org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:250)

at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:95)
at 
no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3$$anonfun$apply$1.apply(LoadWrfIntoHiveOptReduce1.scala:110)
at 
no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3$$anonfun$apply$1.apply(LoadWrfIntoHiveOptReduce1.scala:107)

at scala.collection.immutable.Range.foreach(Range.scala:141)
at 
no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3.apply(LoadWrfIntoHiveOptReduce1.scala:107)
at 
no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3.apply(LoadWrfIntoHiveOptReduce1.scala:107)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)

Best,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.4 HiveContext fails to initialise with native libs

2015-06-04 Thread Night Wolf
Thanks Yin, that seems to work with the Shell. But on a compiled
application with Spark-submit it still fails with the same exception.

On Thu, Jun 4, 2015 at 2:46 PM, Yin Huai yh...@databricks.com wrote:

 Can you put the following setting in spark-defaults.conf and try again?

 spark.sql.hive.metastore.sharedPrefixes
 com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc,com.mapr.fs.shim.LibraryLoader,com.mapr.security.JNISecurity,com.mapr.fs.jni

 https://issues.apache.org/jira/browse/SPARK-7819 has more context about
 it.

 On Wed, Jun 3, 2015 at 9:38 PM, Night Wolf nightwolf...@gmail.com wrote:

 Hi all,

 Trying out Spark 1.4 RC4 on MapR4/Hadoop 2.5.1 running in yarn-client
 mode with Hive support.

 *Build command;*
 ./make-distribution.sh --name mapr4.0.2_yarn_j6_2.10 --tgz -Pyarn -Pmapr4
 -Phadoop-2.4 -Pmapr4 -Phive -Phadoop-provided
 -Dhadoop.version=2.5.1-mapr-1501 -Dyarn.version=2.5.1-mapr-1501 -DskipTests
 -e -X


 When trying to run a hive query in the spark shell *sqlContext.sql(show
 tables)* I get the following exception;

 scala sqlContext.sql(show tables)
 15/06/04 04:33:16 INFO hive.HiveContext: Initializing
 HiveMetastoreConnection version 0.13.1 using Spark classes.
 java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.mapr.fs.ShimLoader.loadNativeLibrary(ShimLoader.java:323)
 at com.mapr.fs.ShimLoader.load(ShimLoader.java:198)
 at
 org.apache.hadoop.conf.CoreDefaultProperties.clinit(CoreDefaultProperties.java:59)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:274)
 at
 org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1857)
 at
 org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2072)
 at
 org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2282)
 at
 org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2234)
 at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2151)
 at org.apache.hadoop.conf.Configuration.set(Configuration.java:1002)
 at org.apache.hadoop.conf.Configuration.set(Configuration.java:974)
 at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:518)
 at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:536)
 at org.apache.hadoop.mapred.JobConf.init(JobConf.java:430)
 at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:1366)
 at org.apache.hadoop.hive.conf.HiveConf.init(HiveConf.java:1332)
 at
 org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:99)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at
 org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:170)
 at
 org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:166)
 at
 org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:212)
 at
 org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:175)
 at
 org.apache.spark.sql.hive.HiveContext$$anon$2.init(HiveContext.scala:370)
 at
 org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:370)
 at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:369)
 at
 org.apache.spark.sql.hive.HiveContext$$anon$1.init(HiveContext.scala:382)
 at
 org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:382)
 at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:381)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:901)
 at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
 at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
 at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
 at
 $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21)
 at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:26)
 at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28)
 at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30)
 at $line37.$read$$iwC$$iwC$$iwC$$iwC.init(console:32)
 at $line37.$read$$iwC$$iwC$$iwC.init(console:34)
 at $line37.$read$$iwC$$iwC.init(console:36)
 at $line37.$read$$iwC.init(console:38)
 at $line37.$read.init(console:40)
 at $line37.$read$.init(console:44)
 at $line37.$read$.clinit(console)
 at $line37.$eval$.init(console:7)
 at $line37.$eval$.clinit(console)
 at $line37.$eval.$print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 

SparkSQL DF.explode with Nulls

2015-06-04 Thread Tom Seddon
Hi,

I've worked out how to use explode on my input avro dataset with the
following structure
root
 |-- pageViewId: string (nullable = false)
 |-- components: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- name: string (nullable = false)
 |||-- loadTimeMs: long (nullable = true)


I'm trying to turn this into this layout with repeated pageViewIds for each
row of my components:
root
 |-- pageViewId: string (nullable = false)
 |-- name: string (nullable = false)
 |-- loadTimeMs: long (nullable = true)

Explode words fine for the first 10 records using this bit of code, but my
big problem is that loadTimeMs has nulls in it, which I think is causing
the error.  Any ideas how I can trap those nulls?  Perhaps by converting to
zeros and then I can deal with them later?  I tried writing a udf which
just takes the loadTimeMs column and swaps nulls for zeros, but this
separates the struct and then I don't know how to use explode.

avroFile.filter($lazyComponents.components.isNotNull)
.explode($lazyComponents.components)
{ case Row(lazyComponents: Seq[Row]) = lazyComponents
.map(x = x.getString(0) - x.getLong(1))}
.select('pageViewId, '_1, '_2)
.take(10).foreach(println)

15/06/04 12:01:21 ERROR Executor: Exception in task 0.0 in stage 19.0 (TID
65)
java.lang.RuntimeException: Failed to check null bit for primitive long
value.
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:87)
at
$line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(console:33)
at
$line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(console:33)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
$line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:33)
at
$line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:33)
at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
at
org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:89)
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:71)
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


Re: Embedding your own transformer in Spark.ml Pipleline

2015-06-04 Thread Peter Rudenko
Hi Brandon, they are available, but private to ml package. They are now 
public in 1.4. For 1.3.1 you can define your transformer in 
org.apache.spark.ml package - then you could use these traits.


Thanks,
Peter Rudenko

On 2015-06-04 20:28, Brandon Plaster wrote:
Is HasInputCol and HasOutputCol available in 1.3.1? I'm getting 
the following message when I'm trying to implement a Transformer and 
importing org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}:


error: object shared is not a member of package org.apache.spark.ml.param


and

error: trait HasInputCol in package param cannot be accessed in 
package org.apache.spark.ml.param



On Tue, Jun 2, 2015 at 1:51 PM, Peter Rudenko petro.rude...@gmail.com 
mailto:petro.rude...@gmail.com wrote:


Hi Dimple,
take a look to existing transformers:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
(*it's for spark-1.4)

The idea is just to implement class that extends Transformer
withHasInputColwithHasOutputCol (if your transformer 1:1 column
transformer) and has

deftransform(dataset: DataFrame):DataFrame

method.

Thanks,
Peter

On 2015-06-02 20:19, dimple wrote:

Hi,
I would like to embed my own transformer in the Spark.ml Pipleline but do
not see an example of it. Can someone share an example of which
classes/interfaces I need to extend/implement in order to do so. Thanks.

Dimple



--
View this message in 
context:http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail:user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail:user-h...@spark.apache.org 
mailto:user-h...@spark.apache.org








Re: Spark 1.4.0-rc4 HiveContext.table(db.tbl) NoSuchTableException

2015-06-04 Thread Yin Huai
Hi Doug,

sqlContext.table does not officially support database name. It only
supports table name as the parameter. We will add a method to support
database name in future.

Thanks,

Yin

On Thu, Jun 4, 2015 at 8:10 AM, Doug Balog doug.sparku...@dugos.com wrote:

 Hi Yin,
  I’m very surprised to hear that its not supported in 1.3 because I’ve
 been using it since 1.3.0.
 It worked great up until  SPARK-6908 was merged into master.

 What is the supported way to get  DF for a table that is not in the
 default database ?

 IMHO, If you are not going to support “databaseName.tableName”,
 sqlContext.table() should have a version that takes a database and a table,
 ie

 def table(databaseName: String, tableName: String): DataFrame =
   DataFrame(this, catalog.lookupRelation(Seq(databaseName,tableName)))

 The handling of databases in Spark(sqlContext, hiveContext, Catalog) could
 be better.

 Thanks,

 Doug

  On Jun 3, 2015, at 8:21 PM, Yin Huai yh...@databricks.com wrote:
 
  Hi Doug,
 
  Actually, sqlContext.table does not support database name in both Spark
 1.3 and Spark 1.4. We will support it in future version.
 
  Thanks,
 
  Yin
 
 
 
  On Wed, Jun 3, 2015 at 10:45 AM, Doug Balog doug.sparku...@dugos.com
 wrote:
  Hi,
 
  sqlContext.table(“db.tbl”) isn’t working for me, I get a
 NoSuchTableException.
 
  But I can access the table via
 
  sqlContext.sql(“select * from db.tbl”)
 
  So I know it has the table info from the metastore.
 
  Anyone else see this ?
 
  I’ll keep digging.
  I compiled via make-distribution  -Pyarn -phadoop-2.4 -Phive
 -Phive-thriftserver
  It worked for me in 1.3.1
 
  Cheers,
 
  Doug
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 




Re: inlcudePackage() deprecated?

2015-06-04 Thread Daniel Emaasit
Got it. Ignore my similar question on Github comments.

On Thu, Jun 4, 2015 at 11:48 AM, Shivaram Venkataraman 
shiva...@eecs.berkeley.edu wrote:

 Yeah - We don't have support for running UDFs on DataFrames yet. There is
 an open issue to track this
 https://issues.apache.org/jira/browse/SPARK-6817

 Thanks
 Shivaram

 On Thu, Jun 4, 2015 at 3:10 AM, Daniel Emaasit daniel.emaa...@gmail.com
 wrote:

 Hello Shivaram,
 Was the includePackage() function deprecated in SparkR 1.4.0?
 I don't see it in the documentation? If it was, does that mean that we
 can use R packages on Spark DataFrames the usual way we do for local R
 dataframes?

 Daniel

 --
 Daniel Emaasit
 Ph.D. Research Assistant
 Transportation Research Center (TRC)
 University of Nevada, Las Vegas
 Las Vegas, NV 89154-4015
 Cell: 615-649-2489
 www.danielemaasit.com  http://www.danielemaasit.com/







-- 
Daniel Emaasit
Ph.D. Research Assistant
Transportation Research Center (TRC)
University of Nevada, Las Vegas
Las Vegas, NV 89154-4015
Cell: 615-649-2489
www.danielemaasit.com  http://www.danielemaasit.com/


Re: TreeReduce Functionality in Spark

2015-06-04 Thread Raghav Shankar
Hey DB,

Thanks for the reply!

I still don't think this answers my question. For example, if I have a
top() action being executed and I have 32 workers(32 partitions), and I
choose a depth of 4, what does the overlay of intermediate reducers look
like? How many reducers are there excluding the master and the worker? How
many partitions get sent to each of these intermediate reducers? Does this
number vary at each level?

Thanks!

On Thursday, June 4, 2015, DB Tsai dbt...@dbtsai.com wrote:

 By default, the depth of the tree is 2. Each partition will be one node.

 Sincerely,

 DB Tsai
 ---
 Blog: https://www.dbtsai.com


 On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar raghav0110...@gmail.com
 javascript:; wrote:
  Hey Reza,
 
  Thanks for your response!
 
  Your response clarifies some of my initial thoughts. However, what I
 don't
  understand is how the depth of the tree is used to identify how many
  intermediate reducers there will be, and how many partitions are sent to
 the
  intermediate reducers. Could you provide some insight into this?
 
  Thanks,
  Raghav
 
  On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com
 javascript:; wrote:
 
  In a regular reduce, all partitions have to send their reduced value to
 a
  single machine, and that machine can become a bottleneck.
 
  In a treeReduce, the partitions talk to each other in a logarithmic
 number
  of rounds. Imagine a binary tree that has all the partitions at its
 leaves
  and the root will contain the final reduced value. This way there is no
  single bottleneck machine.
 
  It remains to decide the number of children each node should have and
 how
  deep the tree should be, which is some of the logic in the method you
  pasted.
 
  On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com
 javascript:; wrote:
 
  I am trying to understand what the treeReduce function for an RDD does,
  and
  how it is different from the normal reduce function. My current
  understanding is that treeReduce tries to split up the reduce into
  multiple
  steps. We do a partial reduce on different nodes, and then a final
 reduce
  is
  done to get the final result. Is this correct? If so, I guess what I am
  curious about is, how does spark decide how many nodes will be on each
  level, and how many partitions will be sent to a given node?
 
  The bulk of the implementation is within this function:
 
  partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
.getOrElse(throw new UnsupportedOperationException(empty
  collection))
 
  The above function is expanded to
 
  val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
  (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp,
  cleanCombOp)
var partiallyAggregated = mapPartitions(it =
  Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
  depth)).toInt, 2)
// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.
while (numPartitions  scale + numPartitions / scale) {
  numPartitions /= scale
  val curNumPartitions = numPartitions
  partiallyAggregated =
 partiallyAggregated.mapPartitionsWithIndex
  {
(i, iter) = iter.map((i % curNumPartitions, _))
  }.reduceByKey(new HashPartitioner(curNumPartitions),
  cleanCombOp).values
}
partiallyAggregated.reduce(cleanCombOp)
 
  I am completely lost about what is happening in this function. I would
  greatly appreciate some sort of explanation.
 
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 javascript:;
  For additional commands, e-mail: user-h...@spark.apache.org
 javascript:;
 
 
 



Re: Python Image Library and Spark

2015-06-04 Thread Akhil Das
Replace this line:

 img_data = sc.parallelize( list(im.getdata()) )

With:

 img_data = sc.parallelize( list(im.getdata()), 3 * No cores you have )


Thanks
Best Regards

On Thu, Jun 4, 2015 at 1:57 AM, Justin Spargur jmspar...@gmail.com wrote:

 Hi all,

  I'm playing around with manipulating images via Python and want to
 utilize Spark for scalability. That said, I'm just learing Spark and my
 Python is a bit rusty (been doing PHP coding for the last few years). I
 think I have most of the process figured out. However, the script fails on
 larger images and Spark is sending out the following warning for smaller
 images:

 Stage 0 contains a task of very large size (1151 KB). The maximum
 recommended task size is 100 KB.

 My code is as follows:

 import Image
 from pyspark import SparkContext

 if __name__ == __main__:

 imageFile = sample.jpg
 outFile   = sample.gray.jpg

 sc = SparkContext(appName=Grayscale)
 im = Image.open(imageFile)

 # Create an RDD for the data from the image file
 img_data = sc.parallelize( list(im.getdata()) )

 # Create an RDD for the grayscale value
 gValue = img_data.map( lambda x: int(x[0]*0.21 + x[1]*0.72 +
 x[2]*0.07) )

 # Put our grayscale value into the RGR channels
 grayscale = gValue.map( lambda x: (x,x,x)  )

 # Save the output in a new image.
 im.putdata( grayscale.collect() )

 im.save(outFile)

 Obviously, something is amiss. However, I can't figure out where I'm off
 track with this. Any help is appreciated! Thanks in advance!!!



Spark ML decision list

2015-06-04 Thread Sateesh Kavuri
Hi,

I have used weka machine learning library for generating a model for my
training set. I have used the PART algorithm (decision lists) from weka.

Now, I would like to use spark ML for the PART algo for my training set and
could not seem to find a parallel. Could anyone point out the corresponding
algorithm or even if its available in Spark ML?

Thanks,
Sateesh


Re: Adding new Spark workers on AWS EC2 - access error

2015-06-04 Thread Akhil Das
That's because you need to add the master's public key (~/.ssh/id_rsa.pub)
to the newly added slaves ~/.ssh/authorized_keys.

I add slaves this way:

- Launch a new instance by clicking on the slave instance and choose *launch
more like this *
*- *Once its launched, ssh into it and add the master public key to
.ssh/authorized_keys
- Add the slaves internal IP to the master's conf/slaves file

- Rsync spark directory to the slave machine (*rsync -za ~/spark SLAVES-IP:*
)

- do sbin/start-all.sh and it will show up along with other slaves.


Thanks
Best Regards

On Thu, Jun 4, 2015 at 6:45 AM, barmaley o...@solver.com wrote:

 I have the existing operating Spark cluster that was launched with
 spark-ec2
 script. I'm trying to add new slave by following the instructions:

 Stop the cluster
 On AWS console launch more like this on one of the slaves
 Start the cluster
 Although the new instance is added to the same security group and I can
 successfully SSH to it with the same private key, spark-ec2 ... start call
 can't access this machine for some reason:

 Running setup-slave on all cluster nodes to mount filesystems, etc...
 [1] 00:59:59 [FAILURE] ec2-52-25-53-64.us-west-2.compute.amazonaws.com
 Exited with error code 255 Stderr: Permission denied (publickey).

 , obviously, followed by tons of other errors while trying to deploy Spark
 stuff on this instance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Adding-new-Spark-workers-on-AWS-EC2-access-error-tp23143.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Equivalent to Storm's 'field grouping' in Spark.

2015-06-04 Thread luke89
Hi Matei,

thank you for answering.

Accordingly to what you said, am I mistaken when I say that tuples with the
same key might eventually be spread across more than one node in case an
overloaded worker can no longer accept tuples?
In other words, suppose a worker (processing key K) cannot accept more
tuples: how does Spark Streaming handle the other K-keyed tuples? Systems
like Storm do not provide any mechanism to handle such a situation.

I am pretty new to Spark, and I apologize if the question sounds too naive,
but I am experiencing some troubles in understanding Spark Internals!

Thank you, again!



2015-06-03 19:34 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com:

 This happens automatically when you use the byKey operations, e.g.
 reduceByKey, updateStateByKey, etc. Spark Streaming keeps the state for a
 given set of keys on a specific node and sends new tuples with that key to
 that.

 Matei

  On Jun 3, 2015, at 6:31 AM, allonsy luke1...@gmail.com wrote:
 
  Hi everybody,
  is there in Spark anything sharing the philosophy of Storm's field
 grouping?
 
  I'd like to manage data partitioning across the workers by sending tuples
  sharing the same key to the very same worker in the cluster, but I did
 not
  find any method to do that.
 
  Suggestions?
 
  :)
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Equivalent-to-Storm-s-field-grouping-in-Spark-tp23135.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 




Difference bewteen library dependencies version

2015-06-04 Thread Jean-Charles RISCH
Hello,
*(Before everything : I use IntellijIdea 14.0.1, SBT and Scala 2.11.6)*

This morning, I was looking to resolve the Failed to locate the winutils
binary in the hadoop binary path error.


I noticed that I can solve it configuring my build.sbt to

...

libraryDependencies += org.apache.hadoop % hadoop-client % 1.0.4

libraryDependencies += org.apache.spark %% spark-core % 1.3.1 excludeAll(
  ExclusionRule(organization = org.apache.hadoop)
  )

libraryDependencies += org.apache.spark %% spark-mllib % 1.3.1 excludeAll(
  ExclusionRule(organization = org.apache.hadoop)
  )




but if i change the line

libraryDependencies += org.apache.hadoop % hadoop-client % 1.0.4

to

libraryDependencies += org.apache.hadoop % hadoop-client % 2.7.0

the error is back.


What does it mean? Spark is build for an old version of hadoop? I really
want to understand.

*Also, a bonus question : *
As you can see I am using spark 1.3.1 and spark-mllib APIs. I am using the
last version, but my APIs are not corresponding to the latest official APIs
(https://spark.apache.org/docs/*latest*/api/scala/#package)

For example, to run a KMeans algo, I have to use KMeans.train() whereas it
does not exist in the latest API.

First time, I ask something in the mailing list, I hope I use it well.
Sorry for my bad english.

Thank you and have a good day,

JC
ᐧ


large shuffling = executor lost?

2015-06-04 Thread Yifan LI
Hi,

I am running my graphx application with Spark 1.3.1 on a small cluster. Then it 
failed on this exception:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
location for shuffle 1
 
But actually I found it is caused by “ExecutorLostFailure” indeed, and someone 
told it might because there was a large shuffling…

Is there anyone has idea to fix it? Thanks in advance!




Best,
Yifan LI







Re: How to create fewer output files for Spark job ?

2015-06-04 Thread ๏̯͡๏
It worked.

On Thu, Jun 4, 2015 at 5:14 PM, MEETHU MATHEW meethu2...@yahoo.co.in
wrote:

 Try using coalesce

 Thanks  Regards,
 Meethu M



   On Wednesday, 3 June 2015 11:26 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:


 I am running a series of spark functions with 9000 executors and its
 resulting in 9000+ files that is execeeding the namespace file count qutota.

 How can Spark be configured to use CombinedOutputFormat.
 {code}
 protected def writeOutputRecords(detailRecords:
 RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) {
 val writeJob = new Job()
 val schema = SchemaUtil.outputSchema(_detail)
 AvroJob.setOutputKeySchema(writeJob, schema)
 detailRecords.saveAsNewAPIHadoopFile(outputDir,
   classOf[AvroKey[GenericRecord]],
   classOf[org.apache.hadoop.io.NullWritable],
   classOf[AvroKeyOutputFormat[GenericRecord]],
   writeJob.getConfiguration)
   }
 {code}

 --
 Deepak






-- 
Deepak


Re: How to create fewer output files for Spark job ?

2015-06-04 Thread MEETHU MATHEW
Try using coalesce Thanks  Regards,
Meethu M 


 On Wednesday, 3 June 2015 11:26 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com 
wrote:
   

 I am running a series of spark functions with 9000 executors and its resulting 
in 9000+ files that is execeeding the namespace file count qutota.
How can Spark be configured to use CombinedOutputFormat. {code}protected def 
writeOutputRecords(detailRecords: RDD[(AvroKey[DetailOutputRecord], 
NullWritable)], outputDir: String) {    val writeJob = new Job()    val schema 
= SchemaUtil.outputSchema(_detail)    AvroJob.setOutputKeySchema(writeJob, 
schema)    detailRecords.saveAsNewAPIHadoopFile(outputDir,      
classOf[AvroKey[GenericRecord]],      
classOf[org.apache.hadoop.io.NullWritable],      
classOf[AvroKeyOutputFormat[GenericRecord]],      writeJob.getConfiguration)  
}{code}

-- 
Deepak


  

Re: StreamingListener, anyone?

2015-06-04 Thread Dmitry Goldenberg
Shixiong,

Thanks, interesting point. So if we want to only process one batch then
terminate the consumer, what's the best way to achieve that? Presumably the
listener could set a flag on the driver notifying it that it can terminate.
But the driver is not in a loop, it's basically blocked in
awaitTermination.  So what would be a way to trigger the termination in the
driver?

context.awaitTermination() allows the current thread to wait for the
termination of a context by stop() or by an exception - presumably, we
need to call stop() somewhere or perhaps throw.

Cheers,
- Dmitry

On Thu, Jun 4, 2015 at 3:55 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 You should not call `jssc.stop(true);` in a StreamingListener. It will
 cause a dead-lock: `jssc.stop` won't return until `listenerBus` exits. But
 since `jssc.stop` blocks `StreamingListener`, `listenerBus` cannot exit.

 Best Regards,
 Shixiong Zhu

 2015-06-04 0:39 GMT+08:00 dgoldenberg dgoldenberg...@gmail.com:

 Hi,

 I've got a Spark Streaming driver job implemented and in it, I register a
 streaming listener, like so:

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));
 jssc.addStreamingListener(new JobListener(jssc));

 where JobListener is defined like so
 private static class JobListener implements StreamingListener {

 private JavaStreamingContext jssc;

 JobListener(JavaStreamingContext jssc) {
 this.jssc = jssc;
 }

 @Override
 public void
 onBatchCompleted(StreamingListenerBatchCompleted
 batchCompleted) {
 System.out.println( Batch completed.);
 jssc.stop(true);
 System.out.println( The job has been
 stopped.);
 }
 

 I do not seem to be seeing onBatchCompleted being triggered.  Am I doing
 something wrong?

 In this particular case, I was trying to implement a bulk ingest type of
 logic where the first batch is all we're interested in (reading out of a
 Kafka topic with offset reset set to smallest).




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Roadmap for Spark with Kafka on Scala 2.11?

2015-06-04 Thread algermissen1971
Hi Iulian,

On 26 May 2015, at 13:04, Iulian Dragoș iulian.dra...@typesafe.com wrote:

 
 On Tue, May 26, 2015 at 10:09 AM, algermissen1971 
 algermissen1...@icloud.com wrote:
 Hi,
 
 I am setting up a project that requires Kafka support and I wonder what the 
 roadmap is for Scala 2.11 Support (including Kafka).
 
 Can we expect to see 2.11 support anytime soon?
 
 The upcoming 1.4 release (now at RC2) includes support for Kafka and Scala 
 2.11.6. It'd be great if you could give it a try. You can find the binaries 
 (and staging repository including 2.11 artifacts) here:
 
  https://www.mail-archive.com/dev@spark.apache.org/msg09347.html
 

Feedback after a coupl eof days:

- I am using 1.4.0-rc4 now without problems
- Not used Kafka support yet
- I am using this with akka-2.3.11 and akka-http 1.0-RC3 (and sbt-assembly) and 
this has produced a dependency nightmare. I am even adding guava manually to 
the assembly because I just could not get sbt-assembly to not complain.

I am far from a good understanding of sbt / maven internals, but it seems that 
the ‘compile’ scope set in the spark POM for a lot of dependencies is somehow 
not honored and the libs end up causing conflicts in sbt-assembly.

(I am writing this to share experience, not to complain. Thanks for the great 
work!!)

onward...

Jan





 iulian
  
 
 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
 -- 
 
 --
 Iulian Dragos
 
 --
 Reactive Apps on the JVM
 www.typesafe.com
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Transactional guarantee while saving DataFrame into a DB

2015-06-04 Thread Deenar Toraskar
Hi Tariq

You need to handle the transaction semantics yourself. You could for
example save from the dataframe to a staging table and then write to the
final table using a single atomic INSERT INTO finalTable from
stagingTable call. Remember to clear the staging table first to recover
from previous failures if any.

Deenar

On 2 June 2015 at 16:01, Mohammad Tariq donta...@gmail.com wrote:

 Hi list,

 With the help of Spark DataFrame API we can save a DataFrame into a
 database table through insertIntoJDBC() call. However, I could not find any
 info about how it handles the transactional guarantee. What if my program
 gets killed during the processing? Would it end up in partial load?

 Is it somehow possible to handle these kind of scenarios? Rollback or
 something of that sort?

 Many thanks.

 P.S : I am using spark-1.3.1-bin-hadoop2.4 with java 1.7

 [image: http://]
 Tariq, Mohammad
 about.me/mti
 [image: http://]
 http://about.me/mti




Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-04 Thread Dmitry Goldenberg
set the storage policy for the DStream RDDs to MEMORY AND DISK - it
appears the storage level can be specified in the createStream methods but
not createDirectStream...


On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 You can also try Dynamic Resource Allocation




 https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation



 Also re the Feedback Loop for automatic message consumption rate
 adjustment – there is a “dumb” solution option – simply set the storage
 policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
 exhausted spark streaming will resort to keeping new RDDs on disk which
 will prevent it from crashing and hence loosing them. Then some memory will
 get freed and it will resort back to RAM and so on and so forth





 Sent from Samsung Mobile

  Original message 

 From: Evo Eftimov

 Date:2015/05/28 13:22 (GMT+00:00)

 To: Dmitry Goldenberg

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
 in Kafka or Spark's metrics?



 You can always spin new boxes in the background and bring them into the
 cluster fold when fully operational and time that with job relaunch and
 param change



 Kafka offsets are mabaged automatically for you by the kafka clients which
 keep them in zoomeeper dont worry about that ad long as you shut down your
 job gracefuly. Besides msnaging the offsets explicitly is not a big deal if
 necessary





 Sent from Samsung Mobile



  Original message 

 From: Dmitry Goldenberg

 Date:2015/05/28 13:16 (GMT+00:00)

 To: Evo Eftimov

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
 in Kafka or Spark's metrics?



 Thanks, Evo.  Per the last part of your comment, it sounds like we will
 need to implement a job manager which will be in control of starting the
 jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
 marking them as ones to relaunch, scaling the cluster up/down by
 adding/removing machines, and relaunching the 'suspended' (shut down) jobs.



 I suspect that relaunching the jobs may be tricky since that means keeping
 track of the starter offsets in Kafka topic(s) from which the jobs started
 working on.



 Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
 of jobs, coupled with the wait for the new machines to come online may turn
 out quite time-consuming which will make for lengthy request times, and our
 requests are not asynchronous.  Ideally, the currently running jobs would
 continue to run on the machines currently available in the cluster.



 In the scale-down case, the job manager would want to signal to Spark's
 job scheduler not to send work to the node being taken out, find out when
 the last job has finished running on the node, then take the node out.



 This is somewhat like changing the number of cylinders in a car engine
 while the car is running...



 Sounds like a great candidate for a set of enhancements in Spark...



 On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 @DG; The key metrics should be



 -  Scheduling delay – its ideal state is to remain constant over
 time and ideally be less than the time of the microbatch window

 -  The average job processing time should remain less than the
 micro-batch window

 -  Number of Lost Jobs – even if there is a single Job lost that
 means that you have lost all messages for the DStream RDD processed by that
 job due to the previously described spark streaming memory leak condition
 and subsequent crash – described in previous postings submitted by me



 You can even go one step further and periodically issue “get/check free
 memory” to see whether it is decreasing relentlessly at a constant rate –
 if it touches a predetermined RAM threshold that should be your third
 metric



 Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
 you can implement one on your own without waiting for Jiras and new
 features whenever they might be implemented by the Spark dev team –
 moreover you can avoid using slow mechanisms such as ZooKeeper and even
 incorporate some Machine Learning in your Feedback Loop to make it handle
 the message consumption rate more intelligently and benefit from ongoing
 online learning – BUT this is STILL about voluntarily sacrificing your
 performance in the name of keeping your system stable – it is not about
 scaling your system/solution



 In terms of how to scale the Spark Framework Dynamically – even though
 this is not supported at the moment out of the box I guess you can have a
 sys management framework spin dynamically a few more boxes (spark worker
 nodes), stop dynamically your currently running Spark Streaming Job,
 relaunch it with new params e.g. more Receivers, larger number of
 Partitions (hence tasks), more RAM per executor 

Re: Adding an indexed column

2015-06-04 Thread Deenar Toraskar
or you could

1) convert dataframe to RDD
2) use mapPartitions and zipWithIndex within each partition
3) convert RDD back to dataframe you will need to make sure you preserve
partitioning

Deenar

On 1 June 2015 at 02:23, ayan guha guha.a...@gmail.com wrote:

 If you are on spark 1.3, use repartitionandSort followed by mappartition.
 In 1.4, window functions will be supported, it seems
 On 1 Jun 2015 04:10, Ricardo Almeida ricardo.alme...@actnowib.com
 wrote:

 That's great and how would you create an ordered index by partition (by
 product in this example)?

 Assuming now a dataframe like:

 flag | product | price
 --
 1|   a |47.808764653746
 1|   b |47.808764653746
 1|   a |31.9869279512204
 1|   b |47.7907893713564
 1|   a |16.7599200038239
 1|   b |16.7599200038239
 1|   b |20.3916014172137


 get a new dataframe such as:

 flag | product | price | index
 --
 1|   a |47.808764653746  | 0
 1|   a |31.9869279512204 | 1
 1|   a |16.7599200038239 | 2
 1|   b |47.808764653746  | 0
 1|   b |47.7907893713564 | 1
 1|   b |20.3916014172137 | 2
 1|   b |16.7599200038239 | 3








 On 29 May 2015 at 12:25, Wesley Miao wesley.mi...@gmail.com wrote:

 One way I can see is to -

 1. get rdd from your df
 2. call rdd.zipWithIndex to get a new rdd
 3. turn your new rdd to a new df

 On Fri, May 29, 2015 at 5:43 AM, Cesar Flores ces...@gmail.com wrote:


 Assuming that I have the next data frame:

 flag | price
 --
 1|47.808764653746
 1|47.808764653746
 1|31.9869279512204
 1|47.7907893713564
 1|16.7599200038239
 1|16.7599200038239
 1|20.3916014172137

 How can I create a data frame with an extra indexed column as the next
 one:

 flag | price  | index
 --|---
 1|47.808764653746 | 0
 1|47.808764653746 | 1
 1|31.9869279512204| 2
 1|47.7907893713564| 3
 1|16.7599200038239| 4
 1|16.7599200038239| 5
 1|20.3916014172137| 6

 --
 Cesar Flores






Re: Compute Median in Spark Dataframe

2015-06-04 Thread Deenar Toraskar
Hi Holden, Olivier


So for column you need to pass in a Java function, I have some sample
code which does this but it does terrible things to access Spark internals.
I also need to call a Hive UDAF in a dataframe agg function. Are there any
examples of what Column expects?

Deenar

On 2 June 2015 at 21:13, Holden Karau hol...@pigscanfly.ca wrote:

 So for column you need to pass in a Java function, I have some sample code
 which does this but it does terrible things to access Spark internals.


 On Tuesday, June 2, 2015, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Nice to hear from you Holden ! I ended up trying exactly that (Column) -
 but I may have done it wrong :

 In [*5*]: g.agg(Column(percentile(value, 0.5)))
 Py4JError: An error occurred while calling o97.agg. Trace:
 py4j.Py4JException: Method agg([class java.lang.String, class
 scala.collection.immutable.Nil$]) does not exist
 at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)

 Any idea ?

 Olivier.
 Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca a
 écrit :

 Not super easily, the GroupedData class uses a strToExpr function which
 has a pretty limited set of functions so we cant pass in the name of an
 arbitrary hive UDAF (unless I'm missing something). We can instead
 construct an column with the expression you want and then pass it in to
 agg() that way (although then you need to call the hive UDAF there). There
 are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark
 SQL AggregateExpressions, but they are private.

 On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 I've finally come to the same conclusion, but isn't there any way to
 call this Hive UDAFs from the agg(percentile(key,0.5)) ??

 Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a
 écrit :

 Like this...sqlContext should be a HiveContext instance

 case class KeyValue(key: Int, value: String)
 val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF
 df.registerTempTable(table)
 sqlContext.sql(select percentile(key,0.5) from table).show()

 ​

 On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 Is there any way to compute a median on a column using Spark's
 Dataframe. I know you can use stats in a RDD but I'd rather stay within a
 dataframe.
 Hive seems to imply that using ntile one can compute percentiles,
 quartiles and therefore a median.
 Does anyone have experience with this ?

 Regards,

 Olivier.





 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau



 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau




Optimisation advice for Avro-Parquet merge job

2015-06-04 Thread James Aley
Hi,

We have a load of Avro data coming into our data systems in the form of
relatively small files, which we're merging into larger Parquet files with
Spark. I've been following the docs and the approach I'm taking seemed
fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
not the most optimal approach.

I was wondering if anyone on this list might have some advice to make to
make this job as efficient as possible. Here's some code:

DataFrame dfInput = sqlContext.load(inputPaths.get(0),
com.databricks.spark.avro);
long totalSize = getDirSize(inputPaths.get(0));

for (int i = 1; i  inputs.size(); ++i) {
dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
com.databricks.spark.avro));
totalSize += getDirSize(inputPaths.get(i));
}

int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
DataFrame outputFrame;

// Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
// the synchronize block below. Suggestions welcome here too! :-)
synchronized (this) {
RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
null);
outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
}

outputFrame.save(outputPath, parquet, SaveMode.Overwrite);

Here are some things bothering me:

   - Conversion to an RDD and back again so that we can use coalesce() to
   reduce the number of partitions. This is because we read that repartition()
   is not as efficient as coalesce(), and local micro benchmarks seemed to
   somewhat confirm that this was faster. Is this really a good idea though?
   Should we be doing something else?
   - Usage of unionAll() - this is the only way I could find to join the
   separate data sets into a single data frame to save as Parquet. Is there a
   better way?
   - Do I need to be using the DataFrame API at all? I'm not querying any
   data here, so the nice API for SQL-like transformations of the data isn't
   being used. The DataFrame API just seemed like the path of least resistance
   for working with Avro and Parquet. Would there be any advantage to using
   hadoopRDD() with the appropriate Input/Output formats?


Any advice or tips greatly appreciated!


James.


Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-04 Thread Cody Koeninger
direct stream isn't a receiver, it isn't required to cache data anywhere
unless you want it to.

If you want it, just call cache.

On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg dgoldenberg...@gmail.com
wrote:

 set the storage policy for the DStream RDDs to MEMORY AND DISK - it
 appears the storage level can be specified in the createStream methods but
 not createDirectStream...


 On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 You can also try Dynamic Resource Allocation




 https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation



 Also re the Feedback Loop for automatic message consumption rate
 adjustment – there is a “dumb” solution option – simply set the storage
 policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
 exhausted spark streaming will resort to keeping new RDDs on disk which
 will prevent it from crashing and hence loosing them. Then some memory will
 get freed and it will resort back to RAM and so on and so forth





 Sent from Samsung Mobile

  Original message 

 From: Evo Eftimov

 Date:2015/05/28 13:22 (GMT+00:00)

 To: Dmitry Goldenberg

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
 growth in Kafka or Spark's metrics?



 You can always spin new boxes in the background and bring them into the
 cluster fold when fully operational and time that with job relaunch and
 param change



 Kafka offsets are mabaged automatically for you by the kafka clients
 which keep them in zoomeeper dont worry about that ad long as you shut down
 your job gracefuly. Besides msnaging the offsets explicitly is not a big
 deal if necessary





 Sent from Samsung Mobile



  Original message 

 From: Dmitry Goldenberg

 Date:2015/05/28 13:16 (GMT+00:00)

 To: Evo Eftimov

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
 growth in Kafka or Spark's metrics?



 Thanks, Evo.  Per the last part of your comment, it sounds like we will
 need to implement a job manager which will be in control of starting the
 jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
 marking them as ones to relaunch, scaling the cluster up/down by
 adding/removing machines, and relaunching the 'suspended' (shut down) jobs.



 I suspect that relaunching the jobs may be tricky since that means
 keeping track of the starter offsets in Kafka topic(s) from which the jobs
 started working on.



 Ideally, we'd want to avoid a re-launch.  The 'suspension' and
 relaunching of jobs, coupled with the wait for the new machines to come
 online may turn out quite time-consuming which will make for lengthy
 request times, and our requests are not asynchronous.  Ideally, the
 currently running jobs would continue to run on the machines currently
 available in the cluster.



 In the scale-down case, the job manager would want to signal to Spark's
 job scheduler not to send work to the node being taken out, find out when
 the last job has finished running on the node, then take the node out.



 This is somewhat like changing the number of cylinders in a car engine
 while the car is running...



 Sounds like a great candidate for a set of enhancements in Spark...



 On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 @DG; The key metrics should be



 -  Scheduling delay – its ideal state is to remain constant over
 time and ideally be less than the time of the microbatch window

 -  The average job processing time should remain less than the
 micro-batch window

 -  Number of Lost Jobs – even if there is a single Job lost that
 means that you have lost all messages for the DStream RDD processed by that
 job due to the previously described spark streaming memory leak condition
 and subsequent crash – described in previous postings submitted by me



 You can even go one step further and periodically issue “get/check free
 memory” to see whether it is decreasing relentlessly at a constant rate –
 if it touches a predetermined RAM threshold that should be your third
 metric



 Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
 you can implement one on your own without waiting for Jiras and new
 features whenever they might be implemented by the Spark dev team –
 moreover you can avoid using slow mechanisms such as ZooKeeper and even
 incorporate some Machine Learning in your Feedback Loop to make it handle
 the message consumption rate more intelligently and benefit from ongoing
 online learning – BUT this is STILL about voluntarily sacrificing your
 performance in the name of keeping your system stable – it is not about
 scaling your system/solution



 In terms of how to scale the Spark Framework Dynamically – even though
 this is not supported at the moment out of the box I guess you can have a
 sys management framework spin 

Re: Difference bewteen library dependencies version

2015-06-04 Thread Ted Yu
For your first question, please take a look at HADOOP-9922.
The fix is in hadoop-common module.

Cheers

On Thu, Jun 4, 2015 at 2:53 AM, Jean-Charles RISCH 
risch.jeanchar...@gmail.com wrote:

 Hello,
 *(Before everything : I use IntellijIdea 14.0.1, SBT and Scala 2.11.6)*

 This morning, I was looking to resolve the Failed to locate the winutils
 binary in the hadoop binary path error.


 I noticed that I can solve it configuring my build.sbt to

 ...

 libraryDependencies += org.apache.hadoop % hadoop-client % 1.0.4

 libraryDependencies += org.apache.spark %% spark-core % 1.3.1 
 excludeAll(
   ExclusionRule(organization = org.apache.hadoop)
   )

 libraryDependencies += org.apache.spark %% spark-mllib % 1.3.1 
 excludeAll(
   ExclusionRule(organization = org.apache.hadoop)
   )

 


 but if i change the line

 libraryDependencies += org.apache.hadoop % hadoop-client % 1.0.4

 to

 libraryDependencies += org.apache.hadoop % hadoop-client % 2.7.0

 the error is back.


 What does it mean? Spark is build for an old version of hadoop? I really
 want to understand.

 *Also, a bonus question : *
 As you can see I am using spark 1.3.1 and spark-mllib APIs. I am using the
 last version, but my APIs are not corresponding to the latest official APIs
 (https://spark.apache.org/docs/*latest*/api/scala/#package)

 For example, to run a KMeans algo, I have to use KMeans.train() whereas it
 does not exist in the latest API.

 First time, I ask something in the mailing list, I hope I use it well.
 Sorry for my bad english.

 Thank you and have a good day,

 JC
 ᐧ