Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)

2015-04-23 Thread Sujee Maniyam
Thanks all...

btw, s3n load works without any issues with  spark-1.3.1-bulit-for-hadoop
2.4

I tried this on 1.3.1-hadoop26
  sc.hadoopConfiguration.set(fs.s3n.impl,
org.apache.hadoop.fs.s3native.NativeS3FileSystem)
 val f = sc.textFile(s3n://bucket/file)
 f.count

No it can't find the implementation path.  Looks like some jar is missing ?

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

On Wednesday, April 22, 2015, Shuai Zheng szheng.c...@gmail.com wrote:

 Below is my code to access s3n without problem (only for 1.3.1. there is a
 bug in 1.3.0).



   Configuration hadoopConf = ctx.hadoopConfiguration();

   hadoopConf.set(fs.s3n.impl,
 org.apache.hadoop.fs.s3native.NativeS3FileSystem);

   hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId);

   hadoopConf.set(fs.s3n.awsSecretAccessKey,
 awsSecretAccessKey);



 Regards,



 Shuai



 *From:* Sujee Maniyam [mailto:su...@sujee.net
 javascript:_e(%7B%7D,'cvml','su...@sujee.net');]
 *Sent:* Wednesday, April 22, 2015 12:45 PM
 *To:* Spark User List
 *Subject:* spark 1.3.1 : unable to access s3n:// urls (no file system for
 scheme s3n:)



 Hi all

 I am unable to access s3n://  urls using   sc.textFile().. getting 'no
 file system for scheme s3n://'  error.



 a bug or some conf settings missing?



 See below for details:



 env variables :

 AWS_SECRET_ACCESS_KEY=set

 AWS_ACCESS_KEY_ID=set



 spark/RELAESE :

 Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0

 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive
 -Phive-thriftserver -Pyarn -DzincPort=3034





 ./bin/spark-shell

  val f = sc.textFile(s3n://bucket/file)

  f.count



 error==

 java.io.IOException: No FileSystem for scheme: s3n

 at
 org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)

 at
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)

 at
 org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

 at
 org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

 at
 org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

 at
 org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)

 at
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)

 at
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)

 at
 org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 at
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 at
 org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)

 at org.apache.spark.rdd.RDD.count(RDD.scala:1006)

 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)

 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29)

 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)

 at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33)

 at $iwC$$iwC$$iwC$$iwC.init(console:35)

 at $iwC$$iwC$$iwC.init(console:37)

 at $iwC$$iwC.init(console:39)

 at $iwC.init(console:41)

 at init(console:43)

 at .init(console:47)

 at .clinit(console)

 at .init(console:7)

 at .clinit(console)

 at $print(console)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at
 

Re: Instantiating/starting Spark jobs programmatically

2015-04-23 Thread Anshul Singhle
Hi firemonk9,

What you're doing looks interesting. Can you share some more details?
Are you running the same spark context for each job, or are you running a
seperate spark context for each job?
Does your system need sharing of rdd's across multiple jobs? If yes, how do
you implement that?
Also what prompted you to run Yarn instead of standalone? Does this give
some performance benefit? Have you evaluated yarn vs mesos?
Also have you looked at spark jobserver by ooyala? It makes doing some if
the stuff I mentioned easier. IIRC it also works with yarn. Definitely
works with Mesos. Heres the link
https://github.com/spark-jobserver/spark-jobserver

Thanks
Anshul
On 23 Apr 2015 20:32, Dean Wampler deanwamp...@gmail.com wrote:

 I strongly recommend spawning a new process for the Spark jobs. Much
 cleaner separation. Your driver program won't be clobbered if the Spark job
 dies, etc. It can even watch for failures and restart.

 In the Scala standard library, the sys.process package has classes for
 constructing and interoperating with external processes. Perhaps Java has
 something similar these days?

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Tue, Apr 21, 2015 at 2:15 PM, Steve Loughran ste...@hortonworks.com
 wrote:


  On 21 Apr 2015, at 17:34, Richard Marscher rmarsc...@localytics.com
 wrote:

 - There are System.exit calls built into Spark as of now that could kill
 your running JVM. We have shadowed some of the most offensive bits within
 our own application to work around this. You'd likely want to do that or to
 do your own Spark fork. For example, if the SparkContext can't connect to
 your cluster master node when it is created, it will System.exit.


 people can block errant System.exit calls by running under a
 SecurityManager. Less than ideal (and there's a small performance hit) -but
 possible





RE: Map Question

2015-04-23 Thread Ganelin, Ilya
You need to expose that variable the same way you'd expose any other variable 
in Python that you wanted to see across modules. As long as you share a spark 
context all will work as expected.

http://stackoverflow.com/questions/142545/python-how-to-make-a-cross-module-variable



Sent with Good (www.good.com)


-Original Message-
From: Vadim Bichutskiy 
[vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com]
Sent: Thursday, April 23, 2015 12:00 PM Eastern Standard Time
To: Tathagata Das
Cc: user@spark.apache.org
Subject: Re: Map Question

Here it is. How do I access a broadcastVar in a function that's in another 
module (process_stuff.py below):

Thanks,
Vadim

main.py
---

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from process_stuff import myfunc
from metadata import get_metadata

conf = SparkConf().setAppName('My App').setMaster('local[4]')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 30)
sqlContext = SQLContext(sc)

distFile = ssc.textFileStream(s3n://...)

distFile.foreachRDD(process)

mylist = get_metadata()

print 'BROADCASTING...'
broadcastVar = sc.broadcast(mylist)
print broadcastVar
print broadcastVar.value
print 'FINISHED BROADCASTING...'

## mylist and broadcastVar, broadcastVar.value print fine

def getSqlContextInstance(sparkContext):

if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']

def process(rdd):

sqlContext = getSqlContextInstance(rdd.context)

if rdd.take(1):

jsondf = sqlContext.jsonRDD(rdd)

#jsondf.printSchema()

jsondf.registerTempTable('mytable')

stuff = sqlContext.sql(SELECT ...)
stuff_mapped = stuff.map(myfunc)  ## I want myfunc to see mylist from 
above?

...

process_stuff.py
--

def myfunc(x):

metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX?

...


metadata.py


def get_metadata():

...

return mylist
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=d750a2b5-528a-47e7-8d0c-df37c6ff3370]ᐧ


On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Can you give full code? especially the myfunc?

On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote:
Here's what I did:

print 'BROADCASTING...'
broadcastVar = sc.broadcast(mylist)
print broadcastVar
print broadcastVar.value
print 'FINISHED BROADCASTING...'

The above works fine,

but when I call myrdd.map(myfunc) I get NameError: global name 'broadcastVar' 
is not defined

The myfunc function is in a different module. How do I make it aware of 
broadcastVar?
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=cccea2c4-02b9-45f0-9e40-d25891e0ded5]ᐧ

On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote:
Great. Will try to modify the code. Always room to optimize!
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=82843831-9ce6-4e1b-9fe8-72b9b7180fc4]ᐧ

On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Absolutely. The same code would work for local as well as distributed mode!

On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote:
Can I use broadcast vars in local mode?
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=641ba5c3-4ac7-4614-84a9-45aafd24502f]ᐧ

On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Yep. Not efficient. Pretty bad actually. That's why broadcast variable were 
introduced right at the very beginning of Spark.



On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote:
Thanks TD. I was looking into broadcast variables.

Right now I am running it locally...and I plan to move it to production on 
EC2.

The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but 
I don't think it's efficient?

mylist is filled only once at the start and never changes.

Vadim
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=5aa8db9d-d2c8-49b1-821f-621a3d2aaf87]ᐧ

On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Is the mylist present on every executor? If not, then you have to pass it on. 
And broadcasts are the best way to pass them on. But note that once broadcasted 
it will immutable at the executors, and if you update the list at the driver, 
you will have to broadcast it again.

TD


Spark + Hue

2015-04-23 Thread MrAsanjar .
Hi all
Is there any good documentation on how to integrate spark with Hue 3.7.x?
Is the only way to install spark Job Server?
Thanks in advance for your help


Re: Trouble working with Spark-CSV package (error: object databricks is not a member of package com)

2015-04-23 Thread Mohammed Omer
Hm, no I don't have that in my path.

However, someone on the spark-csv project advised that since I could not
get another package/example to work, that this might be a Spark / Yarn
issue: https://github.com/databricks/spark-csv/issues/54

Thoughts? I'll open a ticket later this afternoon if the discussion turns
that way.

Thank you, by the way, for the work on this project.

Mo

On Thu, Apr 23, 2015 at 5:17 AM, Krishna Sankar ksanka...@gmail.com wrote:

 Do you have commons-csv-1.1-bin.jar in your path somewhere ? I had to
 download and add this.
 Cheers
 k/

 On Wed, Apr 22, 2015 at 11:01 AM, Mohammed Omer beancinemat...@gmail.com
 wrote:

 Afternoon all,

 I'm working with Scala 2.11.6, and Spark 1.3.1 built from source via:

 `mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package`

 The error is encountered when running spark shell via:

 `spark-shell --packages com.databricks:spark-csv_2.11:1.0.3`

 The full trace of the commands can be found at
 https://gist.github.com/momer/9d1ca583f9978ec9739d

 Not sure if I've done something wrong, or if the documentation is
 outdated, or...?

 Would appreciate any input or push in the right direction!

 Thank you,

 Mo





Re: A Spark Group by is running forever

2015-04-23 Thread ๏̯͡๏
I have seen multiple blogs stating to use reduceByKey instead of
groupByKey. Could someone please help me in converting below code to use
reduceByKey


Code

some spark processing
...

Below
val viEventsWithListingsJoinSpsLevelMetric:
 
org.apache.spark.rdd.RDD[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
 com.ebay.ep.poc.spark.reporting.process.detail.viewitem.provider.VISummary,
Long)]


  val sellerSegments = viEventsWithListingsJoinSpsLevelMetric.groupBy {
  case (viDetail, vi, itemId) =
(viDetail.get(0), viDetail.get(1).asInstanceOf[Long],
viDetail.get(2), viDetail.get(8).asInstanceOf[Int])
}

We grouby above key so that we get an iterable (list), with list we can
compute .max values for powersellers and sellerstdlevel.

val powerSellerLevel = sellerSegments.map {
  case (k, v) =
val viGrouped = v.toList
val viPowerSellers = viGrouped.map { viTuple =
Option(viTuple._2.powerSellerLevel).getOrElse() }
val viSellerStandardLevels = viGrouped.map { viTuple =
Option(viTuple._2.sellerStdLevel).getOrElse() }
val powerSellerLevel = viPowerSellers.max
val sellerStandardLevel = viSellerStandardLevels.max
val viEventDetail = viGrouped.head._1
val viSummary = viGrouped.head._2
viSummary.powerSellerLevel = powerSellerLevel
viSummary.sellerStdLevel = sellerStandardLevel
viSummary.itemId = viGrouped.head._3
(viEventDetail, viSummary)
}


The above groupBy query ran for 6H and does not seem to finish. Hence i
started thinking of reduceByKey. Now reduceByKey() needs pairs and hence i
modified viEventsWithListingsJoinSpsLevelMetric ( x,y,z) to
viEventsWithListingsJoinSpsLevelMetric (A,B).

I moved the key generated through groupByquery into the processing of
viEventsWithListingsJoinSpsLevelMetric, so that
viEventsWithListingsJoinSpsLevelMetric is of type A,B. Hence it is modified
as

(((viEventDetail.get(0), viEventDetail.get(1).asInstanceOf[Long],
viEventDetail.get(2),
viEventDetail.get(8).asInstanceOf[Int])),(viEventDetail, viSummary,
itemId)).

Now i want to compute max values, and i do the next processing using
reduceByKey

val powerSellerLevel = viEventsWithListingsJoinSpsLevelMetric.reduceByKey {

  case (k, v) =

val viGrouped = v.toList

 // Some code to compute max needs to go here.

}


But i get a compiler error that v.toList is not supported.

[ERROR]
/Users/dvasthimal/ebay/projects/ep-spark/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/detail/viewitem/provider/VISummaryDataProvider.scala:115:
error: value toList is not a member of
(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
com.ebay.ep.poc.spark.reporting.process.detail.viewitem.provider.VISummary,
Long)

[INFO] val viGrouped = v.toList

[INFO]   ^

[ERROR] one error found


Now if you think, groupBy was generating (k, Iterable) and hence the next
map() could get list and run through that list to compute max. How is that
possible with reduceByKey because it never generates max.


Suggestions are appreciated.


-Deepak















On Thu, Apr 23, 2015 at 1:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I have a groupBy query after a map-side join  leftOuterJoin. And this
 query is running for more than 2 hours.


 asks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
 RecordsErrors  0 36 0 RUNNING PROCESS_LOCAL 17 /
 phxaishdc9dn1560.stratus.phx.ebay.com 2015/04/22 23:27:00 1.4 h  29 s
 61.8 MB / 63144909  0.0 B / 0



 The input looks to be only 60 MB.
 *Command*
 ./bin/spark-submit -v --master yarn-cluster --driver-class-path
 /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
 --jars
 /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
  *--num-executors 36 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=8G --executor-memory 12g* *--executor-cores 6* --queue
 hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
 /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
 startDate=2015-04-6 endDate=2015-04-7
 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
 output=/user/dvasthimal/epdatasets/viewItem buffersize=128
 maxbuffersize=1068 maxResultSize=2G

 Queries

 1. val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi)
 }
 2.  Brodcast Map - Join

 val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) }
 .collectAsMapval broadCastMap = sc.broadcast(lstgItemMap)

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = 

dynamicAllocation spark-shell

2015-04-23 Thread Michael Stone
If I enable dynamicAllocation and then use spark-shell or pyspark, 
things start out working as expected: running simple commands causes new 
executors to start and complete tasks. If the shell is left idle for a 
while, executors start getting killed off:


15/04/23 10:52:43 INFO cluster.YarnClientSchedulerBackend: Requesting to kill 
executor(s) 368
15/04/23 10:52:43 INFO spark.ExecutorAllocationManager: Removing executor 368 
because it has been idle for 600 seconds (new desired total will be 665)

That makes sense. But the action also results in error messages:

15/04/23 10:52:47 ERROR cluster.YarnScheduler: Lost executor 368 on hostname: 
remote Akka client disassociated
15/04/23 10:52:47 INFO scheduler.DAGScheduler: Executor lost: 368 (epoch 0)
15/04/23 10:52:47 INFO spark.ExecutorAllocationManager: Existing executor 368 
has been removed (new total is 665)
15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Trying to remove 
executor 368 from BlockManagerMaster.
15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Removing block manager 
BlockManagerId(368, hostname, 35877)
15/04/23 10:52:47 INFO storage.BlockManagerMaster: Removed 368 successfully in 
removeExecutor

After that, trying to run a simple command results in:

15/04/23 10:13:30 ERROR util.Utils: Uncaught exception in thread 
spark-dynamic-executor-allocation-0
java.lang.IllegalArgumentException: Attempted to request a negative number of 
executor(s) -663 from the cluster manager. Please specify a positive number!

And then only the single remaining executor attempts to complete the new 
tasks. Am I missing some kind of simple configuration item, are other 
people seeing the same behavior as a bug, or is this actually expected?


Mike Stone

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)

2015-04-23 Thread Ted Yu
NativeS3FileSystem class is in hadoop-aws jar.
Looks like it was not on classpath.

Cheers

On Thu, Apr 23, 2015 at 7:30 AM, Sujee Maniyam su...@sujee.net wrote:

 Thanks all...

 btw, s3n load works without any issues with  spark-1.3.1-bulit-for-hadoop
 2.4

 I tried this on 1.3.1-hadoop26
   sc.hadoopConfiguration.set(fs.s3n.impl,
 org.apache.hadoop.fs.s3native.NativeS3FileSystem)
  val f = sc.textFile(s3n://bucket/file)
  f.count

 No it can't find the implementation path.  Looks like some jar is missing ?

 java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
 org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
 at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
 at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

 On Wednesday, April 22, 2015, Shuai Zheng szheng.c...@gmail.com wrote:

 Below is my code to access s3n without problem (only for 1.3.1. there is
 a bug in 1.3.0).



   Configuration hadoopConf = ctx.hadoopConfiguration();

   hadoopConf.set(fs.s3n.impl,
 org.apache.hadoop.fs.s3native.NativeS3FileSystem);

   hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId);

   hadoopConf.set(fs.s3n.awsSecretAccessKey,
 awsSecretAccessKey);



 Regards,



 Shuai



 *From:* Sujee Maniyam [mailto:su...@sujee.net]
 *Sent:* Wednesday, April 22, 2015 12:45 PM
 *To:* Spark User List
 *Subject:* spark 1.3.1 : unable to access s3n:// urls (no file system
 for scheme s3n:)



 Hi all

 I am unable to access s3n://  urls using   sc.textFile().. getting 'no
 file system for scheme s3n://'  error.



 a bug or some conf settings missing?



 See below for details:



 env variables :

 AWS_SECRET_ACCESS_KEY=set

 AWS_ACCESS_KEY_ID=set



 spark/RELAESE :

 Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0

 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive
 -Phive-thriftserver -Pyarn -DzincPort=3034





 ./bin/spark-shell

  val f = sc.textFile(s3n://bucket/file)

  f.count



 error==

 java.io.IOException: No FileSystem for scheme: s3n

 at
 org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)

 at
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)

 at
 org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

 at
 org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

 at
 org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

 at
 org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)

 at
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)

 at
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)

 at
 org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 at
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)

 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 at
 org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)

 at org.apache.spark.rdd.RDD.count(RDD.scala:1006)

 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)

 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29)

 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)

 at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33)

 at $iwC$$iwC$$iwC$$iwC.init(console:35)

 at $iwC$$iwC$$iwC.init(console:37)

 at $iwC$$iwC.init(console:39)

 at $iwC.init(console:41)

 at init(console:43)

 at .init(console:47)

 at .clinit(console)

 at .init(console:7)

 at .clinit(console)

 at $print(console)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at 

Re: Instantiating/starting Spark jobs programmatically

2015-04-23 Thread Dean Wampler
I strongly recommend spawning a new process for the Spark jobs. Much
cleaner separation. Your driver program won't be clobbered if the Spark job
dies, etc. It can even watch for failures and restart.

In the Scala standard library, the sys.process package has classes for
constructing and interoperating with external processes. Perhaps Java has
something similar these days?

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Apr 21, 2015 at 2:15 PM, Steve Loughran ste...@hortonworks.com
wrote:


  On 21 Apr 2015, at 17:34, Richard Marscher rmarsc...@localytics.com
 wrote:

 - There are System.exit calls built into Spark as of now that could kill
 your running JVM. We have shadowed some of the most offensive bits within
 our own application to work around this. You'd likely want to do that or to
 do your own Spark fork. For example, if the SparkContext can't connect to
 your cluster master node when it is created, it will System.exit.


 people can block errant System.exit calls by running under a
 SecurityManager. Less than ideal (and there's a small performance hit) -but
 possible



Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Using Spark streaming to create a large volume of small nano-batch input files, 
~4k per file, thousands of ‘part-x’ files.  When reading the nano-batch 
files and doing a distributed calculation my tasks run only on the machine 
where it was launched. I’m launching in “yarn-client” mode. The rdd is created 
using sc.textFile(“list of thousand files”)

What would cause the read to occur only on the machine that launched the 
driver. 

Do I need to do something to the RDD after reading? Has some partition factor 
been applied to all derived rdds?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark Streaming] Help with updateStateByKey()

2015-04-23 Thread allonsy
Hi everybody,

I think I could use some help with the /updateStateByKey()/ JAVA method in
Spark Streaming.

*Context:*

I have a /JavaReceiverInputDStreamDataUpdate du/ DStream, where object
/DataUpdate/ mainly has 2 fields of interest (in my case), namely
du.personId (an Integer) and du.cell.hashCode() (Integer, again). Obviously,
I am processing several /DataUpdate/ objects (coming from a log file read in
microbatches), and every /personId/ will be 'associated' to several
/du.cell.hashCode()/s.

What I need to do is, for every /personId/ statefully counting how many
times it appears with a particular /du.cell.hashCode()/, possibly
partitioning by the /personId/ key.

(Long story short: an area is split in cells and I wonder how many times
every person appears in every cell  )

In a very naive way, I guess everything should look like a
/HashMappersonId, HashMaplt;cell.hashCode(), count/, but I am not quite
sure how to partition by /personId/ and increase the count. 

It looks like method /updateStateByKey()/ should do the trick (I am new to
Spark Streaming), yet I can't figure out in which way.

Any suggestions?

Feel free to ask anything in case I was unclear or more information is
needed. :)


Thank you!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Help-with-updateStateByKey-tp22637.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Sure

var columns = mc.textFile(source).map { line = line.split(delimiter) }

Here “source” is a comma delimited list of files or directories. Both the 
textFile and .map tasks happen only on the machine they were launched from.

Later other distributed operations happen but I suspect if I can figure out why 
the fist line is run only on the client machine the rest will clear up too. 
Here are some subsequent lines.

if(filterColumn != -1) {
  columns = columns.filter { tokens = tokens(filterColumn) == filterBy }
}

val interactions = columns.map { tokens =
  tokens(rowIDColumn) - tokens(columnIDPosition)
}

interactions.cache()

On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com wrote:

Will you be able to paste code here?

On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com 
mailto:p...@occamsmachete.com wrote:
Using Spark streaming to create a large volume of small nano-batch input files, 
~4k per file, thousands of ‘part-x’ files.  When reading the nano-batch 
files and doing a distributed calculation my tasks run only on the machine 
where it was launched. I’m launching in “yarn-client” mode. The rdd is created 
using sc.textFile(“list of thousand files”)

What would cause the read to occur only on the machine that launched the driver.

Do I need to do something to the RDD after reading? Has some partition factor 
been applied to all derived rdds?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org 
mailto:user-h...@spark.apache.org







Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Argh, I looked and there really isn’t that much data yet. There will be 
thousands but starting small.

I bet this is just a total data size not requiring all workers thing—sorry, 
nevermind.


On Apr 23, 2015, at 10:30 AM, Pat Ferrel p...@occamsmachete.com wrote:

They are in HDFS so available on all workers

On Apr 23, 2015, at 10:29 AM, Pat Ferrel p...@occamsmachete.com wrote:

Physically? Not sure, they were written using the nano-batch rdds in a 
streaming job that is in a separate driver. The job is a Kafka consumer. 

Would that effect all derived rdds? If so is there something I can do to mix it 
up or does Spark know best about execution speed here?


On Apr 23, 2015, at 10:23 AM, Sean Owen so...@cloudera.com wrote:

Where are the file splits? meaning is it possible they were also
(only) available on one node and that was also your driver?

On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote:
 Sure
 
  var columns = mc.textFile(source).map { line = line.split(delimiter) }
 
 Here “source” is a comma delimited list of files or directories. Both the
 textFile and .map tasks happen only on the machine they were launched from.
 
 Later other distributed operations happen but I suspect if I can figure out
 why the fist line is run only on the client machine the rest will clear up
 too. Here are some subsequent lines.
 
  if(filterColumn != -1) {
columns = columns.filter { tokens = tokens(filterColumn) == filterBy
 }
  }
 
  val interactions = columns.map { tokens =
tokens(rowIDColumn) - tokens(columnIDPosition)
  }
 
  interactions.cache()
 
 On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:
 
 Will you be able to paste code here?
 
 On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote:
 
 Using Spark streaming to create a large volume of small nano-batch input
 files, ~4k per file, thousands of ‘part-x’ files.  When reading the
 nano-batch files and doing a distributed calculation my tasks run only on
 the machine where it was launched. I’m launching in “yarn-client” mode. The
 rdd is created using sc.textFile(“list of thousand files”)
 
 What would cause the read to occur only on the machine that launched the
 driver.
 
 Do I need to do something to the RDD after reading? Has some partition
 factor been applied to all derived rdds?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Slower performance when bigger memory?

2015-04-23 Thread Shuai Zheng
Hi All,

 

I am running some benchmark on r3*8xlarge instance. I have a cluster with
one master (no executor on it) and one slave (r3*8xlarge).

 

My job has 1000 tasks in stage 0.

 

R3*8xlarge has 244G memory and 32 cores.

 

If I create 4 executors, each has 8 core+50G memory, each task will take
around 320s-380s. And if I only use one big executor with 32 cores and 200G
memory, each task will take 760s-900s.

 

And I check the log, looks like the minor GC takes much longer when using
200G memory:

 

285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)]
38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95
sys=120.65, real=11.25 secs] 

 

And when it uses 50G memory, the minor GC takes only less than 1s.

 

I try to see what is the best way to configure the Spark. For some special
reason, I tempt to use a bigger memory on single executor if no significant
penalty on performance. But now looks like it is?

 

Anyone has any idea?

 

Regards,

 

Shuai



Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-23 Thread N B
Thanks for the response, Conor. I tried with those settings and for a while
it seemed like it was cleaning up shuffle files after itself. However,
after exactly 5 hours later it started throwing exceptions and eventually
stopped working again. A sample stack trace is below. What is curious about
5 hours is that I set the cleaner ttl to 5 hours after changing the max
window size to 1 hour (down from 6 hours in order to test). It also stopped
cleaning the shuffle files after this started happening.

Any idea why this could be happening?

2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
java.lang.Exception: Could not compute split, block input-0-1429706099000
not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Thanks
NB


On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell conor.fenn...@altocloud.com
wrote:

 Hi,


 We set the spark.cleaner.ttl to some reasonable time and also
 set spark.streaming.unpersist=true.


 Those together cleaned up the shuffle files for us.


 -Conor

 On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote:

 We already do have a cron job in place to clean just the shuffle files.
 However, what I would really like to know is whether there is a proper
 way of telling spark to clean up these files once its done with them?

 Thanks
 NB


 On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com
  wrote:

 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
 32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune
 -exec rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
 +1440 -name spark-*-*-* -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and
 not being cleaned up by Spark. Since this is a Spark streaming application,
 it is expected to stay up indefinitely, so shuffle files not being cleaned
 up is a big problem right now. Our max window size is 6 hours, so we have
 set up a cron job to clean up shuffle files older than 12 hours otherwise
 it will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files is
 being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and
 meanwhile how to handle shuffle files will be greatly appreciated.

 Thanks
 NB










Re: Bug? Can't reference to the column by name after join two DataFrame on a same name key

2015-04-23 Thread Yin Huai
Hi Shuai,

You can use as to create a table alias. For example, df1.as(df1). Then
you can use $df1.col to refer it.

Thanks,

Yin

On Thu, Apr 23, 2015 at 11:14 AM, Shuai Zheng szheng.c...@gmail.com wrote:

 Hi All,



 I use 1.3.1



 When I have two DF and join them on a same name key, after that, I can’t
 get the common key by name.



 Basically:

 select * from t1 inner join t2 on t1.col1 = t2.col1



 And I am using purely DataFrame, spark SqlContext not HiveContext



 DataFrame df3 = df1.join(df2, df1.col(col).equalTo(df2.col(col))).select(
 *col*);



 because df1 and df2 join on the same key col,



 Then I can't reference the key col. I understand I should use a full
 qualified name for that column (like in SQL, use t1.col), but I don’t know
 how should I address this in spark sql.



 Exception in thread main org.apache.spark.sql.AnalysisException:
 Reference 'id' is ambiguous, could be: id#8L, id#0L.;



 It looks that joined key can't be referenced by name or by df1.col name
 pattern.

 The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive
 case, so I am not sure whether it is the same issue, but I still have the
 issue in latest code.



 It looks like the result after join won't keep the parent DF information
 anywhere?



 I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273



 But not sure whether  it is the same issue? Should I open a new ticket for
 this?



 Regards,



 Shuai





Re: Tasks run only on one machine

2015-04-23 Thread Sean Owen
Where are the file splits? meaning is it possible they were also
(only) available on one node and that was also your driver?

On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote:
 Sure

 var columns = mc.textFile(source).map { line = line.split(delimiter) }

 Here “source” is a comma delimited list of files or directories. Both the
 textFile and .map tasks happen only on the machine they were launched from.

 Later other distributed operations happen but I suspect if I can figure out
 why the fist line is run only on the client machine the rest will clear up
 too. Here are some subsequent lines.

 if(filterColumn != -1) {
   columns = columns.filter { tokens = tokens(filterColumn) == filterBy
 }
 }

 val interactions = columns.map { tokens =
   tokens(rowIDColumn) - tokens(columnIDPosition)
 }

 interactions.cache()

 On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Will you be able to paste code here?

 On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote:

 Using Spark streaming to create a large volume of small nano-batch input
 files, ~4k per file, thousands of ‘part-x’ files.  When reading the
 nano-batch files and doing a distributed calculation my tasks run only on
 the machine where it was launched. I’m launching in “yarn-client” mode. The
 rdd is created using sc.textFile(“list of thousand files”)

 What would cause the read to occur only on the machine that launched the
 driver.

 Do I need to do something to the RDD after reading? Has some partition
 factor been applied to all derived rdds?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Slower performance when bigger memory?

2015-04-23 Thread Ted Yu
Shuai:
Please take a look at:

http://blog.takipi.com/garbage-collectors-serial-vs-parallel-vs-cms-vs-the-g1-and-whats-new-in-java-8/



 On Apr 23, 2015, at 10:18 AM, Dean Wampler deanwamp...@gmail.com wrote:
 
 JVM's often have significant GC overhead with heaps bigger than 64GB. You 
 might try your experiments with configurations below this threshold.
 
 dean
 
 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition (O'Reilly)
 Typesafe
 @deanwampler
 http://polyglotprogramming.com
 
 On Thu, Apr 23, 2015 at 12:14 PM, Shuai Zheng szheng.c...@gmail.com wrote:
 Hi All,
 
  
 
 I am running some benchmark on r3*8xlarge instance. I have a cluster with 
 one master (no executor on it) and one slave (r3*8xlarge).
 
  
 
 My job has 1000 tasks in stage 0.
 
  
 
 R3*8xlarge has 244G memory and 32 cores.
 
  
 
 If I create 4 executors, each has 8 core+50G memory, each task will take 
 around 320s-380s. And if I only use one big executor with 32 cores and 200G 
 memory, each task will take 760s-900s.
 
  
 
 And I check the log, looks like the minor GC takes much longer when using 
 200G memory:
 
  
 
 285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)] 
 38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95 
 sys=120.65, real=11.25 secs]
 
  
 
 And when it uses 50G memory, the minor GC takes only less than 1s.
 
  
 
 I try to see what is the best way to configure the Spark. For some special 
 reason, I tempt to use a bigger memory on single executor if no significant 
 penalty on performance. But now looks like it is?
 
  
 
 Anyone has any idea?
 
  
 
 Regards,
 
  
 
 Shuai
 
 


Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
Physically? Not sure, they were written using the nano-batch rdds in a 
streaming job that is in a separate driver. The job is a Kafka consumer. 

Would that effect all derived rdds? If so is there something I can do to mix it 
up or does Spark know best about execution speed here?


On Apr 23, 2015, at 10:23 AM, Sean Owen so...@cloudera.com wrote:

Where are the file splits? meaning is it possible they were also
(only) available on one node and that was also your driver?

On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote:
 Sure
 
var columns = mc.textFile(source).map { line = line.split(delimiter) }
 
 Here “source” is a comma delimited list of files or directories. Both the
 textFile and .map tasks happen only on the machine they were launched from.
 
 Later other distributed operations happen but I suspect if I can figure out
 why the fist line is run only on the client machine the rest will clear up
 too. Here are some subsequent lines.
 
if(filterColumn != -1) {
  columns = columns.filter { tokens = tokens(filterColumn) == filterBy
 }
}
 
val interactions = columns.map { tokens =
  tokens(rowIDColumn) - tokens(columnIDPosition)
}
 
interactions.cache()
 
 On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:
 
 Will you be able to paste code here?
 
 On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote:
 
 Using Spark streaming to create a large volume of small nano-batch input
 files, ~4k per file, thousands of ‘part-x’ files.  When reading the
 nano-batch files and doing a distributed calculation my tasks run only on
 the machine where it was launched. I’m launching in “yarn-client” mode. The
 rdd is created using sc.textFile(“list of thousand files”)
 
 What would cause the read to occur only on the machine that launched the
 driver.
 
 Do I need to do something to the RDD after reading? Has some partition
 factor been applied to all derived rdds?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Map Question

2015-04-23 Thread Vadim Bichutskiy
Here it is. How do I access a broadcastVar in a function that's in another
module (process_stuff.py below):

Thanks,
Vadim

main.py
---

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from process_stuff import myfunc
from metadata import get_metadata

conf = SparkConf().setAppName('My App').setMaster('local[4]')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 30)
sqlContext = SQLContext(sc)

distFile = ssc.textFileStream(s3n://...)

distFile.foreachRDD(process)

mylist = get_metadata()

print 'BROADCASTING...'
broadcastVar = sc.broadcast(mylist)
print broadcastVar
print broadcastVar.value
print 'FINISHED BROADCASTING...'

## mylist and broadcastVar, broadcastVar.value print fine

def getSqlContextInstance(sparkContext):

if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] =
SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']

def process(rdd):

sqlContext = getSqlContextInstance(rdd.context)

if rdd.take(1):

jsondf = sqlContext.jsonRDD(rdd)

#jsondf.printSchema()

jsondf.registerTempTable('mytable')

stuff = sqlContext.sql(SELECT ...)
stuff_mapped = stuff.map(myfunc)  ## I want myfunc to see mylist from
above?

...

process_stuff.py
--

def myfunc(x):

metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO
FIX?

...


metadata.py


def get_metadata():

...

return mylist
ᐧ

On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das t...@databricks.com wrote:

 Can you give full code? especially the myfunc?

 On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Here's what I did:

 print 'BROADCASTING...'
 broadcastVar = sc.broadcast(mylist)
 print broadcastVar
 print broadcastVar.value
 print 'FINISHED BROADCASTING...'

 The above works fine,

 but when I call myrdd.map(myfunc) I get *NameError: global name
 'broadcastVar' is not defined*

 The myfunc function is in a different module. How do I make it aware of
 broadcastVar?
 ᐧ

 On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Great. Will try to modify the code. Always room to optimize!
 ᐧ

 On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com
 wrote:

 Absolutely. The same code would work for local as well as distributed
 mode!

 On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Can I use broadcast vars in local mode?
 ᐧ

 On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Yep. Not efficient. Pretty bad actually. That's why broadcast
 variable were introduced right at the very beginning of Spark.



 On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Thanks TD. I was looking into broadcast variables.

 Right now I am running it locally...and I plan to move it to
 production on EC2.

 The way I fixed it is by doing myrdd.map(lambda x: (x,
 mylist)).map(myfunc) but I don't think it's efficient?

 mylist is filled only once at the start and never changes.

 Vadim
 ᐧ

 On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the mylist present on every executor? If not, then you have to
 pass it on. And broadcasts are the best way to pass them on. But note 
 that
 once broadcasted it will immutable at the executors, and if you update 
 the
 list at the driver, you will have to broadcast it again.

 TD

 On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a
 map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. 
 In yet
 another separate Python module I have a global list, i.e. mylist,
 that's populated with metadata. I can't get myfunc to see 
 mylist...it's
 always empty. Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim












Re: Map Question

2015-04-23 Thread Vadim Bichutskiy
Thanks Ilya. I am having trouble doing that. Can you give me an example?
ᐧ

On Thu, Apr 23, 2015 at 12:06 PM, Ganelin, Ilya ilya.gane...@capitalone.com
 wrote:

  You need to expose that variable the same way you'd expose any other
 variable in Python that you wanted to see across modules. As long as you
 share a spark context all will work as expected.


 http://stackoverflow.com/questions/142545/python-how-to-make-a-cross-module-variable



 Sent with Good (www.good.com)



 -Original Message-
 *From: *Vadim Bichutskiy [vadim.bichuts...@gmail.com]
 *Sent: *Thursday, April 23, 2015 12:00 PM Eastern Standard Time
 *To: *Tathagata Das
 *Cc: *user@spark.apache.org
 *Subject: *Re: Map Question

 Here it is. How do I access a broadcastVar in a function that's in another
 module (process_stuff.py below):

 Thanks,
 Vadim

  main.py
 ---

 from pyspark import SparkContext, SparkConf
 from pyspark.streaming import StreamingContext
 from pyspark.sql import SQLContext
 from process_stuff import myfunc
 from metadata import get_metadata

 conf = SparkConf().setAppName('My App').setMaster('local[4]')
 sc = SparkContext(conf=conf)
 ssc = StreamingContext(sc, 30)
 sqlContext = SQLContext(sc)

 distFile = ssc.textFileStream(s3n://...)

 distFile.foreachRDD(process)

 mylist = get_metadata()

 print 'BROADCASTING...'
 broadcastVar = sc.broadcast(mylist)
 print broadcastVar
 print broadcastVar.value
 print 'FINISHED BROADCASTING...'

 ## mylist and broadcastVar, broadcastVar.value print fine

 def getSqlContextInstance(sparkContext):

 if ('sqlContextSingletonInstance' not in globals()):
 globals()['sqlContextSingletonInstance'] =
 SQLContext(sparkContext)
 return globals()['sqlContextSingletonInstance']

 def process(rdd):

 sqlContext = getSqlContextInstance(rdd.context)

 if rdd.take(1):

 jsondf = sqlContext.jsonRDD(rdd)

 #jsondf.printSchema()

 jsondf.registerTempTable('mytable')

 stuff = sqlContext.sql(SELECT ...)
 stuff_mapped = stuff.map(myfunc)  ## I want myfunc to see mylist from
 above?

 ...

 process_stuff.py
 --

 def myfunc(x):

 metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW
 TO FIX?

 ...


 metadata.py
 

 def get_metadata():

 ...

 return mylist
  ᐧ

 On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das t...@databricks.com
 wrote:

 Can you give full code? especially the myfunc?

 On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Here's what I did:

  print 'BROADCASTING...'
 broadcastVar = sc.broadcast(mylist)
 print broadcastVar
 print broadcastVar.value
 print 'FINISHED BROADCASTING...'

 The above works fine,

 but when I call myrdd.map(myfunc) I get *NameError: global name
 'broadcastVar' is not defined*

  The myfunc function is in a different module. How do I make it aware
 of broadcastVar?
 ᐧ

 On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Great. Will try to modify the code. Always room to optimize!
 ᐧ

  On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com
 wrote:

 Absolutely. The same code would work for local as well as distributed
 mode!

 On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Can I use broadcast vars in local mode?
 ᐧ

 On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Yep. Not efficient. Pretty bad actually. That's why broadcast
 variable were introduced right at the very beginning of Spark.



 On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Thanks TD. I was looking into broadcast variables.

 Right now I am running it locally...and I plan to move it to
 production on EC2.

 The way I fixed it is by doing myrdd.map(lambda x: (x,
 mylist)).map(myfunc) but I don't think it's efficient?

 mylist is filled only once at the start and never changes.

 Vadim
 ᐧ

 On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com
  wrote:

  Is the mylist present on every executor? If not, then you have
 to pass it on. And broadcasts are the best way to pass them on. But 
 note
 that once broadcasted it will immutable at the executors, and if you 
 update
 the list at the driver, you will have to broadcast it again.

 TD

 On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a
 map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. 
 In yet
 another separate Python module I have a global list, i.e. mylist,
 that's populated with metadata. I can't get myfunc to see 
 mylist...it's
 always empty. Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim











 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates. The information
 transmitted 

Re: Tasks run only on one machine

2015-04-23 Thread Jeetendra Gangele
Will you be able to paste code here?

On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote:

 Using Spark streaming to create a large volume of small nano-batch input
 files, ~4k per file, thousands of 'part-x' files.  When reading the
 nano-batch files and doing a distributed calculation my tasks run only on
 the machine where it was launched. I'm launching in yarn-client mode. The
 rdd is created using sc.textFile(list of thousand files)

 What would cause the read to occur only on the machine that launched the
 driver.

 Do I need to do something to the RDD after reading? Has some partition
 factor been applied to all derived rdds?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: dynamicAllocation spark-shell

2015-04-23 Thread Cheolsoo Park
Hi,

 Attempted to request a negative number of executor(s) -663 from the
cluster manager. Please specify a positive number!

This is a bug in dynamic allocation. Here is the jira-
https://issues.apache.org/jira/browse/SPARK-6954

Thanks!
Cheolsoo

On Thu, Apr 23, 2015 at 7:57 AM, Michael Stone mst...@mathom.us wrote:

 If I enable dynamicAllocation and then use spark-shell or pyspark, things
 start out working as expected: running simple commands causes new executors
 to start and complete tasks. If the shell is left idle for a while,
 executors start getting killed off:

 15/04/23 10:52:43 INFO cluster.YarnClientSchedulerBackend: Requesting to
 kill executor(s) 368
 15/04/23 10:52:43 INFO spark.ExecutorAllocationManager: Removing executor
 368 because it has been idle for 600 seconds (new desired total will be 665)

 That makes sense. But the action also results in error messages:

 15/04/23 10:52:47 ERROR cluster.YarnScheduler: Lost executor 368 on
 hostname: remote Akka client disassociated
 15/04/23 10:52:47 INFO scheduler.DAGScheduler: Executor lost: 368 (epoch 0)
 15/04/23 10:52:47 INFO spark.ExecutorAllocationManager: Existing executor
 368 has been removed (new total is 665)
 15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Trying to remove
 executor 368 from BlockManagerMaster.
 15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Removing block
 manager BlockManagerId(368, hostname, 35877)
 15/04/23 10:52:47 INFO storage.BlockManagerMaster: Removed 368
 successfully in removeExecutor

 After that, trying to run a simple command results in:

 15/04/23 10:13:30 ERROR util.Utils: Uncaught exception in thread
 spark-dynamic-executor-allocation-0
 java.lang.IllegalArgumentException: Attempted to request a negative number
 of executor(s) -663 from the cluster manager. Please specify a positive
 number!

 And then only the single remaining executor attempts to complete the new
 tasks. Am I missing some kind of simple configuration item, are other
 people seeing the same behavior as a bug, or is this actually expected?

 Mike Stone

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-23 Thread Tathagata Das
What was the state of your streaming application? Was it falling behind
with a large increasing scheduling delay?

TD

On Thu, Apr 23, 2015 at 11:31 AM, N B nb.nos...@gmail.com wrote:

 Thanks for the response, Conor. I tried with those settings and for a
 while it seemed like it was cleaning up shuffle files after itself.
 However, after exactly 5 hours later it started throwing exceptions and
 eventually stopped working again. A sample stack trace is below. What is
 curious about 5 hours is that I set the cleaner ttl to 5 hours after
 changing the max window size to 1 hour (down from 6 hours in order to
 test). It also stopped cleaning the shuffle files after this started
 happening.

 Any idea why this could be happening?

 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
 java.lang.Exception: Could not compute split, block input-0-1429706099000
 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Thanks
 NB


 On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell 
 conor.fenn...@altocloud.com wrote:

 Hi,


 We set the spark.cleaner.ttl to some reasonable time and also
 set spark.streaming.unpersist=true.


 Those together cleaned up the shuffle files for us.


 -Conor

 On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote:

 We already do have a cron job in place to clean just the shuffle files.
 However, what I would really like to know is whether there is a proper
 way of telling spark to clean up these files once its done with them?

 Thanks
 NB


 On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
 32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune
 -exec rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
 +1440 -name spark-*-*-* -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and
 not being cleaned up by Spark. Since this is a Spark streaming 
 application,
 it is expected to stay up indefinitely, so shuffle files not being cleaned
 up is a big problem right now. Our max window size is 6 hours, so we have
 set up a cron job to clean up shuffle files older than 12 hours otherwise
 it will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files
 is being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation 
 bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and
 meanwhile how to handle shuffle files will be greatly appreciated.

 Thanks
 NB











Non-Deterministic Graph Building

2015-04-23 Thread hokiegeek2
Hi Everyone,

I am running into a really weird problem that only one other person has
reported to the best of my knowledge (and the thread never yielded a
resolution).  I build a GraphX Graph from an input EdgeRDD and VertexRDD via
the Graph(VertexRDD,EdgeRDD) constructor. When I execute Graph.triplets on
the Graph I get wildly varying results where the triplet source and
destination vertex data are inconsistent between runs and rarely, if ever,
match what I would expect from the input edge pairs that are used to
generate the VertexRDD and EdgeRDDs.

Here's what I know for sure:

1. Consistency of Input Edge Data--I read the edges in from HBase and
generate a raw edge RDD containing tuples consisting of a source edge name
and destination edge name. I've written this RDD out to HDFS over several
runs and confirmed that generation of the raw edge RDD is deterministic.

2. Consistency of Edge and Vertex Count--the overall numbers of edges and
vertices in the EdgeRDD and VertexRDD, respectively, are consistent between
jobs.

3. Inconsistency of Triplet Data--the output from Graph.triplets varies
between jobs, where the edge pairings are different.

4. Disconnect Between Input Edge Data and Triplets--the input edge data
often does not match the corresponding triplet data for the same job, but in
some cases will.  Interestingly, while the actual edge pairings as seen in
the input edge data RDD and the triplets often don't match, the total number
of edges in the input edge RDD and triplets RDD for each edge name is the
same.

Based upon what I've seen, it seems as if the vertex ids are skewed somehow,
especially given point (4) where I noted that the total number of
appearances of an edge name is consistent between input edge RDD data and
triplet RDD data for the same job but, again, the pairings with edges on the
other end of the relationship can vary.

I will post my code later tonight/tomorrow AM, but wanted to see if this
problem description matches what anyone else has seen.

Thanks

--John



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Non-Deterministic-Graph-Building-tp22638.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Question regarding join with multiple columns with pyspark

2015-04-23 Thread Ali Bajwa
Hi experts,

Sorry if this is a n00b question or has already been answered...

Am trying to use the data frames API in python to join 2 dataframes
with more than 1 column. The example I've seen in the documentation
only shows a single column - so I tried this:

Example code

import pandas as pd
from pyspark.sql import SQLContext
hc = SQLContext(sc)
A = pd.DataFrame({'year': ['1993', '2005', '1994'], 'month': ['5',
'12', '12'], 'value': [100, 200, 300]})
a = hc.createDataFrame(A)
B = pd.DataFrame({'year': ['1993', '1993'], 'month': ['12', '12'],
'value': [101, 102]})
b = hc.createDataFrame(B)

print Pandas  # try with Pandas
print A
print B
print pd.merge(A, B, on=['year', 'month'], how='inner')

print Spark
print a.toPandas()
print b.toPandas()
print a.join(b, a.year==b.year and a.month==b.month, 'inner').toPandas()


*Output

Pandas
  month  value  year
0 5100  1993
112200  2005
212300  1994

  month  value  year
012101  1993
112102  1993

Empty DataFrame

Columns: [month, value_x, year, value_y]

Index: []

Spark
  month  value  year
0 5100  1993
112200  2005
212300  1994

  month  value  year
012101  1993
112102  1993

 month  value  year month  value  year
012200  200512102  1993
112200  200512101  1993
212300  199412102  1993
312300  199412101  1993

It looks like Spark returns some results where an inner join should
return nothing.

Am I doing the join with two columns in the wrong way? If yes, what is
the right syntax for this?

Thanks!
Ali

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Bug? Can't reference to the column by name after join two DataFrame on a same name key

2015-04-23 Thread Shuai Zheng
Got it. Thanks! J

 

 

From: Yin Huai [mailto:yh...@databricks.com] 
Sent: Thursday, April 23, 2015 2:35 PM
To: Shuai Zheng
Cc: user
Subject: Re: Bug? Can't reference to the column by name after join two 
DataFrame on a same name key

 

Hi Shuai,

 

You can use as to create a table alias. For example, df1.as(df1). Then you 
can use $df1.col to refer it. 

 

Thanks,

 

Yin

 

On Thu, Apr 23, 2015 at 11:14 AM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I use 1.3.1

 

When I have two DF and join them on a same name key, after that, I can’t get 
the common key by name.

 

Basically:

select * from t1 inner join t2 on t1.col1 = t2.col1

 

And I am using purely DataFrame, spark SqlContext not HiveContext

 

DataFrame df3 = df1.join(df2, df1.col(col).equalTo(df2.col(col))).select(col);

 

because df1 and df2 join on the same key col,

 

Then I can't reference the key col. I understand I should use a full qualified 
name for that column (like in SQL, use t1.col), but I don’t know how should I 
address this in spark sql.

 

Exception in thread main org.apache.spark.sql.AnalysisException: Reference 
'id' is ambiguous, could be: id#8L, id#0L.;

 

It looks that joined key can't be referenced by name or by df1.col name pattern.

The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive case, so I 
am not sure whether it is the same issue, but I still have the issue in latest 
code.

 

It looks like the result after join won't keep the parent DF information 
anywhere?

 

I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273

 

But not sure whether  it is the same issue? Should I open a new ticket for this?

 

Regards,

 

Shuai

 

 



Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Hao Ren
Should I repost this to dev list ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Getting error running MLlib example with new cluster

2015-04-23 Thread Su She
I had asked this question before, but wanted to ask again as I think
it is related to my pom file or project setup.

I have been trying on/off for the past month to try to run this MLlib example:

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pyspark where do third parties libraries need to be installed under Yarn-client mode

2015-04-23 Thread dusts66
I am trying to figure out python library management.  So my question is: 
Where do third party Python libraries(ex. numpy, scipy, etc.) need to exist
if I running a spark job via 'spark-submit' against my cluster in 'yarn
client' mode.  Do the libraries need to only exist on the client(ie. the
server executing the driver code) or do the libraries need to exist on the
datanode/worker nodes where the tasks are executed?  The documentation seems
to indicate that under 'yarn client' the libraries are only need on the
client machine not the entire cluster.  If the libraries are needed across
all cluster machines, any suggestions on a deployment strategy or dependency
management model that works well?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-where-do-third-parties-libraries-need-to-be-installed-under-Yarn-client-mode-tp22639.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Getting error running MLlib example with new cluster

2015-04-23 Thread Su She
Sorry, accidentally sent the last email before finishing.

I had asked this question before, but wanted to ask again as I think
it is now related to my pom file or project setup. Really appreciate the help!

I have been trying on/off for the past month to try to run this MLlib
example: 
https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala

I am able to build the project successfully. When I run it, it returns:

features in spam: 8
features in ham: 7

and then freezes. According to the UI, the description of the job is
count at DataValidators.scala.38. This corresponds to this line in
the code:

val model = lrLearner.run(trainingData)

I've tried just about everything I can think of...changed numFeatures
from 1 - 10,000, set executor memory to 1g, set up a new cluster, at
this point I think I might have missed dependencies as that has
usually been the problem in other spark apps I have tried to run. This
is my pom file, that I have used for other successful spark apps.
Please let me know if you think I need any additional dependencies or
there are incompatibility issues, or a pom.xml that is better to use.
Thank you!

Cluster information:

Spark version: 1.2.0-SNAPSHOT (in my older cluster it is 1.2.0)
java version 1.7.0_25
Scala version: 2.10.4
hadoop version: hadoop 2.5.0-cdh5.3.3 (older cluster was 5.3.0)



project xmlns = http://maven.apache.org/POM/4.0.0;
xmlns:xsi=http://w3.org/2001/XMLSchema-instance; xsi:schemaLocation
=http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd;
groupId edu.berkely/groupId
artifactId simple-project /artifactId
modelVersion 4.0.0/modelVersion
name Simple Project /name
packaging jar /packaging
version 1.0 /version
repositories
repository
idcloudera/id
url http://repository.cloudera.com/artifactory/cloudera-repos//url
/repository

repository
idscala-tools.org/id
nameScala-tools Maven2 Repository/name
urlhttp://scala-tools.org/repo-releases/url
/repository

/repositories

pluginRepositories
pluginRepository
idscala-tools.org/id
nameScala-tools Maven2 Repository/name
urlhttp://scala-tools.org/repo-releases/url
/pluginRepository
/pluginRepositories

build
plugins
plugin
groupIdorg.scala-tools/groupId
artifactIdmaven-scala-plugin/artifactId
executions

execution
idcompile/id
goals
goalcompile/goal
/goals
phasecompile/phase
/execution
execution
idtest-compile/id
goals
goaltestCompile/goal
/goals
phasetest-compile/phase
/execution
execution
   phaseprocess-resources/phase
   goals
 goalcompile/goal
   /goals
/execution
/executions
/plugin
plugin
artifactIdmaven-compiler-plugin/artifactId
configuration
source1.7/source
target1.7/target
/configuration
/plugin
/plugins
/build


dependencies
dependency !--Spark dependency --
groupId org.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.2.0-cdh5.3.0/version
/dependency

dependency
groupIdorg.apache.hadoop/groupId
artifactIdhadoop-client/artifactId
version2.5.0-mr1-cdh5.3.0/version
/dependency

dependency
groupIdorg.scala-lang/groupId
artifactIdscala-library/artifactId
version2.10.4/version
/dependency

dependency
groupIdorg.scala-lang/groupId
artifactIdscala-compiler/artifactId
version2.10.4/version
/dependency

dependency
groupIdcom.101tec/groupId
artifactIdzkclient/artifactId
version0.3/version
/dependency

 dependency
 groupIdcom.yammer.metrics/groupId
 artifactIdmetrics-core/artifactId
 version2.2.0/version
 /dependency


dependency
groupIdorg.apache.hadoop/groupId

gridsearch - python

2015-04-23 Thread Pagliari, Roberto
Can anybody point me to an example, if available, about gridsearch with python?

Thank you,



Re: problem writing to s3

2015-04-23 Thread Daniel Mahler
Hi Akhil

I can confirm that the problem goes away when jsonRaw and jsonClean are in
different s3 buckets.

thanks
Daniel

On Thu, Apr 23, 2015 at 1:27 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Can you try writing to a different S3 bucket and confirm that?

 Thanks
 Best Regards

 On Thu, Apr 23, 2015 at 12:11 AM, Daniel Mahler dmah...@gmail.com wrote:

 Hi Akhil,

 It works fine when outprefix is a hdfs:///localhost/... url.

 It looks to me as if there is something about spark writing to the same
 s3 bucket it is reading from.

 That is the only real difference between the 2 saveAsTextFile whet
 outprefix is on s3,
 inpath is also on s3 but in a different bucket, but jsonRaw and jsonClean
 are distinct directories in the same bucket.
 I do know know why that should be a problem though.

 I will rerun using s3 paths and send the log information.

 thanks
 Daniel

 thanks
 Daniel

 On Wed, Apr 22, 2015 at 1:45 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you look in your worker logs and see whats happening in there? Are
 you able to write the same to your HDFS?

 Thanks
 Best Regards

 On Wed, Apr 22, 2015 at 4:45 AM, Daniel Mahler dmah...@gmail.com
 wrote:

 I am having a strange problem writing to s3 that I have distilled to
 this minimal example:

 def jsonRaw = s${outprefix}-json-raw
 def jsonClean = s${outprefix}-json-clean

 val txt = sc.textFile(inpath)//.coalesce(shards, false)
 txt.count

 val res = txt.saveAsTextFile(jsonRaw)

 val txt2 = sc.textFile(jsonRaw +/part-*)
 txt2.count

 txt2.saveAsTextFile(jsonClean)

 This code should simply copy files from inpath to jsonRaw and then from
 jsonRaw to jsonClean.
 This code executes all the way down to the last line where it hangs
 after creating the output directory contatining a _temporary_$folder but no
 actual files not even temporary ones.

 `outputprefix` is and  bucket url, both jsonRaw and jsonClean are in
 the same bucket.
 Both calls .count succeed and return the same number. This means Spark
 can read from inpath and can both read from and write to jsonRaw. Since
 jsonClean is in the same bucket as jsonRaw and the final line does create
 the directory, I cannot think of any reason why the files should  not be
 written. If there were any access or url problems they should already
 manifest when writing jsonRaw.

 This problem is completely reproduceable with Spark 1.2.1 and 1.3.1
 The console output from the last line is

 scala txt0.saveAsTextFile(jsonClean)
 15/04/21 22:55:48 INFO storage.BlockManager: Removing broadcast 3
 15/04/21 22:55:48 INFO storage.BlockManager: Removing block
 broadcast_3_piece0
 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3_piece0 of
 size 2024 dropped from memory (free 278251716)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-51-181-81.ec2.internal:45199 in memory (size:
 2024.0 B, free: 265.4 MB)
 15/04/21 22:55:48 INFO storage.BlockManagerMaster: Updated info of
 block broadcast_3_piece0
 15/04/21 22:55:48 INFO storage.BlockManager: Removing block broadcast_3
 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3 of size
 2728 dropped from memory (free 27825)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-166-129-153.ec2.internal:46671 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-51-153-34.ec2.internal:51691 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-158-142-155.ec2.internal:54690 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-61-144-7.ec2.internal:44849 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed
 broadcast_3_piece0 on ip-10-69-77-180.ec2.internal:42417 in memory (size:
 2024.0 B, free: 13.8 GB)
 15/04/21 22:55:48 INFO spark.ContextCleaner: Cleaned broadcast 3
 15/04/21 22:55:49 INFO spark.SparkContext: Starting job: saveAsTextFile
 at console:38
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Got job 2
 (saveAsTextFile at console:38) with 96 output partitions
 (allowLocal=false)
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Final stage: Stage
 2(saveAsTextFile at console:38)
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Parents of final stage:
 List()
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Missing parents: List()
 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting Stage 2
 (MapPartitionsRDD[5] at saveAsTextFile at console:38), which has no
 missing parents
 15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(22248)
 called with curMem=48112, maxMem=278302556
 15/04/21 22:55:49 INFO storage.MemoryStore: Block broadcast_4 stored as
 values in memory (estimated size 21.7 KB, free 265.3 MB)
 15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(17352)

Re: Tasks run only on one machine

2015-04-23 Thread Pat Ferrel
They are in HDFS so available on all workers

On Apr 23, 2015, at 10:29 AM, Pat Ferrel p...@occamsmachete.com wrote:

Physically? Not sure, they were written using the nano-batch rdds in a 
streaming job that is in a separate driver. The job is a Kafka consumer. 

Would that effect all derived rdds? If so is there something I can do to mix it 
up or does Spark know best about execution speed here?


On Apr 23, 2015, at 10:23 AM, Sean Owen so...@cloudera.com wrote:

Where are the file splits? meaning is it possible they were also
(only) available on one node and that was also your driver?

On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote:
 Sure
 
   var columns = mc.textFile(source).map { line = line.split(delimiter) }
 
 Here “source” is a comma delimited list of files or directories. Both the
 textFile and .map tasks happen only on the machine they were launched from.
 
 Later other distributed operations happen but I suspect if I can figure out
 why the fist line is run only on the client machine the rest will clear up
 too. Here are some subsequent lines.
 
   if(filterColumn != -1) {
 columns = columns.filter { tokens = tokens(filterColumn) == filterBy
 }
   }
 
   val interactions = columns.map { tokens =
 tokens(rowIDColumn) - tokens(columnIDPosition)
   }
 
   interactions.cache()
 
 On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:
 
 Will you be able to paste code here?
 
 On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote:
 
 Using Spark streaming to create a large volume of small nano-batch input
 files, ~4k per file, thousands of ‘part-x’ files.  When reading the
 nano-batch files and doing a distributed calculation my tasks run only on
 the machine where it was launched. I’m launching in “yarn-client” mode. The
 rdd is created using sc.textFile(“list of thousand files”)
 
 What would cause the read to occur only on the machine that launched the
 driver.
 
 Do I need to do something to the RDD after reading? Has some partition
 factor been applied to all derived rdds?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Bug? Can't reference to the column by name after join two DataFrame on a same name key

2015-04-23 Thread Shuai Zheng
Hi All,

 

I use 1.3.1

 

When I have two DF and join them on a same name key, after that, I can't get
the common key by name.

 

Basically:

select * from t1 inner join t2 on t1.col1 = t2.col1

 

And I am using purely DataFrame, spark SqlContext not HiveContext

 

DataFrame df3 = df1.join(df2,
df1.col(col).equalTo(df2.col(col))).select(col);

 

because df1 and df2 join on the same key col,

 

Then I can't reference the key col. I understand I should use a full
qualified name for that column (like in SQL, use t1.col), but I don't know
how should I address this in spark sql.

 

Exception in thread main org.apache.spark.sql.AnalysisException: Reference
'id' is ambiguous, could be: id#8L, id#0L.;

 

It looks that joined key can't be referenced by name or by df1.col name
pattern.

The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive case,
so I am not sure whether it is the same issue, but I still have the issue in
latest code.

 

It looks like the result after join won't keep the parent DF information
anywhere?

 

I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273

 

But not sure whether  it is the same issue? Should I open a new ticket for
this?

 

Regards,

 

Shuai

 



Is the Spark-1.3.1 support build with scala 2.8 ?

2015-04-23 Thread guoqing0...@yahoo.com.hk
Is the Spark-1.3.1 support build with scala 2.8 ?  Wether it can integrated 
with kafka_2.8.0-0.8.0 If build with scala 2.10 . 

Thanks.


Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Corey Nolet
If you return an iterable, you are not tying the API to a compactbuffer.
Someday, the data could be fetched lazily and he API would not have to
change.
On Apr 23, 2015 6:59 PM, Dean Wampler deanwamp...@gmail.com wrote:

 I wasn't involved in this decision (I just make the fries), but
 CompactBuffer is designed for relatively small data sets that at least fit
 in memory. It's more or less an Array. In principle, returning an iterator
 could hide the actual data structure that might be needed to hold a much
 bigger data set, if necessary.

 HOWEVER, it actually returns a CompactBuffer.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444


 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote:

 Should I repost this to dev list ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Understanding Spark/MLlib failures

2015-04-23 Thread Burak Yavuz
Hi Andrew,

I observed similar behavior under high GC pressure, when running ALS. What
happened to me was that, there would be very long Full GC pauses (over 600
seconds at times). These would prevent the executors from sending
heartbeats to the driver. Then the driver would think that the executor
died, so it would kill it. The scheduler would look at the outputs and say:
`org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 1` or `Fetch Failed`, then reschedule the job at a
different executor.

Then these executors would get even more overloaded, causing them to GC
more often, and new jobs would be launched with even smaller tasks. Because
these executors were being killed by the driver, new jobs with the same
name (and less tasks) would be launched. However, it usually led to a
spiral of death, where executors were constantly being killed, and the
stage wasn't being completed, but restarted with different numbers of tasks.

Some configuration parameters that helped me through this process were:

spark.executor.memory  // decrease the executor memory so that Full GC's
take less time, however are more frequent
spark.executor.heartbeatInterval // This I set at 60 for 600 seconds
(10 minute GC!!)
spark.core.connection.ack.wait.timeout // another timeout to set

Hope these parameters help you. I haven't directly answered your questions,
but there are bits and pieces in there that are hopefully helpful.

Best,
Burak


On Thu, Apr 23, 2015 at 4:11 PM, aleverentz andylevere...@fico.com wrote:

 [My apologies if this is a re-post.  I wasn't subscribed the first time I
 sent this message, and I'm hoping this second message will get through.]

 I’ve been using Spark 1.3.0 and MLlib for some machine learning tasks.  In
 a
 fit of blind optimism, I decided to try running MLlib’s Principal
 Components
 Analayis (PCA) on a dataset with approximately 10,000 columns and 200,000
 rows.

 The Spark job has been running for about 5 hours on a small cluster, and it
 has been stuck on a particular job (treeAggregate at RowMatrix.scala:119)
 for most of that time.  The treeAggregate job is now on retry 5, and
 after
 each failure it seems that the next retry uses a smaller number of tasks.
 (Initially, there were around 80 tasks; later it was down to 50, then 42;
 now it’s down to 16.)  The web UI shows the following error under failed
 stages:  org.apache.spark.shuffle.MetadataFetchFailedException: Missing
 an
 output location for shuffle 1.

 This raises a few questions:

 1. What does missing an output location for shuffle 1 mean?  I’m guessing
 this cryptic error message is indicative of some more fundamental problem
 (out of memory? out of disk space?), but I’m not sure how to diagnose it.

 2. Why do subsequent retries use fewer and fewer tasks?  Does this mean
 that
 the algorithm is actually making progress?  Or is the scheduler just
 performing some kind of repartitioning and starting over from scratch?
 (Also, If the algorithm is in fact making progress, should I expect it to
 finish eventually?  Or do repeated failures generally indicate that the
 cluster is too small to perform the given task?)

 3. Is it reasonable to expect that I could get PCA to run on this dataset
 using the same cluster simply by changing some configuration parameters?
 Or
 is a larger cluster with significantly more resources per node the only way
 around this problem?

 4. In general, are there any tips for diagnosing performance issues like
 the
 one above?  I've spent some time trying to get a few different algorithms
 to
 scale to larger and larger datasets, and whenever I run into a failure, I'd
 like to be able to identify the bottleneck that is preventing further
 scaling.  Any general advice for doing that kind of detective work would be
 much appreciated.

 Thanks,

 ~ Andrew






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-Spark-MLlib-failures-tp22641.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Koert Kuipers
because CompactBuffer is considered an implementation detail. It is also
not public for the same reason.

On Thu, Apr 23, 2015 at 6:46 PM, Hao Ren inv...@gmail.com wrote:

 Should I repost this to dev list ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Understanding Spark/MLlib failures

2015-04-23 Thread aleverentz
[My apologies if this is a re-post.  I wasn't subscribed the first time I
sent this message, and I'm hoping this second message will get through.]

I’ve been using Spark 1.3.0 and MLlib for some machine learning tasks.  In a
fit of blind optimism, I decided to try running MLlib’s Principal Components
Analayis (PCA) on a dataset with approximately 10,000 columns and 200,000
rows.

The Spark job has been running for about 5 hours on a small cluster, and it
has been stuck on a particular job (treeAggregate at RowMatrix.scala:119)
for most of that time.  The treeAggregate job is now on retry 5, and after
each failure it seems that the next retry uses a smaller number of tasks. 
(Initially, there were around 80 tasks; later it was down to 50, then 42;
now it’s down to 16.)  The web UI shows the following error under failed
stages:  org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 1.

This raises a few questions:

1. What does missing an output location for shuffle 1 mean?  I’m guessing
this cryptic error message is indicative of some more fundamental problem
(out of memory? out of disk space?), but I’m not sure how to diagnose it.

2. Why do subsequent retries use fewer and fewer tasks?  Does this mean that
the algorithm is actually making progress?  Or is the scheduler just
performing some kind of repartitioning and starting over from scratch? 
(Also, If the algorithm is in fact making progress, should I expect it to
finish eventually?  Or do repeated failures generally indicate that the
cluster is too small to perform the given task?)

3. Is it reasonable to expect that I could get PCA to run on this dataset
using the same cluster simply by changing some configuration parameters?  Or
is a larger cluster with significantly more resources per node the only way
around this problem?

4. In general, are there any tips for diagnosing performance issues like the
one above?  I've spent some time trying to get a few different algorithms to
scale to larger and larger datasets, and whenever I run into a failure, I'd
like to be able to identify the bottleneck that is preventing further
scaling.  Any general advice for doing that kind of detective work would be
much appreciated.

Thanks,

~ Andrew






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-Spark-MLlib-failures-tp22641.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Understanding Spark/MLlib failures

2015-04-23 Thread Reza Zadeh
Hi Andrew,

The .principalComponents feature of RowMatrix is currently constrained to
tall and skinny matrices. Your matrix is barely above the skinny
requirement (10k columns), though the number of rows is fine.

What are you looking to do with the principal components? If unnormalized
PCA is OK for your application, you can instead run RowMatrix.computeSVD,
and use the 'V' matrix, which can be used the same way as the principal
components. The computeSVD method can handle square matrices, so it should
be able to handle your matrix.

Reza

On Thu, Apr 23, 2015 at 4:11 PM, aleverentz andylevere...@fico.com wrote:

 [My apologies if this is a re-post.  I wasn't subscribed the first time I
 sent this message, and I'm hoping this second message will get through.]

 I’ve been using Spark 1.3.0 and MLlib for some machine learning tasks.  In
 a
 fit of blind optimism, I decided to try running MLlib’s Principal
 Components
 Analayis (PCA) on a dataset with approximately 10,000 columns and 200,000
 rows.

 The Spark job has been running for about 5 hours on a small cluster, and it
 has been stuck on a particular job (treeAggregate at RowMatrix.scala:119)
 for most of that time.  The treeAggregate job is now on retry 5, and
 after
 each failure it seems that the next retry uses a smaller number of tasks.
 (Initially, there were around 80 tasks; later it was down to 50, then 42;
 now it’s down to 16.)  The web UI shows the following error under failed
 stages:  org.apache.spark.shuffle.MetadataFetchFailedException: Missing
 an
 output location for shuffle 1.

 This raises a few questions:

 1. What does missing an output location for shuffle 1 mean?  I’m guessing
 this cryptic error message is indicative of some more fundamental problem
 (out of memory? out of disk space?), but I’m not sure how to diagnose it.

 2. Why do subsequent retries use fewer and fewer tasks?  Does this mean
 that
 the algorithm is actually making progress?  Or is the scheduler just
 performing some kind of repartitioning and starting over from scratch?
 (Also, If the algorithm is in fact making progress, should I expect it to
 finish eventually?  Or do repeated failures generally indicate that the
 cluster is too small to perform the given task?)

 3. Is it reasonable to expect that I could get PCA to run on this dataset
 using the same cluster simply by changing some configuration parameters?
 Or
 is a larger cluster with significantly more resources per node the only way
 around this problem?

 4. In general, are there any tips for diagnosing performance issues like
 the
 one above?  I've spent some time trying to get a few different algorithms
 to
 scale to larger and larger datasets, and whenever I run into a failure, I'd
 like to be able to identify the bottleneck that is preventing further
 scaling.  Any general advice for doing that kind of detective work would be
 much appreciated.

 Thanks,

 ~ Andrew






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-Spark-MLlib-failures-tp22641.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Dean Wampler
I wasn't involved in this decision (I just make the fries), but
CompactBuffer is designed for relatively small data sets that at least fit
in memory. It's more or less an Array. In principle, returning an iterator
could hide the actual data structure that might be needed to hold a much
bigger data set, if necessary.

HOWEVER, it actually returns a CompactBuffer.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote:

 Should I repost this to dev list ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: setting cost in linear SVM [Python]

2015-04-23 Thread Xiangrui Meng
If by C you mean the parameter C in LIBLINEAR, the corresponding
parameter in MLlib is regParam:
https://github.com/apache/spark/blob/master/python/pyspark/mllib/classification.py#L273,
while regParam = 1/C. -Xiangrui

On Wed, Apr 22, 2015 at 3:25 PM, Pagliari, Roberto
rpagli...@appcomsci.com wrote:
 Is there a way to set the cost value C when using linear SVM?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem with using Spark ML

2015-04-23 Thread Staffan
So I got the tip of trying to reduce step-size and that finally gave some
more decent results, had hoped for the default params to give at least OK
results and thought that the problem must be somewhere else in the code.
Problem solved!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-using-Spark-ML-tp22591p22628.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StackOverflow Error when run ALS with 100 iterations

2015-04-23 Thread Xiangrui Meng
ALS.setCheckpointInterval was added in Spark 1.3.1. You need to
upgrade Spark to use this feature. -Xiangrui

On Wed, Apr 22, 2015 at 9:03 PM, amghost zhengweita...@outlook.com wrote:
 Hi, would you please how to checkpoint the training set rdd since all things
 are done in ALS.train method.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/StackOverflow-Error-when-run-ALS-with-100-iterations-tp4296p22619.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Distinct is very slow

2015-04-23 Thread Jeetendra Gangele
Anyone any thought on this?

On 22 April 2015 at 22:49, Jeetendra Gangele gangele...@gmail.com wrote:

 I made 7000 tasks in mapTopair and in distinct also I made same number of
 tasks.
 Still lots of shuffle read and write is happening due to application
 running for much longer time.
 Any idea?

 On 17 April 2015 at 11:55, Akhil Das ak...@sigmoidanalytics.com wrote:

 How many tasks are you seeing in your mapToPair stage? Is it 7000? then i
 suggest you giving a number similar/close to 7000 in your .distinct call,
 what is happening in your case is that, you are repartitioning your data to
 a smaller number (32) which would put a lot of load on processing i
 believe, you can try increasing it.

 Thanks
 Best Regards

 On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Akhil, any thought on this?

 On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com
 wrote:

 No I did not tried the partitioning below is the full code

 public static  void  matchAndMerge(JavaRDDVendorRecord
 matchRdd,JavaSparkContext jsc) throws IOException{
  long start = System.currentTimeMillis();
   JavaPairRDDLong, MatcherReleventData RddForMarch
 =matchRdd.zipWithIndex().mapToPair(new
 PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() {

 @Override
 public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord,
 Long t)
 throws Exception {
 MatcherReleventData matcherData = new MatcherReleventData();
 Tuple2Long, MatcherReleventData tuple = new Tuple2Long,
 MatcherReleventData(t._2,
 matcherData.convertVendorDataToMatcherData(t._1));
  return tuple;
 }

 }).cache();
  log.info(after index+RddForMarch.take(1));
  MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap();
 MapLong, MatcherReleventData matchData = new HashMapLong,
 MatcherReleventData(tmp);
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);

 JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new
 FunctionMatcherReleventData, IterableString(){

 @Override
 public IterableString call(MatcherReleventData v1)
 throws Exception {
 ListString values = new ArrayListString();
 HelperUtilities helper1 = new HelperUtilities();
 MatcherKeys matchkeys=helper1.getBlockinkeys(v1);
 if(matchkeys.get_companyName() !=null){
 values.add(matchkeys.get_companyName());
 }
 if(matchkeys.get_phoneNumberr() !=null){
 values.add(matchkeys.get_phoneNumberr());
 }
 if(matchkeys.get_zipCode() !=null){
 values.add(matchkeys.get_zipCode());
 }
 if(matchkeys.getM_domain() !=null){
 values.add(matchkeys.getM_domain());
 }
   return values;
 }
  });
  log.info(blocking RDD is+blockingRdd.count());
 int count=0;
 log.info(Starting printing);
   for (Tuple2Long, String entry : blockingRdd.collect()) {

   log.info(entry._1() + : + entry._2());
   count++;
 }
   log.info(total count+count);
  JavaPairRDDLong,Integer
 completeDataToprocess=blockingRdd.flatMapValues( new FunctionString,
 IterableInteger(){

 @Override
 public IterableInteger call(String v1) throws Exception {
 return ckdao.getSingelkeyresult(v1);
 }
  }).distinct(32);
  log.info(after hbase count is+completeDataToprocess.count());
  log.info(data for process+completeDataToprocess.take(1));
  JavaPairRDDLong, Tuple2Integer, Double withScore
 =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer,
 Long, Tuple2Integer, Double(){

 @Override
 public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer
 t)
 throws Exception {
 Scoring scoreObj = new Scoring();
 double score =scoreObj.computeMatchScore(companyDAO.get(t._2()),
 dataMatchGlobal.getValue().get(t._1()));
 Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(),
 score);
 Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long,
 Tuple2Integer,Double(t._1(), maptuple);
 return tuple;
 }
  });
  log.info(with score tuple is+withScore.take(1));
  JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD
 =withScore.reduceByKey( new Function2Tuple2Integer,Double,
 Tuple2Integer,Double, Tuple2Integer,Double(){

 @Override
 public Tuple2Integer, Double call(Tuple2Integer, Double v1,
 Tuple2Integer, Double v2) throws Exception {
  int res =v1._2().compareTo(v2._2());
 if(res 0){
  Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(),
 v1._2());
 return result;
  }
 else if(res0){
 Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
 v2._2());
 return result;
 }
 else{
 Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(),
 v2._2());
 return result;
 }
   }
  });
  log.info(max score RDD+maxScoreRDD.take(10));

  maxScoreRDD.foreach( new
 VoidFunctionTuple2Long,Tuple2Integer,Double(){

 @Override
 public void call(Tuple2Long, Tuple2Integer, Double t)
 throws Exception {
 MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1());
 log.info(broadcast is+dataMatchGlobal.getValue().get(t._1()));
 //Set the score for better understanding of merge
 matchedData.setScore(t._2()._2());
 

RE: Error in creating spark RDD

2015-04-23 Thread Sun, Rui
Hi, SparkContext.newAPIHadoopRDD() is for working with new Hadoop mapreduce API.
So, you should import import 
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
Instead of import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;

-Original Message-
From: madhvi [mailto:madhvi.gu...@orkash.com] 
Sent: Wednesday, April 22, 2015 5:13 PM
To: user@spark.apache.org
Subject: Error in creating spark RDD

Hi,

I am creating a spark RDD through accumulo writing like:

JavaPairRDDKey, Value accumuloRDD =
sc.newAPIHadoopRDD(accumuloJob.getConfiguration(),AccumuloInputFormat.class,Key.class,
Value.class);

But I am getting the following error and it is not getting compiled:

Bound mismatch: The generic method newAPIHadoopRDD(Configuration, ClassF, 
ClassK, ClassV) of type JavaSparkContext is not applicable for the 
arguments (Configuration, ClassAccumuloInputFormat, ClassKey, 
ClassValue). The inferred type AccumuloInputFormat is not a valid substitute 
for the bounded parameter F extends InputFormatK,V

I am using the following import statements:

import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;

I am not getting what is the problem in this.

Thanks
Madhvi


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark RDD Lifecycle: whether RDD will be reclaimed out of scope

2015-04-23 Thread Prannoy
Hi,

Yes, Spark automatically removes old RDDs from the cache when you make new
ones. Unpersist forces it to remove them right away.

On Thu, Apr 23, 2015 at 9:28 AM, Jeffery [via Apache Spark User List] 
ml-node+s1001560n22618...@n3.nabble.com wrote:

 Hi, Dear Spark Users/Devs:

 In a method, I create a new RDD, and cache it, whether Spark will unpersit
 the RDD automatically after the rdd is out of scope?

 I am thinking so, but want to make sure with you, the experts :)

 Thanks,
 Jeffery Yuan

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-Lifecycle-whether-RDD-will-be-reclaimed-out-of-scope-tp22618.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-Lifecycle-whether-RDD-will-be-reclaimed-out-of-scope-tp22618p22625.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Pipeline in pyspark

2015-04-23 Thread Suraj Shetiya
Hi,

I have come across ways of building pipeline of input/transform and output
pipelines with Java (Google Dataflow/Spark etc). I also understand that
Spark itelf provides ways for creating a pipeline within mlib for
MLtransforms (primarily fit) Both of the above are available in Java/Scala
environment and the later being supported on Python as well.

However, if my understanding is correct, pipelines within mltransforms
donot create a complete dataflow transform for non-ml scenarios (ex. io
transforms, dataframe/graph transforms). Correct me if otherwise. I would
like to know, what is the best way to create spark dataflow pipeline in a
generic way. I have a use case where I have my input files in different
formats and would like to convert them to rdd and further build the
dataframe transforms and stream/store them finally. I hope not to do Disk
I/Os between pipeline tasks.

 I also came across luigi(http://luigi.readthedocs.org/en/latest/) on
Python, but I found that it stores the contents onto disc and reloads it
for the next phase of the pipeline.

Appreciate if you can share your thoughts.


-- 
Regards,
Suraj


Re: IOUtils cannot write anything in Spark?

2015-04-23 Thread Holden Karau
It seems like saveAsTextFile might do what you are looking for.

On Wednesday, April 22, 2015, Xi Shen davidshe...@gmail.com wrote:

 Hi,

 I have a RDD of some processed data. I want to write these files to HDFS,
 but not for future M/R processing. I want to write plain old style text
 file. I tried:

 rdd foreach {d =
   val file = // create the file using a HDFS FileSystem
   val lines = d map {
 // format data into string
   }

   IOUtils.writeLines(lines, System.separator(), file)
 }

 Note, I was using the IOUtils from common-io, not from Hadoop package.

 The results are all file are created in myHDFS, but has no data at all...



 [image: --]
 Xi Shen
 [image: http://]about.me/davidshen
 http://about.me/davidshen?promo=email_sig
   http://about.me/davidshen



-- 
Cell : 425-233-8271


IOUtils cannot write anything in Spark?

2015-04-23 Thread Xi Shen
Hi,

I have a RDD of some processed data. I want to write these files to HDFS,
but not for future M/R processing. I want to write plain old style text
file. I tried:

rdd foreach {d =
  val file = // create the file using a HDFS FileSystem
  val lines = d map {
// format data into string
  }

  IOUtils.writeLines(lines, System.separator(), file)
}

Note, I was using the IOUtils from common-io, not from Hadoop package.

The results are all file are created in myHDFS, but has no data at all...



[image: --]
Xi Shen
[image: http://]about.me/davidshen
http://about.me/davidshen?promo=email_sig
  http://about.me/davidshen


Re: StandardScaler failing with OOM errors in PySpark

2015-04-23 Thread Rok Roskar
the feature dimension is 800k.

yes, I believe the driver memory is likely the problem since it doesn't crash 
until the very last part of the tree aggregation. 

I'm running it via pyspark through YARN -- I have to run in client mode so I 
can't set spark.driver.memory -- I've tried setting the spark.yarn.am.memory 
and overhead parameters but it doesn't seem to have an effect. 

Thanks,

Rok

On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote:

 What is the feature dimension? Did you set the driver memory? -Xiangrui
 
 On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
 I'm trying to use the StandardScaler in pyspark on a relatively small (a few
 hundred Mb) dataset of sparse vectors with 800k features. The fit method of
 StandardScaler crashes with Java heap space or Direct buffer memory errors.
 There should be plenty of memory around -- 10 executors with 2 cores each
 and 8 Gb per core. I'm giving the executors 9g of memory and have also tried
 lots of overhead (3g), thinking it might be the array creation in the
 aggregators that's causing issues.
 
 The bizarre thing is that this isn't always reproducible -- sometimes it
 actually works without problems. Should I be setting up executors
 differently?
 
 Thanks,
 
 Rok
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark yarn-cluster job failing in batch processing

2015-04-23 Thread sachin Singh
Hi All,
I am trying to execute batch processing in yarn-cluster mode i.e. I have
many sql insert queries,based on argument provided it will it will fetch the
queries ,create context , schema RDD and insert in hive tables,

Please Note- in standalone mode its working and in cluster mode working is I
configured one query,also I have configured
yarn.nodemanager.delete.debug-sec = 600

I am using below command-

spark-submit --jars
./analiticlibs/utils-common-1.0.0.jar,./analiticlibs/mysql-connector-java-5.1.17.jar,./analiticlibs/log4j-1.2.17.jar
--files datasource.properties,log4j.properties,hive-site.xml --deploy-mode
cluster --master yarn --num-executors 1 --driver-memory 2g
--driver-java-options -XX:MaxPermSize=1G --executor-memory 1g
--executor-cores 1 --class com.java.analitics.jobs.StandaloneAggregationJob
sparkanalitics-1.0.0.jar daily_agg 2015-04-21


Exception from Container log-

Exception in thread Driver java.lang.ArrayIndexOutOfBoundsException: 2
at
com.java.analitics.jobs.StandaloneAggregationJob.main(StandaloneAggregationJob.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)

exception in our exception log file-

 diagnostics: Application application_1429800386537_0001 failed 2 times due
to AM Container for appattempt_1429800386537_0001_02 exited with 
exitCode: 15 due to: Exception from container-launch.
Container id: container_1429800386537_0001_02_01
Exit code: 15
Stack trace: ExitCodeException exitCode=15: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:197)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 15
.Failing this attempt.. Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: root.hdfs
 start time: 1429800525569
 final status: FAILED
 tracking URL:
http://tejas.alcatel.com:8088/cluster/app/application_1429800386537_0001
 user: hdfs
2015-04-23 20:19:27 DEBUG Client - stopping client from cache:
org.apache.hadoop.ipc.Client@12f5f40b
2015-04-23 20:19:27 DEBUG Utils - Shutdown hook called

need urgent support,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-yarn-cluster-job-failing-in-batch-processing-tp22626.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is there a way to get the list of all jobs?

2015-04-23 Thread mkestemont
Hello,

I am currently trying to monitor the progression of jobs. I created a class
extending SparkListener, added a jobProgressListener to my sparkContext, and
overrided the methods OnTaskStart, OnTaskEnd, OnJobStart and OnJobEnd, which
leads to good results.

Then, I would also like to monitor the progression of one job in comparison
to the global progression of all jobs. I guess this is not directly
possible, so I would like to retrieve the list of all jobs (or at least, the
number of jobs), so that I can approximate the global progression by
dividing the progression of one job by the total number of jobs.

However, I do not find how to do this. I searched through the
JobProgressListener API, but I only found methods to get the list of active
jobs, or the list of already completed jobs. Is there a way to get the
number or the list of jobs in the current version of Spark ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-get-the-list-of-all-jobs-tp22635.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pipeline in pyspark

2015-04-23 Thread ayan guha
I do not think you can share data across spark contexts. So as long as you
can pass it around you should be good.
On 23 Apr 2015 17:12, Suraj Shetiya surajshet...@gmail.com wrote:

 Hi,

 I have come across ways of building pipeline of input/transform and output
 pipelines with Java (Google Dataflow/Spark etc). I also understand that
 Spark itelf provides ways for creating a pipeline within mlib for
 MLtransforms (primarily fit) Both of the above are available in Java/Scala
 environment and the later being supported on Python as well.

 However, if my understanding is correct, pipelines within mltransforms
 donot create a complete dataflow transform for non-ml scenarios (ex. io
 transforms, dataframe/graph transforms). Correct me if otherwise. I would
 like to know, what is the best way to create spark dataflow pipeline in a
 generic way. I have a use case where I have my input files in different
 formats and would like to convert them to rdd and further build the
 dataframe transforms and stream/store them finally. I hope not to do Disk
 I/Os between pipeline tasks.

  I also came across luigi(http://luigi.readthedocs.org/en/latest/) on
 Python, but I found that it stores the contents onto disc and reloads it
 for the next phase of the pipeline.

 Appreciate if you can share your thoughts.


 --
 Regards,
 Suraj



Re: Hive table creation - possible bug in Spark 1.3?

2015-04-23 Thread madhu phatak
Hi,
 Hive table creation need an extra step from 1.3. You can follow the
following template

 df.registerTempTable(tableName)

 hc.sql(screate table $tableName as select * from $tableName)

this will save the table in hive with given tableName.









Regards,
Madhukara Phatak
http://datamantra.io/

On Thu, Apr 23, 2015 at 4:00 AM, Michael Armbrust mich...@databricks.com
wrote:

 Sorry for the confusion.  We should be more clear about the semantics in
 the documentation. (PRs welcome :) )

 .saveAsTable does not create a hive table, but instead creates a Spark
 Data Source table.  Here the metadata is persisted into Hive, but hive
 cannot read the tables (as this API support MLlib vectors, schema
 discovery, and other things that hive does not).  If you want to create a
 hive table, use HiveQL and run a CREATE TABLE AS SELECT ...

 On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen oph...@gmail.com wrote:

 I wrote few mails here regarding this issue.
 After further investigation I think there is a bug in Spark 1.3 in saving
 Hive tables.

 (hc is HiveContext)

 1. Verify the needed configuration exists:
 scala hc.sql(set hive.exec.compress.output).collect
 res4: Array[org.apache.spark.sql.Row] =
 Array([hive.exec.compress.output=true])
 scala hc.sql(set
 mapreduce.output.fileoutputformat.compress.codec).collect
 res5: Array[org.apache.spark.sql.Row] =
 Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec])
 scala hc.sql(set
 mapreduce.output.fileoutputformat.compress.type).collect
 res6: Array[org.apache.spark.sql.Row] =
 Array([mapreduce.output.fileoutputformat.compress.type=BLOCK])
 2. Loading DataFrame and save as table (path point to exists file):
 val saDF = hc.parquetFile(path)
 saDF.count

 (count yield 229764 - i.e. the rdd exists)
 saDF.saveAsTable(test_hive_ms)

 Now for few interesting outputs:
 1. Trying to query Hive CLI, the table exists but with wrong output
 format:
 Failed with exception java.io.IOException:java.io.IOException: hdfs://
 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet
 not a SequenceFile
 2. Looking at the output files found that files are '.parquet' and not
 '.snappy'
 3. Looking at the saveAsTable output shows that it actually store the
 table in both, wrong output format and without compression:
 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table:
 Table(tableName:test_hive_ms, dbName:default, owner:hadoop,
 createTime:1429687014, lastAccessTime:0, retention:0,
 sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring,
 comment:from deserializer)], location:null,
 inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
 outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
 compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
 serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
 parameters:{serialization.format=1, path=hdfs://
 10.166.157.97:9000/user/hive/warehouse/test_hive_ms}
 http://10.166.157.97:9000/user/hive/warehouse/test_hive_ms%7D),
 bucketCols:[], sortCols:[], parameters:{},
 skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
 skewedColValueLocationMaps:{})), partitionKeys:[],
 parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]},
 EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1,
 spark.sql.sources.provider=org.apache.spark.sql.parquet},
 viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)

 So, the question is: do I miss some configuration here or should I open a
 bug?

 Thanks,
 Ophir





Re: Spark SQL performance issue.

2015-04-23 Thread ayan guha
Quick questions: why are you cache both rdd and table?
Which stage of job is slow?
On 23 Apr 2015 17:12, Nikolay Tikhonov tikhonovnico...@gmail.com wrote:

 Hi,
 I have Spark SQL performance issue. My code contains a simple JavaBean:

 public class Person implements Externalizable {
 private int id;
 private String name;
 private double salary;
 
 }


 Apply a schema to an RDD and register table.

 JavaRDDPerson rdds = ...
 rdds.cache();

 DataFrame dataFrame = sqlContext.createDataFrame(rdds, Person.class);
 dataFrame.registerTempTable(person);

 sqlContext.cacheTable(person);


 Run sql query.

 sqlContext.sql(SELECT id, name, salary FROM person WHERE salary = YYY
 AND salary = XXX).collectAsList()


 I launch standalone cluster which contains 4 workers. Each node runs on
 machine with 8 CPU and 15 Gb memory. When I run the query on the
 environment
 over RDD which contains 1 million persons it takes 1 minute. Somebody can
 tell me how to tuning the performance?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-issue-tp22627.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark SQL performance issue.

2015-04-23 Thread Nikolay Tikhonov
 why are you cache both rdd and table?
I try to cache all the data to avoid the bad performance for the first
query. Is it right?

 Which stage of job is slow?
The query is run many times on one sqlContext and each query execution
takes 1 second.

2015-04-23 11:33 GMT+03:00 ayan guha guha.a...@gmail.com:

 Quick questions: why are you cache both rdd and table?
 Which stage of job is slow?
 On 23 Apr 2015 17:12, Nikolay Tikhonov tikhonovnico...@gmail.com
 wrote:

 Hi,
 I have Spark SQL performance issue. My code contains a simple JavaBean:

 public class Person implements Externalizable {
 private int id;
 private String name;
 private double salary;
 
 }


 Apply a schema to an RDD and register table.

 JavaRDDPerson rdds = ...
 rdds.cache();

 DataFrame dataFrame = sqlContext.createDataFrame(rdds, Person.class);
 dataFrame.registerTempTable(person);

 sqlContext.cacheTable(person);


 Run sql query.

 sqlContext.sql(SELECT id, name, salary FROM person WHERE salary =
 YYY
 AND salary = XXX).collectAsList()


 I launch standalone cluster which contains 4 workers. Each node runs on
 machine with 8 CPU and 15 Gb memory. When I run the query on the
 environment
 over RDD which contains 1 million persons it takes 1 minute. Somebody can
 tell me how to tuning the performance?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-issue-tp22627.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




A Spark Group by is running forever

2015-04-23 Thread ๏̯͡๏
I have a groupBy query after a map-side join  leftOuterJoin. And this
query is running for more than 2 hours.


asks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsErrors
0 36 0 RUNNING PROCESS_LOCAL 17 /
phxaishdc9dn1560.stratus.phx.ebay.com 2015/04/22
23:27:00 1.4 h  29 s  61.8 MB / 63144909  0.0 B / 0



The input looks to be only 60 MB.
*Command*
./bin/spark-submit -v --master yarn-cluster --driver-class-path
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
--jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
 *--num-executors 36 --driver-memory 12g --driver-java-options
-XX:MaxPermSize=8G --executor-memory 12g* *--executor-cores 6* --queue
hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-04-6 endDate=2015-04-7
input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
output=/user/dvasthimal/epdatasets/viewItem buffersize=128
maxbuffersize=1068 maxResultSize=2G

Queries

1. val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) }
2.  Brodcast Map - Join

val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) }
.collectAsMapval broadCastMap = sc.broadcast(lstgItemMap)

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= viEvents.mapPartitions({

// buisness logic )}

3.

Left Outer

val spsLevelMetricSum = DataUtil.getSpsLevelMetricSum(sc, startDate)

val spsLvlMetric = spsLevelMetricSum.map { sps = (sps.getUserId.toLong,
sps) }

 val viEventsWithListingsJoinSpsLevelMetric = viEventsWithListings
.leftOuterJoin(spsLvlMetric).map {
 // buisness logic
}

Any thoughts ?

4. Group BY :

val sellerSegments = viEventsWithListingsJoinSpsLevelMetric.groupBy {

  case (viDetail, vi, itemId) =

(viDetail.get(0), viDetail.get(1).asInstanceOf[Long], viDetail.get(2),
viDetail.get(8).asInstanceOf[Int])

}


#4 is very slow.

-- 



Deepak


Re: Custom paritioning of DSTream

2015-04-23 Thread davidkl
Hello Evo, Ranjitiyer,

I am also looking for the same thing. Using foreach is not useful for me as
processing the RDD as a whole won't be distributed across workers and that
would kill performance in my application :-/

Let me know if you find a solution for this. 

Regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574p22630.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Custom paritioning of DSTream

2015-04-23 Thread Evo Eftimov
You can use transform which yields RDDs from the DStream as on each of the
RDDs you can then apply partitionBy - transform also returns another DSTream
while foreach doesn't 

 

Btw what do you mean re foreach killing the performance by not distributing
the workload  - every function (provided it is not Action) applied to an
RDD within foreach is distributed across the cluster since it gets applied
to an RDD 

 

From: davidkl [via Apache Spark User List]
[mailto:ml-node+s1001560n22630...@n3.nabble.com] 
Sent: Thursday, April 23, 2015 10:13 AM
To: Evo Eftimov
Subject: Re: Custom paritioning of DSTream

 

Hello Evo, Ranjitiyer, 

I am also looking for the same thing. Using foreach is not useful for me as
processing the RDD as a whole won't be distributed across workers and that
would kill performance in my application :-/ 

Let me know if you find a solution for this. 

Regards 

  _  

If you reply to this email, your message will be added to the discussion
below:

http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DS
Tream-tp22574p22630.html 

To unsubscribe from Custom paritioning of DSTream, click here
http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt
p?macro=unsubscribe_by_codenode=22574code=ZXZvLmVmdGltb3ZAaXNlY2MuY29tfDIy
NTc0fDY0MDQ0NDg5Ng== .
 
http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt
p?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.
namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.vi
ew.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemai
l.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aem
ail.naml NAML 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574p22631.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Hive table creation - possible bug in Spark 1.3?

2015-04-23 Thread madhu phatak
Hi Michael,
Here https://issues.apache.org/jira/browse/SPARK-7084 is the jira issue
and PR https://github.com/apache/spark/pull/5654 for the same. Please
have a look.




Regards,
Madhukara Phatak
http://datamantra.io/

On Thu, Apr 23, 2015 at 1:22 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
  Hive table creation need an extra step from 1.3. You can follow the
 following template

  df.registerTempTable(tableName)

  hc.sql(screate table $tableName as select * from $tableName)

 this will save the table in hive with given tableName.









 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Thu, Apr 23, 2015 at 4:00 AM, Michael Armbrust mich...@databricks.com
 wrote:

 Sorry for the confusion.  We should be more clear about the semantics in
 the documentation. (PRs welcome :) )

 .saveAsTable does not create a hive table, but instead creates a Spark
 Data Source table.  Here the metadata is persisted into Hive, but hive
 cannot read the tables (as this API support MLlib vectors, schema
 discovery, and other things that hive does not).  If you want to create a
 hive table, use HiveQL and run a CREATE TABLE AS SELECT ...

 On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen oph...@gmail.com wrote:

 I wrote few mails here regarding this issue.
 After further investigation I think there is a bug in Spark 1.3 in
 saving Hive tables.

 (hc is HiveContext)

 1. Verify the needed configuration exists:
 scala hc.sql(set hive.exec.compress.output).collect
 res4: Array[org.apache.spark.sql.Row] =
 Array([hive.exec.compress.output=true])
 scala hc.sql(set
 mapreduce.output.fileoutputformat.compress.codec).collect
 res5: Array[org.apache.spark.sql.Row] =
 Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec])
 scala hc.sql(set
 mapreduce.output.fileoutputformat.compress.type).collect
 res6: Array[org.apache.spark.sql.Row] =
 Array([mapreduce.output.fileoutputformat.compress.type=BLOCK])
 2. Loading DataFrame and save as table (path point to exists file):
 val saDF = hc.parquetFile(path)
 saDF.count

 (count yield 229764 - i.e. the rdd exists)
 saDF.saveAsTable(test_hive_ms)

 Now for few interesting outputs:
 1. Trying to query Hive CLI, the table exists but with wrong output
 format:
 Failed with exception java.io.IOException:java.io.IOException: hdfs://
 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet
 not a SequenceFile
 2. Looking at the output files found that files are '.parquet' and not
 '.snappy'
 3. Looking at the saveAsTable output shows that it actually store the
 table in both, wrong output format and without compression:
 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table:
 Table(tableName:test_hive_ms, dbName:default, owner:hadoop,
 createTime:1429687014, lastAccessTime:0, retention:0,
 sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring,
 comment:from deserializer)], location:null,
 inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
 outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
 compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
 serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
 parameters:{serialization.format=1, path=hdfs://
 10.166.157.97:9000/user/hive/warehouse/test_hive_ms}
 http://10.166.157.97:9000/user/hive/warehouse/test_hive_ms%7D),
 bucketCols:[], sortCols:[], parameters:{},
 skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
 skewedColValueLocationMaps:{})), partitionKeys:[],
 parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]},
 EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1,
 spark.sql.sources.provider=org.apache.spark.sql.parquet},
 viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)

 So, the question is: do I miss some configuration here or should I open
 a bug?

 Thanks,
 Ophir






Re: Convert DStream to DataFrame

2015-04-23 Thread Sergio Jiménez Barrio
Thank you ver much, Tathagata!

El miércoles, 22 de abril de 2015, Tathagata Das t...@databricks.com
escribió:

 Aaah, that. That is probably a limitation of the SQLContext (cc'ing Yin
 for more information).


 On Wed, Apr 22, 2015 at 7:07 AM, Sergio Jiménez Barrio 
 drarse.a...@gmail.com
 javascript:_e(%7B%7D,'cvml','drarse.a...@gmail.com'); wrote:

 Sorry, this is the error:

 [error] /home/sergio/Escritorio/hello/streaming.scala:77: Implementation
 restriction: case classes cannot have more than 22 parameters.



 2015-04-22 16:06 GMT+02:00 Sergio Jiménez Barrio drarse.a...@gmail.com
 javascript:_e(%7B%7D,'cvml','drarse.a...@gmail.com');:

 I tried the solution of the guide, but I exceded the size of case class
 Row:


 2015-04-22 15:22 GMT+02:00 Tathagata Das tathagata.das1...@gmail.com
 javascript:_e(%7B%7D,'cvml','tathagata.das1...@gmail.com');:

 Did you checkout the latest streaming programming guide?


 http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations

 You also need to be aware of that to convert json RDDs to dataframe,
 sqlContext has to make a pass on the data to learn the schema. This will
 fail if a batch has no data. You have to safeguard against that.

 On Wed, Apr 22, 2015 at 6:19 AM, ayan guha guha.a...@gmail.com
 javascript:_e(%7B%7D,'cvml','guha.a...@gmail.com'); wrote:

 What about sqlcontext.createDataframe(rdd)?
 On 22 Apr 2015 23:04, Sergio Jiménez Barrio drarse.a...@gmail.com
 javascript:_e(%7B%7D,'cvml','drarse.a...@gmail.com'); wrote:

 Hi,

 I am using Kafka with Apache Stream to send JSON to Apache Spark:

 val messages = KafkaUtils.createDirectStream[String, String, 
 StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

 Now, I want parse the DStream created to DataFrame, but I don't know
 if Spark 1.3 have some easy way for this. ¿Any suggestion? I can get the
 message with:

 val lines = messages.map(_._2)

 Thank u for all. Sergio J.








-- 
Atte. Sergio Jiménez


Re: Spark SQL performance issue.

2015-04-23 Thread Arush Kharbanda
Hi

Can you share your Web UI, depicting your task level breakup.I can see many
thing
s that can be improved.

1. JavaRDDPerson rdds = ...rdds.cache(); -this caching is not needed as
you are not reading the rdd  for any action

2.Instead of collecting as list, if you can save as text file, it would be
better. As it would avoid moving results to the driver.

Thanks
Arush

On Thu, Apr 23, 2015 at 2:47 PM, Nikolay Tikhonov tikhonovnico...@gmail.com
 wrote:

  why are you cache both rdd and table?
 I try to cache all the data to avoid the bad performance for the first
 query. Is it right?

  Which stage of job is slow?
 The query is run many times on one sqlContext and each query execution
 takes 1 second.

 2015-04-23 11:33 GMT+03:00 ayan guha guha.a...@gmail.com:

 Quick questions: why are you cache both rdd and table?
 Which stage of job is slow?
 On 23 Apr 2015 17:12, Nikolay Tikhonov tikhonovnico...@gmail.com
 wrote:

 Hi,
 I have Spark SQL performance issue. My code contains a simple JavaBean:

 public class Person implements Externalizable {
 private int id;
 private String name;
 private double salary;
 
 }


 Apply a schema to an RDD and register table.

 JavaRDDPerson rdds = ...
 rdds.cache();

 DataFrame dataFrame = sqlContext.createDataFrame(rdds, Person.class);
 dataFrame.registerTempTable(person);

 sqlContext.cacheTable(person);


 Run sql query.

 sqlContext.sql(SELECT id, name, salary FROM person WHERE salary =
 YYY
 AND salary = XXX).collectAsList()


 I launch standalone cluster which contains 4 workers. Each node runs on
 machine with 8 CPU and 15 Gb memory. When I run the query on the
 environment
 over RDD which contains 1 million persons it takes 1 minute. Somebody can
 tell me how to tuning the performance?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-issue-tp22627.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Building Spark : Adding new DataType in Catalyst

2015-04-23 Thread zia_kayani
I've already tried UDT in Spark 1.2 and 1.3 but I encountered Kryo
Serialization Exception on Joining as tracked  here
https://datastax-oss.atlassian.net/browse/SPARKC-23  , i've talked to 
Michael Armbrust https://plus.google.com/u/1/109154927192908362223/posts  
about the Exception, he said I'll caution you that this is not a stable
public API. So I moved to adding custom dataType into spark. I've got the
answer of my this question from  Iulian Dragoș
https://plus.google.com/u/1/114220203404330592389/poststhat One way
is to use export SPARK_PREPEND_CLASSES=true. This will instruct the launcher
to prepend the target directories for each project to the spark assembly,
this solved my problem. Thanks...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-Adding-new-DataType-in-Catalyst-tp22604p22632.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: problem with spark thrift server

2015-04-23 Thread Arush Kharbanda
Hi

What do you mean disable the driver? what are you trying to achieve.

Thanks
Arush

On Thu, Apr 23, 2015 at 12:29 PM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:

 Hi ,
 I have a question about spark thrift server , i deployed the spark on yarn
  and found if the spark driver disable , the spark application will be
 crashed on yarn.  appreciate for any suggestions and idea .

 Thank you!




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-23 Thread Sourav Chandra
HI TD,

Some observations:

1. If I submit the application using spark-submit tool with *client as
deploy mode* it works fine with single master and worker (driver, master
and worker are running in same machine)
2. If I submit the application using spark-submit tool with client as
deploy mode it *crashes after some time with  StackOverflowError* *single
master and 2 workers* (driver, master and 1 worker is running in same
machine, other
worker is in different machine)
 *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0
(TID 5412)*
*java.lang.StackOverflowError*
*at
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)*
*at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)*
*at
java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)*
*at
java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)*
*at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)*
*at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
*at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
*at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
*at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
*at java.lang.reflect.Method.invoke(Method.java:606)*
*at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
*at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
*at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
*at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
*at java.lang.reflect.Method.invoke(Method.java:606)*
*at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
*at
scala.collection.immutable.$colon$colon.readObject(List.scala:366)*
*at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*


3. If I submit the 

Re: Trouble working with Spark-CSV package (error: object databricks is not a member of package com)

2015-04-23 Thread Krishna Sankar
Do you have commons-csv-1.1-bin.jar in your path somewhere ? I had to
download and add this.
Cheers
k/

On Wed, Apr 22, 2015 at 11:01 AM, Mohammed Omer beancinemat...@gmail.com
wrote:

 Afternoon all,

 I'm working with Scala 2.11.6, and Spark 1.3.1 built from source via:

 `mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package`

 The error is encountered when running spark shell via:

 `spark-shell --packages com.databricks:spark-csv_2.11:1.0.3`

 The full trace of the commands can be found at
 https://gist.github.com/momer/9d1ca583f9978ec9739d

 Not sure if I've done something wrong, or if the documentation is
 outdated, or...?

 Would appreciate any input or push in the right direction!

 Thank you,

 Mo



Re: Error in creating spark RDD

2015-04-23 Thread madhvi

On Thursday 23 April 2015 12:22 PM, Akhil Das wrote:
Here's a complete scala example 
https://github.com/bbux-proteus/spark-accumulo-examples/blob/1dace96a115f29c44325903195c8135edf828c86/src/main/scala/org/bbux/spark/AccumuloMetadataCount.scala


Thanks
Best Regards

On Thu, Apr 23, 2015 at 12:19 PM, Akhil Das 
ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com wrote:


Change your import from mapred to mapreduce. like :

import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;

Thanks
Best Regards

On Wed, Apr 22, 2015 at 2:42 PM, madhvi madhvi.gu...@orkash.com
mailto:madhvi.gu...@orkash.com wrote:

Hi,

I am creating a spark RDD through accumulo writing like:

JavaPairRDDKey, Value accumuloRDD =

sc.newAPIHadoopRDD(accumuloJob.getConfiguration(),AccumuloInputFormat.class,Key.class,
Value.class);

But I am getting the following error and it is not getting
compiled:

Bound mismatch: The generic method
newAPIHadoopRDD(Configuration, ClassF, ClassK, ClassV)
of type JavaSparkContext is not applicable for the arguments
(Configuration, ClassAccumuloInputFormat, ClassKey,
ClassValue). The inferred type AccumuloInputFormat is not a
valid substitute for the bounded parameter F extends
InputFormatK,V

I am using the following import statements:

import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;

I am not getting what is the problem in this.

Thanks
Madhvi


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org




Hi,

Thanks.I got that solved:)

madhvi


Re: Re: HiveContext setConf seems not stable

2015-04-23 Thread guoqing0...@yahoo.com.hk
Hi all , 
My understanding for this problem is SQLConf will be overwrite by the 
hiveconfig in initialization phase when setConf(key: String, value: String)  
being called in the first time as below code snippets , so it is correctly in 
later. I`m not sure whether it is right , any point are welcome. Thanks.
@transient protected[hive] lazy val hiveconf: HiveConf = {
  setConf(sessionState.getConf.getAllProperties)
  sessionState.getConf
}
protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = 
synchronized {
  try {
val cmd_trimmed: String = cmd.trim()
val tokens: Array[String] = cmd_trimmed.split(\\s+)
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), 
hiveconf)...}protected[sql] def runSqlHive(sql: String): 
Seq[String] = {
  val maxResults = 10
  val results = runHive(sql, maxResults)
  // It is very confusing when you only get back some of the results...
  if (results.size == maxResults) sys.error(RESULTS POSSIBLY TRUNCATED)
  results
}override def setConf(key: String, value: String): Unit = {
  super.setConf(key, value)
  runSqlHive(sSET $key=$value)

}
 
From: madhu phatak
Date: 2015-04-23 02:17
To: Michael Armbrust
CC: Ophir Cohen; Hao Ren; user
Subject: Re: HiveContext setConf seems not stable
Hi,
calling getConf don't solve the issue. Even many hive specific queries are 
broken. Seems like no hive configurations are getting passed properly. 




Regards,
Madhukara Phatak
http://datamantra.io/

On Wed, Apr 22, 2015 at 2:19 AM, Michael Armbrust mich...@databricks.com 
wrote:
As a workaround, can you call getConf first before any setConf?

On Tue, Apr 21, 2015 at 1:58 AM, Ophir Cohen oph...@gmail.com wrote:
I think I encounter the same problem, I'm trying to turn on the compression of 
Hive.
I have the following lines:
def initHiveContext(sc: SparkContext): HiveContext = {
val hc: HiveContext = new HiveContext(sc)
hc.setConf(hive.exec.compress.output, true)
hc.setConf(mapreduce.output.fileoutputformat.compress.codec, 
org.apache.hadoop.io.compress.SnappyCodec)
hc.setConf(mapreduce.output.fileoutputformat.compress.type, BLOCK)


logger.info(hc.getConf(hive.exec.compress.output))
logger.info(hc.getConf(mapreduce.output.fileoutputformat.compress.codec))
logger.info(hc.getConf(mapreduce.output.fileoutputformat.compress.type))

hc
  }
And the log for calling it twice:
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: false
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: 
org.apache.hadoop.io.compress.SnappyCodec
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: true
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: 
org.apache.hadoop.io.compress.SnappyCodec
15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK

BTW
It worked on 1.2.1...


On Thu, Apr 2, 2015 at 11:47 AM, Hao Ren inv...@gmail.com wrote:
Hi,

Jira created: https://issues.apache.org/jira/browse/SPARK-6675

Thank you.


On Wed, Apr 1, 2015 at 7:50 PM, Michael Armbrust mich...@databricks.com wrote:
Can you open a JIRA please?

On Wed, Apr 1, 2015 at 9:38 AM, Hao Ren inv...@gmail.com wrote:
Hi,

I find HiveContext.setConf does not work correctly. Here are some code snippets 
showing the problem:

snippet 1:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object Main extends App {

  val conf = new SparkConf()
.setAppName(context-test)
.setMaster(local[8])
  val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)

  hc.setConf(spark.sql.shuffle.partitions, 10)
  hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)
  hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println
  hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println
}


Results:
(hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test)
(spark.sql.shuffle.partitions,10)

snippet 2:

...
  hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test)
  hc.setConf(spark.sql.shuffle.partitions, 10)
  hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println
  hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println
...


Results:
(hive.metastore.warehouse.dir,/user/hive/warehouse)
(spark.sql.shuffle.partitions,10)

You can see that I just permuted the two setConf call, then that leads to two 
different Hive configuration.
It seems that HiveContext can not set a new value on 

Is a higher-res or vector version of Spark logo available?

2015-04-23 Thread Enno Shioji
My employer (adform.com) would like to use the Spark logo in a recruitment
event (to indicate that we are using Spark in our company). I looked in the
Spark repo (https://github.com/apache/spark/tree/master/docs/img) but
couldn't find a vector format.

Is a higher-res or vector format version available anywhere?

Enno


Streaming Kmeans usage in java

2015-04-23 Thread Jeetendra Gangele
Do everyone do we have sample example how to use streaming k-means
clustering with java. I have seen some example usage in scala. can anybody
point me to the java example?

regards
jeetendra


How to start Thrift JDBC server as part of standalone spark application?

2015-04-23 Thread Vladimir Grigor
Hello,

I would like to export RDD/DataFrames via JDBC SQL interface from the
standalone application for currently stable Spark v1.3.1.

I found one way of doing it but it requires the use of @DeveloperAPI method
HiveThriftServer2.startWithContext(sqlContext)

Is there a better, production level approach to do that?

Full code snippet is below:
// you can run it via:
// ../spark/bin/spark-submit --master local[*] --class SimpleApp
target/scala-2.10/simple-project_2.10-1.0.jar src/test/resources/1.json
tableFromJson


import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

object SimpleApp {

  def main(args: Array[String]) {

if (args.length != 2) {
  Console.err.println(Usage: app source_json_file table_name)
  System.exit(1)
}
val sourceFile = args(0)
val tableName = args(1)

val sparkConf = new SparkConf().setAppName(Simple Application)
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)

val df = sqlContext.jsonFile(sourceFile)
df.registerTempTable(tableName)

println(Registered temp table %s for data source
%s.format(tableName, sourceFile))

HiveThriftServer2.startWithContext(sqlContext)

  }
}





Best, Vladimir Grigor


Re: How to debug Spark on Yarn?

2015-04-23 Thread Ted Yu
For step 2, you can pipe application log to a file instead of copy-pasting. 

Cheers



 On Apr 22, 2015, at 10:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:
 
 I submit a spark app to YARN and i get these messages
 
 
 
 15/04/22 22:45:04 INFO yarn.Client: Application report for 
 application_1429087638744_101363 (state: RUNNING)
 
 
 15/04/22 22:45:04 INFO yarn.Client: Application report for 
 application_1429087638744_101363 (state: RUNNING).
 
 ...
 
 
 
 1) I can go to Spark UI and see the status of the APP but cannot see the logs 
 as the job progresses. How can i see logs of executors as they progress ?
 
 2) In case the App fails/completes then Spark UI vanishes and i get a YARN 
 Job page that says job failed, there are no link to Spark UI again. Now i 
 take the job ID and run yarn application logs appid and my console fills up 
 (with huge scrolling) with logs of all executors. Then i copy and paste into 
 a text editor and search for keywords Exception , Job aborted due to . Is 
 this the right way to view logs ?
 
 
 -- 
 Deepak
 


Re: MLlib - Collaborative Filtering - trainImplicit task size

2015-04-23 Thread Christian S. Perone
All these warnings come from ALS iterations, from flatMap and also from
aggregate, for instance the origin of the state where the flatMap is
showing these warnings (w/ Spark 1.3.0, they are also shown in Spark 1.3.1):

org.apache.spark.rdd.RDD.flatMap(RDD.scala:296)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)

And from the aggregate:

org.apache.spark.rdd.RDD.aggregate(RDD.scala:968)
org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)



On Thu, Apr 23, 2015 at 2:49 AM, Xiangrui Meng men...@gmail.com wrote:

 This is the size of the serialized task closure. Is stage 246 part of
 ALS iterations, or something before or after it? -Xiangrui

 On Tue, Apr 21, 2015 at 10:36 AM, Christian S. Perone
 christian.per...@gmail.com wrote:
  Hi Sean, thanks for the answer. I tried to call repartition() on the
 input
  with many different sizes and it still continues to show that warning
  message.
 
  On Tue, Apr 21, 2015 at 7:05 AM, Sean Owen so...@cloudera.com wrote:
 
  I think maybe you need more partitions in your input, which might make
  for smaller tasks?
 
  On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone
  christian.per...@gmail.com wrote:
   I keep seeing these warnings when using trainImplicit:
  
   WARN TaskSetManager: Stage 246 contains a task of very large size (208
   KB).
   The maximum recommended task size is 100 KB.
  
   And then the task size starts to increase. Is this a known issue ?
  
   Thanks !
  
   --
   Blog | Github | Twitter
   Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great
   big
   joke on me.
 
 
 
 
  --
  Blog | Github | Twitter
  Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
  joke on me.




-- 
Blog http://blog.christianperone.com | Github https://github.com/perone
| Twitter https://twitter.com/tarantulae
Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
joke on me.


ML regression - spark context dies without error

2015-04-23 Thread jamborta
Hi all,

I have been testing Spark ML algorithms with bigger dataset, and ran into
some problems with linear regression:

It seems the executors stop without any apparent reason:

15/04/22 20:15:05 INFO BlockManagerInfo: Added rdd_12492_80 in memory on
backend-node:48037 (size: 28.5 MB, free: 2.8 GB)
15/04/22 20:15:05 INFO BlockManagerInfo: Added rdd_12493_80 in memory on
backend-node:48037 (size: 37.6 MB, free: 2.7 GB)
15/04/22 20:15:08 INFO BlockManagerInfo: Added rdd_12489_81 in memory on
backend-node:48037 (size: 8.4 MB, free: 2.7 GB)
[E 150422 20:15:12 java_gateway:483] Error while sending or receiving.
Traceback (most recent call last):
  File
/home/azureuser/spark-1.3.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 479, in send_command
raise Py4JError(Answer from Java side is empty)
Py4JError: Answer from Java side is empty

Then sparkcontext stops, too :

[E 150422 20:15:12 java_gateway:431] An error occurred while trying to
connect to the Java server

the problem is that it does not happen all the time, it only fails maybe
once in every five attempts.

any suggestions where can I get more detailed logs from?

Thanks,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ML-regression-spark-context-dies-without-error-tp22633.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Contributors, read me! Updated Contributing to Spark wiki

2015-04-23 Thread Sean Owen
Following several discussions about how to improve the contribution
process in Spark, I've overhauled the guide to contributing. Anyone
who is going to contribute needs to read it, as it has more formal
guidance about the process:

https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark

We may push back harder now on pull requests and JIRAs that don't
follow this guidance. It will help everyone spend less time to get
changes in, and spend less time on duplicated effort, or changes that
won't.

A summary of key points is found in CONTRIBUTING.md, a prompt
presented before opening pull requests
(https://github.com/apache/spark/blob/master/CONTRIBUTING.md):

- Is the change important and ready enough to ask the community to
spend time reviewing?
- Have you searched for existing, related JIRAs and pull requests?
- Is this a new feature that can stand alone as a package on
http://spark-packages.org ?
- Is the change being proposed clearly explained and motivated?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StandardScaler failing with OOM errors in PySpark

2015-04-23 Thread Rok Roskar
ok yes, I think I have narrowed it down to being a problem with driver
memory settings. It looks like the application master/driver is not being
launched with the settings specified:

For the driver process on the main node I see -XX:MaxPermSize=128m
-Xms512m -Xmx512m as options used to start the JVM, even though I
specified

'spark.yarn.am.memory', '5g'
'spark.yarn.am.memoryOverhead', '2000'

The info shows that these options were read:

15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with 7120
MB memory including 2000 MB overhead

Is there some reason why these options are being ignored and instead
starting the driver with just 512Mb of heap?

On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote:

 the feature dimension is 800k.

 yes, I believe the driver memory is likely the problem since it doesn't
 crash until the very last part of the tree aggregation.

 I'm running it via pyspark through YARN -- I have to run in client mode so
 I can't set spark.driver.memory -- I've tried setting the
 spark.yarn.am.memory and overhead parameters but it doesn't seem to have an
 effect.

 Thanks,

 Rok

 On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote:

  What is the feature dimension? Did you set the driver memory? -Xiangrui
 
  On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote:
  I'm trying to use the StandardScaler in pyspark on a relatively small
 (a few
  hundred Mb) dataset of sparse vectors with 800k features. The fit
 method of
  StandardScaler crashes with Java heap space or Direct buffer memory
 errors.
  There should be plenty of memory around -- 10 executors with 2 cores
 each
  and 8 Gb per core. I'm giving the executors 9g of memory and have also
 tried
  lots of overhead (3g), thinking it might be the array creation in the
  aggregators that's causing issues.
 
  The bizarre thing is that this isn't always reproducible -- sometimes it
  actually works without problems. Should I be setting up executors
  differently?
 
  Thanks,
 
  Rok
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 




Re: gridsearch - python

2015-04-23 Thread Punyashloka Biswal
https://issues.apache.org/jira/browse/SPARK-7022.

Punya

On Thu, Apr 23, 2015 at 5:47 PM Pagliari, Roberto rpagli...@appcomsci.com
wrote:

 Can anybody point me to an example, if available, about gridsearch with
 python?



 Thank you,





RE: Re: problem with spark thrift server

2015-04-23 Thread Cheng, Hao
Hi, can you describe a little bit how the ThriftServer crashed, or steps to 
reproduce that? It’s probably a bug of ThriftServer.

Thanks,

From: guoqing0...@yahoo.com.hk [mailto:guoqing0...@yahoo.com.hk]
Sent: Friday, April 24, 2015 9:55 AM
To: Arush Kharbanda
Cc: user
Subject: Re: Re: problem with spark thrift server

Thanks for your reply , i would like to use Spark Thriftserver as JDBC SQL 
interface and the Spark application running on YARN . but the application was 
FINISHED when the Thriftserver crashed , all the cached table was lost .

Thriftserver start command:
start-thriftserver.sh --master yarn --executor-memory 20480m --executor-cores 2 
 --num-executors 20 --queue spark

My question is whether the Thriftserver has anyother more stable mode on YARN , 
like active standby in the Thriftserver .
Really appreciate for any suggestions and idea .
Thanks.

From: Arush Kharbandamailto:ar...@sigmoidanalytics.com
Date: 2015-04-23 18:40
To: guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk
CC: usermailto:user@spark.apache.org
Subject: Re: problem with spark thrift server
Hi

What do you mean disable the driver? what are you trying to achieve.

Thanks
Arush

On Thu, Apr 23, 2015 at 12:29 PM, 
guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk wrote:
Hi ,
I have a question about spark thrift server , i deployed the spark on yarn  and 
found if the spark driver disable , the spark application will be crashed on 
yarn.  appreciate for any suggestions and idea .

Thank you!



--

[Sigmoid Analytics]http://htmlsig.com/www.sigmoidanalytics.com

Arush Kharbanda || Technical Teamlead

ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com || 
www.sigmoidanalytics.comhttp://www.sigmoidanalytics.com/


Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-23 Thread Sourav Chandra
*bump*

On Thu, Apr 23, 2015 at 3:46 PM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 HI TD,

 Some observations:

 1. If I submit the application using spark-submit tool with *client as
 deploy mode* it works fine with single master and worker (driver, master
 and worker are running in same machine)
 2. If I submit the application using spark-submit tool with client as
 deploy mode it *crashes after some time with  StackOverflowError* *single
 master and 2 workers* (driver, master and 1 worker is running in same
 machine, other
 worker is in different machine)
  *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0
 (TID 5412)*
 *java.lang.StackOverflowError*
 *at
 java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)*
 *at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)*
 *at
 java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)*
 *at
 java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)*
 *at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)*
 *at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
 *at
 scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
 *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:606)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
 *at
 scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
 *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:606)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 

Re: Is the Spark-1.3.1 support build with scala 2.8 ?

2015-04-23 Thread madhu phatak
Hi,
AFAIK it's only build with 2.10 and 2.11.  You should integrate
kafka_2.10.0-0.8.0
to make it work.




Regards,
Madhukara Phatak
http://datamantra.io/

On Fri, Apr 24, 2015 at 9:22 AM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:

 Is the Spark-1.3.1 support build with scala 2.8 ?  Wether it can
 integrated with kafka_2.8.0-0.8.0 If build with scala 2.10 .

 Thanks.



RE: gridsearch - python

2015-04-23 Thread Pagliari, Roberto
I know grid search with cross validation is not supported. However, I was 
wondering if there is something availalable for the time being.

Thanks,


From: Punyashloka Biswal [mailto:punya.bis...@gmail.com]
Sent: Thursday, April 23, 2015 9:06 PM
To: Pagliari, Roberto; user@spark.apache.org
Subject: Re: gridsearch - python

https://issues.apache.org/jira/browse/SPARK-7022.
Punya

On Thu, Apr 23, 2015 at 5:47 PM Pagliari, Roberto 
rpagli...@appcomsci.commailto:rpagli...@appcomsci.com wrote:
Can anybody point me to an example, if available, about gridsearch with python?

Thank you,



Re: Re: Is the Spark-1.3.1 support build with scala 2.8 ?

2015-04-23 Thread guoqing0...@yahoo.com.hk
Thank you very much for your suggestion.

Regards,
 
From: madhu phatak
Date: 2015-04-24 13:06
To: guoqing0...@yahoo.com.hk
CC: user
Subject: Re: Is the Spark-1.3.1 support build with scala 2.8 ?
Hi,
AFAIK it's only build with 2.10 and 2.11.  You should integrate 
kafka_2.10.0-0.8.0 to make it work.




Regards,
Madhukara Phatak
http://datamantra.io/

On Fri, Apr 24, 2015 at 9:22 AM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:
Is the Spark-1.3.1 support build with scala 2.8 ?  Wether it can integrated 
with kafka_2.8.0-0.8.0 If build with scala 2.10 . 

Thanks.



Spark SQL - Setting YARN Classpath for primordial class loader

2015-04-23 Thread Night Wolf
Hi guys,

Having a problem build a DataFrame in Spark SQL from a JDBC data source
when running with --master yarn-client and adding the JDBC driver JAR with
--jars. If I run with a local[*] master all works fine.

./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client

sqlContext.load(jdbc, Map(url -
jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd, driver -
com.mysql.jdbc.Driver, dbtable - MY_TBL”))


This throws a class not found exception when running with Spark SQL. But
when trying to get the driver class on the workers or driver the class is
found no problems. So I'm guessing this is some problem with the primordial
class loader/Java security in the DriverManager and the class loader used
in Spark SQL when running on YARN.

Any ideas? The only thing I have found that works is merging my mysql adbc
driver into the Spark assembly JAR thats shipped to YARN. Because adding
with --jars doesn't work.

Cheers!


Re: Spark SQL - Setting YARN Classpath for primordial class loader

2015-04-23 Thread Marcelo Vanzin
You'd have to use spark.{driver,executor}.extraClassPath to modify the
system class loader. But that also means you have to manually
distribute the jar to the nodes in your cluster, into a common
location.

On Thu, Apr 23, 2015 at 6:38 PM, Night Wolf nightwolf...@gmail.com wrote:
 Hi guys,

 Having a problem build a DataFrame in Spark SQL from a JDBC data source when
 running with --master yarn-client and adding the JDBC driver JAR with
 --jars. If I run with a local[*] master all works fine.

 ./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client

 sqlContext.load(jdbc, Map(url -
 jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd, driver -
 com.mysql.jdbc.Driver, dbtable - MY_TBL”))


 This throws a class not found exception when running with Spark SQL. But
 when trying to get the driver class on the workers or driver the class is
 found no problems. So I'm guessing this is some problem with the primordial
 class loader/Java security in the DriverManager and the class loader used in
 Spark SQL when running on YARN.

 Any ideas? The only thing I have found that works is merging my mysql adbc
 driver into the Spark assembly JAR thats shipped to YARN. Because adding
 with --jars doesn't work.

 Cheers!



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL - Setting YARN Classpath for primordial class loader

2015-04-23 Thread Night Wolf
Thanks Marcelo, can this be a path on HDFS?

On Fri, Apr 24, 2015 at 11:52 AM, Marcelo Vanzin van...@cloudera.com
wrote:

 You'd have to use spark.{driver,executor}.extraClassPath to modify the
 system class loader. But that also means you have to manually
 distribute the jar to the nodes in your cluster, into a common
 location.

 On Thu, Apr 23, 2015 at 6:38 PM, Night Wolf nightwolf...@gmail.com
 wrote:
  Hi guys,
 
  Having a problem build a DataFrame in Spark SQL from a JDBC data source
 when
  running with --master yarn-client and adding the JDBC driver JAR with
  --jars. If I run with a local[*] master all works fine.
 
  ./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client
 
  sqlContext.load(jdbc, Map(url -
  jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd, driver -
  com.mysql.jdbc.Driver, dbtable - MY_TBL”))
 
 
  This throws a class not found exception when running with Spark SQL. But
  when trying to get the driver class on the workers or driver the class is
  found no problems. So I'm guessing this is some problem with the
 primordial
  class loader/Java security in the DriverManager and the class loader
 used in
  Spark SQL when running on YARN.
 
  Any ideas? The only thing I have found that works is merging my mysql
 adbc
  driver into the Spark assembly JAR thats shipped to YARN. Because adding
  with --jars doesn't work.
 
  Cheers!



 --
 Marcelo



Re: Spark SQL - Setting YARN Classpath for primordial class loader

2015-04-23 Thread Marcelo Vanzin
No, those have to be local paths.

On Thu, Apr 23, 2015 at 6:53 PM, Night Wolf nightwolf...@gmail.com wrote:
 Thanks Marcelo, can this be a path on HDFS?

 On Fri, Apr 24, 2015 at 11:52 AM, Marcelo Vanzin van...@cloudera.com
 wrote:

 You'd have to use spark.{driver,executor}.extraClassPath to modify the
 system class loader. But that also means you have to manually
 distribute the jar to the nodes in your cluster, into a common
 location.

 On Thu, Apr 23, 2015 at 6:38 PM, Night Wolf nightwolf...@gmail.com
 wrote:
  Hi guys,
 
  Having a problem build a DataFrame in Spark SQL from a JDBC data source
  when
  running with --master yarn-client and adding the JDBC driver JAR with
  --jars. If I run with a local[*] master all works fine.
 
  ./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client
 
  sqlContext.load(jdbc, Map(url -
  jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd, driver -
  com.mysql.jdbc.Driver, dbtable - MY_TBL”))
 
 
  This throws a class not found exception when running with Spark SQL. But
  when trying to get the driver class on the workers or driver the class
  is
  found no problems. So I'm guessing this is some problem with the
  primordial
  class loader/Java security in the DriverManager and the class loader
  used in
  Spark SQL when running on YARN.
 
  Any ideas? The only thing I have found that works is merging my mysql
  adbc
  driver into the Spark assembly JAR thats shipped to YARN. Because adding
  with --jars doesn't work.
 
  Cheers!



 --
 Marcelo





-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: problem with spark thrift server

2015-04-23 Thread guoqing0...@yahoo.com.hk
Thanks for your reply , i would like to use Spark Thriftserver as JDBC SQL 
interface and the Spark application running on YARN . but the application was 
FINISHED when the Thriftserver crashed , all the cached table was lost .

Thriftserver start command:
start-thriftserver.sh --master yarn --executor-memory 20480m --executor-cores 2 
 --num-executors 20 --queue spark

My question is whether the Thriftserver has anyother more stable mode on YARN , 
like active standby in the Thriftserver . 
Really appreciate for any suggestions and idea .
Thanks.
 
From: Arush Kharbanda
Date: 2015-04-23 18:40
To: guoqing0...@yahoo.com.hk
CC: user
Subject: Re: problem with spark thrift server
Hi

What do you mean disable the driver? what are you trying to achieve.

Thanks
Arush

On Thu, Apr 23, 2015 at 12:29 PM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:
Hi , 
I have a question about spark thrift server , i deployed the spark on yarn  and 
found if the spark driver disable , the spark application will be crashed on 
yarn.  appreciate for any suggestions and idea . 

Thank you! 



-- 
Arush Kharbanda || Technical Teamlead
ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


spark 1.3.0 strange log message

2015-04-23 Thread Henry Hung
Dear All,

When using spark 1.3.0 spark-submit with directing out and err to a log file, I 
saw some strange lines inside that looks like this:
[Stage 0:(0 + 2) / 120]
[Stage 0:(2 + 2) / 120]
[Stage 0:==  (6 + 2) / 120]
[Stage 0:=  (12 + 2) / 120]
[Stage 0:=  (20 + 2) / 120]
[Stage 0:===(24 + 2) / 120]
[Stage 0:== (32 + 2) / 120]
[Stage 0:===(42 + 2) / 120]
[Stage 0:   (52 + 2) / 120]
[Stage 0:===(59 + 2) / 120]
[Stage 0:===(68 + 2) / 120]
[Stage 0:   (78 + 3) / 120]
[Stage 0:=  (88 + 4) / 120]
[Stage 0:= (100 + 2) / 120]
[Stage 0:==(110 + 2) / 120]


Here is my log4j property:

# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: 
%m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO


I want to know how to disable this kind of stage progress message?

Best regards,
Henry


The privileged confidential information contained in this email is intended for 
use only by the addressees as indicated by the original sender of this email. 
If you are not the addressee indicated in this email or are not responsible for 
delivery of the email to such a person, please kindly reply to the sender 
indicating this fact and delete all copies of it from your computer and network 
server immediately. Your cooperation is highly appreciated. It is advised that 
any unauthorized use of confidential information of Winbond is strictly 
prohibited; and any information in this email irrelevant to the official 
business of Winbond shall be deemed as neither given nor endorsed by Winbond.


Re: spark 1.3.0 strange log message

2015-04-23 Thread Terry Hole
Use this in spark conf: spark.ui.showConsoleProgress=false

Best Regards,

On Fri, Apr 24, 2015 at 11:23 AM, Henry Hung ythu...@winbond.com wrote:

  Dear All,



 When using spark 1.3.0 spark-submit with directing out and err to a log
 file, I saw some strange lines inside that looks like this:

 [Stage 0:(0 + 2)
 / 120]

 [Stage 0:(2 + 2)
 / 120]

 [Stage 0:==  (6 + 2)
 / 120]

 [Stage 0:=  (12 + 2)
 / 120]

 [Stage 0:=  (20 + 2)
 / 120]

 [Stage 0:===(24 + 2)
 / 120]

 [Stage 0:== (32 + 2)
 / 120]

 [Stage 0:===(42 + 2)
 / 120]

 [Stage 0:   (52 + 2)
 / 120]

 [Stage 0:===(59 + 2)
 / 120]

 [Stage 0:===(68 + 2)
 / 120]

 [Stage 0:   (78 + 3)
 / 120]

 [Stage 0:=  (88 + 4)
 / 120]

 [Stage 0:= (100 + 2)
 / 120]

 [Stage 0:==(110 + 2)
 / 120]





 Here is my log4j property:



 # Set everything to be logged to the console

 log4j.rootCategory=WARN, console

 log4j.appender.console=org.apache.log4j.ConsoleAppender

 log4j.appender.console.target=System.err

 log4j.appender.console.layout=org.apache.log4j.PatternLayout

 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
 %c{1}: %m%n



 # Settings to quiet third party logs that are too verbose

 log4j.logger.org.eclipse.jetty=WARN

 log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR

 log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO

 log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO





 I want to know how to disable this kind of stage progress message?



 Best regards,

 Henry

 --
 The privileged confidential information contained in this email is
 intended for use only by the addressees as indicated by the original sender
 of this email. If you are not the addressee indicated in this email or are
 not responsible for delivery of the email to such a person, please kindly
 reply to the sender indicating this fact and delete all copies of it from
 your computer and network server immediately. Your cooperation is highly
 appreciated. It is advised that any unauthorized use of confidential
 information of Winbond is strictly prohibited; and any information in this
 email irrelevant to the official business of Winbond shall be deemed as
 neither given nor endorsed by Winbond.