Re: MLlib: Feature Importances API

2015-12-17 Thread Yanbo Liang
Hi Asim,

The "featureImportances" is only exposed at ML not MLlib.
You need to update your code to use RandomForestClassifier of ML to train
and get one RandomForestClassificationModel. Then you can call
RandomForestClassificationModel.featureImportances

to get the importances of each feature.

For how to use RandomForestClassifier, you can refer this example

.

Yanbo

2015-12-17 13:41 GMT+08:00 Asim Jalis :

> I wanted to use get feature importances related to a Random Forest as
> described in this JIRA: https://issues.apache.org/jira/browse/SPARK-5133
>
> However, I don’t see how to call this. I don't see any methods exposed on
>
> org.apache.spark.mllib.tree.RandomForest
>
> How can I get featureImportances when I generate a RandomForest model in
> this code?
>
> import org.apache.spark.mllib.linalg.Vectors
> import org.apache.spark.mllib.regression.LabeledPoint
> import org.apache.spark.mllib.tree.RandomForest
> import org.apache.spark.mllib.tree.model.RandomForestModel
> import org.apache.spark.mllib.util.MLUtils
> import org.apache.spark.rdd.RDD
> import util.Random
>
> def displayModel(model:RandomForestModel) = {
>   // Display model.
>   println("Learned classification tree model:\n" + model.toDebugString)
> }
>
> def saveModel(model:RandomForestModel,path:String) = {
>   // Save and load model.
>   model.save(sc, path)
>   val sameModel = DecisionTreeModel.load(sc, path)
> }
>
> def testModel(model:RandomForestModel,testData:RDD[LabeledPoint]) = {
>   // Test model.
>   val labelAndPreds = testData.map { point =>
> val prediction = model.predict(point.features)
> (point.label, prediction)
>   }
>   val testErr = labelAndPreds.
> filter(r => r._1 != r._2).count.toDouble / testData.count()
>   println("Test Error = " + testErr)
> }
>
> def buildModel(trainingData:RDD[LabeledPoint],
>   numClasses:Int,categoricalFeaturesInfo:Map[Int,Int]) = {
>   val numTrees = 30
>   val featureSubsetStrategy = "auto"
>   val impurity = "gini"
>   val maxDepth = 4
>   val maxBins = 32
>
>   // Build model.
>   val model = RandomForest.trainClassifier(
> trainingData, numClasses, categoricalFeaturesInfo,
> numTrees, featureSubsetStrategy, impurity, maxDepth,
> maxBins)
>
>   model
> }
>
> // Create plain RDD.
> val rdd = sc.parallelize(Range(0,1000))
>
> // Convert to LabeledPoint RDD.
> val data = rdd.
>   map(x => {
> val label = x % 2
> val feature1 = x % 5
> val feature2 = x % 7
> val features = Seq(feature1,feature2).
>   map(_.toDouble).
>   zipWithIndex.
>   map(_.swap)
> val vector = Vectors.sparse(features.size, features)
> val point = new LabeledPoint(label, vector)
> point })
>
> // Split data into training (70%) and test (30%).
> val splits = data.randomSplit(Array(0.7, 0.3))
> val (trainingData, testData) = (splits(0), splits(1))
>
> // Set up parameters for training.
> val numClasses = data.map(_.label).distinct.count.toInt
> val categoricalFeaturesInfo = Map[Int, Int]()
>
> val model = buildModel(
> trainingData,
> numClasses,
> categoricalFeaturesInfo)
> testModel(model,testData)
>
>


Re: java.lang.NoSuchMethodError while saving a random forest model Spark version 1.5

2015-12-17 Thread Yanbo Liang
Spark 1.5 officially use Parquet 1.7.0, but Spark 1.3 use Parquet 1.6.0.
It's better to check which version of Parquet is used in your environment.

2015-12-17 10:26 GMT+08:00 Joseph Bradley :

> This method is tested in the Spark 1.5 unit tests, so I'd guess it's a
> problem with the Parquet dependency.  What version of Parquet are you
> building Spark 1.5 off of?  (I'm not that familiar with Parquet issues
> myself, but hopefully a SQL person can chime in.)
>
> On Tue, Dec 15, 2015 at 3:23 PM, Rachana Srivastava <
> rachana.srivast...@markmonitor.com> wrote:
>
>> I have recently upgraded spark version but when I try to run save a random 
>> forest model using model save command I am getting nosuchmethoderror.  My 
>> code works fine with 1.3x version.
>>
>>
>>
>> model.save(sc.sc(), "modelsavedir");
>>
>>
>>
>>
>>
>> ERROR:
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation -
>> Aborting job.
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>> 22.0 (TID 230, localhost): java.lang.NoSuchMethodError:
>> parquet.schema.Types$GroupBuilder.addField(Lparquet/schema/Type;)Lparquet/schema/Types$BaseGroupBuilder;
>>
>> at
>> org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:517)
>>
>> at
>> org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:516)
>>
>> at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>
>> at
>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
>>
>> at
>> scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
>>
>> at
>> org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:516)
>>
>> at
>> org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:312)
>>
>> at
>> org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:305)
>>
>> at
>> org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:305)
>>
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>
>> at
>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>
>> at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>
>> at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>
>> at
>> org.apache.spark.sql.types.StructType.foreach(StructType.scala:92)
>>
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>
>> at
>> org.apache.spark.sql.types.StructType.map(StructType.scala:92)
>>
>> at
>> org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:305)
>>
>> at
>> org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58)
>>
>> at
>> org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55)
>>
>> at
>> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:287)
>>
>> at
>> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:261)
>>
>> at
>> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94)
>>
>> at
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272)
>>
>> at
>> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:233)
>>
>> at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
>>
>> at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
>>
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>
>> at
>> 

Are there some solution to complete the transform category variables into dummy variable in scala or spark ?

2015-12-17 Thread zml张明磊
Hi ,

 I am a new to scala and spark. Recently, I need to write a tool that 
transform category variables to dummy/indicator variables. I want to know are 
there some tools in scala and spark which support this transformation which 
like pandas.get_dummies in python ? Any example or study learning materials for 
me ?

Thanks,
Minglei.


Re: Need clarifications in Regression

2015-12-17 Thread Yanbo Liang
Hi Arunkumar,

There are two implementation for LinearRegression, one

under ml package and another one

under mllib package.
We can ensure the LinearRegression

under ml package can produce the same result compared with R, so please use
this one to test. If you still get different result, please file a JIRA to
track it.

Yanbo


2015-12-16 14:35 GMT+08:00 Arunkumar Pillai :

> Hi
>
> The Regression algorithm in the MLlib is using Loss function to calculate
> the regression estimates and R is using matrix method to calculate the
> estimates.
>
> I see some difference between the results of Both Spark and R.
>
> I was using the following class
> LinearRegressionWithSGD.train(parsedData, numIterations)
>
> is it possible to get both results same.
>
> Please correct me if i'm wrong
>
>
>
> --
> Thanks and Regards
> Arun
>


Re: Linear Regression with OLS

2015-12-17 Thread Yanbo Liang
Hi Arunkumar,

You can refer the officially examples of LinearRegression under ML package(
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/LinearRegressionWithElasticNetExample.scala
).

If you want to train this LinearRegressionModel with OLS, you only need to
set solver to "normal".

val lr = new LinearRegression() .setMaxIter(10) .setRegParam(0.3)
.setElasticNetParam(0.8) .setSolver("normal")

Yanbo

​


java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-17 Thread Priya Ch
Hi All,


  When running streaming application, I am seeing the below error:


java.io.FileNotFoundException:
/data1/yarn/nm/usercache/root/appcache/application_1450172646510_0004/blockmgr-a81f42cd-6b52-4704-83f3-2cfc12a11b86/02/temp_shuffle_589ddccf-d436-4d2c-9935-e5f8c137b54b
(Too many open files)

at java.io.FileInputStream.open(Native Method)

at java.io.FileInputStream.(FileInputStream.java:146)

at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$1.apply$mcVI$sp(ExternalSorter.scala:730)

at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)

at
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:729)

at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

It looks like the issue is because in a multi-threaded application, there
are too many file handlers and this has reached maximum number of file
handles.

Regards,
Padma Ch


Re: Content based window operation on Time-series data

2015-12-17 Thread Sandy Ryza
Hi Arun,

A Java API was actually recently added to the library.  It will be
available in the next release.

-Sandy

On Thu, Dec 10, 2015 at 12:16 AM, Arun Verma 
wrote:

> Thank you for your reply. It is a Scala and Python library. Is similar
> library exists for Java?
>
> On Wed, Dec 9, 2015 at 10:26 PM, Sean Owen  wrote:
>
>> CC Sandy as his https://github.com/cloudera/spark-timeseries might be
>> of use here.
>>
>> On Wed, Dec 9, 2015 at 4:54 PM, Arun Verma 
>> wrote:
>> > Hi all,
>> >
>> > We have RDD(main) of sorted time-series data. We want to split it into
>> > different RDDs according to window size and then perform some
>> aggregation
>> > operation like max, min etc. over each RDD in parallel.
>> >
>> > If window size is w then ith RDD has data from (startTime + (i-1)*w) to
>> > (startTime + i*w) where startTime is time-stamp of 1st entry in main
>> RDD and
>> > (startTime + (i-1)*w) is greater then last entry of main RDD.
>> >
>> > For now, I am using DataFrame and Spark version 1.5.2. Below code is
>> running
>> > sequentially on the data, so execution time is high and resource
>> utilization
>> > is low. Code snippet is given below:
>> > /*
>> > * aggragator is max
>> > * df - Dataframe has sorted timeseries data
>> > * start - first entry of DataFrame
>> > * end - last entry of DataFrame df
>> > * bucketLengthSec - window size
>> > * stepResults - has particular block/window output(JSON)
>> > * appendResults - has output till this block/window(JSON)
>> > */
>> > while (start <= end) {
>> > row = df.filter(df.col("timeStamp")
>> > .between(start, nextStart))
>> > .agg(max(df.col("timeStamp")), max(df.col("value")))
>> > .first();
>> > if (row.get(0) != null) {
>> > stepResults = new JSONObject();
>> > stepResults.put("x", Long.parseLong(row.get(0).toString()));
>> > stepResults.put("y", row.get(1));
>> > appendResults.add(stepResults);
>> > }
>> > start = nextStart;
>> > nextStart = start + bucketLengthSec;
>> > }
>> >
>> >
>> > --
>> > Thanks and Regards,
>> > Arun Verma
>>
>
>
>
> --
> Thanks and Regards,
> Arun Verma
>


Re: Are there some solution to complete the transform category variables into dummy variable in scala or spark ?

2015-12-17 Thread Yanbo Liang
Hi Minglei,

Spark ML provide a transformer named "OneHotEncoder" to map a column of
category indices to a column of binary vectors. It's similar with
pandas.get_dummies and OneHotEncoder of sklearn, but the output will be a
column of vector type rather than multiple columns.
You can refer the officially example

.

Yanbo

2015-12-17 16:00 GMT+08:00 zml张明磊 :

> Hi ,
>
>
>
>  I am a new to scala and spark. Recently, I need to write a tool
> that transform category variables to dummy/indicator variables. I want to
> know are there some tools in scala and spark which support this
> transformation which like *pandas.get_dummies* in python ? Any example or
> study learning materials for me ?
>
>
>
> Thanks,
>
> Minglei.
>


Re: Cluster mode dependent jars not working

2015-12-17 Thread vimal dinakaran
--driver-classpath needs to be added with jars needed. But this is not
being mentioned in the spark documentation.

On Tue, Dec 15, 2015 at 9:13 PM, Ted Yu  wrote:

> Please use --conf spark.executor.extraClassPath=XXX to specify dependent
> jars.
>
> On Tue, Dec 15, 2015 at 3:57 AM, vimal dinakaran 
> wrote:
>
>> I am running spark using cluster mode for deployment . Below is the
>> command
>>
>>
>> JARS=$JARS_HOME/amqp-client-3.5.3.jar,$JARS_HOME/nscala-time_2.10-2.0.0.jar,\
>> $JARS_HOME/kafka_2.10-0.8.2.1.jar,$JARS_HOME/kafka-clients-0.8.2.1.jar,\
>> $JARS_HOME/spark-streaming-kafka_2.10-1.4.1.jar,\
>> $JARS_HOME/zkclient-0.3.jar,$JARS_HOME/protobuf-java-2.4.0a.jar
>>
>> dse spark-submit -v --conf
>> "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
>>  --executor-memory 512M \
>>  --total-executor-cores 3 \
>>  --deploy-mode "cluster" \
>>  --master spark://$MASTER:7077 \
>>  --jars=$JARS \
>>  --supervise \
>>  --class "com.testclass" $APP_JAR  input.json \
>>  --files "/home/test/input.json"
>>
>> The above command is working fine in client mode. But when I use it in
>> cluster mode I get class not found exception
>>
>> Exception in thread "main" java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
>> at
>> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
>> Caused by: java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>>
>> In client mode the dependent jars are getting copied to the
>> /var/lib/spark/work directory whereas in cluster mode it is not.
>> I am using nfs and I have mounted the same directory on all the spark
>> nodes under same name. Still I get the error.
>>
>> From the verbose logs of dse spark-submit, I see the classpath elements
>> are missing here .
>>
>> --
>> spark.hadoop.cassandra.input.native.ssl.trust.store.password -> cassandra
>> spark.cassandra.connection.ssl.trustStore.password -> cassandra
>> spark.ssl.keyStorePassword -> cassandra
>> spark.cassandra.auth.username -> cassandra
>> spark.hadoop.fs.har.impl -> org.apache.hadoop.fs.HarFileSystem
>> Classpath elements:
>>
>>
>> WARN  2015-12-15 17:08:48 org.apache.spark.util.Utils: Your hostname,
>> demeter-dev-node2 resolves to a loopback address: 127.0.1.1; using
>> 10.29.23.170 instead (on interface eth0)
>> WARN  2015-12-15 17:08:48 org.apache.spark.util.Utils: Set SPARK_LOCAL_IP
>> if you need to bind to another addres
>>
>> How it is able to pick the application jar which is also under same
>> directory but not the dependent jars ?
>> Please help me in getting this solved.
>>
>>
>


Dynamic jar loading

2015-12-17 Thread amarouni
Hello guys,

Do you know if the method SparkContext.addJar("file:///...") can be used
on a running context (an already started spark-shell) ?
And if so, does it add the jar to the class-path of the Spark workers
(Yarn containers in case of yarn-client) ?

Thanks,

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming: How to specify deploy mode through configuration parameter?

2015-12-17 Thread Saiph Kappa
I am not sure how the process works and if patches are applied to all
upcoming versions of spark. Is it likely that the fix is available in this
build (spark 1.6.0  17-Dec-2015 09:02)?
http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/

Thanks!

On Wed, Dec 16, 2015 at 9:22 PM, Ted Yu  wrote:

> Since both scala and java files are involved in the PR, I don't see an
> easy way around without building yourself.
>
> Cheers
>
> On Wed, Dec 16, 2015 at 10:18 AM, Saiph Kappa 
> wrote:
>
>> Exactly, but it's only fixed for the next spark version. Is there any
>> work around for version 1.5.2?
>>
>> On Wed, Dec 16, 2015 at 4:36 PM, Ted Yu  wrote:
>>
>>> This seems related:
>>> [SPARK-10123][DEPLOY] Support specifying deploy mode from configuration
>>>
>>> FYI
>>>
>>> On Wed, Dec 16, 2015 at 7:31 AM, Saiph Kappa 
>>> wrote:
>>>
 Hi,

 I have a client application running on host0 that is launching multiple
 drivers on multiple remote standalone spark clusters (each cluster is
 running on a single machine):

 «
 ...

 List("host1", "host2" , "host3").foreach(host => {

 val sparkConf = new SparkConf()
 sparkConf.setAppName("App")

 sparkConf.set("spark.driver.memory", "4g")
 sparkConf.set("spark.executor.memory", "4g")
 sparkConf.set("spark.driver.maxResultSize", "4g")
 sparkConf.set("spark.serializer", 
 "org.apache.spark.serializer.KryoSerializer")
 sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops 
 -XX:+UseConcMarkSweepGC " +
   "-XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ")

 sparkConf.setMaster(s"spark://$host:7077")

 val rawStreams = (1 to source.parallelism).map(_ => 
 ssc.textFileStream("/home/user/data/")).toArray
 val rawStream = ssc.union(rawStreams)
 rawStream.count.map(c => s"Received $c records.").print()

 }
 ...

 »

 The problem is that I'm getting an error message saying that the directory 
 "/home/user/data/" does not exist.
 In fact, this directory only exists in host1, host2 and host3 and not in 
 host0.
 But since I'm launching the driver to host1..3 I thought data would be 
 fetched from those machines.

 I'm also trying to avoid using the spark submit script, and couldn't find 
 the configuration parameter to specify the deploy mode.

 Is there any way to specify the deploy mode through configuration 
 parameter?


 Thanks.


>>>
>>
>


Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-17 Thread Bartłomiej Alberski
I prepared simple example helping in reproducing problem:

https://github.com/alberskib/spark-streaming-broadcast-issue

I think that in that way it will be easier for you to understand problem
and find solution (if any exists)

Thanks
Bartek

2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski :

> First of all , thanks @tdas for looking into my problem.
>
> Yes, I checked it seperately and it is working fine. For below piece of
> code there is no single exception and values are sent correctly.
>
> val reporter = new MyClassReporter(...)
> reporter.send(...)
> val out = new FileOutputStream("out123.txt")
> val outO = new ObjectOutputStream(out)
> outO.writeObject(reporter)
> outO.flush()
> outO.close()
>
> val in = new FileInputStream("out123.txt")
> val inO = new ObjectInputStream(in)
> val reporterFromFile  =
> inO.readObject().asInstanceOf[StreamingGraphiteReporter]
> reporterFromFile.send(...)
> in.close()
>
> Maybe I am wrong but I think that it will be strange if class implementing
> Serializable and properly broadcasted to executors cannot be serialized and
> deserialized?
> I also prepared slightly different piece of code and I received slightly
> different exception. Right now it looks like:
> java.lang.ClassCastException: [B cannot be cast to com.example.sender.
> MyClassReporter.
>
> Maybe I am wrong but, it looks like that when restarting from checkpoint
> it does read proper block of memory to read bytes for MyClassReporter.
>
> 2015-12-16 2:38 GMT+01:00 Tathagata Das :
>
>> Could you test serializing and deserializing the MyClassReporter  class
>> separately?
>>
>> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski > > wrote:
>>
>>> Below is the full stacktrace(real names of my classes were changed) with
>>> short description of entries from my code:
>>>
>>> rdd.mapPartitions{ partition => //this is the line to which second
>>> stacktrace entry is pointing
>>>   val sender =  broadcastedValue.value // this is the maing place to
>>> which first stacktrace entry is pointing
>>> }
>>>
>>> java.lang.ClassCastException:
>>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>>> com.example.sender.MyClassReporter
>>> at com.example.flow.Calculator
>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
>>> at com.example.flow.Calculator
>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> 2015-12-14 17:10 GMT+01:00 Ted Yu :
>>>
 Can you show the complete stack trace for the ClassCastException ?

 Please see the following thread:
 http://search-hadoop.com/m/q3RTtgEUHVmJA1T1

 Cheers

 On Mon, Dec 14, 2015 at 7:33 AM, alberskib  wrote:

> Hey all,
>
> When my streaming application is restarting from failure (from
> checkpoint) I
> am receiving strange error:
>
> java.lang.ClassCastException:
> org.apache.spark.util.SerializableConfiguration cannot be cast to
> com.example.sender.MyClassReporter.
>
> Instance of B class is created on driver side (with proper config
> passed as
> constructor arg) and broadcasted to the executors in order to ensure
> that on
> each worker there will be only single instance. Everything is going
> well up
> to place where I am getting value of broadcasted field and executing
> function on it i.e.
> broadcastedValue.value.send(...)
>
> Below you can find definition of MyClassReporter (with trait):
>
> trait Reporter{
>   def send(name: String, value: String, timestamp: Long) : Unit
>   def flush() : Unit
> }
>
> class MyClassReporter(config: MyClassConfig, flow: String) extends
> Reporter
> with Serializable {
>
>   val prefix = 

Some tasks take a long time to find local block

2015-12-17 Thread patrick256
I'm using Spark 1.5.2 and my RDD has 512 equally sized partitions and is 100%
cached in memory across 512 executors. 

I have a filter-map-collect job with 512 tasks. Sometimes this job completes
sub-second. On other occasions when I run it 50% of the tasks complete
sub-second, 45% of the tasks take 10 seconds and 5% of the tasks take 20
seconds. 

Here is the log from an executor where the task took 20 seconds: 

15/12/16 09:44:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 5312 
15/12/16 09:44:37 INFO executor.Executor: Running task 215.0 in stage 17.0
(TID 5312) 
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Started reading broadcast
variable 10 
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(1777) called
with curMem=908793307, maxMem=5927684014 
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10_piece0 stored
as bytes in memory (estimated size 1777.0 B, free 4.7 GB) 
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Reading broadcast
variable 10 took 186 ms 
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(3272) called
with curMem=908795084, maxMem=5927684014 
15/12/16 *09:44:37* INFO storage.MemoryStore: Block broadcast_10 stored as
values in memory (estimated size 3.2 KB, free 4.7 GB) 
15/12/16 *09:44:57* INFO storage.BlockManager: Found block rdd_5_215 locally 
15/12/16 09:44:57 INFO executor.Executor: Finished task 215.0 in stage 17.0
(TID 5312). 2074 bytes result sent to driver 

So it appears the 20 seconds is spent finding the local block. 

Since the lag is always either exactly 10 seconds or exactly 20 seconds I
suspect it's due to a 10 second timeout on some listener, or something like
that. If that is true then I guess my options are either find out why it's
timing out and fix it or make the timeout shorter so it tries more
frequently. 

Any advice welcome. Many thanks in advance, 

Pat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Some-tasks-take-a-long-time-to-find-local-block-tp25727.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka - streaming from multiple topics

2015-12-17 Thread Cody Koeninger
Using spark.streaming.concurrentJobs for this probably isn't a good idea,
as it allows the next batch to start processing before current one is
finished, which may have unintended consequences.

Why can't you use a single stream with all the topics you care about, or
multiple streams if you're e.g. joining them?



On Wed, Dec 16, 2015 at 3:00 PM, jpocalan  wrote:

> Nevermind, I found the answer to my questions.
> The following spark configuration property will allow you to process
> multiple KafkaDirectStream in parallel:
> --conf spark.streaming.concurrentJobs=
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-from-multiple-topics-tp8678p25723.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming: How to specify deploy mode through configuration parameter?

2015-12-17 Thread Ted Yu
As far as I can tell, it is not in 1.6.0 RC.
You can comment on the JIRA, requesting backport to 1.6.1

Cheers

On Thu, Dec 17, 2015 at 5:28 AM, Saiph Kappa  wrote:

> I am not sure how the process works and if patches are applied to all
> upcoming versions of spark. Is it likely that the fix is available in this
> build (spark 1.6.0  17-Dec-2015 09:02)?
> http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/
>
> Thanks!
>
> On Wed, Dec 16, 2015 at 9:22 PM, Ted Yu  wrote:
>
>> Since both scala and java files are involved in the PR, I don't see an
>> easy way around without building yourself.
>>
>> Cheers
>>
>> On Wed, Dec 16, 2015 at 10:18 AM, Saiph Kappa 
>> wrote:
>>
>>> Exactly, but it's only fixed for the next spark version. Is there any
>>> work around for version 1.5.2?
>>>
>>> On Wed, Dec 16, 2015 at 4:36 PM, Ted Yu  wrote:
>>>
 This seems related:
 [SPARK-10123][DEPLOY] Support specifying deploy mode from configuration

 FYI

 On Wed, Dec 16, 2015 at 7:31 AM, Saiph Kappa 
 wrote:

> Hi,
>
> I have a client application running on host0 that is launching
> multiple drivers on multiple remote standalone spark clusters (each 
> cluster
> is running on a single machine):
>
> «
> ...
>
> List("host1", "host2" , "host3").foreach(host => {
>
> val sparkConf = new SparkConf()
> sparkConf.setAppName("App")
>
> sparkConf.set("spark.driver.memory", "4g")
> sparkConf.set("spark.executor.memory", "4g")
> sparkConf.set("spark.driver.maxResultSize", "4g")
> sparkConf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops 
> -XX:+UseConcMarkSweepGC " +
>   "-XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ")
>
> sparkConf.setMaster(s"spark://$host:7077")
>
> val rawStreams = (1 to source.parallelism).map(_ => 
> ssc.textFileStream("/home/user/data/")).toArray
> val rawStream = ssc.union(rawStreams)
> rawStream.count.map(c => s"Received $c records.").print()
>
> }
> ...
>
> »
>
> The problem is that I'm getting an error message saying that the 
> directory "/home/user/data/" does not exist.
> In fact, this directory only exists in host1, host2 and host3 and not in 
> host0.
> But since I'm launching the driver to host1..3 I thought data would be 
> fetched from those machines.
>
> I'm also trying to avoid using the spark submit script, and couldn't find 
> the configuration parameter to specify the deploy mode.
>
> Is there any way to specify the deploy mode through configuration 
> parameter?
>
>
> Thanks.
>
>

>>>
>>
>


Spark streaming: Consistency of multiple streams in Spark

2015-12-17 Thread Ashwin
Hi, I have been looking into using Spark streaming for the specific use case of 
joining events of data from multiple time-series streams. 

The part that I am having a hard time understanding is the consistency 
semantics of this across multiple streams. As per [1] Section 4.3.4, I 
understand that Spark has the notion of RDD's (i.e. micro batch time) across 
multiple streams and they are synchronized. 

But these batch time’s probably have no relation to the actual event times 
within that batch. So if I have two streams each with 2 minutes worth of data, 
I do not see yet how these could be ingested in a synchronized manner into 
Spark such that spark can maintain alignment of these boundaries.  

Or put another way, as a producer of these streams for example from Kinesis, I 
have no notion of batch times. Given that, if I had multiple streams, I do not 
see how Spark could synchronize these multiple streams. 

What am I missing?

Thanks,
Ashwin

[1] http://www.eecs.berkeley.edu/Pubs/TechRpts/2014/EECS-2014-12.pdf


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Matrix Inverse

2015-12-17 Thread Arunkumar Pillai
Hi

I want to find matrix inverse of (XTranspose * X). PFB my code.

This code does not work for even slight larger dataset. Please help me if
the approach is correct.

   val sqlQuery = "SELECT column1,column2 ,column3 FROM " + tableName
  val   matrixDF` = sqlContext.sql(sqlQuery)


var identityArray = Array.fill[String](numberOfRows)("1.0") // array
that contains 1 for calculating the intercept
val collectionString =
Array.ofDim[String](numberOfColumns,numberOfRows) // store the collection
as string
val independentCollectionDoubleType =
Array.ofDim[Double](numberOfColumns,numberOfRows) // store the collection
as double

//concatenate the values of all the as string type
for (i <- 0 to numberOfColumns - 1) {
  val matrixStringDF = matrixDF.map(_.get(i).toString)
  val matrixArray = matrixStringDF.collect()
  identityArray = Array.concat(identityArray, matrixArray)
  collectionString(i) = matrixArray
}

val valuesDouble = identityArray.map(value => value.toDouble) //Array
of all the  values and identity as an array of double
val valuesMatrix = Matrices.dense(numberOfRows, numberOfColumns + 1,
valuesDouble) //matrix
val transposeMatrix = valuesMatrix.transpose // transpose of  matrix
val transposeMatrix_matrix =
transposeMatrix.multiply(valuesMatrix.asInstanceOf[DenseMatrix]) //
transpose of  matrix * matrix
val transposeMatrix_matrixAsArray = transposeMatrix_matrix.values //
pdt as array
val transposeMatrix_matrixBreeze = new
linalg.DenseMatrix(transposeMatrix_matrix.numRows,
transposeMatrix_matrix.numCols, transposeMatrix_matrixAsArray, 0) //breeze
dense matrix of the product
val inverse = linalg.inv(transposeMatrix_matrixBreeze)




-- 
Thanks and Regards
Arun


One task hangs and never finishes

2015-12-17 Thread Daniel Haviv
Hi,

I have an application running a set of transformations and finishes
with saveAsTextFile.

Out of 80 tasks all finish pretty fast but one that just hangs and
outputs these message to STDERR:

5/12/17 17:22:19 INFO collection.ExternalAppendOnlyMap: Thread 82
spilling in-memory map of 4.0 GB to disk (6 times so far)

15/12/17 17:23:41 INFO collection.ExternalAppendOnlyMap: Thread 82
spilling in-memory map of 3.8 GB to disk (7 times so far)


Inside the WEBUI I can see that for some reason the shuffle spill
memory is exteremly high (15GB) compared to the others (around a few
mb to 1 GB) and as a result the GC time is exteremly bad



IndexIDAttemptStatus  ▴Locality LevelExecutor ID / HostLaunch
TimeDurationScheduler DelayTask Deserialization TimeGC TimeResult
Serialization TimeGetting Result TimePeak Execution MemoryOutput Size
/ RecordsShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
Spill (Disk)Errors171530RUNNINGPROCESS_LOCAL8 /
impact3.indigo.co.il2015/12/17 17:18:3232 min0 ms0 ms25 min0 ms0 ms0.0
B0.0 B / 0835.8 MB / 521783315.2 GB662.9 MB

I'm running with 8 executors with 8 cpus and 25GB ram each and it
seems that tasks are correctly spread across the nodes:



Executor IDAddressRDD BlocksStorage MemoryDisk UsedActive TasksFailed
TasksComplete TasksTotal TasksTask TimeInputShuffle ReadShuffle
WriteLogsThread Dump1impact1.indigo.co.il:3812000.0 B / 12.9 GB0.0
B0025253.54 h2.1 GB377.6 MB555.3 MB
stdout 

stderr 

Thread Dump 
2impact4.indigo.co.il:4076800.0
B / 12.9 GB0.0 B0024244.32 h2.0 GB513.1 MB495.9 MB
stdout 

stderr 

Thread Dump 
3impact2.indigo.co.il:4366600.0
B / 12.9 GB0.0 B0024243.78 h2.0 GB332.7 MB503.1 MB
stdout 

stderr 

Thread Dump 
4impact3.indigo.co.il:4902000.0
B / 12.9 GB0.0 B0026263.39 h2.2 GB532.0 MB596.1 MB
stdout 

stderr 

Thread Dump 
5impact1.indigo.co.il:4906800.0
B / 12.9 GB0.0 B0024243.30 h2.0 GB187.3 MB502.1 MB
stdout 

stderr 

Thread Dump 
6impact4.indigo.co.il:5006900.0
B / 12.9 GB0.0 B0028283.64 h2.4 GB336.4 MB498.9 MB
stdout 

stderr 

Thread Dump 
7impact2.indigo.co.il:4022500.0
B / 12.9 GB0.0 B0028283.62 h2.0 GB93.6 MB496.2 MB
stdout 

stderr 

Thread Dump 
8impact3.indigo.co.il:5076700.0
B / 12.9 GB0.0 B1024253.38 h2.1 GB336.2 MB564.4 MB
stdout 

stderr 

Thread Dump 

How to submit spark job to YARN from scala code

2015-12-17 Thread Saiph Kappa
Hi,

Since it is not currently possible to submit a spark job to a spark cluster
running in standalone mode (cluster mode - it's not currently possible to
specify this deploy mode within the code), can I do it with YARN?

I tried to do something like this (but in scala):

«

... // Client object - main
methodSystem.setProperty("SPARK_YARN_MODE", "true")val sparkConf = new
SparkConf()try {  val args = new ClientArguments(argStrings,
sparkConf)  new Client(args, sparkConf).run()} catch {  case e:
Exception => {Console.err.println(e.getMessage)System.exit(1)
}}System.exit(0)

» in http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/

However it is not possible to create a new instance of Client since
import org.apache.spark.deploy.yarn.Client is private

Is there any way I can submit spark jobs from the code in cluster mode
and not using the spark-submit script?

Thanks.


[SparkML] RandomForestModel vs PipelineModel API on a Driver.

2015-12-17 Thread Eugene Morozov
Hi!

I'm looking for a way to run prediction for learned model in the most
performant way. It might happen that some users might want to predict just
couple of samples (literally one or two), but some other would run
prediction for tens of thousands. It's not a surprise there is an overhead
to load data into cluster even for couple of samples. So, to avoid such an
overhead the one might run prediction directly on a driver.

It's possible with mlib API, because RandomForestModel allows me to provide
just feature Vector instead of RDD.
But it looks like it's not possible with PipelineModel API, which receives
only DataFrame (which in turns required to have SQLContext and SparkContext
to build those).

Is there any workaround for PipelineModel API?
Do you think it'd be useful for someone else besides me? If so I will file
a feature request.
--
Be well!
Jean Morozov


Re: Kafka - streaming from multiple topics

2015-12-17 Thread Jean-Pierre OCALAN
Hi Cody,

First of all thanks for the note about spark.streaming.concurrentJobs. I
guess this is why it's not mentioned in the actual spark streaming doc.
Since those 3 topics contain completely different data on which I need to
apply different kind of transformations, I am not sure joining them would
be really efficient, unless you know something that I don't.

As I really don't need any interaction between those streams, I think I
might end up running 3 different streaming apps instead of one.

Thanks again!

On Thu, Dec 17, 2015 at 11:43 AM, Cody Koeninger  wrote:

> Using spark.streaming.concurrentJobs for this probably isn't a good idea,
> as it allows the next batch to start processing before current one is
> finished, which may have unintended consequences.
>
> Why can't you use a single stream with all the topics you care about, or
> multiple streams if you're e.g. joining them?
>
>
>
> On Wed, Dec 16, 2015 at 3:00 PM, jpocalan  wrote:
>
>> Nevermind, I found the answer to my questions.
>> The following spark configuration property will allow you to process
>> multiple KafkaDirectStream in parallel:
>> --conf spark.streaming.concurrentJobs=
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-from-multiple-topics-tp8678p25723.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
jean-pierre ocalan
jpoca...@gmail.com


Re: SparkContext.cancelJob - what part of Spark uses it? Nothing in webUI to kill jobs?

2015-12-17 Thread Mark Hamstra
Ah, sorry for leading you astray a bit.  I was working from memory instead
of looking at the code, and was probably thinking back all the way to
Reynold's initial implementation of SparkContext#killJob(), which was
public.  I'd have to do some digging to determine exactly when and why
SparkContext#cancelJob() became private[spark].  Of course, the other
problem is that more often than not I am working with custom builds of
Spark, and I'm not beyond changing selected things from private to public.
:)

When you start talking about doing some checks before killing a job, I
imagine that you are thinking about something like checking that parts of a
job are not needed by other jobs, etc.  That's a reasonable idea, but the
realization of that goal is not simple -- especially not when you start
including asynchronous execution with various timeouts or other events
requesting cancellation, or more extensive reuse functionality as in
https://issues.apache.org/jira/browse/SPARK-11838  If you don't want to
spend a lot of time looking at Job cancellation issues, best to back away
now! :)

On Wed, Dec 16, 2015 at 4:26 PM, Jacek Laskowski  wrote:

> Thanks Mark for the answer! It helps, but still leaves me with few
> more questions. If you don't mind, I'd like to ask you few more
> questions.
>
> When you said "It can be used, and is used in user code, but it isn't
> always as straightforward as you might think." did you think about the
> Spark code or some other user code? Can I have a look at the code and
> the use case? The method is `private[spark]` and it's not even
> @DeveloperApi that makes using the method even more risky. I believe
> it's a very low-level ingredient of Spark that very few people use if
> at all. If I could see the code that uses the method, that could help.
>
> Following up, isn't killing a stage similar to killing a job? They can
> both be shared and I could imagine a very similar case for killing a
> job as for a stage where an implementation does some checks before
> killing the job eventually. It is possible for stages that are in a
> sense similar to jobs so...I'm still unsure why the method is not used
> by Spark itself. If it's not used by Spark why could it be useful for
> others outside Spark?
>
> Doh, why did I come across the method? It will take some time before I
> forget about it :-)
>
> Pozdrawiam,
> Jacek
>
> --
> Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
> http://blog.jaceklaskowski.pl
> Mastering Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
> Follow me at https://twitter.com/jaceklaskowski
> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>
>
> On Wed, Dec 16, 2015 at 10:55 AM, Mark Hamstra 
> wrote:
> > It can be used, and is used in user code, but it isn't always as
> > straightforward as you might think.  This is mostly because a Job often
> > isn't a Job -- or rather it is more than one Job.  There are several RDD
> > transformations that aren't lazy, so they end up launching "hidden" Jobs
> > that you may not anticipate and may expect to be canceled (but won't be)
> by
> > a cancelJob() called on a later action on that transformed RDD.  It is
> also
> > possible for a single DataFrame or Spark SQL query to result in more than
> > one running Job.  The upshot of all of this is that getting cancelJob()
> to
> > work as most users would expect all the time is non-trivial, and most of
> the
> > time using a jobGroup is a better way to capture what may be more than
> one
> > Job that the user is thinking of as a single Job.
> >
> > On Wed, Dec 16, 2015 at 5:34 AM, Sean Owen  wrote:
> >>
> >> It does look like it's not actually used. It may simply be there for
> >> completeness, to match cancelStage and cancelJobGroup, which are used.
> >> I also don't know of a good reason there's no way to kill a whole job.
> >>
> >> On Wed, Dec 16, 2015 at 1:15 PM, Jacek Laskowski 
> wrote:
> >> > Hi,
> >> >
> >> > While reviewing Spark code I came across SparkContext.cancelJob. I
> >> > found no part of Spark using it. Is this a leftover after some
> >> > refactoring? Why is this part of sc?
> >> >
> >> > The reason I'm asking is another question I'm having after having
> >> > learnt about killing a stage in webUI. I noticed there is a way to
> >> > kill/cancel stages, but no corresponding feature to kill/cancel jobs.
> >> > Why? Is there a JIRA ticket to have it some day perhaps?
> >> >
> >> > Pozdrawiam,
> >> > Jacek
> >> >
> >> > --
> >> > Jacek Laskowski | https://medium.com/@jaceklaskowski/
> >> > Mastering Apache Spark
> >> > ==> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
> >> > Follow me at https://twitter.com/jaceklaskowski
> >> >
> >> > -
> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> > For additional commands, e-mail: user-h...@spark.apache.org
> 

How to access resources added with SQL: ADD FILE

2015-12-17 Thread Antonio Piccolboni
Hi,
I need to access a file from a UDF. In standalone, if I add the file
/tmp/somedata, it ends up in /private/tmp/somedata, as I found out keeping
an eye on the logs. That is actually the same file because of a link
between the directories, nothing related to spark. My expectation reading
some code examples was to have access to a copy of the file in the current
working directory of the UDF, but that is clearly not the case here. I was
wondering if there is a general rule about these added files that is
independent from Spark  mode and persistence layer, say if the file comes
from HDFS vs S3. That would be very helpful for me to achieve the necessary
level of generality for the UDF. Thanks


Antonio


Re: Kafka - streaming from multiple topics

2015-12-17 Thread Cody Koeninger
You could stick them all in a single stream, and do mapPartitions, then
switch on the topic for that partition.  It's probably cleaner to do
separate jobs, just depends on how you want to organize your code.

On Thu, Dec 17, 2015 at 11:11 AM, Jean-Pierre OCALAN 
wrote:

> Hi Cody,
>
> First of all thanks for the note about spark.streaming.concurrentJobs. I
> guess this is why it's not mentioned in the actual spark streaming doc.
> Since those 3 topics contain completely different data on which I need to
> apply different kind of transformations, I am not sure joining them would
> be really efficient, unless you know something that I don't.
>
> As I really don't need any interaction between those streams, I think I
> might end up running 3 different streaming apps instead of one.
>
> Thanks again!
>
> On Thu, Dec 17, 2015 at 11:43 AM, Cody Koeninger 
> wrote:
>
>> Using spark.streaming.concurrentJobs for this probably isn't a good idea,
>> as it allows the next batch to start processing before current one is
>> finished, which may have unintended consequences.
>>
>> Why can't you use a single stream with all the topics you care about, or
>> multiple streams if you're e.g. joining them?
>>
>>
>>
>> On Wed, Dec 16, 2015 at 3:00 PM, jpocalan  wrote:
>>
>>> Nevermind, I found the answer to my questions.
>>> The following spark configuration property will allow you to process
>>> multiple KafkaDirectStream in parallel:
>>> --conf spark.streaming.concurrentJobs=
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-from-multiple-topics-tp8678p25723.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> jean-pierre ocalan
> jpoca...@gmail.com
>


pyspark + kafka + streaming = NoSuchMethodError

2015-12-17 Thread Christos Mantas

Hello,

I am trying to set up a simple example with Spark Streaming (Python) and 
Kafka on a single machine deployment.
My Kafka broker/server is also on the same machine (localhost:1281) and 
I am using Spark Version: spark-1.5.2-bin-hadoop2.6


Python code

   ...
   ssc = StreamingContext(sc, 1)
   ...
   lines = KafkaUtils.createDirectStream(ssc, ["test"],
   {"metadata.broker.list":"localhost:1281"})


So I try

   spark-submit --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar
   my_kafka_streaming_wordcount.py

OR

   spark-submit --packages
   org.apache.spark:spark-streaming-kafka_2.11:1.5.2
   my_kafka_streaming_wordcount.py
   (my kafka version is 2.11-0.9.0.0)

OR

   pyspark  --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar 
   [import stuff and type those lines]


and I end up with:

   15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load
   native-hadoop library for your platform... using builtin-java
   classes where applicable
   15/12/17 19:45:00 WARN MetricsSystem: Using default name
   DAGScheduler for source because spark.app.id is not set.
   Traceback (most recent call last):
  File "/I/edited/the/path/here/my_kafka_streaming_wordcount.py",
   line 80, in 
lines = KafkaUtils.createDirectStream(ssc, ["test"],
   {"metadata.broker.list":"localhost:1281"})
  File
   
"/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
   line 130, in createDirectStream
   py4j.protocol.Py4JJavaError: An error occurred while calling
   o29.createDirectStream.
   : java.lang.NoSuchMethodError:
   scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at kafka.api.RequestKeys$.(RequestKeys.scala:39)
at kafka.api.RequestKeys$.(RequestKeys.scala)
at
   kafka.api.TopicMetadataRequest.(TopicMetadataRequest.scala:53)
at
   
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
at
   
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
at
   
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
at
   
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
at
   
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
   sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
   
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at
   py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at
   py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

Am I missing something?

Thanks in advance
Chris M.







​Spark 1.6 - YARN Cluster Mode

2015-12-17 Thread syepes
Hello,

This week I have been testing 1.6 (#d509194b) in our HDP 2.3 platform and
its been working pretty ok, at the exception of the YARN cluster deployment
mode.
Note that with 1.5 using the same "spark-props.conf" and "spark-env.sh"
config files the cluster mode works as expected.

Has anyone else also tried the cluster mode in 1.6?


Problem reproduction:

# spark-submit --master yarn --deploy-mode cluster --num-executors 1 
--properties-file $PWD/spark-props.conf --class
org.apache.spark.examples.SparkPi
/opt/spark/lib/spark-examples-1.6.0-SNAPSHOT-hadoop2.7.1.jar

Error: Could not find or load main class
org.apache.spark.deploy.yarn.ApplicationMaster

spark-props.conf
-
spark.driver.extraJavaOptions-Dhdp.version=2.3.2.0-2950
spark.driver.extraLibraryPath   
/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64
spark.executor.extraJavaOptions  -Dhdp.version=2.3.2.0-2950
spark.executor.extraLibraryPath 
/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64
-

I will try to do some more debugging on this issue.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-6-YARN-Cluster-Mode-tp25729.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can't run spark on yarn

2015-12-17 Thread Eran Witkon
Hi,
I am trying to install spark 1.5.2 on Apache hadoop 2.6 and Hive and yarn

spark-env.sh
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop

bash_profile
#HADOOP VARIABLES START
export JAVA_HOME=/usr/lib/jvm/java-8-oracle/
export HADOOP_INSTALL=/usr/local/hadoop
export PATH=$PATH:$HADOOP_INSTALL/bin
export PATH=$PATH:$HADOOP_INSTALL/sbin
export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_HOME=$HADOOP_INSTALL
export HADOOP_HDFS_HOME=$HADOOP_INSTALL
export YARN_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib"
export HADOOP_USER_CLASSPATH_FIRST=true
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export YARN_CONF_DIR=/usr/local/hadoop/etc/hadoop
#HADOOP VARIABLES END

export SPARK_HOME=/usr/local/spark
export HIVE_HOME=/usr/local/hive
export PATH=$PATH:$HIVE_HOME/bin


When I run spark-shell
./bin/spark-shell --master yarn-client

Output:
15/12/17 22:22:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/12/17 22:22:07 INFO spark.SecurityManager: Changing view acls to: hduser
15/12/17 22:22:07 INFO spark.SecurityManager: Changing modify acls to:
hduser
15/12/17 22:22:07 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hduser); users with modify permissions: Set(hduser)
15/12/17 22:22:07 INFO spark.HttpServer: Starting HTTP Server
15/12/17 22:22:07 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/12/17 22:22:08 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:38389
15/12/17 22:22:08 INFO util.Utils: Successfully started service 'HTTP class
server' on port 38389.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.2
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_66)
Type in expressions to have them evaluated.
Type :help for more information.
15/12/17 22:22:11 WARN util.Utils: Your hostname, eranw-Lenovo-Yoga-2-Pro
resolves to a loopback address: 127.0.1.1; using 10.0.0.1 instead (on
interface wlp1s0)
15/12/17 22:22:11 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind
to another address
15/12/17 22:22:11 INFO spark.SparkContext: Running Spark version 1.5.2
15/12/17 22:22:11 INFO spark.SecurityManager: Changing view acls to: hduser
15/12/17 22:22:11 INFO spark.SecurityManager: Changing modify acls to:
hduser
15/12/17 22:22:11 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hduser); users with modify permissions: Set(hduser)
15/12/17 22:22:11 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/12/17 22:22:11 INFO Remoting: Starting remoting
15/12/17 22:22:12 INFO util.Utils: Successfully started service
'sparkDriver' on port 36381.
15/12/17 22:22:12 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@10.0.0.1:36381]
15/12/17 22:22:12 INFO spark.SparkEnv: Registering MapOutputTracker
15/12/17 22:22:12 INFO spark.SparkEnv: Registering BlockManagerMaster
15/12/17 22:22:12 INFO storage.DiskBlockManager: Created local directory at
/tmp/blockmgr-139fac31-5f21-4c61-9575-3110d5205f7d
15/12/17 22:22:12 INFO storage.MemoryStore: MemoryStore started with
capacity 530.0 MB
15/12/17 22:22:12 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-955ef002-a802-49c6-b440-0656861f737c/httpd-2127cbe1-97d7-40a5-a96f-75216f115f00
15/12/17 22:22:12 INFO spark.HttpServer: Starting HTTP Server
15/12/17 22:22:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/12/17 22:22:12 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:36760
15/12/17 22:22:12 INFO util.Utils: Successfully started service 'HTTP file
server' on port 36760.
15/12/17 22:22:12 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/12/17 22:22:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/12/17 22:22:12 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/12/17 22:22:12 INFO util.Utils: Successfully started service 'SparkUI'
on port 4040.
15/12/17 22:22:12 INFO ui.SparkUI: Started SparkUI at http://10.0.0.1:4040
15/12/17 22:22:12 WARN metrics.MetricsSystem: Using default name
DAGScheduler for source because spark.app.id is not set.
15/12/17 22:22:12 INFO client.RMProxy: Connecting to ResourceManager at /
0.0.0.0:8032
15/12/17 22:22:12 INFO yarn.Client: Requesting a new application from
cluster with 1 NodeManagers
15/12/17 22:22:12 INFO yarn.Client: Verifying our application has not
requested more than the maximum memory capability of the cluster (8192 MB
per container)
15/12/17 22:22:12 INFO yarn.Client: Will allocate AM container, with 896 MB
memory including 384 MB overhead
15/12/17 22:22:12 INFO yarn.Client: Setting up container launch context for
our AM
15/12/17 22:22:12 INFO yarn.Client: Setting up the launch 

unsubscribe

2015-12-17 Thread Roman Garcia
please


Re: How to submit spark job to YARN from scala code

2015-12-17 Thread Steve Loughran

On 17 Dec 2015, at 16:50, Saiph Kappa 
> wrote:

Hi,

Since it is not currently possible to submit a spark job to a spark cluster 
running in standalone mode (cluster mode - it's not currently possible to 
specify this deploy mode within the code), can I do it with YARN?

I tried to do something like this (but in scala):

«

... // Client object - main method
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf()

try {
  val args = new ClientArguments(argStrings, sparkConf)
  new Client(args, sparkConf).run()
} catch {
  case e: Exception => {
Console.err.println(e.getMessage)
System.exit(1)
  }
}

System.exit(0)


» in http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/


However it is not possible to create a new instance of Client since import 
org.apache.spark.deploy.yarn.Client is private

the standard way to work around a problem like this is to place your code in a 
package which has access. File a JIRA asking for a public API too —one that 
doesn't require you to set system properties as a way of passing parameters down



Is there any way I can submit spark jobs from the code in cluster mode and not 
using the spark-submit script?


Thanks.



Re: Large number of conf broadcasts

2015-12-17 Thread Prasad Ravilla
Hi Anders,

I am running into the same issue as yours. I am trying to read about 120 
thousand avro files into a single data frame.

Is your patch part of a pull request from the master branch in github?

Thanks,
Prasad.

From: Anders Arpteg
Date: Thursday, October 22, 2015 at 10:37 AM
To: Koert Kuipers
Cc: user
Subject: Re: Large number of conf broadcasts

Yes, seems unnecessary. I actually tried patching the com.databricks.spark.avro 
reader to only broadcast once per dataset, instead of every single 
file/partition. It seems to work just as fine, and there are significantly less 
broadcasts and not seeing out of memory issues any more. Strange that more 
people does not react to this, since the broadcasting seems completely 
unnecessary...

Best,
Anders

On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers 
> wrote:
i am seeing the same thing. its gona completely crazy creating broadcasts for 
the last 15 mins or so. killing it...

On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg 
> wrote:
Hi,

Running spark 1.5.0 in yarn-client mode, and am curios in why there are so many 
broadcast being done when loading datasets with large number of 
partitions/files. Have datasets with thousands of partitions, i.e. hdfs files 
in the avro folder, and sometime loading hundreds of these large datasets. 
Believe I have located the broadcast to line SparkContext.scala:1006. It seems 
to just broadcast the hadoop configuration, and I don't see why it should be 
necessary to broadcast that for EVERY file? Wouldn't it be possible to reuse 
the same broadcast configuration? It hardly the case the the configuration 
would be different between each file in a single dataset. Seems to be wasting 
lots of memory and needs to persist unnecessarily to disk (see below again).

Thanks,
Anders

15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0 to 
disk  [19/49086]15/09/24 17:11:11 
INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on 
10.254.35.24:49428
 (size: 23.1 KB)
15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored as bytes 
in memory (estimated size 23.1 KB, free 2.4 KB)
15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in memory 
on 
10.254.35.24:49428
 (size: 23.1 KB, free: 464.0 MB)
15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from 
hadoopFile at AvroRelation.scala:121
15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory threshold 
of 1024.0 KB for computing block broadcast_4804 in memory
.
15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache broadcast_4804 in 
memory! (computed 496.0 B so far)
15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0 B 
(scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
limit = 530.3 MB.
15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to disk 
instead.
15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with 
curMem=556036460, maxMem=556038881
15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
15/09/24 17:11:11 INFO BlockManager: Dropping block broadcast_1872_piece0 from 
memory
15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0 to disk




Spark Path Wildcards Question

2015-12-17 Thread Mark Vervuurt
Hi Guys,

quick verification question:

Spark’s method like textFile(…) and sequenceFile(…) support wildcards. 
However if I have a directory structure with “hdfs:///data/year/month/day” (ex. 
"hdfs:///data/2015/12/17”), then its possible to crawl a whole year of data 
consisting of sequence files with 
“sparkContext.sequenceFile('hdfs:///data/*/*/*/*.seq')” correct?

Looking at our results it seems to be working fine and as described above.

Thanks,
Mark

Re: Large number of conf broadcasts

2015-12-17 Thread Koert Kuipers
https://github.com/databricks/spark-avro/pull/95

On Thu, Dec 17, 2015 at 3:35 PM, Prasad Ravilla  wrote:

> Hi Anders,
>
> I am running into the same issue as yours. I am trying to read about 120
> thousand avro files into a single data frame.
>
> Is your patch part of a pull request from the master branch in github?
>
> Thanks,
> Prasad.
>
> From: Anders Arpteg
> Date: Thursday, October 22, 2015 at 10:37 AM
> To: Koert Kuipers
> Cc: user
> Subject: Re: Large number of conf broadcasts
>
> Yes, seems unnecessary. I actually tried patching the
> com.databricks.spark.avro reader to only broadcast once per dataset,
> instead of every single file/partition. It seems to work just as fine, and
> there are significantly less broadcasts and not seeing out of memory issues
> any more. Strange that more people does not react to this, since the
> broadcasting seems completely unnecessary...
>
> Best,
> Anders
>
> On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers  wrote:
>
>> i am seeing the same thing. its gona completely crazy creating broadcasts
>> for the last 15 mins or so. killing it...
>>
>> On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg 
>> wrote:
>>
>>> Hi,
>>>
>>> Running spark 1.5.0 in yarn-client mode, and am curios in why there are
>>> so many broadcast being done when loading datasets with large number of
>>> partitions/files. Have datasets with thousands of partitions, i.e. hdfs
>>> files in the avro folder, and sometime loading hundreds of these large
>>> datasets. Believe I have located the broadcast to line
>>> SparkContext.scala:1006. It seems to just broadcast the hadoop
>>> configuration, and I don't see why it should be necessary to broadcast that
>>> for EVERY file? Wouldn't it be possible to reuse the same broadcast
>>> configuration? It hardly the case the the configuration would be different
>>> between each file in a single dataset. Seems to be wasting lots of memory
>>> and needs to persist unnecessarily to disk (see below again).
>>>
>>> Thanks,
>>> Anders
>>>
>>> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0
>>> to disk  [19/49086]15/09/24
>>> 17:11:11 INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on
>>> 10.254.35.24:49428
>>> 
>>> (size: 23.1 KB)
>>> 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored
>>> as bytes in memory (estimated size 23.1 KB, free 2.4 KB)
>>> 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in
>>> memory on 10.254.35.24:49428
>>> 
>>> (size: 23.1 KB, free: 464.0 MB)
>>> 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from
>>> hadoopFile at AvroRelation.scala:121
>>> 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory
>>> threshold of 1024.0 KB for computing block broadcast_4804 in memory
>>> .
>>> 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache
>>> broadcast_4804 in memory! (computed 496.0 B so far)
>>> 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0
>>> B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
>>> limit = 530.3 MB.
>>> 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to
>>> disk instead.
>>> 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with
>>> curMem=556036460, maxMem=556038881
>>> 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
>>> 15/09/24 17:11:11 INFO BlockManager: Dropping block
>>> broadcast_1872_piece0 from memory
>>> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0
>>> to disk
>>>
>>>
>>
>>


Re: Large number of conf broadcasts

2015-12-17 Thread Prasad Ravilla
Thanks, Koert.

Regards,
Prasad.

From: Koert Kuipers
Date: Thursday, December 17, 2015 at 1:06 PM
To: Prasad Ravilla
Cc: Anders Arpteg, user
Subject: Re: Large number of conf broadcasts

https://github.com/databricks/spark-avro/pull/95

On Thu, Dec 17, 2015 at 3:35 PM, Prasad Ravilla 
> wrote:
Hi Anders,

I am running into the same issue as yours. I am trying to read about 120 
thousand avro files into a single data frame.

Is your patch part of a pull request from the master branch in github?

Thanks,
Prasad.

From: Anders Arpteg
Date: Thursday, October 22, 2015 at 10:37 AM
To: Koert Kuipers
Cc: user
Subject: Re: Large number of conf broadcasts

Yes, seems unnecessary. I actually tried patching the com.databricks.spark.avro 
reader to only broadcast once per dataset, instead of every single 
file/partition. It seems to work just as fine, and there are significantly less 
broadcasts and not seeing out of memory issues any more. Strange that more 
people does not react to this, since the broadcasting seems completely 
unnecessary...

Best,
Anders

On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers 
> wrote:
i am seeing the same thing. its gona completely crazy creating broadcasts for 
the last 15 mins or so. killing it...

On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg 
> wrote:
Hi,

Running spark 1.5.0 in yarn-client mode, and am curios in why there are so many 
broadcast being done when loading datasets with large number of 
partitions/files. Have datasets with thousands of partitions, i.e. hdfs files 
in the avro folder, and sometime loading hundreds of these large datasets. 
Believe I have located the broadcast to line SparkContext.scala:1006. It seems 
to just broadcast the hadoop configuration, and I don't see why it should be 
necessary to broadcast that for EVERY file? Wouldn't it be possible to reuse 
the same broadcast configuration? It hardly the case the the configuration 
would be different between each file in a single dataset. Seems to be wasting 
lots of memory and needs to persist unnecessarily to disk (see below again).

Thanks,
Anders

15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0 to 
disk  [19/49086]15/09/24 17:11:11 
INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on 
10.254.35.24:49428
 (size: 23.1 KB)
15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored as bytes 
in memory (estimated size 23.1 KB, free 2.4 KB)
15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in memory 
on 
10.254.35.24:49428
 (size: 23.1 KB, free: 464.0 MB)
15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from 
hadoopFile at AvroRelation.scala:121
15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory threshold 
of 1024.0 KB for computing block broadcast_4804 in memory
.
15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache broadcast_4804 in 
memory! (computed 496.0 B so far)
15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0 B 
(scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
limit = 530.3 MB.
15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to disk 
instead.
15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with 
curMem=556036460, maxMem=556038881
15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
15/09/24 17:11:11 INFO BlockManager: Dropping block broadcast_1872_piece0 from 
memory
15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0 to disk





Re: number of blocks in ALS/recommendation API

2015-12-17 Thread Burak Yavuz
Copying the first part from the scaladoc:
"
This is a blocked implementation of the ALS factorization algorithm that
groups the two sets of factors (referred to as "users" and "products") into
blocks and reduces communication by only sending one copy of each user
vector to each product block on each iteration, and only for the product
blocks that need that user's feature vector. This is achieved by
pre-computing some information about the ratings matrix to determine the
"out-links" of each user (which blocks of products it will contribute to)
and "in-link" information for each product (which of the feature vectors it
receives from each user block it will depend on). This allows us to send
only an array of feature vectors between each user block and product block,
and have the product block find the users' ratings and update the products
based on these messages.
"

Basically, the number of blocks can be tweaked to reduce shuffling, making
the application more efficient in both run time and disk space usage. For
example, if you have a high number of product ratings per user ratio (1
user rating 100s of products), you may choose a smaller product block
number so that that user's ratings get sent to a fewer number of
partitions, which would lead to less shuffling.

However, if you choose your number of blocks to be low, then you may run
into OOMEs.

Hope that helps.

Burak

On Thu, Dec 17, 2015 at 3:17 AM, Roberto Pagliari  wrote:

> What is the meaning of the ‘blocks’ input argument in mllib ALS
> implementation, and how does that relate to the number of executors and/or
> size of the input data?
>
>
> Thank you,
>
>


Difference between Local Hive Metastore server and A Hive-based Metastore server

2015-12-17 Thread Divya Gehlot
Hi,
I am new bee to spark and using 1.4.1
Got confused between  Local Metastore server and a hive based metastore
server.
Can somebody share the usecases when to use which one  and pros and cons ?

I am using HDP 2,.3.2 in which hive-site-xml is already in spark
configuration directory that means HDP 2.3.2 already uses hive based
metastore server.


seriazable error in apache spark job

2015-12-17 Thread Pankaj Narang
I am encountering below error. Can somebody guide ?

Something similar is one this link
https://github.com/elastic/elasticsearch-hadoop/issues/298


actor.MentionCrawlActor
java.io.NotSerializableException: actor.MentionCrawlActor
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
~[na:1.7.0_79]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
~[na:1.7.0_79]



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/seriazable-error-in-apache-spark-job-tp25732.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is DataFrame.groupBy supposed to preserve order within groups?

2015-12-17 Thread Timothée Carayol
Hi all,

I tried to do something like the following in Spark

df.orderBy('col1, 'col2).groupBy('col1).agg(first('col3))

I was hoping to get, within each col1 value, the value for col3 that
corresponds to the highest value for col2 within that col1 group. This only
works if the order on col2 is preserved after the groupBy step.

https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
suggests that it is (unlike RDD.groupBy, DataFrame.groupBy is described as
preserving the order).

Yet in my experiments, I find that in some cases the order is not
preserved. Running the same code multiple times gives me different results.

If this is a bug, I'll happily work on a reproducible example and post to
JIRA but I thought I'd check with the mailing list first in case that is,
in fact, the expected behaviour?

Thanks
Timothée


Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-17 Thread Shixiong Zhu
Streaming checkpoint doesn't support Accumulator or Broadcast. See
https://issues.apache.org/jira/browse/SPARK-5206

Here is a workaround:
https://issues.apache.org/jira/browse/SPARK-5206?focusedCommentId=14506806=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14506806

Best Regards,
Shixiong Zhu

2015-12-17 4:39 GMT-08:00 Bartłomiej Alberski :

> I prepared simple example helping in reproducing problem:
>
> https://github.com/alberskib/spark-streaming-broadcast-issue
>
> I think that in that way it will be easier for you to understand problem
> and find solution (if any exists)
>
> Thanks
> Bartek
>
> 2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski :
>
>> First of all , thanks @tdas for looking into my problem.
>>
>> Yes, I checked it seperately and it is working fine. For below piece of
>> code there is no single exception and values are sent correctly.
>>
>> val reporter = new MyClassReporter(...)
>> reporter.send(...)
>> val out = new FileOutputStream("out123.txt")
>> val outO = new ObjectOutputStream(out)
>> outO.writeObject(reporter)
>> outO.flush()
>> outO.close()
>>
>> val in = new FileInputStream("out123.txt")
>> val inO = new ObjectInputStream(in)
>> val reporterFromFile  =
>> inO.readObject().asInstanceOf[StreamingGraphiteReporter]
>> reporterFromFile.send(...)
>> in.close()
>>
>> Maybe I am wrong but I think that it will be strange if class
>> implementing Serializable and properly broadcasted to executors cannot be
>> serialized and deserialized?
>> I also prepared slightly different piece of code and I received slightly
>> different exception. Right now it looks like:
>> java.lang.ClassCastException: [B cannot be cast to com.example.sender.
>> MyClassReporter.
>>
>> Maybe I am wrong but, it looks like that when restarting from checkpoint
>> it does read proper block of memory to read bytes for MyClassReporter.
>>
>> 2015-12-16 2:38 GMT+01:00 Tathagata Das :
>>
>>> Could you test serializing and deserializing the MyClassReporter  class
>>> separately?
>>>
>>> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski <
>>> albers...@gmail.com> wrote:
>>>
 Below is the full stacktrace(real names of my classes were changed)
 with short description of entries from my code:

 rdd.mapPartitions{ partition => //this is the line to which second
 stacktrace entry is pointing
   val sender =  broadcastedValue.value // this is the maing place to
 which first stacktrace entry is pointing
 }

 java.lang.ClassCastException:
 org.apache.spark.util.SerializableConfiguration cannot be cast to
 com.example.sender.MyClassReporter
 at com.example.flow.Calculator
 $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
 at com.example.flow.Calculator
 $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
 at
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
 at
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:88)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 2015-12-14 17:10 GMT+01:00 Ted Yu :

> Can you show the complete stack trace for the ClassCastException ?
>
> Please see the following thread:
> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>
> Cheers
>
> On Mon, Dec 14, 2015 at 7:33 AM, alberskib 
> wrote:
>
>> Hey all,
>>
>> When my streaming application is restarting from failure (from
>> checkpoint) I
>> am receiving strange error:
>>
>> java.lang.ClassCastException:
>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>> com.example.sender.MyClassReporter.
>>
>> Instance of B class is created on driver side (with proper config
>> passed as
>> constructor arg) and broadcasted to the executors in order to ensure
>> that on
>> each worker there will be only single instance. Everything is going

ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.

2015-12-17 Thread Anderson de Andrade
Hi. The following code is raising the warning in the title:



I read a similar thread about this. However, I do not think I'm joining two
VertexRDDs. Is this the best way to go about stacking aggregateMessage
calls?

Thank you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ShippableVertexPartitionOps-Joining-two-VertexPartitions-with-different-indexes-is-slow-tp25730.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark REST API shows Error 503 Service Unavailable

2015-12-17 Thread prateek arora
Hi Vikram ,

As per Cloudera Person :

" There is a minor bug with the way the classpath is setup for the Spark
HistoryServer in 5.5.0 which causes the observed error when using the REST
API (as a result of bad jersey versions (1.9) being included).

This will be fixed in CDH and CM 5.5.2 (yet to be released) onwards."

On Thu, Dec 17, 2015 at 3:24 PM, Vikram Kone  wrote:

> Hi Prateek,
> Were you able to figure why this is happening? I'm seeing the same error
> on my spark standalone cluster.
>
> Any pointers anyone?
>
> On Fri, Dec 11, 2015 at 2:05 PM, prateek arora  > wrote:
>
>>
>>
>> Hi
>>
>> I am trying to access Spark Using REST API but got below error :
>>
>> Command :
>>
>> curl http://:18088/api/v1/applications
>>
>> Response:
>>
>>
>> 
>> 
>> 
>> Error 503 Service Unavailable
>> 
>> 
>> HTTP ERROR 503
>>
>> Problem accessing /api/v1/applications. Reason:
>> Service Unavailable
>> Caused by:
>> org.spark-project.jetty.servlet.ServletHolder$1:
>> java.lang.reflect.InvocationTargetException
>> at
>>
>> org.spark-project.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:496)
>> at
>>
>> org.spark-project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:543)
>> at
>>
>> org.spark-project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:415)
>> at
>>
>> org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:657)
>> at
>>
>> org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
>> at
>>
>> org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
>> at
>>
>> org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
>> at
>>
>> org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
>> at
>>
>> org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
>> at
>>
>> org.spark-project.jetty.server.handler.GzipHandler.handle(GzipHandler.java:301)
>> at
>>
>> org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
>> at
>>
>> org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
>> at org.spark-project.jetty.server.Server.handle(Server.java:370)
>> at
>>
>> org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
>> at
>>
>> org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
>> at
>>
>> org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
>> at
>> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
>> at
>>
>> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>> at
>>
>> org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
>> at
>>
>> org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
>> at
>>
>> org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
>> at
>>
>> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>> at
>>
>> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> at
>>
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>> at
>>
>> com.sun.jersey.spi.container.servlet.WebComponent.createResourceConfig(WebComponent.java:728)
>> at
>>
>> com.sun.jersey.spi.container.servlet.WebComponent.createResourceConfig(WebComponent.java:678)
>> at
>>
>> com.sun.jersey.spi.container.servlet.WebComponent.init(WebComponent.java:203)
>> at
>>
>> com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:373)
>> at
>>
>> com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:556)
>> at javax.servlet.GenericServlet.init(GenericServlet.java:244)
>> at
>>
>> org.spark-project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:532)
>> ... 22 more
>> Caused by: java.lang.NoSuchMethodError:
>>
>> com.sun.jersey.core.reflection.ReflectionHelper.getOsgiRegistryInstance()Lcom/sun/jersey/core/osgi/OsgiRegistry;
>> at
>>
>> 

Re: Spark REST API shows Error 503 Service Unavailable

2015-12-17 Thread Vikram Kone
Hi Prateek,
Were you able to figure why this is happening? I'm seeing the same error on
my spark standalone cluster.

Any pointers anyone?

On Fri, Dec 11, 2015 at 2:05 PM, prateek arora 
wrote:

>
>
> Hi
>
> I am trying to access Spark Using REST API but got below error :
>
> Command :
>
> curl http://:18088/api/v1/applications
>
> Response:
>
>
> 
> 
> 
> Error 503 Service Unavailable
> 
> 
> HTTP ERROR 503
>
> Problem accessing /api/v1/applications. Reason:
> Service Unavailable
> Caused by:
> org.spark-project.jetty.servlet.ServletHolder$1:
> java.lang.reflect.InvocationTargetException
> at
>
> org.spark-project.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:496)
> at
>
> org.spark-project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:543)
> at
>
> org.spark-project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:415)
> at
>
> org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:657)
> at
>
> org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
> at
>
> org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
> at
>
> org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
> at
>
> org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
> at
>
> org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
> at
>
> org.spark-project.jetty.server.handler.GzipHandler.handle(GzipHandler.java:301)
> at
>
> org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
> at
>
> org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
> at org.spark-project.jetty.server.Server.handle(Server.java:370)
> at
>
> org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
> at
>
> org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
> at
>
> org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
> at
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
> at
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
> at
>
> org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
> at
>
> org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
> at
>
> org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
> at
>
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
> at
>
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
>
> com.sun.jersey.spi.container.servlet.WebComponent.createResourceConfig(WebComponent.java:728)
> at
>
> com.sun.jersey.spi.container.servlet.WebComponent.createResourceConfig(WebComponent.java:678)
> at
>
> com.sun.jersey.spi.container.servlet.WebComponent.init(WebComponent.java:203)
> at
>
> com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:373)
> at
>
> com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:556)
> at javax.servlet.GenericServlet.init(GenericServlet.java:244)
> at
>
> org.spark-project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:532)
> ... 22 more
> Caused by: java.lang.NoSuchMethodError:
>
> com.sun.jersey.core.reflection.ReflectionHelper.getOsgiRegistryInstance()Lcom/sun/jersey/core/osgi/OsgiRegistry;
> at
>
> com.sun.jersey.spi.scanning.AnnotationScannerListener$AnnotatedClassVisitor.getClassForName(AnnotationScannerListener.java:217)
> at
>
> com.sun.jersey.spi.scanning.AnnotationScannerListener$AnnotatedClassVisitor.visitEnd(AnnotationScannerListener.java:186)
> at org.objectweb.asm.ClassReader.accept(Unknown Source)
> at org.objectweb.asm.ClassReader.accept(Unknown Source)
> at
>
> com.sun.jersey.spi.scanning.AnnotationScannerListener.onProcess(AnnotationScannerListener.java:136)
> at
>
> 

Re: Spark REST API shows Error 503 Service Unavailable

2015-12-17 Thread Marcelo Vanzin
Hi Prateek,

Are you using CDH 5.5 by any chance? We fixed this bug in an upcoming
patch. Unfortunately there's no workaround at the moment... it doesn't
affect upstream Spark either.

On Fri, Dec 11, 2015 at 2:05 PM, prateek arora
 wrote:
>
>
> Hi
>
> I am trying to access Spark Using REST API but got below error :
>
> Command :
>
> curl http://:18088/api/v1/applications
>
> Response:
>
>
> 
> 
> 
> Error 503 Service Unavailable
> 
> 
> HTTP ERROR 503
>
> Problem accessing /api/v1/applications. Reason:
> Service Unavailable
> Caused by:
> org.spark-project.jetty.servlet.ServletHolder$1:
> java.lang.reflect.InvocationTargetException
> at
> org.spark-project.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:496)
> at
> org.spark-project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:543)
> at
> org.spark-project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:415)
> at
> org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:657)
> at
> org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
> at
> org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
> at
> org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
> at
> org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
> at
> org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
> at
> org.spark-project.jetty.server.handler.GzipHandler.handle(GzipHandler.java:301)
> at
> org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
> at
> org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
> at org.spark-project.jetty.server.Server.handle(Server.java:370)
> at
> org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
> at
> org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
> at
> org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
> at
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
> at
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
> at
> org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
> at
> org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
> at
> org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
> com.sun.jersey.spi.container.servlet.WebComponent.createResourceConfig(WebComponent.java:728)
> at
> com.sun.jersey.spi.container.servlet.WebComponent.createResourceConfig(WebComponent.java:678)
> at
> com.sun.jersey.spi.container.servlet.WebComponent.init(WebComponent.java:203)
> at
> com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:373)
> at
> com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:556)
> at javax.servlet.GenericServlet.init(GenericServlet.java:244)
> at
> org.spark-project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:532)
> ... 22 more
> Caused by: java.lang.NoSuchMethodError:
> com.sun.jersey.core.reflection.ReflectionHelper.getOsgiRegistryInstance()Lcom/sun/jersey/core/osgi/OsgiRegistry;
> at
> com.sun.jersey.spi.scanning.AnnotationScannerListener$AnnotatedClassVisitor.getClassForName(AnnotationScannerListener.java:217)
> at
> com.sun.jersey.spi.scanning.AnnotationScannerListener$AnnotatedClassVisitor.visitEnd(AnnotationScannerListener.java:186)
> at org.objectweb.asm.ClassReader.accept(Unknown Source)
> at org.objectweb.asm.ClassReader.accept(Unknown Source)
> at
> com.sun.jersey.spi.scanning.AnnotationScannerListener.onProcess(AnnotationScannerListener.java:136)
> at
> com.sun.jersey.core.spi.scanning.JarFileScanner.scan(JarFileScanner.java:97)
> at
> 

Re: Spark REST API shows Error 503 Service Unavailable

2015-12-17 Thread Vikram Kone
No we are using standard spark w/ datastax cassandra. I'm able to see some
json when I do  http://10.1.40.16:7080/json/v1/applications
but getting the following errors when I do
http://10.1.40.16:7080/api/v1/applications

HTTP ERROR 503

Problem accessing /api/v1/applications. Reason:

Service Unavailable

Caused by:

org.spark-project.jetty.servlet.ServletHolder$1:
java.lang.reflect.InvocationTargetException
at 
org.spark-project.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:496)
at 
org.spark-project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:543)
at 
org.spark-project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:415)
at 
org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:657)
at 
org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
at 
org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
at 
org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
at 
org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
at 
org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
at 
org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
at 
org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
at org.spark-project.jetty.server.Server.handle(Server.java:370)
at 
org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
at 
org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
at 
org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
at 
org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
at 
org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at 
com.sun.jersey.spi.container.servlet.WebComponent.createResourceConfig(WebComponent.java:728)
at 
com.sun.jersey.spi.container.servlet.WebComponent.createResourceConfig(WebComponent.java:678)
at 
com.sun.jersey.spi.container.servlet.WebComponent.init(WebComponent.java:203)
at 
com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:373)
at 
com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:556)
at javax.servlet.GenericServlet.init(GenericServlet.java:244)
at 
org.spark-project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:532)
... 21 more
Caused by: java.lang.IncompatibleClassChangeError: Implementing class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
com.sun.jersey.api.core.ScanningResourceConfig.init(ScanningResourceConfig.java:79)
at 
com.sun.jersey.api.core.PackagesResourceConfig.init(PackagesResourceConfig.java:104)
at 
com.sun.jersey.api.core.PackagesResourceConfig.(PackagesResourceConfig.java:78)
at 

Re: Spark REST API shows Error 503 Service Unavailable

2015-12-17 Thread Marcelo Vanzin
On Thu, Dec 17, 2015 at 3:31 PM, Vikram Kone  wrote:
> No we are using standard spark w/ datastax cassandra. I'm able to see some
> json when I do  http://10.1.40.16:7080/json/v1/applications
> but getting the following errors when I do
> http://10.1.40.16:7080/api/v1/applications

That is actually a different exception from the original one posted by Prateek.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: looking for Spark streaming unit example written in Java

2015-12-17 Thread Andy Davidson
Hi Ted

I added the following hack to my gradle project. I am now able to run spark
streaming unit tests in my project.

Hopefully others will find this helpful

andy

dependencies {

providedgroup: 'commons-cli',  name: 'commons-cli',
version: '1.3+'

providedgroup: 'org.apache.spark', name: 'spark-sql_2.10',
version: '1.5.2'

providedgroup: 'org.apache.spark', name: 'spark-streaming_2.10',
version: '1.5.2'

providedgroup: 'org.apache.spark', name:
'spark-streaming-twitter_2.10', version: '1.5.2'



testCompile 'org.apache.spark:spark-streaming_2.10:1.5.2'

testCompile group: 'org.scalatest',name: 'scalatest_2.10',
version: '2.2.1'



// gradle can not find  dependencies that end in '-tests.jar' E.G.
spark-streaming_2.10-1.5.2-tests.jar

// as a work around we checked the binary into git

   

File sparkStreamingTestJar =
file('src/test/resources/sparkTestJarFiles/spark-streaming_2.10-1.5.2-tests.
jar')

testCompile files(sparkStreamingTestJar)

   

File sparkCoreTestJar =
file('src/test/resources/sparkTestJarFiles/spark-core_2.10-1.5.2-tests.jar')

testCompile files(sparkCoreTestJar)

}



From:  Andrew Davidson 
Date:  Wednesday, December 16, 2015 at 5:37 PM
To:  Ted Yu 
Cc:  "user @spark" 
Subject:  Re: looking for Spark streaming unit example written in Java

> Hi Ted
> 
> I am having a heck of a time trying use the JavaAPISuite code in my project.
> 
> I basically copied testTransform() into my local project. I can compile it
> using java 8 how ever I can not seem to get it to run.
> 
> com.pws.fantasy.ml.SparkStreamingTransformerTest > testTransform FAILED
> 
> java.lang.NoClassDefFoundError at SparkStreamingTransformerTest.java:73
> 
> Caused by: java.lang.ClassNotFoundException at
> SparkStreamingTransformerTest.java:73
> 
> 
> 
> Line 73 is
> 
>   JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc,
> inputData, 1);
> 
> 
> 
> 
> 
> This class is defined in
> spark-1.5.2/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.s
> cala
> 
> 
> 
> Do you know if the test jars are published? As a work around I downloaded the
> spark source and ran Œmvn install¹
> 
> 
> 
> $ ls -l 
> ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-streaming_2
> .10-1.5.2-tests.jar
> 
> -rw-r--r‹  1 ~  staff   1.8M Dec 16 16:37
> ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-streaming_2
> .10-1.5.2-tests.jar
> 
> $ 
> 
> 
> $ jar -tf 
> ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-streaming_2
> .10-1.5.2-tests.jar | grep JavaTestUtils
> 
> org/apache/spark/streaming/JavaTestUtils$.class
> 
> org/apache/spark/streaming/JavaTestUtils.class
> 
> $ 
> 
> 
> 
> My local project builds using gradle (I have never use grade before). I tried
> lots of hacks.  Not sure why it I can not use JavaTestUtils
> 
> 
> 
> dependencies {
> 
> providedgroup: 'commons-cli',  name: 'commons-cli',
> version: '1.3+'
> 
> providedgroup: 'org.apache.spark', name: 'spark-sql_2.10',
> version: '1.5.2'
> 
> // providedgroup: 'org.apache.spark', name: 'spark-streaming_2.10',
> version: '1.5.2'
> 
> provided 'org.apache.spark:spark-streaming_2.10:1.5.2'
> 
>  
> 
> // strange works a little better if testCompile is delcared twice
> 
> // testCompile 'org.apache.spark:spark-streaming_2.10:1.5.2'
> 
> // testCompile 'org.apache.spark:spark-streaming_2.10:1.5.2-tests'
> 
> testCompile
> files(Œ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-stre
> aming_2.10-1.5.2-tests.jar¹)
> 
> 
> 
> testCompile
> files(Œ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-stre
> aming_2.10-1.5.2-tests.jar')
> 
> testRuntime
> files(Œ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-stre
> aming_2.10-1.5.2-tests.jar')
> 
>  }
> 
> 
> 
> Any suggestions would be greatly appreciated.
> 
> 
> 
> Andy
> 
> 
> 
> $ javap JavaTestUtils.class
> 
> Compiled from "JavaTestUtils.scala"
> 
> public final class org.apache.spark.streaming.JavaTestUtils {
> 
>   public static org.scalatest.Status run(scala.Option,
> org.scalatest.Args);
> 
>   public static org.scalatest.Status runTest(java.lang.String,
> org.scalatest.Args);
> 
>   public static void after(scala.Function0);
> 
>   public static void before(scala.Function0);
> 
>   public static  int setupStreams$default$3();
> 
>   public static  boolean testOperation$default$4();
> 
>   public static  void
> testOperation(scala.collection.Seq,
> scala.collection.Seq,
> scala.Function2 org.apache.spark.streaming.dstream.DStream,
> org.apache.spark.streaming.dstream.DStream>,
> 

Re: Download Problem with Spark 1.5.2 pre-built for Hadoop 1.X

2015-12-17 Thread Jean-Baptiste Onofré

Hi,

we have a Jira about that (let me find it): by default, a suffix is 
appended causing issue to resolve the artifact.


Let me find the Jira and the workaround.

Regards
JB

On 12/17/2015 12:48 PM, abc123 wrote:

Get error message when I try to download Spark 1.5.2 pre-built for Hadoop
1.X. Can someone help me please?

Error:
http://d3kbcqa49mib13.cloudfront.net/spark-1.5.2-bin-hadoop1.tgz

NoSuchKey
The specified key does not exist.
spark-1.5.2-bin-hadoop1.tgz
CEA88FA320236296pWBY80tIxCQw5el9YdjTKta2aKAvIuKvo51RpnaU7YtxMxu37aki32yAWWnf2Qb8Lu+2zzI4msM=




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Download-Problem-with-Spark-1-5-2-pre-built-for-Hadoop-1-X-tp25726.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLlib: Feature Importances API

2015-12-17 Thread Yanbo Liang
Hi Asim,

I think it's not necessary to back port featureImportances to
mllib.tree.RandomForest. You can use ml.RandomForestClassifier and
ml.RandomForestRegressor directly.

Yanbo

2015-12-17 19:39 GMT+08:00 Asim Jalis :

> Yanbo,
>
> Thanks for the reply.
>
> Is there a JIRA for exposing featureImportances on
> org.apache.spark.mllib.tree.RandomForest?, or could you create one? I am
> unable to create an issue on JIRA against Spark.
>
> Thanks.
>
> Asim
>
> On Thu, Dec 17, 2015 at 12:07 AM, Yanbo Liang  wrote:
>
>> Hi Asim,
>>
>> The "featureImportances" is only exposed at ML not MLlib.
>> You need to update your code to use RandomForestClassifier of ML to train
>> and get one RandomForestClassificationModel. Then you can call
>> RandomForestClassificationModel.featureImportances
>> 
>> to get the importances of each feature.
>>
>> For how to use RandomForestClassifier, you can refer this example
>> 
>> .
>>
>> Yanbo
>>
>> 2015-12-17 13:41 GMT+08:00 Asim Jalis :
>>
>>> I wanted to use get feature importances related to a Random Forest as
>>> described in this JIRA: https://issues.apache.org/jira/browse/SPARK-5133
>>>
>>> However, I don’t see how to call this. I don't see any methods exposed
>>> on
>>>
>>> org.apache.spark.mllib.tree.RandomForest
>>>
>>> How can I get featureImportances when I generate a RandomForest model in
>>> this code?
>>>
>>> import org.apache.spark.mllib.linalg.Vectors
>>> import org.apache.spark.mllib.regression.LabeledPoint
>>> import org.apache.spark.mllib.tree.RandomForest
>>> import org.apache.spark.mllib.tree.model.RandomForestModel
>>> import org.apache.spark.mllib.util.MLUtils
>>> import org.apache.spark.rdd.RDD
>>> import util.Random
>>>
>>> def displayModel(model:RandomForestModel) = {
>>>   // Display model.
>>>   println("Learned classification tree model:\n" + model.toDebugString)
>>> }
>>>
>>> def saveModel(model:RandomForestModel,path:String) = {
>>>   // Save and load model.
>>>   model.save(sc, path)
>>>   val sameModel = DecisionTreeModel.load(sc, path)
>>> }
>>>
>>> def testModel(model:RandomForestModel,testData:RDD[LabeledPoint]) = {
>>>   // Test model.
>>>   val labelAndPreds = testData.map { point =>
>>> val prediction = model.predict(point.features)
>>> (point.label, prediction)
>>>   }
>>>   val testErr = labelAndPreds.
>>> filter(r => r._1 != r._2).count.toDouble / testData.count()
>>>   println("Test Error = " + testErr)
>>> }
>>>
>>> def buildModel(trainingData:RDD[LabeledPoint],
>>>   numClasses:Int,categoricalFeaturesInfo:Map[Int,Int]) = {
>>>   val numTrees = 30
>>>   val featureSubsetStrategy = "auto"
>>>   val impurity = "gini"
>>>   val maxDepth = 4
>>>   val maxBins = 32
>>>
>>>   // Build model.
>>>   val model = RandomForest.trainClassifier(
>>> trainingData, numClasses, categoricalFeaturesInfo,
>>> numTrees, featureSubsetStrategy, impurity, maxDepth,
>>> maxBins)
>>>
>>>   model
>>> }
>>>
>>> // Create plain RDD.
>>> val rdd = sc.parallelize(Range(0,1000))
>>>
>>> // Convert to LabeledPoint RDD.
>>> val data = rdd.
>>>   map(x => {
>>> val label = x % 2
>>> val feature1 = x % 5
>>> val feature2 = x % 7
>>> val features = Seq(feature1,feature2).
>>>   map(_.toDouble).
>>>   zipWithIndex.
>>>   map(_.swap)
>>> val vector = Vectors.sparse(features.size, features)
>>> val point = new LabeledPoint(label, vector)
>>> point })
>>>
>>> // Split data into training (70%) and test (30%).
>>> val splits = data.randomSplit(Array(0.7, 0.3))
>>> val (trainingData, testData) = (splits(0), splits(1))
>>>
>>> // Set up parameters for training.
>>> val numClasses = data.map(_.label).distinct.count.toInt
>>> val categoricalFeaturesInfo = Map[Int, Int]()
>>>
>>> val model = buildModel(
>>> trainingData,
>>> numClasses,
>>> categoricalFeaturesInfo)
>>> testModel(model,testData)
>>>
>>>
>>
>


Python 3.x support

2015-12-17 Thread YaoPau
I found the jira for Python 3 support  here
  , but it looks like
support for 3.4 was still unresolved.

Which Python 3 versions are supported by Spark 1.5?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-3-x-support-tp25731.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Base ERROR

2015-12-17 Thread Jeff Zhang
I believe this is hbase issue, you'd better to ask on hbase mail list.



On Fri, Dec 18, 2015 at 9:57 AM, censj  wrote:

> hi,all:
> I wirte data to hbase,but Hbase arise this ERROR,Could you help me?
>
>
> r.KeeperException$SessionExpiredException: KeeperErrorCode = Session
> expired for /hbase-unsecure/rs/byd0157,16020,1449106975377
> 2015-12-17 21:24:29,854 WARN  [regionserver/byd0157/192.168.0.157:16020]
> zookeeper.RecoverableZooKeeper: Possibly transient ZooKeeper,
> quorum=byd0151:2181,byd0150:2181,byd0152:2181,
> exception=org.apache.zookeeper.KeeperException$SessionExpiredException:
> KeeperErrorCode = Session expired for
> /hbase-unsecure/rs/byd0157,16020,1449106975377
> 2015-12-17 21:24:29,854 ERROR [regionserver/byd0157/192.168.0.157:16020]
> zookeeper.RecoverableZooKeeper: ZooKeeper delete failed after 4 attempts
> 2015-12-17 21:24:29,854 WARN  [regionserver/byd0157/192.168.0.157:16020]
> regionserver.HRegionServer: Failed deleting my ephemeral node
> org.apache.zookeeper.KeeperException$SessionExpiredException:
> KeeperErrorCode = Session expired for
> /hbase-unsecure/rs/byd0157,16020,1449106975377
>at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
>at
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.delete(RecoverableZooKeeper.java:179)
>at
> org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1345)
>at
> org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1334)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServer.deleteMyEphemeralNode(HRegionServer.java:1393)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServer.run(HRegionServer.java:1076)
>at java.lang.Thread.run(Thread.java:745)
> 2015-12-17 21:24:29,855 INFO  [regionserver/byd0157/192.168.0.157:16020]
> regionserver.HRegionServer: stopping server byd0157,16020,1449106975377;
> zookeeper connection closed.
> 2015-12-17 21:24:29,855 INFO  [regionserver/byd0157/192.168.0.157:16020]
> regionserver.HRegionServer: regionserver/byd0157/192.168.0.157:16020
> exiting
> 2015-12-17 21:24:29,858 ERROR [main]
> regionserver.HRegionServerCommandLine: Region server exiting
> java.lang.RuntimeException: HRegionServer Aborted
>at
> org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.start(HRegionServerCommandLine.java:68)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.run(HRegionServerCommandLine.java:87)
>at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>at
> org.apache.hadoop.hbase.util.ServerCommandLine.doMain(ServerCommandLine.java:126)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServer.main(HRegionServer.java:2641)
> 2015-12-17 21:24:29,940 INFO  [Thread-6] regionserver.ShutdownHook:
> Shutdown hook starting; hbase.shutdown.hook=true;
> fsShutdownHook=org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer@6de54b40
> 2015-12-17 21:24:29,942 INFO  [Thread-6] regionserver.ShutdownHook:
> Starting fs shutdown hook thread.
> 2015-12-17 21:24:29,953 INFO  [Thread-6] regionserver.ShutdownHook:
> Shutdown hook finished.
>
>
>


-- 
Best Regards

Jeff Zhang


Re: HiveContext Self join not reading from cache

2015-12-17 Thread Gourav Sengupta
Hi Ted,

The self join works fine on tbales where the hivecontext tables are direct
hive tables, therefore

table1 = hiveContext.sql("select columnA, columnB from hivetable1")
table1.registerTempTable("table1")
table1.cache()
table1.count()

and if I do a self join on table1 things are quite fine

But in case we have something like this:
table1 = hiveContext.sql("select columnA, columnB from hivetable1")
table1.registerTempTable("table1")
table1.cache()
table1.count()

table2 = hiveContext.sql("select columnA, columnB from hivetable2")
table2.registerTempTable("table2")
table2.cache()
table2.count()

table3 = hiveContext.sql("select table1.* from table1 table2 where
table1.columnA = table2.columnA")
table3.registerTempTable("table3")
table3.cache()
table3.count()


then the self join on table3 does not take data from table3 cache, neither
from table1 or table2 cache but starts taking data directly from S3 - which
as you would understand does not make any sense.


Regards,
Gourav





On Wed, Dec 16, 2015 at 7:16 PM, Ted Yu  wrote:

> I did the following exercise in spark-shell ("c" is cached table):
>
> scala> sqlContext.sql("select x.b from c x join c y on x.a = y.a").explain
> == Physical Plan ==
> Project [b#4]
> +- BroadcastHashJoin [a#3], [a#125], BuildRight
>:- InMemoryColumnarTableScan [b#4,a#3], InMemoryRelation [a#3,b#4,c#5],
> true, 1, StorageLevel(true, true, false, true, 1), ConvertToUnsafe,
> Some(c)
>+- InMemoryColumnarTableScan [a#125], InMemoryRelation
> [a#125,b#126,c#127], true, 1, StorageLevel(true, true, false, true, 1),
> ConvertToUnsafe, Some(c)
>
> sqlContext.sql("select x.b, y.c from c x join c y on x.a =
> y.a").registerTempTable("d")
> scala> sqlContext.cacheTable("d")
>
> scala> sqlContext.sql("select x.b from d x join d y on x.c = y.c").explain
> == Physical Plan ==
> Project [b#4]
> +- SortMergeJoin [c#90], [c#253]
>:- Sort [c#90 ASC], false, 0
>:  +- TungstenExchange hashpartitioning(c#90,200), None
>: +- InMemoryColumnarTableScan [b#4,c#90], InMemoryRelation
> [b#4,c#90], true, 1, StorageLevel(true, true, false, true, 1), Project
> [b#4,c#90], Some(d)
>+- Sort [c#253 ASC], false, 0
>   +- TungstenExchange hashpartitioning(c#253,200), None
>  +- InMemoryColumnarTableScan [c#253], InMemoryRelation
> [b#246,c#253], true, 1, StorageLevel(true, true, false, true, 1),
> Project [b#4,c#90], Some(d)
>
> Is the above what you observed ?
>
> Cheers
>
> On Wed, Dec 16, 2015 at 9:34 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> This is how the data  can be created:
>>
>> 1. TableA : cached()
>> 2. TableB : cached()
>> 3. TableC: TableA inner join TableB cached()
>> 4. TableC join TableC does not take the data from cache but starts
>> reading the data for TableA and TableB from disk.
>>
>> Does this sound like a bug? The self join between TableA and TableB are
>> working fine and taking data from cache.
>>
>>
>> Regards,
>> Gourav
>>
>
>


number of blocks in ALS/recommendation API

2015-12-17 Thread Roberto Pagliari
What is the meaning of the 'blocks' input argument in mllib ALS implementation, 
and how does that relate to the number of executors and/or size of the input 
data?


Thank you,



Re: MLlib: Feature Importances API

2015-12-17 Thread Asim Jalis
Yanbo,

Thanks for the reply.

Is there a JIRA for exposing featureImportances on
org.apache.spark.mllib.tree.RandomForest?, or could you create one? I am
unable to create an issue on JIRA against Spark.

Thanks.

Asim

On Thu, Dec 17, 2015 at 12:07 AM, Yanbo Liang  wrote:

> Hi Asim,
>
> The "featureImportances" is only exposed at ML not MLlib.
> You need to update your code to use RandomForestClassifier of ML to train
> and get one RandomForestClassificationModel. Then you can call
> RandomForestClassificationModel.featureImportances
> 
> to get the importances of each feature.
>
> For how to use RandomForestClassifier, you can refer this example
> 
> .
>
> Yanbo
>
> 2015-12-17 13:41 GMT+08:00 Asim Jalis :
>
>> I wanted to use get feature importances related to a Random Forest as
>> described in this JIRA: https://issues.apache.org/jira/browse/SPARK-5133
>>
>> However, I don’t see how to call this. I don't see any methods exposed on
>>
>> org.apache.spark.mllib.tree.RandomForest
>>
>> How can I get featureImportances when I generate a RandomForest model in
>> this code?
>>
>> import org.apache.spark.mllib.linalg.Vectors
>> import org.apache.spark.mllib.regression.LabeledPoint
>> import org.apache.spark.mllib.tree.RandomForest
>> import org.apache.spark.mllib.tree.model.RandomForestModel
>> import org.apache.spark.mllib.util.MLUtils
>> import org.apache.spark.rdd.RDD
>> import util.Random
>>
>> def displayModel(model:RandomForestModel) = {
>>   // Display model.
>>   println("Learned classification tree model:\n" + model.toDebugString)
>> }
>>
>> def saveModel(model:RandomForestModel,path:String) = {
>>   // Save and load model.
>>   model.save(sc, path)
>>   val sameModel = DecisionTreeModel.load(sc, path)
>> }
>>
>> def testModel(model:RandomForestModel,testData:RDD[LabeledPoint]) = {
>>   // Test model.
>>   val labelAndPreds = testData.map { point =>
>> val prediction = model.predict(point.features)
>> (point.label, prediction)
>>   }
>>   val testErr = labelAndPreds.
>> filter(r => r._1 != r._2).count.toDouble / testData.count()
>>   println("Test Error = " + testErr)
>> }
>>
>> def buildModel(trainingData:RDD[LabeledPoint],
>>   numClasses:Int,categoricalFeaturesInfo:Map[Int,Int]) = {
>>   val numTrees = 30
>>   val featureSubsetStrategy = "auto"
>>   val impurity = "gini"
>>   val maxDepth = 4
>>   val maxBins = 32
>>
>>   // Build model.
>>   val model = RandomForest.trainClassifier(
>> trainingData, numClasses, categoricalFeaturesInfo,
>> numTrees, featureSubsetStrategy, impurity, maxDepth,
>> maxBins)
>>
>>   model
>> }
>>
>> // Create plain RDD.
>> val rdd = sc.parallelize(Range(0,1000))
>>
>> // Convert to LabeledPoint RDD.
>> val data = rdd.
>>   map(x => {
>> val label = x % 2
>> val feature1 = x % 5
>> val feature2 = x % 7
>> val features = Seq(feature1,feature2).
>>   map(_.toDouble).
>>   zipWithIndex.
>>   map(_.swap)
>> val vector = Vectors.sparse(features.size, features)
>> val point = new LabeledPoint(label, vector)
>> point })
>>
>> // Split data into training (70%) and test (30%).
>> val splits = data.randomSplit(Array(0.7, 0.3))
>> val (trainingData, testData) = (splits(0), splits(1))
>>
>> // Set up parameters for training.
>> val numClasses = data.map(_.label).distinct.count.toInt
>> val categoricalFeaturesInfo = Map[Int, Int]()
>>
>> val model = buildModel(
>> trainingData,
>> numClasses,
>> categoricalFeaturesInfo)
>> testModel(model,testData)
>>
>>
>


Download Problem with Spark 1.5.2 pre-built for Hadoop 1.X

2015-12-17 Thread abc123
Get error message when I try to download Spark 1.5.2 pre-built for Hadoop
1.X. Can someone help me please?

Error:
http://d3kbcqa49mib13.cloudfront.net/spark-1.5.2-bin-hadoop1.tgz

NoSuchKey
The specified key does not exist.
spark-1.5.2-bin-hadoop1.tgz
CEA88FA320236296pWBY80tIxCQw5el9YdjTKta2aKAvIuKvo51RpnaU7YtxMxu37aki32yAWWnf2Qb8Lu+2zzI4msM=




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Download-Problem-with-Spark-1-5-2-pre-built-for-Hadoop-1-X-tp25726.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-17 Thread Jakob Odersky
It might be a good idea to see how many files are open and try increasing
the open file limit (this is done on an os level). In some application
use-cases it is actually a legitimate need.

If that doesn't help, make sure you close any unused files and streams in
your code. It will also be easier to help diagnose the issue if you send an
error-reproducing snippet.


Re: SparkContext.cancelJob - what part of Spark uses it? Nothing in webUI to kill jobs?

2015-12-17 Thread Jacek Laskowski
Thanks Mark! That helped a lot, and my takeaway from it is to...back
away now! :) I'm following the advice as there's simply too much at
the moment to learn in Spark.

Pozdrawiam,
Jacek

Jacek Laskowski | https://medium.com/@jaceklaskowski/
Mastering Apache Spark
==> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
Follow me at https://twitter.com/jaceklaskowski


On Thu, Dec 17, 2015 at 1:13 PM, Mark Hamstra  wrote:
> Ah, sorry for leading you astray a bit.  I was working from memory instead
> of looking at the code, and was probably thinking back all the way to
> Reynold's initial implementation of SparkContext#killJob(), which was
> public.  I'd have to do some digging to determine exactly when and why
> SparkContext#cancelJob() became private[spark].  Of course, the other
> problem is that more often than not I am working with custom builds of
> Spark, and I'm not beyond changing selected things from private to public.
> :)
>
> When you start talking about doing some checks before killing a job, I
> imagine that you are thinking about something like checking that parts of a
> job are not needed by other jobs, etc.  That's a reasonable idea, but the
> realization of that goal is not simple -- especially not when you start
> including asynchronous execution with various timeouts or other events
> requesting cancellation, or more extensive reuse functionality as in
> https://issues.apache.org/jira/browse/SPARK-11838  If you don't want to
> spend a lot of time looking at Job cancellation issues, best to back away
> now! :)
>
> On Wed, Dec 16, 2015 at 4:26 PM, Jacek Laskowski  wrote:
>>
>> Thanks Mark for the answer! It helps, but still leaves me with few
>> more questions. If you don't mind, I'd like to ask you few more
>> questions.
>>
>> When you said "It can be used, and is used in user code, but it isn't
>> always as straightforward as you might think." did you think about the
>> Spark code or some other user code? Can I have a look at the code and
>> the use case? The method is `private[spark]` and it's not even
>> @DeveloperApi that makes using the method even more risky. I believe
>> it's a very low-level ingredient of Spark that very few people use if
>> at all. If I could see the code that uses the method, that could help.
>>
>> Following up, isn't killing a stage similar to killing a job? They can
>> both be shared and I could imagine a very similar case for killing a
>> job as for a stage where an implementation does some checks before
>> killing the job eventually. It is possible for stages that are in a
>> sense similar to jobs so...I'm still unsure why the method is not used
>> by Spark itself. If it's not used by Spark why could it be useful for
>> others outside Spark?
>>
>> Doh, why did I come across the method? It will take some time before I
>> forget about it :-)
>>
>> Pozdrawiam,
>> Jacek
>>
>> --
>> Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
>> http://blog.jaceklaskowski.pl
>> Mastering Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
>> Follow me at https://twitter.com/jaceklaskowski
>> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>>
>>
>> On Wed, Dec 16, 2015 at 10:55 AM, Mark Hamstra 
>> wrote:
>> > It can be used, and is used in user code, but it isn't always as
>> > straightforward as you might think.  This is mostly because a Job often
>> > isn't a Job -- or rather it is more than one Job.  There are several RDD
>> > transformations that aren't lazy, so they end up launching "hidden" Jobs
>> > that you may not anticipate and may expect to be canceled (but won't be)
>> > by
>> > a cancelJob() called on a later action on that transformed RDD.  It is
>> > also
>> > possible for a single DataFrame or Spark SQL query to result in more
>> > than
>> > one running Job.  The upshot of all of this is that getting cancelJob()
>> > to
>> > work as most users would expect all the time is non-trivial, and most of
>> > the
>> > time using a jobGroup is a better way to capture what may be more than
>> > one
>> > Job that the user is thinking of as a single Job.
>> >
>> > On Wed, Dec 16, 2015 at 5:34 AM, Sean Owen  wrote:
>> >>
>> >> It does look like it's not actually used. It may simply be there for
>> >> completeness, to match cancelStage and cancelJobGroup, which are used.
>> >> I also don't know of a good reason there's no way to kill a whole job.
>> >>
>> >> On Wed, Dec 16, 2015 at 1:15 PM, Jacek Laskowski 
>> >> wrote:
>> >> > Hi,
>> >> >
>> >> > While reviewing Spark code I came across SparkContext.cancelJob. I
>> >> > found no part of Spark using it. Is this a leftover after some
>> >> > refactoring? Why is this part of sc?
>> >> >
>> >> > The reason I'm asking is another question I'm having after having
>> >> > learnt about killing a stage in webUI. I noticed there is a way to
>> >> > kill/cancel stages, but no corresponding 

Base ERROR

2015-12-17 Thread censj
hi,all:
I wirte data to hbase,but Hbase arise this ERROR,Could you help me?
> 
> r.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired 
> for /hbase-unsecure/rs/byd0157,16020,1449106975377
> 2015-12-17 21:24:29,854 WARN  [regionserver/byd0157/192.168.0.157:16020] 
> zookeeper.RecoverableZooKeeper: Possibly transient ZooKeeper, 
> quorum=byd0151:2181,byd0150:2181,byd0152:2181, 
> exception=org.apache.zookeeper.KeeperException$SessionExpiredException: 
> KeeperErrorCode = Session expired for 
> /hbase-unsecure/rs/byd0157,16020,1449106975377
> 2015-12-17 21:24:29,854 ERROR [regionserver/byd0157/192.168.0.157:16020] 
> zookeeper.RecoverableZooKeeper: ZooKeeper delete failed after 4 attempts
> 2015-12-17 21:24:29,854 WARN  [regionserver/byd0157/192.168.0.157:16020] 
> regionserver.HRegionServer: Failed deleting my ephemeral node
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired for /hbase-unsecure/rs/byd0157,16020,1449106975377
>at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
>at 
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.delete(RecoverableZooKeeper.java:179)
>at 
> org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1345)
>at 
> org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1334)
>at 
> org.apache.hadoop.hbase.regionserver.HRegionServer.deleteMyEphemeralNode(HRegionServer.java:1393)
>at 
> org.apache.hadoop.hbase.regionserver.HRegionServer.run(HRegionServer.java:1076)
>at java.lang.Thread.run(Thread.java:745)
> 2015-12-17 21:24:29,855 INFO  [regionserver/byd0157/192.168.0.157:16020] 
> regionserver.HRegionServer: stopping server byd0157,16020,1449106975377; 
> zookeeper connection closed.
> 2015-12-17 21:24:29,855 INFO  [regionserver/byd0157/192.168.0.157:16020] 
> regionserver.HRegionServer: regionserver/byd0157/192.168.0.157:16020 exiting
> 2015-12-17 21:24:29,858 ERROR [main] regionserver.HRegionServerCommandLine: 
> Region server exiting
> java.lang.RuntimeException: HRegionServer Aborted
>at 
> org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.start(HRegionServerCommandLine.java:68)
>at 
> org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.run(HRegionServerCommandLine.java:87)
>at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>at 
> org.apache.hadoop.hbase.util.ServerCommandLine.doMain(ServerCommandLine.java:126)
>at 
> org.apache.hadoop.hbase.regionserver.HRegionServer.main(HRegionServer.java:2641)
> 2015-12-17 21:24:29,940 INFO  [Thread-6] regionserver.ShutdownHook: Shutdown 
> hook starting; hbase.shutdown.hook=true; 
> fsShutdownHook=org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer@6de54b40
> 2015-12-17 21:24:29,942 INFO  [Thread-6] regionserver.ShutdownHook: Starting 
> fs shutdown hook thread.
> 2015-12-17 21:24:29,953 INFO  [Thread-6] regionserver.ShutdownHook: Shutdown 
> hook finished.



Re: Can't run spark on yarn

2015-12-17 Thread Saisai Shao
Please check the Yarn AM log to see why AM is failed to start. That's the
reason why using `sc` will get such complaint.

On Fri, Dec 18, 2015 at 4:25 AM, Eran Witkon  wrote:

> Hi,
> I am trying to install spark 1.5.2 on Apache hadoop 2.6 and Hive and yarn
>
> spark-env.sh
> export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
>
> bash_profile
> #HADOOP VARIABLES START
> export JAVA_HOME=/usr/lib/jvm/java-8-oracle/
> export HADOOP_INSTALL=/usr/local/hadoop
> export PATH=$PATH:$HADOOP_INSTALL/bin
> export PATH=$PATH:$HADOOP_INSTALL/sbin
> export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
> export HADOOP_COMMON_HOME=$HADOOP_INSTALL
> export HADOOP_HDFS_HOME=$HADOOP_INSTALL
> export YARN_HOME=$HADOOP_INSTALL
> export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
> export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib"
> export HADOOP_USER_CLASSPATH_FIRST=true
> export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
> export YARN_CONF_DIR=/usr/local/hadoop/etc/hadoop
> #HADOOP VARIABLES END
>
> export SPARK_HOME=/usr/local/spark
> export HIVE_HOME=/usr/local/hive
> export PATH=$PATH:$HIVE_HOME/bin
>
>
> When I run spark-shell
> ./bin/spark-shell --master yarn-client
>
> Output:
> 15/12/17 22:22:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/12/17 22:22:07 INFO spark.SecurityManager: Changing view acls to: hduser
> 15/12/17 22:22:07 INFO spark.SecurityManager: Changing modify acls to:
> hduser
> 15/12/17 22:22:07 INFO spark.SecurityManager: SecurityManager:
> authentication disabled; ui acls disabled; users with view permissions:
> Set(hduser); users with modify permissions: Set(hduser)
> 15/12/17 22:22:07 INFO spark.HttpServer: Starting HTTP Server
> 15/12/17 22:22:07 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 15/12/17 22:22:08 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:38389
> 15/12/17 22:22:08 INFO util.Utils: Successfully started service 'HTTP
> class server' on port 38389.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.5.2
>   /_/
>
> Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_66)
> Type in expressions to have them evaluated.
> Type :help for more information.
> 15/12/17 22:22:11 WARN util.Utils: Your hostname, eranw-Lenovo-Yoga-2-Pro
> resolves to a loopback address: 127.0.1.1; using 10.0.0.1 instead (on
> interface wlp1s0)
> 15/12/17 22:22:11 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind
> to another address
> 15/12/17 22:22:11 INFO spark.SparkContext: Running Spark version 1.5.2
> 15/12/17 22:22:11 INFO spark.SecurityManager: Changing view acls to: hduser
> 15/12/17 22:22:11 INFO spark.SecurityManager: Changing modify acls to:
> hduser
> 15/12/17 22:22:11 INFO spark.SecurityManager: SecurityManager:
> authentication disabled; ui acls disabled; users with view permissions:
> Set(hduser); users with modify permissions: Set(hduser)
> 15/12/17 22:22:11 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 15/12/17 22:22:11 INFO Remoting: Starting remoting
> 15/12/17 22:22:12 INFO util.Utils: Successfully started service
> 'sparkDriver' on port 36381.
> 15/12/17 22:22:12 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@10.0.0.1:36381]
> 15/12/17 22:22:12 INFO spark.SparkEnv: Registering MapOutputTracker
> 15/12/17 22:22:12 INFO spark.SparkEnv: Registering BlockManagerMaster
> 15/12/17 22:22:12 INFO storage.DiskBlockManager: Created local directory
> at /tmp/blockmgr-139fac31-5f21-4c61-9575-3110d5205f7d
> 15/12/17 22:22:12 INFO storage.MemoryStore: MemoryStore started with
> capacity 530.0 MB
> 15/12/17 22:22:12 INFO spark.HttpFileServer: HTTP File server directory is
> /tmp/spark-955ef002-a802-49c6-b440-0656861f737c/httpd-2127cbe1-97d7-40a5-a96f-75216f115f00
> 15/12/17 22:22:12 INFO spark.HttpServer: Starting HTTP Server
> 15/12/17 22:22:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 15/12/17 22:22:12 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:36760
> 15/12/17 22:22:12 INFO util.Utils: Successfully started service 'HTTP file
> server' on port 36760.
> 15/12/17 22:22:12 INFO spark.SparkEnv: Registering OutputCommitCoordinator
> 15/12/17 22:22:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 15/12/17 22:22:12 INFO server.AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:4040
> 15/12/17 22:22:12 INFO util.Utils: Successfully started service 'SparkUI'
> on port 4040.
> 15/12/17 22:22:12 INFO ui.SparkUI: Started SparkUI at http://10.0.0.1:4040
> 15/12/17 22:22:12 WARN metrics.MetricsSystem: Using default name
> DAGScheduler for source because spark.app.id is not set.
> 15/12/17 22:22:12 INFO client.RMProxy: Connecting to ResourceManager at /
> 0.0.0.0:8032
> 15/12/17 22:22:12 INFO yarn.Client: Requesting a new application from
> cluster with 1 NodeManagers
> 15/12/17 22:22:12 

Re: pyspark + kafka + streaming = NoSuchMethodError

2015-12-17 Thread Shixiong Zhu
What's the Scala version of your Spark? Is it 2.10?

Best Regards,
Shixiong Zhu

2015-12-17 10:10 GMT-08:00 Christos Mantas :

> Hello,
>
> I am trying to set up a simple example with Spark Streaming (Python) and
> Kafka on a single machine deployment.
> My Kafka broker/server is also on the same machine (localhost:1281) and I
> am using Spark Version: spark-1.5.2-bin-hadoop2.6
>
> Python code
>
> ...
> ssc = StreamingContext(sc, 1)
> ...
> lines = KafkaUtils.createDirectStream(ssc, ["test"],
> {"metadata.broker.list":"localhost:1281"})
>
>
> So I try
>
> spark-submit --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar
> my_kafka_streaming_wordcount.py
>
> OR
>
> spark-submit --packages  org.apache.spark:spark-streaming-kafka_2.11:1.5.2
> my_kafka_streaming_wordcount.py
> (my kafka version is 2.11-0.9.0.0)
>
> OR
>
> pyspark  --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar  [import
> stuff and type those lines]
>
>
> and I end up with:
>
> 15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/12/17 19:45:00 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> Traceback (most recent call last):
>   File "/I/edited/the/path/here/my_kafka_streaming_wordcount.py", line 80,
> in 
> lines = KafkaUtils.createDirectStream(ssc, ["test"],
> {"metadata.broker.list":"localhost:1281"})
>   File
> "/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
> line 130, in createDirectStream
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o29.createDirectStream.
> : java.lang.NoSuchMethodError:
> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
> at kafka.api.RequestKeys$.(RequestKeys.scala:39)
> at kafka.api.RequestKeys$.(RequestKeys.scala)
> at kafka.api.TopicMetadataRequest.(TopicMetadataRequest.scala:53)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
> at
> org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
>
> Am I missing something?
>
> Thanks in advance
> Chris M.
>
>
>
>
>
>


Re: pyspark + kafka + streaming = NoSuchMethodError

2015-12-17 Thread Luciano Resende
Unless you built your own Spark distribution with Scala 2_11, you want to
use the 2.10 dependency :

   --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2

On Thu, Dec 17, 2015 at 10:10 AM, Christos Mantas  wrote:

> Hello,
>
> I am trying to set up a simple example with Spark Streaming (Python) and
> Kafka on a single machine deployment.
> My Kafka broker/server is also on the same machine (localhost:1281) and I
> am using Spark Version: spark-1.5.2-bin-hadoop2.6
>
> Python code
>
> ...
> ssc = StreamingContext(sc, 1)
> ...
> lines = KafkaUtils.createDirectStream(ssc, ["test"],
> {"metadata.broker.list":"localhost:1281"})
>
>
> So I try
>
> spark-submit --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar
> my_kafka_streaming_wordcount.py
>
> OR
>
> spark-submit --packages  org.apache.spark:spark-streaming-kafka_2.11:1.5.2
> my_kafka_streaming_wordcount.py
> (my kafka version is 2.11-0.9.0.0)
>
> OR
>
> pyspark  --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar  [import
> stuff and type those lines]
>
>
> and I end up with:
>
> 15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/12/17 19:45:00 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> Traceback (most recent call last):
>   File "/I/edited/the/path/here/my_kafka_streaming_wordcount.py", line 80,
> in 
> lines = KafkaUtils.createDirectStream(ssc, ["test"],
> {"metadata.broker.list":"localhost:1281"})
>   File
> "/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
> line 130, in createDirectStream
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o29.createDirectStream.
> : java.lang.NoSuchMethodError:
> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
> at kafka.api.RequestKeys$.(RequestKeys.scala:39)
> at kafka.api.RequestKeys$.(RequestKeys.scala)
> at kafka.api.TopicMetadataRequest.(TopicMetadataRequest.scala:53)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
> at
> org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
>
> Am I missing something?
>
> Thanks in advance
> Chris M.
>
>
>
>
>
>


-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: Can't run spark on yarn

2015-12-17 Thread Alexander Pivovarov
Try to start aws EMR 4.2.0 with hadoop and spark applications on spot
instances. Then look at how hadoop and spark configured. Try to configure
your hadoop and spark similar way
On Dec 17, 2015 6:09 PM, "Saisai Shao"  wrote:

> Please check the Yarn AM log to see why AM is failed to start. That's the
> reason why using `sc` will get such complaint.
>
> On Fri, Dec 18, 2015 at 4:25 AM, Eran Witkon  wrote:
>
>> Hi,
>> I am trying to install spark 1.5.2 on Apache hadoop 2.6 and Hive and yarn
>>
>> spark-env.sh
>> export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
>>
>> bash_profile
>> #HADOOP VARIABLES START
>> export JAVA_HOME=/usr/lib/jvm/java-8-oracle/
>> export HADOOP_INSTALL=/usr/local/hadoop
>> export PATH=$PATH:$HADOOP_INSTALL/bin
>> export PATH=$PATH:$HADOOP_INSTALL/sbin
>> export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
>> export HADOOP_COMMON_HOME=$HADOOP_INSTALL
>> export HADOOP_HDFS_HOME=$HADOOP_INSTALL
>> export YARN_HOME=$HADOOP_INSTALL
>> export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
>> export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib"
>> export HADOOP_USER_CLASSPATH_FIRST=true
>> export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
>> export YARN_CONF_DIR=/usr/local/hadoop/etc/hadoop
>> #HADOOP VARIABLES END
>>
>> export SPARK_HOME=/usr/local/spark
>> export HIVE_HOME=/usr/local/hive
>> export PATH=$PATH:$HIVE_HOME/bin
>>
>>
>> When I run spark-shell
>> ./bin/spark-shell --master yarn-client
>>
>> Output:
>> 15/12/17 22:22:07 WARN util.NativeCodeLoader: Unable to load
>> native-hadoop library for your platform... using builtin-java classes where
>> applicable
>> 15/12/17 22:22:07 INFO spark.SecurityManager: Changing view acls to:
>> hduser
>> 15/12/17 22:22:07 INFO spark.SecurityManager: Changing modify acls to:
>> hduser
>> 15/12/17 22:22:07 INFO spark.SecurityManager: SecurityManager:
>> authentication disabled; ui acls disabled; users with view permissions:
>> Set(hduser); users with modify permissions: Set(hduser)
>> 15/12/17 22:22:07 INFO spark.HttpServer: Starting HTTP Server
>> 15/12/17 22:22:07 INFO server.Server: jetty-8.y.z-SNAPSHOT
>> 15/12/17 22:22:08 INFO server.AbstractConnector: Started
>> SocketConnector@0.0.0.0:38389
>> 15/12/17 22:22:08 INFO util.Utils: Successfully started service 'HTTP
>> class server' on port 38389.
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/___/ .__/\_,_/_/ /_/\_\   version 1.5.2
>>   /_/
>>
>> Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
>> 1.8.0_66)
>> Type in expressions to have them evaluated.
>> Type :help for more information.
>> 15/12/17 22:22:11 WARN util.Utils: Your hostname, eranw-Lenovo-Yoga-2-Pro
>> resolves to a loopback address: 127.0.1.1; using 10.0.0.1 instead (on
>> interface wlp1s0)
>> 15/12/17 22:22:11 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind
>> to another address
>> 15/12/17 22:22:11 INFO spark.SparkContext: Running Spark version 1.5.2
>> 15/12/17 22:22:11 INFO spark.SecurityManager: Changing view acls to:
>> hduser
>> 15/12/17 22:22:11 INFO spark.SecurityManager: Changing modify acls to:
>> hduser
>> 15/12/17 22:22:11 INFO spark.SecurityManager: SecurityManager:
>> authentication disabled; ui acls disabled; users with view permissions:
>> Set(hduser); users with modify permissions: Set(hduser)
>> 15/12/17 22:22:11 INFO slf4j.Slf4jLogger: Slf4jLogger started
>> 15/12/17 22:22:11 INFO Remoting: Starting remoting
>> 15/12/17 22:22:12 INFO util.Utils: Successfully started service
>> 'sparkDriver' on port 36381.
>> 15/12/17 22:22:12 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp://sparkDriver@10.0.0.1:36381]
>> 15/12/17 22:22:12 INFO spark.SparkEnv: Registering MapOutputTracker
>> 15/12/17 22:22:12 INFO spark.SparkEnv: Registering BlockManagerMaster
>> 15/12/17 22:22:12 INFO storage.DiskBlockManager: Created local directory
>> at /tmp/blockmgr-139fac31-5f21-4c61-9575-3110d5205f7d
>> 15/12/17 22:22:12 INFO storage.MemoryStore: MemoryStore started with
>> capacity 530.0 MB
>> 15/12/17 22:22:12 INFO spark.HttpFileServer: HTTP File server directory
>> is
>> /tmp/spark-955ef002-a802-49c6-b440-0656861f737c/httpd-2127cbe1-97d7-40a5-a96f-75216f115f00
>> 15/12/17 22:22:12 INFO spark.HttpServer: Starting HTTP Server
>> 15/12/17 22:22:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
>> 15/12/17 22:22:12 INFO server.AbstractConnector: Started
>> SocketConnector@0.0.0.0:36760
>> 15/12/17 22:22:12 INFO util.Utils: Successfully started service 'HTTP
>> file server' on port 36760.
>> 15/12/17 22:22:12 INFO spark.SparkEnv: Registering OutputCommitCoordinator
>> 15/12/17 22:22:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
>> 15/12/17 22:22:12 INFO server.AbstractConnector: Started
>> SelectChannelConnector@0.0.0.0:4040
>> 15/12/17 22:22:12 INFO util.Utils: Successfully started service 'SparkUI'
>> on port 4040.
>> 15/12/17 22:22:12 INFO ui.SparkUI: Started 

RE: How to submit spark job to YARN from scala code

2015-12-17 Thread Alexander Pivovarov
Spark-submit --master yarn-cluster

Look docs for more details
On Dec 17, 2015 5:00 PM, "Forest Fang"  wrote:

> Maybe I'm not understanding your question correctly but would it be
> possible for you to piece up your job submission information as if you are
> operating spark-submit? If so, you could just call
>  org.apache.spark.deploy.SparkSubmit and pass your regular spark-submit
> arguments.
>
> This is how I do it with my sbt plugin which allows you to codify a
> spark-submit command in sbt build so the JAR gets automatically rebuilt and
> potentially redeployed every time you submit a Spark job using a custom sbt
> task:
> https://github.com/saurfang/sbt-spark-submit/blob/master/src/main/scala/sbtsparksubmit/SparkSubmitPlugin.scala#L85
>
>
> --
> Subject: Re: How to submit spark job to YARN from scala code
> From: ste...@hortonworks.com
> CC: user@spark.apache.org
> Date: Thu, 17 Dec 2015 19:45:16 +
>
>
> On 17 Dec 2015, at 16:50, Saiph Kappa  wrote:
>
> Hi,
>
> Since it is not currently possible to submit a spark job to a spark
> cluster running in standalone mode (cluster mode - it's not currently
> possible to specify this deploy mode within the code), can I do it with
> YARN?
>
> I tried to do something like this (but in scala):
>
> «
>
> ... // Client object - main methodSystem.setProperty("SPARK_YARN_MODE", 
> "true")val sparkConf = new SparkConf()try {  val args = new 
> ClientArguments(argStrings, sparkConf)  new Client(args, sparkConf).run()} 
> catch {  case e: Exception => {Console.err.println(e.getMessage)
> System.exit(1)  }}System.exit(0)
>
> » in http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/
>
> However it is not possible to create a new instance of Client since import 
> org.apache.spark.deploy.yarn.Client is private
>
>
> the standard way to work around a problem like this is to place your code
> in a package which has access. File a JIRA asking for a public API too —one
> that doesn't require you to set system properties as a way of passing
> parameters down
>
>
> Is there any way I can submit spark jobs from the code in cluster mode and 
> not using the spark-submit script?
>
> Thanks.
>
>
>


Re: Content based window operation on Time-series data

2015-12-17 Thread Davies Liu
Could you try this?

df.groupBy(cast(col("timeStamp") - start) / bucketLengthSec,
IntegerType)).agg(max("timestamp"), max("value")).collect()

On Wed, Dec 9, 2015 at 8:54 AM, Arun Verma  wrote:
> Hi all,
>
> We have RDD(main) of sorted time-series data. We want to split it into
> different RDDs according to window size and then perform some aggregation
> operation like max, min etc. over each RDD in parallel.
>
> If window size is w then ith RDD has data from (startTime + (i-1)*w) to
> (startTime + i*w) where startTime is time-stamp of 1st entry in main RDD and
> (startTime + (i-1)*w) is greater then last entry of main RDD.
>
> For now, I am using DataFrame and Spark version 1.5.2. Below code is running
> sequentially on the data, so execution time is high and resource utilization
> is low. Code snippet is given below:
> /*
> * aggragator is max
> * df - Dataframe has sorted timeseries data
> * start - first entry of DataFrame
> * end - last entry of DataFrame df
> * bucketLengthSec - window size
> * stepResults - has particular block/window output(JSON)
> * appendResults - has output till this block/window(JSON)
> */
> while (start <= end) {
> row = df.filter(df.col("timeStamp")
> .between(start, nextStart))
> .agg(max(df.col("timeStamp")), max(df.col("value")))
> .first();
> if (row.get(0) != null) {
> stepResults = new JSONObject();
> stepResults.put("x", Long.parseLong(row.get(0).toString()));
> stepResults.put("y", row.get(1));
> appendResults.add(stepResults);
> }
> start = nextStart;
> nextStart = start + bucketLengthSec;
> }
>
>
> --
> Thanks and Regards,
> Arun Verma

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How to submit spark job to YARN from scala code

2015-12-17 Thread Forest Fang
Maybe I'm not understanding your question correctly but would it be possible 
for you to piece up your job submission information as if you are operating 
spark-submit? If so, you could just call  org.apache.spark.deploy.SparkSubmit 
and pass your regular spark-submit arguments.
This is how I do it with my sbt plugin which allows you to codify a 
spark-submit command in sbt build so the JAR gets automatically rebuilt and 
potentially redeployed every time you submit a Spark job using a custom sbt 
task: 
https://github.com/saurfang/sbt-spark-submit/blob/master/src/main/scala/sbtsparksubmit/SparkSubmitPlugin.scala#L85

Subject: Re: How to submit spark job to YARN from scala code
From: ste...@hortonworks.com
CC: user@spark.apache.org
Date: Thu, 17 Dec 2015 19:45:16 +










On 17 Dec 2015, at 16:50, Saiph Kappa  wrote:


Hi,



Since it is not currently possible to submit a spark job to a spark cluster 
running in standalone mode (cluster mode - it's not currently possible to 
specify this deploy mode within the code), can I do it with YARN?




I tried to do something like this (but in scala): 



«

... // Client object - main method
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf()

try {
  val args = new ClientArguments(argStrings, sparkConf)
  new Client(args, sparkConf).run()
} catch {
  case e: Exception => {
Console.err.println(e.getMessage)
System.exit(1)
  }
}

System.exit(0)


» in http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/


However it is not possible to create a new instance of Client since import 
org.apache.spark.deploy.yarn.Client is private








the standard way to work around a problem like this is to place your code in a 
package which has access. File a JIRA asking for a public API too
—one that doesn't require you to set system properties as a way of passing 
parameters down








Is there any way I can submit spark jobs from the code in cluster mode and not 
using the spark-submit script?


Thanks.







  

Writing output fails when spark.unsafe.offHeap is enabled

2015-12-17 Thread Mayuresh Kunjir
I am testing a simple Sort program written using Dataframe APIs. When I
enable spark.unsafe.offHeap, the output stage fails with a NPE. The
exception when run on spark-1.5.1 is copied below.

​
Job aborted due to stage failure: Task 23 in stage 3.0 failed 4 times, most
recent failure: Lost task 23.3 in stage 3.0 (TID 667, xeno-40):
java.lang.NullPointerException

at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering.compare(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering.compare(GenerateOrdering.scala:28)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$RowComparator.compare(UnsafeExternalRowSorter.java:202)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:58)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:35)
at org.apache.spark.util.collection.TimSort.binarySort(TimSort.java:191)
at org.apache.spark.util.collection.TimSort.sort(TimSort.java:129)
at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:190)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:202)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:347)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:332)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:399)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174)
at 
org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$executePartition$1(sort.scala:160)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169)
at 
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$4.apply(sort.scala:169)
at 
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



​My program looks as follows:

case class Data(key: String, value: String)

​val lines = sc.textFile(args(0), 1)
val data = lines.map(_.split(" ")).map(t=>Data(t(0), t(1))).toDF()
data.registerTempTable("data")
val sorted = data.sort("key")
sorted.save(args(1))

​I am running the program on Yarn v2.6 and have tried spark-1.5.1 as well
as current snapshot of spark-1.6.0.

​Thanks and regards,
~Mayuresh​