how to save matrix result to file

2016-01-19 Thread zhangjp
Hi ,all
 I have get a Matrix type result with java , But i don't know how to save 
the result to a file.
"Matrix cov = mat.computeCovariance();"
 THX.

Re: Calling SparkContext methods in scala Future

2016-01-19 Thread Marco
Thank you guys for the answers.

@Ted Yu: You are right, in general the code to fetch stuff externally
should be called separately, while Spark should only access the data
written by these two services via flume/kafka/whatever. However, before I
get there, I would like to have the Spark job ready.

@Shixiong Zhu: I imagined something like that, and I must say that I
thought since the beginning that SparkContext could not be called in
Futures in general. It seems that I was right with that assumption,
although I tried and I got the confirmation I needed. Unfortunately, I
don't have a reproducer, but I would say that it's enough to create one
Future and call sparkContext from there.

Thanks again for the answers.

Kind regards,
Marco

2016-01-18 19:37 GMT+01:00 Shixiong(Ryan) Zhu :

> Hey Marco,
>
> Since the codes in Future is in an asynchronous way, you cannot call
> "sparkContext.stop" at the end of "fetch" because the codes in Future may
> not finish.
>
> However, the exception seems weird. Do you have a simple reproducer?
>
>
> On Mon, Jan 18, 2016 at 9:13 AM, Ted Yu  wrote:
>
>>   externalCallTwo map { dataTwo =>
>> println("in map") // prints, so it gets here ...
>> val rddOne = sparkContext.parallelize(dataOne)
>>
>> I don't think you should call method on sparkContext in map function.
>> sparkContext lives on driver side.
>>
>> Cheers
>>
>> On Mon, Jan 18, 2016 at 6:27 AM, Marco  wrote:
>>
>>> Hello,
>>>
>>> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
>>> issue with the SparkContext.
>>>
>>> Basically, I have an object that needs to do several things:
>>>
>>> - call an external service One (web api)
>>> - call an external service Two (another api)
>>> - read and produce an RDD from HDFS (Spark)
>>> - parallelize the data obtained in the first two calls
>>> - join these different rdds, do stuff with them...
>>>
>>> Now, I am trying to do it in an asynchronous way. This doesn't seem to
>>> work, though. My guess is that Spark doesn't see the calls to .parallelize,
>>> as they are made in different tasks (or Future, therefore this code is
>>> called before/later or maybe with an unset Context (can it be?)). I have
>>> tried different ways, one of these being the call to SparkEnv.set in the
>>> calls to flatMap and map (in the Future). However, all I get is Cannot call
>>> methods on a stopped SparkContext. It just doesnt'work - maybe I just
>>> misunderstood what it does, therefore I removed it.
>>>
>>> This is the code I have written so far:
>>>
>>> object Fetcher {
>>>
>>>   def fetch(name, master, ...) = {
>>> val externalCallOne: Future[WSResponse] = externalService1()
>>> val externalCallTwo: Future[String] = externalService2()
>>> // val sparkEnv = SparkEnv.get
>>> val config = new SparkConf()
>>> .setAppName(name)
>>> .set("spark.master", master)
>>> .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>
>>> val sparkContext = new SparkContext(config)
>>> //val sparkEnv = SparkEnv.get
>>>
>>> val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
>>>   // SparkEnv.set(sparkEnv)
>>>   externalCallTwo map { dataTwo =>
>>> println("in map") // prints, so it gets here ...
>>> val rddOne = sparkContext.parallelize(dataOne)
>>> val rddTwo = sparkContext.parallelize(dataTwo)
>>> // do stuff here ... foreach/println, and
>>>
>>> val joinedData = rddOne leftOuterJoin (rddTwo)
>>>   }
>>> }
>>> eventuallyJoinedData onSuccess { case success => ...  }
>>> eventuallyJoinedData onFailure { case error =>
>>> println(error.getMessage) }
>>> // sparkContext.stop
>>>   }
>>>
>>> }
>>> As you can see, I have also tried to comment the line to stop the
>>> context, but then I get another issue:
>>>
>>> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO
>>>  org.apache.spark.SparkContext - Starting job: count at Fetcher.scala:38
>>> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop
>>> - Selector.select() returned prematurely because
>>> Thread.currentThread().interrupt() was called. Use
>>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>>> 13:09:14.936 [Spark Context Cleaner] ERROR
>>> org.apache.spark.ContextCleaner - Error in cleaning thread
>>> java.lang.InterruptedException: null
>>> at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
>>> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>>> ~[na:1.8.0_65]
>>> at
>>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
>>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>>> at
>>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>>> [spark-core_2.10-1.5.1.jar:1.5.1]
>>> at 
>>> 

Re: has any one implemented TF_IDF using ML transformers?

2016-01-19 Thread Yanbo Liang
Hi Andy,

The equation to calculate IDF is:
idf = log((m + 1) / (d(t) + 1))
you can refer here:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala#L150

The equation to calculate TFIDF is:
TFIDF=TF * IDF
you can refer:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala#L226


Thanks
Yanbo

2016-01-19 7:05 GMT+08:00 Andy Davidson :

> Hi Yanbo
>
> I am using 1.6.0. I am having a hard of time trying to figure out what the
> exact equation is. I do not know Scala.
>
> I took a look a the source code URL  you provide. I do not know Scala
>
> override def transform(dataset: DataFrame): DataFrame = {
> transformSchema(dataset.schema, logging = true)
> val idf = udf { vec: Vector => idfModel.transform(vec) }
> dataset.withColumn($(outputCol), idf(col($(inputCol
> }
>
>
> You mentioned the doc is out of date.
> http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf
>
> Based on my understanding of the subject matter the equations in the java
> doc are correct. I could not find anything like the equations in the source
> code?
>
> IDF(t,D)=log|D|+1DF(t,D)+1,
>
> TFIDF(t,d,D)=TF(t,d)・IDF(t,D).
>
>
> I found the spark unit test org.apache.spark.mllib.feature.JavaTfIdfSuite
> the results do not match equation. (In general the unit test asserts seem
> incomplete).
>
>
>  I have created several small test example to try and figure out how to
> use NaiveBase, HashingTF, and IDF. The values of TFIDF,  theta,
> probabilities , … The result produced by spark not match the published
> results at
> http://nlp.stanford.edu/IR-book/html/htmledition/naive-bayes-text-classification-1.html
>
>
> Kind regards
>
> Andy
>
> private DataFrame createTrainingData() {
>
> // make sure we only use dictionarySize words
>
> JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(
>
> // 0 is Chinese
>
> // 1 in notChinese
>
> RowFactory.create(0, 0.0, Arrays.asList("Chinese",
> "Beijing", "Chinese")),
>
> RowFactory.create(1, 0.0, Arrays.asList("Chinese",
> "Chinese", "Shanghai")),
>
> RowFactory.create(2, 0.0, Arrays.asList("Chinese", "Macao"
> )),
>
> RowFactory.create(3, 1.0, Arrays.asList("Tokyo", "Japan",
> "Chinese";
>
>
>
> return createData(rdd);
>
> }
>
>
> private DataFrame createData(JavaRDD rdd) {
>
> StructField id = null;
>
> id = new StructField("id", DataTypes.IntegerType, false,
> Metadata.empty());
>
>
> StructField label = null;
>
> label = new StructField("label", DataTypes.DoubleType, false,
> Metadata.empty());
>
>
>
> StructField words = null;
>
> words = new StructField("words",
> DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty());
>
>
> StructType schema = new StructType(new StructField[] { id, label,
> words });
>
> DataFrame ret = sqlContext.createDataFrame(rdd, schema);
>
>
>
> return ret;
>
> }
>
>
>private DataFrame runPipleLineTF_IDF(DataFrame rawDF) {
>
> HashingTF hashingTF = new HashingTF()
>
> .setInputCol("words")
>
> .setOutputCol("tf")
>
> .setNumFeatures(dictionarySize);
>
>
>
> DataFrame termFrequenceDF = hashingTF.transform(rawDF);
>
>
>
> termFrequenceDF.cache(); // idf needs to make 2 passes over data
> set
>
> //val idf = new IDF(minDocFreq = 2).fit(tf)
>
> IDFModel idf = new IDF()
>
> //.setMinDocFreq(1) // our vocabulary has 6 words
> we hash into 7
>
> .setInputCol(hashingTF.getOutputCol())
>
> .setOutputCol("idf")
>
> .fit(termFrequenceDF);
>
>
>
> DataFrame ret = idf.transform(termFrequenceDF);
>
>
>
> return ret;
>
> }
>
>
> |-- id: integer (nullable = false)
>
>  |-- label: double (nullable = false)
>
>  |-- words: array (nullable = false)
>
>  ||-- element: string (containsNull = true)
>
>  |-- tf: vector (nullable = true)
>
>  |-- idf: vector (nullable = true)
>
>
>
> +---+-++-+---+
>
> |id |label|words   |tf   |idf
>   |
>
>
> +---+-++-+---+
>
> |0  |0.0  |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0])
> |(7,[1,2],[0.0,0.9162907318741551]) |
>
> |1  |0.0  |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0])
> |(7,[1,4],[0.0,0.9162907318741551]) |
>
> |2  |0.0  |[Chinese, Macao]

Different executor memory for different nodes

2016-01-19 Thread hemangshah
How to set different executor memory limits for different worker nodes? 

I'm using spark 1.5.2 in standalone deployment mode and launching using
scripts. The executor memory is set via 'spark.executor.memory' in
conf/spark-defaults.conf. This sets the same memory limit for all the worker
nodes. I would like to make it such one can set a different limit for
different nodes.

(Spark 1.5.2, ubuntu 14.04)

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Different-executor-memory-for-different-nodes-tp26005.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



storing query object

2016-01-19 Thread Gourav Sengupta
Hi,

I have a SPARK table (created from hiveContext) with couple of hundred
partitions and few thousand files.

When I run query on the table then spark spends a lot of time (as seen in
the pyspark output) to collect this files from the several partitions.
After this the query starts running.

Is there a way to store the object which has collected all these partitions
and files so that every time I restart the job I load this object instead
of taking  50 mins to just collect the files before starting to run the
query?


Please do let me know in case the question is not quite clear.

Regards,
Gourav Sengupta


Spark Dataset doesn't have api for changing columns

2016-01-19 Thread Milad khajavi
Hi Spark users,

when I want to map the result of count on groupBy, I need to convert the
result to Dataframe, then change the column names and map the result to new
case class, Why Spark Datatset API doesn't have direct functionality?

case class LogRow(id: String, location: String, time: Long)
case class KeyValue(key: (String, String), value: Long)

val log = LogRow("1", "a", 1) :: LogRow("1", "a", 2) :: LogRow("1", "b", 3)
:: LogRow("1", "a", 4) :: LogRow("1", "b", 5) :: LogRow("1", "b", 6) ::
LogRow("1", "c", 7) :: LogRow("2", "a", 1) :: LogRow("2", "b", 2) ::
LogRow("2", "b", 3) :: LogRow("2", "a", 4) :: LogRow("2", "a", 5) ::
LogRow("2", "a", 6) :: LogRow("2", "c", 7) :: Nil
log.toDS().groupBy(l => {
  (l.id, l.location)
}).count().toDF().toDF("key", "value").as[KeyValue].show

+-+-+
|  key|value|
+-+-+
|[1,a]|3|
|[1,b]|3|
|[1,c]|1|
|[2,a]|4|
|[2,b]|2|
|[2,c]|1|
+-+-+


-- 
Milād Khājavi
http://blog.khajavi.ir
Having the source means you can do it yourself.
I tried to change the world, but I couldn’t find the source code.


spark yarn client mode

2016-01-19 Thread Sanjeev Verma
Hi

Do I need to install spark on all the yarn cluster node if I want to submit
the job to yarn client?
is there any way exists in which I can spawn a spark job executors on the
cluster nodes where I have not installed spark.

Thanks
Sanjeev


RE: building spark 1.6 throws error Rscript: command not found

2016-01-19 Thread Sun, Rui
Hi, Mich,
Building Spark with SparkR profile enabled requires installation of R on your 
building machine.

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Tuesday, January 19, 2016 5:27 AM
To: Mich Talebzadeh
Cc: user @spark
Subject: Re: building spark 1.6 throws error Rscript: command not found

Please see:
http://www.jason-french.com/blog/2013/03/11/installing-r-in-linux/

On Mon, Jan 18, 2016 at 1:22 PM, Mich Talebzadeh 
> wrote:
./make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.6 -Phive 
-Phive-thriftserver -Pyarn


INFO] --- exec-maven-plugin:1.4.0:exec (sparkr-pkg) @ spark-core_2.10 ---
../R/install-dev.sh: line 40: Rscript: command not found
[INFO] 
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ... SUCCESS [  2.921 s]
[INFO] Spark Project Test Tags  SUCCESS [  2.921 s]
[INFO] Spark Project Launcher . SUCCESS [ 17.252 s]
[INFO] Spark Project Networking ... SUCCESS [  9.237 s]
[INFO] Spark Project Shuffle Streaming Service  SUCCESS [  4.969 s]
[INFO] Spark Project Unsafe ... SUCCESS [ 13.384 s]
[INFO] Spark Project Core . FAILURE [01:34 min]


How can I resolve this by any chance?


Thanks


Dr Mich Talebzadeh

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

Sybase ASE 15 Gold Medal Award 2008
A Winning Strategy: Running the most Critical Financial Data on ASE 15
http://login.sybase.com/files/Product_Overviews/ASE-Winning-Strategy-091908.pdf
Author of the books "A Practitioner’s Guide to Upgrading to Sybase ASE 15", 
ISBN 978-0-9563693-0-7.
co-author "Sybase Transact SQL Guidelines Best Practices", ISBN 
978-0-9759693-0-4
Publications due shortly:
Complex Event Processing in Heterogeneous Environments, ISBN: 978-0-9563693-3-8
Oracle and Sybase, Concepts and Contrasts, ISBN: 978-0-9563693-1-4, volume one 
out shortly

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.




Re: using spark context in map funciton TASk not serilizable error

2016-01-19 Thread Ricardo Paiva
Did you try SparkContext.getOrCreate() ?

You don't need to pass the sparkContext to the map function, you can
retrieve it from the SparkContext singleton.

Regards,

Ricardo


On Mon, Jan 18, 2016 at 6:29 PM, gpatcham [via Apache Spark User List] <
ml-node+s1001560n25998...@n3.nabble.com> wrote:

> Hi,
>
> I have a use case where I need to pass sparkcontext in map function
>
> reRDD.map(row =>method1(row,sc)).saveAsTextFile(outputDir)
>
> Method1 needs spark context to query cassandra. But I see below error
>
> java.io.NotSerializableException: org.apache.spark.SparkContext
>
> Is there a way we can fix this ?
>
> Thanks
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/using-spark-context-in-map-funciton-TASk-not-serilizable-error-tp25998.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>



-- 
Ricardo Paiva
Big Data
*globo.com* 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-spark-context-in-map-funciton-TASk-not-serilizable-error-tp25998p26006.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RDD immutablility

2016-01-19 Thread ddav
Hi,

Certain API's (map, mapValues) give the developer access to the data stored
in RDD's. 
Am I correct in saying that these API's must never modify the data but
always return a new object with a copy of the data if the data needs to be
updated for the returned RDD.

Thanks,
Dave.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-immutablility-tp26007.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark SQL -Hive transactions support

2016-01-19 Thread Hemang Nagar
Do you have any plans for supporting hive transactions in Spark?

 

From: Michael Armbrust [mailto:mich...@databricks.com] 
Sent: Tuesday, January 19, 2016 3:18 PM
To: hnagar 
Cc: user 
Subject: Re: Spark SQL -Hive transactions support

 

We don't support Hive style transaction.

 

On Tue, Jan 19, 2016 at 11:32 AM, hnagar  > wrote:

Hive has transactions support since version 0.14.

I am using Spark 1.6, and Hive 1.2.1, are transactions supported in Spark
SQL now. I tried in the Spark-Shell and it gives the following error

org.apache.spark.sql.AnalysisException:
Unsupported language features in query: insert into test values(1, 'john',
'dong')

I am wondering if its a configuration issue, or this is still not supported
in Spark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Hive-transactions-support-tp26012.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 
For additional commands, e-mail: user-h...@spark.apache.org 
 

 



Re: spark-1.2.0--standalone-ha-zookeeper

2016-01-19 Thread Raghvendra Singh
Hi, there is one question. In spark-env.sh should i specify all masters for
parameter SPARK_MASTER_IP. I've set SPARK_DAEMON_JAVA_OPTS already with
zookeeper configuration as specified in spark documentation.

Thanks & Regards
Raghvendra

On Wed, Jan 20, 2016 at 1:46 AM, Raghvendra Singh <
raghvendra.ii...@gmail.com> wrote:

> Here's the complete master log on reproducing the error
> http://pastebin.com/2YJpyBiF
>
> Regards
> Raghvendra
>
> On Wed, Jan 20, 2016 at 12:38 AM, Raghvendra Singh <
> raghvendra.ii...@gmail.com> wrote:
>
>> Ok I Will try to reproduce the problem. Also I don't think this is an
>> uncommon problem I am searching for this problem on Google for many days
>> and found lots of questions but no answers.
>>
>> Do you know what kinds of settings spark and zookeeper allow for handling
>> time outs during leader election etc. When one is down.
>>
>> Regards
>> Raghvendra
>> On 20-Jan-2016 12:28 am, "Ted Yu"  wrote:
>>
>>> Perhaps I don't have enough information to make further progress.
>>>
>>> On Tue, Jan 19, 2016 at 10:55 AM, Raghvendra Singh <
>>> raghvendra.ii...@gmail.com> wrote:
>>>
 I currently do not have access to those logs but there were only about
 five lines before this error. They were the same which are present usually
 when everything works fine.

 Can you still help?

 Regards
 Raghvendra
 On 18-Jan-2016 8:50 pm, "Ted Yu"  wrote:

> Can you pastebin master log before the error showed up ?
>
> The initial message was posted for Spark 1.2.0
> Which release of Spark / zookeeper do you use ?
>
> Thanks
>
> On Mon, Jan 18, 2016 at 6:47 AM, doctorx 
> wrote:
>
>> Hi,
>> I am facing the same issue, with the given error
>>
>> ERROR Master:75 - Leadership has been revoked -- master shutting down.
>>
>> Can anybody help. Any clue will be useful. Should i change something
>> in
>> spark cluster or zookeeper. Is there any setting in spark which can
>> help me?
>>
>> Thanks & Regards
>> Raghvendra
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-2-0-standalone-ha-zookeeper-tp21308p25994.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>>>
>


Re: Docker/Mesos with Spark

2016-01-19 Thread Sathish Kumaran Vairavelu
Hi Tim

Do you have any materials/blog for running Spark in a container in Mesos
cluster environment? I have googled it but couldn't find info on it. Spark
documentation says it is possible, but no details provided.. Please help


Thanks

Sathish



On Mon, Sep 21, 2015 at 11:54 AM Tim Chen  wrote:

> Hi John,
>
> There is no other blog post yet, I'm thinking to do a series of posts but
> so far haven't get time to do that yet.
>
> Running Spark in docker containers makes distributing spark versions easy,
> it's simple to upgrade and automatically caches on the slaves so the same
> image just runs right away. Most of the docker perf is usually related to
> network and filesystem overheads, but I think with recent changes in Spark
> to make Mesos sandbox the default temp dir filesystem won't be a big
> concern as it's mostly writing to the mounted in Mesos sandbox. Also Mesos
> uses host network by default so network is affected much.
>
> Most of the cluster mode limitation is that you need to make the spark job
> files available somewhere that all the slaves can access remotely (http,
> s3, hdfs, etc) or available on all slaves locally by path.
>
> I'll try to make more doc efforts once I get my existing patches and
> testing infra work done.
>
> Let me know if you have more questions,
>
> Tim
>
> On Sat, Sep 19, 2015 at 5:42 AM, John Omernik  wrote:
>
>> I was searching in the 1.5.0 docs on the Docker on Mesos capabilities and
>> just found you CAN run it this way.  Are there any user posts, blog posts,
>> etc on why and how you'd do this?
>>
>> Basically, at first I was questioning why you'd run spark in a docker
>> container, i.e., if you run with tar balled executor, what are you really
>> gaining?  And in this setup, are you losing out on performance somehow? (I
>> am guessing smarter people than I have figured that out).
>>
>> Then I came along a situation where I wanted to use a python library with
>> spark, and it had to be installed on every node, and I realized one big
>> advantage of dockerized spark would be that spark apps that needed other
>> libraries could be contained and built well.
>>
>> OK, that's huge, let's do that.  For my next question there are lot of
>> "questions" have on how this actually works.  Does Clustermode/client mode
>> apply here? If so, how?  Is there a good walk through on getting this
>> setup? Limitations? Gotchas?  Should I just dive in an start working with
>> it? Has anyone done any stories/rough documentation? This seems like a
>> really helpful feature to scaling out spark, and letting developers truly
>> build what they need without tons of admin overhead, so I really want to
>> explore.
>>
>> Thanks!
>>
>> John
>>
>
>


Re: OOM on yarn-cluster mode

2016-01-19 Thread Saisai Shao
You could try increase the driver memory by "--driver-memory", looks like
the OOM is came from driver side, so the simple solution is to increase the
memory of driver.

On Tue, Jan 19, 2016 at 1:15 PM, Julio Antonio Soto  wrote:

> Hi,
>
> I'm having trouble when uploadig spark jobs in yarn-cluster mode. While
> the job works and completes in yarn-client mode, I hit the following error
> when using spark-submit in yarn-cluster (simplified):
>
> 16/01/19 21:43:31 INFO hive.metastore: Connected to metastore.
> 16/01/19 21:43:32 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> 16/01/19 21:43:32 INFO session.SessionState: Created local directory: 
> /yarn/nm/usercache/julio/appcache/application_1453120455858_0040/container_1453120455858_0040_01_01/tmp/77350a02-d900-4c84-9456-134305044d21_resources
> 16/01/19 21:43:32 INFO session.SessionState: Created HDFS directory: 
> /tmp/hive/nobody/77350a02-d900-4c84-9456-134305044d21
> 16/01/19 21:43:32 INFO session.SessionState: Created local directory: 
> /yarn/nm/usercache/julio/appcache/application_1453120455858_0040/container_1453120455858_0040_01_01/tmp/nobody/77350a02-d900-4c84-9456-134305044d21
> 16/01/19 21:43:32 INFO session.SessionState: Created HDFS directory: 
> /tmp/hive/nobody/77350a02-d900-4c84-9456-134305044d21/_tmp_space.db
> 16/01/19 21:43:32 INFO parquet.ParquetRelation: Listing 
> hdfs://namenode01:8020/user/julio/PFM/CDRs_parquet_np on driver
> 16/01/19 21:43:33 INFO spark.SparkContext: Starting job: table at 
> code.scala:13
> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Got job 0 (table at 
> code.scala:13) with 8 output partitions
> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Final stage: ResultStage 
> 0(table at code.scala:13)
> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Parents of final stage: List()
> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Missing parents: List()
> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Submitting ResultStage 0 
> (MapPartitionsRDD[1] at table at code.scala:13), which has no missing parents
> Exception in thread "dag-scheduler-event-loop"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "dag-scheduler-event-loop"
> Exception in thread "SparkListenerBus"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "SparkListenerBus"
>
> It happens with whatever program I build, for example:
>
> object MainClass {
> def main(args:Array[String]):Unit = {
> val conf = (new org.apache.spark.SparkConf()
>  .setAppName("test")
>  )
>
> val sc = new org.apache.spark.SparkContext(conf)
> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> val rdd = (sqlContext.read.table("cdrs_np")
> .na.drop(how="any")
> .map(_.toSeq.map(y=>y.toString))
> .map(x=>(x.head,x.tail)
> )
>
> rdd.saveAsTextFile(args(0))
> }
> }
>
> The command I'm using in spark-submit is the following:
>
> spark-submit --master yarn \
>  --deploy-mode cluster \
>  --driver-memory 1G \
>  --executor-memory 3000m \
>  --executor-cores 1 \
>  --num-executors 8 \
>  --class MainClass \
>  spark-yarn-cluster-test_2.10-0.1.jar \
>  hdfs://namenode01/etl/test
>
> I've got more than enough resources in my cluster in order to run the job
> (in fact, the exact same command works in --deploy-mode client).
>
> I tried to increase yarn.app.mapreduce.am.resource.mb to 2GB, but that
> didn't work. I guess there is another parameter I should tweak, but I have
> not found any info whatsoever in the Internet.
>
> I'm running Spark 1.5.2 and YARN from Hadoop 2.6.0-cdh5.5.1.
>
>
> Any help would be greatly appreciated!
>
> Thank you.
>
> --
> Julio Antonio Soto de Vicente
>


Re: Split columns in RDD

2016-01-19 Thread Richard Siebeling
thanks Daniel, this will certainly help,
regards, Richard

On Tue, Jan 19, 2016 at 6:35 PM, Daniel Imberman 
wrote:

> edit 2: filter should be map
>
> val numColumns = separatedInputStrings.map{ case(id, (stateList,
> numStates)) => numStates}.reduce(math.max)
>
> On Tue, Jan 19, 2016 at 8:19 AM Daniel Imberman 
> wrote:
>
>> edit: Mistake in the second code example
>>
>> val numColumns = separatedInputStrings.filter{ case(id, (stateList,
>> numStates)) => numStates}.reduce(math.max)
>>
>>
>> On Tue, Jan 19, 2016 at 8:17 AM Daniel Imberman <
>> daniel.imber...@gmail.com> wrote:
>>
>>> Hi Richard,
>>>
>>> If I understand the question correctly it sounds like you could probably
>>> do this using mapValues (I'm assuming that you want two pieces of
>>> information out of all rows, the states as individual items, and the number
>>> of states in the row)
>>>
>>>
>>> val separatedInputStrings = input:RDD[(Int, String).mapValues{
>>> val inputsString = "TX,NV,WY"
>>> val stringList = inputString.split(",")
>>> (stringList, stringList.size)
>>> }
>>>
>>> If you then wanted to find out how many state columns you should have in
>>> your table you could use a normal reduce (with a filter beforehand to
>>> reduce how much data you are shuffling)
>>>
>>> val numColumns = separatedInputStrings.filter(_._2).reduce(math.max)
>>>
>>> I hope this helps!
>>>
>>>
>>>
>>> On Tue, Jan 19, 2016 at 8:05 AM Richard Siebeling 
>>> wrote:
>>>
 that's true and that's the way we're doing it now but then we're only
 using the first row to determine the number of splitted columns.
 It could be that in the second (or last) row there are 10 new columns
 and we'd like to know that too.

 Probably a reduceby operator can be used to do that, but I'm hoping
 that there is a better or another way,

 thanks,
 Richard

 On Tue, Jan 19, 2016 at 4:22 PM, Sabarish Sasidharan <
 sabarish.sasidha...@manthan.com> wrote:

> The most efficient to determine the number of columns would be to do a
> take(1) and split in the driver.
>
> Regards
> Sab
> On 19-Jan-2016 8:48 pm, "Richard Siebeling" 
> wrote:
>
>> Hi,
>>
>> what is the most efficient way to split columns and know how many
>> columns are created.
>>
>> Here is the current RDD
>> -
>> ID   STATE
>> -
>> 1   TX, NY, FL
>> 2   CA, OH
>> -
>>
>> This is the preferred output:
>> -
>> IDSTATE_1 STATE_2  STATE_3
>> -
>> 1 TX  NY  FL
>> 2 CA  OH
>> -
>>
>> With a separated with the new columns STATE_1, STATE_2, STATE_3
>>
>>
>> It looks like the following output is feasible using a ReduceBy
>> operator
>> -
>> IDSTATE_1 STATE_2  STATE_3   NEW_COLUMNS
>> -
>> 1 TXNY   FLSTATE_1,
>> STATE_2, STATE_3
>> 2 CAOH STATE_1,
>> STATE_2
>> -
>>
>> Then in the reduce step, the distinct new columns can be calculated.
>> Is it possible to get the second output where next to the RDD the
>> new_columns are saved somewhere?
>> Or is the required to use the second approach?
>>
>> thanks in advance,
>> Richard
>>
>>



OOM on yarn-cluster mode

2016-01-19 Thread Julio Antonio Soto
Hi,

I'm having trouble when uploadig spark jobs in yarn-cluster mode. While the
job works and completes in yarn-client mode, I hit the following error when
using spark-submit in yarn-cluster (simplified):

16/01/19 21:43:31 INFO hive.metastore: Connected to metastore.
16/01/19 21:43:32 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
16/01/19 21:43:32 INFO session.SessionState: Created local directory:
/yarn/nm/usercache/julio/appcache/application_1453120455858_0040/container_1453120455858_0040_01_01/tmp/77350a02-d900-4c84-9456-134305044d21_resources
16/01/19 21:43:32 INFO session.SessionState: Created HDFS directory:
/tmp/hive/nobody/77350a02-d900-4c84-9456-134305044d21
16/01/19 21:43:32 INFO session.SessionState: Created local directory:
/yarn/nm/usercache/julio/appcache/application_1453120455858_0040/container_1453120455858_0040_01_01/tmp/nobody/77350a02-d900-4c84-9456-134305044d21
16/01/19 21:43:32 INFO session.SessionState: Created HDFS directory:
/tmp/hive/nobody/77350a02-d900-4c84-9456-134305044d21/_tmp_space.db
16/01/19 21:43:32 INFO parquet.ParquetRelation: Listing
hdfs://namenode01:8020/user/julio/PFM/CDRs_parquet_np on driver
16/01/19 21:43:33 INFO spark.SparkContext: Starting job: table at code.scala:13
16/01/19 21:43:33 INFO scheduler.DAGScheduler: Got job 0 (table at
code.scala:13) with 8 output partitions
16/01/19 21:43:33 INFO scheduler.DAGScheduler: Final stage:
ResultStage 0(table at code.scala:13)
16/01/19 21:43:33 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/01/19 21:43:33 INFO scheduler.DAGScheduler: Missing parents: List()
16/01/19 21:43:33 INFO scheduler.DAGScheduler: Submitting ResultStage
0 (MapPartitionsRDD[1] at table at code.scala:13), which has no
missing parents
Exception in thread "dag-scheduler-event-loop"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "dag-scheduler-event-loop"
Exception in thread "SparkListenerBus"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "SparkListenerBus"

It happens with whatever program I build, for example:

object MainClass {
def main(args:Array[String]):Unit = {
val conf = (new org.apache.spark.SparkConf()
 .setAppName("test")
 )

val sc = new org.apache.spark.SparkContext(conf)
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

val rdd = (sqlContext.read.table("cdrs_np")
.na.drop(how="any")
.map(_.toSeq.map(y=>y.toString))
.map(x=>(x.head,x.tail)
)

rdd.saveAsTextFile(args(0))
}
}

The command I'm using in spark-submit is the following:

spark-submit --master yarn \
 --deploy-mode cluster \
 --driver-memory 1G \
 --executor-memory 3000m \
 --executor-cores 1 \
 --num-executors 8 \
 --class MainClass \
 spark-yarn-cluster-test_2.10-0.1.jar \
 hdfs://namenode01/etl/test

I've got more than enough resources in my cluster in order to run the job
(in fact, the exact same command works in --deploy-mode client).

I tried to increase yarn.app.mapreduce.am.resource.mb to 2GB, but that
didn't work. I guess there is another parameter I should tweak, but I have
not found any info whatsoever in the Internet.

I'm running Spark 1.5.2 and YARN from Hadoop 2.6.0-cdh5.5.1.


Any help would be greatly appreciated!

Thank you.

-- 
Julio Antonio Soto de Vicente


GraphX: Easy way to build fully connected grid-graph

2016-01-19 Thread benjamin.naujoks
Hello,

I was wondering if there exists some elegant way to build a fully connected
grid-graph.
The standard grid-graph function only creates one, where a vertex is
connected to the vertices of row+1 and column+1. I need for my algorithm
that every vertex is connected to the vertices of row-1, row+1, column-1 and
column+1.

I already searched a lot with google. Therefore, I hope somebody can give me
a good hint.

Thank you a lot in advance.

Best regards,

Benjamin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Easy-way-to-build-fully-connected-grid-graph-tp26013.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Docker/Mesos with Spark

2016-01-19 Thread Tim Chen
Hi Sathish,

Sorry about that, I think that's a good idea and I'll write up a section in
the Spark documentation page to explain how it can work. We (Mesosphere)
have been doing this for our DCOS spark for our past releases and has been
working well so far.

Thanks!

Tim

On Tue, Jan 19, 2016 at 12:28 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Hi Tim
>
> Do you have any materials/blog for running Spark in a container in Mesos
> cluster environment? I have googled it but couldn't find info on it. Spark
> documentation says it is possible, but no details provided.. Please help
>
>
> Thanks
>
> Sathish
>
>
>
>
> On Mon, Sep 21, 2015 at 11:54 AM Tim Chen  wrote:
>
>> Hi John,
>>
>> There is no other blog post yet, I'm thinking to do a series of posts but
>> so far haven't get time to do that yet.
>>
>> Running Spark in docker containers makes distributing spark versions
>> easy, it's simple to upgrade and automatically caches on the slaves so the
>> same image just runs right away. Most of the docker perf is usually related
>> to network and filesystem overheads, but I think with recent changes in
>> Spark to make Mesos sandbox the default temp dir filesystem won't be a big
>> concern as it's mostly writing to the mounted in Mesos sandbox. Also Mesos
>> uses host network by default so network is affected much.
>>
>> Most of the cluster mode limitation is that you need to make the spark
>> job files available somewhere that all the slaves can access remotely
>> (http, s3, hdfs, etc) or available on all slaves locally by path.
>>
>> I'll try to make more doc efforts once I get my existing patches and
>> testing infra work done.
>>
>> Let me know if you have more questions,
>>
>> Tim
>>
>> On Sat, Sep 19, 2015 at 5:42 AM, John Omernik  wrote:
>>
>>> I was searching in the 1.5.0 docs on the Docker on Mesos capabilities
>>> and just found you CAN run it this way.  Are there any user posts, blog
>>> posts, etc on why and how you'd do this?
>>>
>>> Basically, at first I was questioning why you'd run spark in a docker
>>> container, i.e., if you run with tar balled executor, what are you really
>>> gaining?  And in this setup, are you losing out on performance somehow? (I
>>> am guessing smarter people than I have figured that out).
>>>
>>> Then I came along a situation where I wanted to use a python library
>>> with spark, and it had to be installed on every node, and I realized one
>>> big advantage of dockerized spark would be that spark apps that needed
>>> other libraries could be contained and built well.
>>>
>>> OK, that's huge, let's do that.  For my next question there are lot of
>>> "questions" have on how this actually works.  Does Clustermode/client mode
>>> apply here? If so, how?  Is there a good walk through on getting this
>>> setup? Limitations? Gotchas?  Should I just dive in an start working with
>>> it? Has anyone done any stories/rough documentation? This seems like a
>>> really helpful feature to scaling out spark, and letting developers truly
>>> build what they need without tons of admin overhead, so I really want to
>>> explore.
>>>
>>> Thanks!
>>>
>>> John
>>>
>>
>>


Appending filename information to RDD initialized by sc.textFile

2016-01-19 Thread Femi Anthony
 I  have a set of log files I would like to read into an RDD. These files
are all compressed .gz and are the filenames are date stamped. The source
of these files is the page view statistics data for wikipedia

http://dumps.wikimedia.org/other/pagecounts-raw/

The file names look like this:

pagecounts-20090501-00.gz
pagecounts-20090501-01.gz
pagecounts-20090501-02.gz

What I would like to do is read in all such files in a directory and
prepend the date from the filename (e.g. 20090501) to each row of the
resulting RDD. I first thought of using *sc.wholeTextFiles(..)* instead of
*sc.textFile(..)*, which creates a PairRDD with the key being the file name
with a path, but*sc.wholeTextFiles()* doesn't handle compressed .gz files.

Any suggestions would be welcome.

-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: Spark SQL -Hive transactions support

2016-01-19 Thread Elliot West
Hive's ACID feature (which introduces transactions) is not required for
inserts, only updates and deletes. Inserts should be supported on a vanilla
Hive shell. I'm not sure how Spark interacts with Hive in that regard but
perhaps the HiveSQLContext implementation is lacking support.

On a separate note, ACID is rarely configured out of the box and also
requires that your tables are declared with some specific attributes. This
might be worth checking but does not seem like the issue in your case:

https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions#HiveTransactions-Configuration

For streaming type processing it would be nice to integrate Spark with
Hive's streaming API which uses the ACID feature:

https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest

Additionally, updates and deletes could be supported by Spark by
integrating with the not yet released mutation API:

http://htmlpreview.github.io/?https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html

Elliot.

On Tuesday, 19 January 2016, hnagar  wrote:

> Hive has transactions support since version 0.14.
>
> I am using Spark 1.6, and Hive 1.2.1, are transactions supported in Spark
> SQL now. I tried in the Spark-Shell and it gives the following error
>
> org.apache.spark.sql.AnalysisException:
> Unsupported language features in query: insert into test values(1, 'john',
> 'dong')
>
> I am wondering if its a configuration issue, or this is still not supported
> in Spark.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Hive-transactions-support-tp26012.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


dataframe access hive complex type

2016-01-19 Thread pth001

Hi,

How dataframe (What API) can access hive complex type (Struct, Array, Maps)?

Thanks,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL -Hive transactions support

2016-01-19 Thread hnagar
Hive has transactions support since version 0.14. 

I am using Spark 1.6, and Hive 1.2.1, are transactions supported in Spark
SQL now. I tried in the Spark-Shell and it gives the following error

org.apache.spark.sql.AnalysisException:
Unsupported language features in query: insert into test values(1, 'john',
'dong')

I am wondering if its a configuration issue, or this is still not supported
in Spark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Hive-transactions-support-tp26012.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL -Hive transactions support

2016-01-19 Thread Michael Armbrust
We don't support Hive style transaction.

On Tue, Jan 19, 2016 at 11:32 AM, hnagar  wrote:

> Hive has transactions support since version 0.14.
>
> I am using Spark 1.6, and Hive 1.2.1, are transactions supported in Spark
> SQL now. I tried in the Spark-Shell and it gives the following error
>
> org.apache.spark.sql.AnalysisException:
> Unsupported language features in query: insert into test values(1, 'john',
> 'dong')
>
> I am wondering if its a configuration issue, or this is still not supported
> in Spark.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Hive-transactions-support-tp26012.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark 1.6.0 on ec2 doesn't work

2016-01-19 Thread Calvin Jia
Hi Oleg,

The Tachyon related issue should be fixed.

Hope this helps,
Calvin

On Mon, Jan 18, 2016 at 2:51 AM, Oleg Ruchovets 
wrote:

> Hi ,
>I try to follow the spartk 1.6.0 to install spark on EC2.
>
> It doesn't work properly -  got exceptions and at the end standalone spark
> cluster installed.
> here is log information:
>
> Any suggestions?
>
> Thanks
> Oleg.
>
> oleg@robinhood:~/install/spark-1.6.0-bin-hadoop2.6/ec2$ ./spark-ec2
> --key-pair=CC-ES-Demo
>  
> --identity-file=/home/oleg/work/entity_extraction_framework/ec2_pem_key/CC-ES-Demo.pem
> --region=us-east-1 --zone=us-east-1a --spot-price=0.05   -s 5
> --spark-version=1.6.0launch entity-extraction-spark-cluster
> Setting up security groups...
> Searching for existing cluster entity-extraction-spark-cluster in region
> us-east-1...
> Spark AMI: ami-5bb18832
> Launching instances...
> Requesting 5 slaves as spot instances with price $0.050
> Waiting for spot instances to be granted...
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> All 5 slaves granted
> Launched master in us-east-1a, regid = r-9384033f
> Waiting for AWS to propagate instance metadata...
> Waiting for cluster to enter 'ssh-ready' state..
>
> Warning: SSH connection error. (This could be temporary.)
> Host: ec2-52-90-186-83.compute-1.amazonaws.com
> SSH return code: 255
> SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com
> port 22: Connection refused
>
> .
>
> Warning: SSH connection error. (This could be temporary.)
> Host: ec2-52-90-186-83.compute-1.amazonaws.com
> SSH return code: 255
> SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com
> port 22: Connection refused
>
> .
>
> Warning: SSH connection error. (This could be temporary.)
> Host: ec2-52-90-186-83.compute-1.amazonaws.com
> SSH return code: 255
> SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com
> port 22: Connection refused
>
> .
> Cluster is now in 'ssh-ready' state. Waited 442 seconds.
> Generating cluster's SSH key on master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed.
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Transferring cluster's SSH key to slaves...
> ec2-54-165-243-74.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-165-243-74.compute-1.amazonaws.com,54.165.243.74'
> (ECDSA) to the list of known hosts.
> ec2-54-88-245-107.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-88-245-107.compute-1.amazonaws.com,54.88.245.107'
> (ECDSA) to the list of known hosts.
> ec2-54-172-29-47.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-172-29-47.compute-1.amazonaws.com,54.172.29.47'
> (ECDSA) to the list of known hosts.
> ec2-54-165-131-210.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-165-131-210.compute-1.amazonaws.com,54.165.131.210'
> (ECDSA) to the list of known hosts.
> ec2-54-172-46-184.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-172-46-184.compute-1.amazonaws.com,54.172.46.184'
> (ECDSA) to the list of known hosts.
> Cloning spark-ec2 scripts from
> https://github.com/amplab/spark-ec2/tree/branch-1.5 on master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Cloning into 'spark-ec2'...
> remote: Counting objects: 2068, done.
> remote: Total 2068 (delta 0), reused 0 (delta 0), pack-reused 2068
> Receiving objects: 100% (2068/2068), 349.76 KiB, done.
> Resolving deltas: 100% (796/796), done.
> Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed.
> Deploying files to master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> sending incremental file list
> root/spark-ec2/ec2-variables.sh
>
> sent 1,835 bytes  received 40 bytes  416.67 bytes/sec
> total size is 1,684  speedup is 0.90
> Running setup on master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed.
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Setting up Spark on ip-172-31-24-124.ec2.internal...
> Setting executable permissions on scripts...
> RSYNC'ing /root/spark-ec2 to other cluster nodes...
> 

Re: spark-1.2.0--standalone-ha-zookeeper

2016-01-19 Thread Raghvendra Singh
Here's the complete master log on reproducing the error
http://pastebin.com/2YJpyBiF

Regards
Raghvendra

On Wed, Jan 20, 2016 at 12:38 AM, Raghvendra Singh <
raghvendra.ii...@gmail.com> wrote:

> Ok I Will try to reproduce the problem. Also I don't think this is an
> uncommon problem I am searching for this problem on Google for many days
> and found lots of questions but no answers.
>
> Do you know what kinds of settings spark and zookeeper allow for handling
> time outs during leader election etc. When one is down.
>
> Regards
> Raghvendra
> On 20-Jan-2016 12:28 am, "Ted Yu"  wrote:
>
>> Perhaps I don't have enough information to make further progress.
>>
>> On Tue, Jan 19, 2016 at 10:55 AM, Raghvendra Singh <
>> raghvendra.ii...@gmail.com> wrote:
>>
>>> I currently do not have access to those logs but there were only about
>>> five lines before this error. They were the same which are present usually
>>> when everything works fine.
>>>
>>> Can you still help?
>>>
>>> Regards
>>> Raghvendra
>>> On 18-Jan-2016 8:50 pm, "Ted Yu"  wrote:
>>>
 Can you pastebin master log before the error showed up ?

 The initial message was posted for Spark 1.2.0
 Which release of Spark / zookeeper do you use ?

 Thanks

 On Mon, Jan 18, 2016 at 6:47 AM, doctorx 
 wrote:

> Hi,
> I am facing the same issue, with the given error
>
> ERROR Master:75 - Leadership has been revoked -- master shutting down.
>
> Can anybody help. Any clue will be useful. Should i change something in
> spark cluster or zookeeper. Is there any setting in spark which can
> help me?
>
> Thanks & Regards
> Raghvendra
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-2-0-standalone-ha-zookeeper-tp21308p25994.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

>>


Re: Spark Dataset doesn't have api for changing columns

2016-01-19 Thread Michael Armbrust
In Spark 2.0 we are planning to combine DataFrame and Dataset so that all
the methods will be available on either class.

On Tue, Jan 19, 2016 at 3:42 AM, Milad khajavi  wrote:

> Hi Spark users,
>
> when I want to map the result of count on groupBy, I need to convert the
> result to Dataframe, then change the column names and map the result to new
> case class, Why Spark Datatset API doesn't have direct functionality?
>
> case class LogRow(id: String, location: String, time: Long)
> case class KeyValue(key: (String, String), value: Long)
>
> val log = LogRow("1", "a", 1) :: LogRow("1", "a", 2) :: LogRow("1", "b",
> 3) :: LogRow("1", "a", 4) :: LogRow("1", "b", 5) :: LogRow("1", "b", 6) ::
> LogRow("1", "c", 7) :: LogRow("2", "a", 1) :: LogRow("2", "b", 2) ::
> LogRow("2", "b", 3) :: LogRow("2", "a", 4) :: LogRow("2", "a", 5) ::
> LogRow("2", "a", 6) :: LogRow("2", "c", 7) :: Nil
> log.toDS().groupBy(l => {
>   (l.id, l.location)
> }).count().toDF().toDF("key", "value").as[KeyValue].show
>
> +-+-+
> |  key|value|
> +-+-+
> |[1,a]|3|
> |[1,b]|3|
> |[1,c]|1|
> |[2,a]|4|
> |[2,b]|2|
> |[2,c]|1|
> +-+-+
>
>
> --
> Milād Khājavi
> http://blog.khajavi.ir
> Having the source means you can do it yourself.
> I tried to change the world, but I couldn’t find the source code.
>


Re: Serializing DataSets

2016-01-19 Thread Simon Hafner
The occasional type error if the casting goes wrong for whatever reason.

2016-01-19 1:22 GMT+08:00 Michael Armbrust :
> What error?
>
> On Mon, Jan 18, 2016 at 9:01 AM, Simon Hafner  wrote:
>>
>> And for deserializing,
>> `sqlContext.read.parquet("path/to/parquet").as[T]` and catch the
>> error?
>>
>> 2016-01-14 3:43 GMT+08:00 Michael Armbrust :
>> > Yeah, thats the best way for now (note the conversion is purely logical
>> > so
>> > there is no cost of calling toDF()).  We'll likely be combining the
>> > classes
>> > in Spark 2.0 to remove this awkwardness.
>> >
>> > On Tue, Jan 12, 2016 at 11:20 PM, Simon Hafner 
>> > wrote:
>> >>
>> >> What's the proper way to write DataSets to disk? Convert them to a
>> >> DataFrame and use the writers there?
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: when enable kerberos in hdp, the spark does not work

2016-01-19 Thread Steve Loughran

On 18 Jan 2016, at 23:39, 李振 > 
wrote:

: java.io.IOException: java.net.ConnectException: Connection refused
at 
org.apache.hadoop.crypto.key.kms.KMSClientProvider.addDelegationTokens(KMSClientProvider.java:888)
at 
org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension.addDelegationTokens(KeyProviderDelegationTokenExtension.java:86)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2243)
at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)
at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:206)
at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)

That's a connection refused

http://wiki.apache.org/hadoop/ConnectionRefused

I believe the property you'd want to look at is hadoop.kms.key.provider.uri

https://hadoop.apache.org/docs/r2.7.0/hadoop-kms/index.html



Re: SparkR with Hive integration

2016-01-19 Thread Felix Cheung
You might need hive-site.xml



_
From: Peter Zhang 
Sent: Monday, January 18, 2016 9:08 PM
Subject: Re: SparkR with Hive integration
To: Jeff Zhang 
Cc:  


  Thanks,    
   I will try.   
   Peter 
  -- 
Google
Sent with Airmail
  

On January 19, 2016 at 12:44:46, Jeff Zhang (zjf...@gmail.com) wrote:   
  Please make sure you export environment variable 
HADOOP_CONF_DIR which contains the core-site.xml
On Mon, Jan 18, 2016 at 8:23 PM, Peter Zhang 
 wrote:
  Hi all,   
  
 
http://spark.apache.org/docs/latest/sparkr.html#sparkr-dataframes   
   From Hive tables   

 You can also create SparkR DataFrames from Hive tables. To do this we will 
need to create a HiveContext which can access tables in the Hive MetaStore. 
Note that Spark should have been built with Hive support and more details on 
the difference between SQLContext and HiveContext can be found in the SQL 
programming guide.# sc is an existing 
SparkContext.hiveContext <- sparkRHive.init(sc)sql(hiveContext, "CREATE TABLE 
IF NOT EXISTS src (key INT, value STRING)")sql(hiveContext, "LOAD DATA LOCAL 
INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")# Queries can be 
expressed in HiveQL.results <- sql(hiveContext, "FROM src SELECT key, value")# 
results is now a DataFramehead(results)## key value## 1 238 val_238## 2 86 
val_86## 3 311 val_311
   I use RStudio to run above command, when I run "  sql
  (  hiveContext  ,"CREATE TABLE IF NOT EXISTS src 
(key INT, value STRING)”  )”   
   I got exception: 16/01/19 12:11:51 INFO 
FileUtils: Creating directory if it doesn't exist: 
file:/user/hive/warehouse/src 16/01/19 12:11:51 ERROR DDLTask: 
org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:
file:/user/hive/warehouse/src is not a directory or unable to create one)   

   How  to use HDFS instead of local file 
system(file)?   Which parameter should to set?  
 
   Thanks a lot.
   
   
   Peter Zhang  
   -- 
 Google
 Sent with Airmail  
  


--   
Best Regards
 
 Jeff Zhang   


  

Re: Spark Cassandra Java Connector: records missing despite consistency=ALL

2016-01-19 Thread Femi Anthony
So is the logging to Cassandra being done via Spark ?

On Wed, Jan 13, 2016 at 7:17 AM, Dennis Birkholz 
wrote:

> Hi together,
>
> we Cassandra to log event data and process it every 15 minutes with Spark.
> We are using the Cassandra Java Connector for Spark.
>
> Randomly our Spark runs produce too few output records because no data is
> returned from Cassandra for a several minutes window of input data. When
> querying the data (with cqlsh), after multiple tries, the data eventually
> becomes available.
>
> To solve the problem, we tried to use consistency=ALL when reading the
> data in Spark. We use the
> CassandraJavaUtil.javafunctions().cassandraTable() method and have set
> "spark.cassandra.input.consistency.level"="ALL" on the config when creating
> the Spark context. The problem persists but according to
> http://stackoverflow.com/a/25043599 using a consistency level of ONE on
> the write side (which we use) and ALL on the READ side should be sufficient
> for data consistency.
>
> I would really appreciate if someone could give me a hint how to fix this
> problem, thanks!
>
> Greets,
> Dennis
>
> P.s.:
> some information about our setup:
> Cassandra 2.1.12 in a two Node configuration with replication factor=2
> Spark 1.5.1
> Cassandra Java Driver 2.2.0-rc3
> Spark Cassandra Java Connector 2.10-1.5.0-M2
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: is Hbase Scan really need thorough Get (Hbase+solr+spark)

2016-01-19 Thread ayan guha
It is not scanning the HBase. What it is doing is looping through your list
of Row keys and fetching data for each 1 at a time.

Ex: Your solr result has 5 records, with Row Keys R1...R5.
Then list will be [R1,R2,...R5]

Then table.get(list) will do something like:

res=[]
for k in list:
 v = getFromHbaseWithRowKey(k)## This is just for
illustration, there is no such function :)
  res.add(v)
return res

On Wed, Jan 20, 2016 at 10:09 AM, beeshma r  wrote:

> Hi
>
>  I trying to integrated Hbase-solr-spark.
> Solr  is indexing all the documents from Hbase through hbase-indexer .
> Through the Spark I am manipulating all datasets .Thing is after getting
> the solrdocuments from the solr query ,it has the  rowkey and rowvalues .So
> directly i got the rowkeys and corresponding values
>
> question is 'its really need once again scan Hbase table through Get with
> rowkey from solrdocument'?
>
> example code
>
> HTable table = new HTable(conf, "");
> Get get = null;
> List list = new ArrayList();
> String url =  " ";
> SolrServer server = new HttpSolrServer(url);
> SolrQuery query = new SolrQuery(" ");
> query.setStart(0);
> query.setRows(10);
> QueryResponse response = server.query(query);
> SolrDocumentList docs = response.getResults();
> for (SolrDocument doc : docs) {
> get = new Get(Bytes.toBytes((String) doc.getFieldValue("rowkey")));
>  list.add(get);
>
>   }
>
> *Result[] res = table.get(list);//This is really need? because it takes
> extra time to scan right?*
> This piece of code i got from
> http://www.programering.com/a/MTM5kDMwATI.html
>
> please correct if anything wrong :)
>
> Thanks
> Beesh
>
>


-- 
Best Regards,
Ayan Guha


Re: is Hbase Scan really need thorough Get (Hbase+solr+spark)

2016-01-19 Thread Ted Yu
get(List gets) will call:

  Object [] r1 = batch((List)gets);

where batch() would do:

AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions,
null, results);

ars.waitUntilDone();

multiAp is an AsyncProcess.

In short, client would access region server for the results.


FYI

On Tue, Jan 19, 2016 at 3:28 PM, ayan guha  wrote:

> It is not scanning the HBase. What it is doing is looping through your
> list of Row keys and fetching data for each 1 at a time.
>
> Ex: Your solr result has 5 records, with Row Keys R1...R5.
> Then list will be [R1,R2,...R5]
>
> Then table.get(list) will do something like:
>
> res=[]
> for k in list:
>  v = getFromHbaseWithRowKey(k)## This is just for
> illustration, there is no such function :)
>   res.add(v)
> return res
>
> On Wed, Jan 20, 2016 at 10:09 AM, beeshma r  wrote:
>
>> Hi
>>
>>  I trying to integrated Hbase-solr-spark.
>> Solr  is indexing all the documents from Hbase through hbase-indexer .
>> Through the Spark I am manipulating all datasets .Thing is after getting
>> the solrdocuments from the solr query ,it has the  rowkey and rowvalues .So
>> directly i got the rowkeys and corresponding values
>>
>> question is 'its really need once again scan Hbase table through Get with
>> rowkey from solrdocument'?
>>
>> example code
>>
>> HTable table = new HTable(conf, "");
>> Get get = null;
>> List list = new ArrayList();
>> String url =  " ";
>> SolrServer server = new HttpSolrServer(url);
>> SolrQuery query = new SolrQuery(" ");
>> query.setStart(0);
>> query.setRows(10);
>> QueryResponse response = server.query(query);
>> SolrDocumentList docs = response.getResults();
>> for (SolrDocument doc : docs) {
>> get = new Get(Bytes.toBytes((String) doc.getFieldValue("rowkey")));
>>  list.add(get);
>>
>>   }
>>
>> *Result[] res = table.get(list);//This is really need? because it takes
>> extra time to scan right?*
>> This piece of code i got from
>> http://www.programering.com/a/MTM5kDMwATI.html
>>
>> please correct if anything wrong :)
>>
>> Thanks
>> Beesh
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: OOM on yarn-cluster mode

2016-01-19 Thread Julio Antonio Soto de Vicente
Hi,

I tried with --driver-memory 16G (more than enough to read a simple parquet 
table), but the problem still persists.

Everything works fine in yarn-client.

--
Julio Antonio Soto de Vicente

> El 19 ene 2016, a las 22:18, Saisai Shao  escribió:
> 
> You could try increase the driver memory by "--driver-memory", looks like the 
> OOM is came from driver side, so the simple solution is to increase the 
> memory of driver.
> 
>> On Tue, Jan 19, 2016 at 1:15 PM, Julio Antonio Soto  wrote:
>> Hi,
>> 
>> I'm having trouble when uploadig spark jobs in yarn-cluster mode. While the 
>> job works and completes in yarn-client mode, I hit the following error when 
>> using spark-submit in yarn-cluster (simplified):
>> 16/01/19 21:43:31 INFO hive.metastore: Connected to metastore.
>> 16/01/19 21:43:32 WARN util.NativeCodeLoader: Unable to load native-hadoop 
>> library for your platform... using builtin-java classes where applicable
>> 16/01/19 21:43:32 INFO session.SessionState: Created local directory: 
>> /yarn/nm/usercache/julio/appcache/application_1453120455858_0040/container_1453120455858_0040_01_01/tmp/77350a02-d900-4c84-9456-134305044d21_resources
>> 16/01/19 21:43:32 INFO session.SessionState: Created HDFS directory: 
>> /tmp/hive/nobody/77350a02-d900-4c84-9456-134305044d21
>> 16/01/19 21:43:32 INFO session.SessionState: Created local directory: 
>> /yarn/nm/usercache/julio/appcache/application_1453120455858_0040/container_1453120455858_0040_01_01/tmp/nobody/77350a02-d900-4c84-9456-134305044d21
>> 16/01/19 21:43:32 INFO session.SessionState: Created HDFS directory: 
>> /tmp/hive/nobody/77350a02-d900-4c84-9456-134305044d21/_tmp_space.db
>> 16/01/19 21:43:32 INFO parquet.ParquetRelation: Listing 
>> hdfs://namenode01:8020/user/julio/PFM/CDRs_parquet_np on driver
>> 16/01/19 21:43:33 INFO spark.SparkContext: Starting job: table at 
>> code.scala:13
>> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Got job 0 (table at 
>> code.scala:13) with 8 output partitions
>> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Final stage: ResultStage 
>> 0(table at code.scala:13)
>> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Parents of final stage: List()
>> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Missing parents: List()
>> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Submitting ResultStage 0 
>> (MapPartitionsRDD[1] at table at code.scala:13), which has no missing parents
>> Exception in thread "dag-scheduler-event-loop" 
>> Exception: java.lang.OutOfMemoryError thrown from the 
>> UncaughtExceptionHandler in thread "dag-scheduler-event-loop"
>> Exception in thread "SparkListenerBus" 
>> Exception: java.lang.OutOfMemoryError thrown from the 
>> UncaughtExceptionHandler in thread "SparkListenerBus"
>> It happens with whatever program I build, for example:
>> 
>> object MainClass {
>> def main(args:Array[String]):Unit = {
>> val conf = (new org.apache.spark.SparkConf()
>>  .setAppName("test")
>>  )
>> 
>> val sc = new org.apache.spark.SparkContext(conf)
>> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> 
>> val rdd = (sqlContext.read.table("cdrs_np")
>> .na.drop(how="any")
>> .map(_.toSeq.map(y=>y.toString))
>> .map(x=>(x.head,x.tail)
>> )
>> 
>> rdd.saveAsTextFile(args(0))
>> }
>> }
>> 
>> The command I'm using in spark-submit is the following:
>> 
>> spark-submit --master yarn \
>>  --deploy-mode cluster \
>>  --driver-memory 1G \
>>  --executor-memory 3000m \
>>  --executor-cores 1 \
>>  --num-executors 8 \
>>  --class MainClass \
>>  spark-yarn-cluster-test_2.10-0.1.jar \
>>  hdfs://namenode01/etl/test
>> 
>> I've got more than enough resources in my cluster in order to run the job 
>> (in fact, the exact same command works in --deploy-mode client).
>> 
>> I tried to increase yarn.app.mapreduce.am.resource.mb to 2GB, but that 
>> didn't work. I guess there is another parameter I should tweak, but I have 
>> not found any info whatsoever in the Internet.
>> 
>> I'm running Spark 1.5.2 and YARN from Hadoop 2.6.0-cdh5.5.1.
>> 
>> 
>> Any help would be greatly appreciated!
>> 
>> Thank you.
>> 
>> -- 
>> Julio Antonio Soto de Vicente
> 


Re: Docker/Mesos with Spark

2016-01-19 Thread Sathish Kumaran Vairavelu
Thank you! Looking forward for it..


On Tue, Jan 19, 2016 at 4:03 PM Tim Chen  wrote:

> Hi Sathish,
>
> Sorry about that, I think that's a good idea and I'll write up a section
> in the Spark documentation page to explain how it can work. We (Mesosphere)
> have been doing this for our DCOS spark for our past releases and has been
> working well so far.
>
> Thanks!
>
> Tim
>
> On Tue, Jan 19, 2016 at 12:28 PM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> Hi Tim
>>
>> Do you have any materials/blog for running Spark in a container in Mesos
>> cluster environment? I have googled it but couldn't find info on it. Spark
>> documentation says it is possible, but no details provided.. Please help
>>
>>
>> Thanks
>>
>> Sathish
>>
>>
>>
>>
>> On Mon, Sep 21, 2015 at 11:54 AM Tim Chen  wrote:
>>
>>> Hi John,
>>>
>>> There is no other blog post yet, I'm thinking to do a series of posts
>>> but so far haven't get time to do that yet.
>>>
>>> Running Spark in docker containers makes distributing spark versions
>>> easy, it's simple to upgrade and automatically caches on the slaves so the
>>> same image just runs right away. Most of the docker perf is usually related
>>> to network and filesystem overheads, but I think with recent changes in
>>> Spark to make Mesos sandbox the default temp dir filesystem won't be a big
>>> concern as it's mostly writing to the mounted in Mesos sandbox. Also Mesos
>>> uses host network by default so network is affected much.
>>>
>>> Most of the cluster mode limitation is that you need to make the spark
>>> job files available somewhere that all the slaves can access remotely
>>> (http, s3, hdfs, etc) or available on all slaves locally by path.
>>>
>>> I'll try to make more doc efforts once I get my existing patches and
>>> testing infra work done.
>>>
>>> Let me know if you have more questions,
>>>
>>> Tim
>>>
>>> On Sat, Sep 19, 2015 at 5:42 AM, John Omernik  wrote:
>>>
 I was searching in the 1.5.0 docs on the Docker on Mesos capabilities
 and just found you CAN run it this way.  Are there any user posts, blog
 posts, etc on why and how you'd do this?

 Basically, at first I was questioning why you'd run spark in a docker
 container, i.e., if you run with tar balled executor, what are you really
 gaining?  And in this setup, are you losing out on performance somehow? (I
 am guessing smarter people than I have figured that out).

 Then I came along a situation where I wanted to use a python library
 with spark, and it had to be installed on every node, and I realized one
 big advantage of dockerized spark would be that spark apps that needed
 other libraries could be contained and built well.

 OK, that's huge, let's do that.  For my next question there are lot of
 "questions" have on how this actually works.  Does Clustermode/client mode
 apply here? If so, how?  Is there a good walk through on getting this
 setup? Limitations? Gotchas?  Should I just dive in an start working with
 it? Has anyone done any stories/rough documentation? This seems like a
 really helpful feature to scaling out spark, and letting developers truly
 build what they need without tons of admin overhead, so I really want to
 explore.

 Thanks!

 John

>>>
>>>
>


Re: Docker/Mesos with Spark

2016-01-19 Thread Darren Govoni


I also would be interested in some best practice for making this work.
Where will the writeup be posted? On mesosphere website?


Sent from my Verizon Wireless 4G LTE smartphone

 Original message 
From: Sathish Kumaran Vairavelu  
Date: 01/19/2016  7:00 PM  (GMT-05:00) 
To: Tim Chen  
Cc: John Omernik , user  
Subject: Re: Docker/Mesos with Spark 

Thank you! Looking forward for it..

On Tue, Jan 19, 2016 at 4:03 PM Tim Chen  wrote:
Hi Sathish,
Sorry about that, I think that's a good idea and I'll write up a section in the 
Spark documentation page to explain how it can work. We (Mesosphere) have been 
doing this for our DCOS spark for our past releases and has been working well 
so far.
Thanks!
Tim
On Tue, Jan 19, 2016 at 12:28 PM, Sathish Kumaran Vairavelu 
 wrote:
Hi Tim

Do you have any materials/blog for running Spark in a container in Mesos 
cluster environment? I have googled it but couldn't find info on it. Spark 
documentation says it is possible, but no details provided.. Please help


Thanks 

Sathish



On Mon, Sep 21, 2015 at 11:54 AM Tim Chen  wrote:
Hi John,
There is no other blog post yet, I'm thinking to do a series of posts but so 
far haven't get time to do that yet.
Running Spark in docker containers makes distributing spark versions easy, it's 
simple to upgrade and automatically caches on the slaves so the same image just 
runs right away. Most of the docker perf is usually related to network and 
filesystem overheads, but I think with recent changes in Spark to make Mesos 
sandbox the default temp dir filesystem won't be a big concern as it's mostly 
writing to the mounted in Mesos sandbox. Also Mesos uses host network by 
default so network is affected much.
Most of the cluster mode limitation is that you need to make the spark job 
files available somewhere that all the slaves can access remotely (http, s3, 
hdfs, etc) or available on all slaves locally by path. 
I'll try to make more doc efforts once I get my existing patches and testing 
infra work done.
Let me know if you have more questions,
Tim
On Sat, Sep 19, 2015 at 5:42 AM, John Omernik  wrote:
I was searching in the 1.5.0 docs on the Docker on Mesos capabilities and just 
found you CAN run it this way.  Are there any user posts, blog posts, etc on 
why and how you'd do this? 
Basically, at first I was questioning why you'd run spark in a docker 
container, i.e., if you run with tar balled executor, what are you really 
gaining?  And in this setup, are you losing out on performance somehow? (I am 
guessing smarter people than I have figured that out).  
Then I came along a situation where I wanted to use a python library with 
spark, and it had to be installed on every node, and I realized one big 
advantage of dockerized spark would be that spark apps that needed other 
libraries could be contained and built well.   
OK, that's huge, let's do that.  For my next question there are lot of 
"questions" have on how this actually works.  Does Clustermode/client mode 
apply here? If so, how?  Is there a good walk through on getting this setup? 
Limitations? Gotchas?  Should I just dive in an start working with it? Has 
anyone done any stories/rough documentation? This seems like a really helpful 
feature to scaling out spark, and letting developers truly build what they need 
without tons of admin overhead, so I really want to explore. 
Thanks!
John








spark dataframe jdbc read/write using dbcp connection pool

2016-01-19 Thread fightf...@163.com
Hi , 
I want to load really large volumn datasets from mysql using spark dataframe 
api. And then save as 
parquet file or orc file to facilitate that with hive / Impala. The datasets 
size is about 1 billion records and 
when I am using the following naive code to run that , Error occurs and 
executor lost failure.

val prop = new java.util.Properties
prop.setProperty("user","test")
prop.setProperty("password", "test")

val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)

val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf")

I can see from the executor log and the message is like the following. I can 
see from the log that the wait_timeout threshold reached 
and there is no retry mechanism in the code process. So I am asking you experts 
to help on tuning this. Or should I try to use a jdbc
connection pool to increase parallelism ? 

   16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 
0.0 (TID 0)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure

The last packet successfully received from the server was 377,769 milliseconds 
ago.  The last packet sent successfully to the server was 377,790 milliseconds 
ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

Caused by: java.io.EOFException: Can not read response from server. Expected to 
read 4 bytes, read 1 bytes before connection was unexpectedly lost.
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
... 22 more
16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
4
16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 
(TID 2)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure





fightf...@163.com


is Hbase Scan really need thorough Get (Hbase+solr+spark)

2016-01-19 Thread beeshma r
Hi

 I trying to integrated Hbase-solr-spark.
Solr  is indexing all the documents from Hbase through hbase-indexer .
Through the Spark I am manipulating all datasets .Thing is after getting
the solrdocuments from the solr query ,it has the  rowkey and rowvalues .So
directly i got the rowkeys and corresponding values

question is 'its really need once again scan Hbase table through Get with
rowkey from solrdocument'?

example code

HTable table = new HTable(conf, "");
Get get = null;
List list = new ArrayList();
String url =  " ";
SolrServer server = new HttpSolrServer(url);
SolrQuery query = new SolrQuery(" ");
query.setStart(0);
query.setRows(10);
QueryResponse response = server.query(query);
SolrDocumentList docs = response.getResults();
for (SolrDocument doc : docs) {
get = new Get(Bytes.toBytes((String) doc.getFieldValue("rowkey")));
 list.add(get);

  }

*Result[] res = table.get(list);//This is really need? because it takes
extra time to scan right?*
This piece of code i got from
http://www.programering.com/a/MTM5kDMwATI.html

please correct if anything wrong :)

Thanks
Beesh


Re: is Hbase Scan really need thorough Get (Hbase+solr+spark)

2016-01-19 Thread beeshma r
Thanks Ted, :)

if everything gets indexed  from Hbase into solr ,then no need to trace
Regionservers once again


Thanks
Beesh


On Wed, Jan 20, 2016 at 5:05 AM, Ted Yu  wrote:

> get(List gets) will call:
>
>   Object [] r1 = batch((List)gets);
>
> where batch() would do:
>
> AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions,
> null, results);
>
> ars.waitUntilDone();
>
> multiAp is an AsyncProcess.
>
> In short, client would access region server for the results.
>
>
> FYI
>
> On Tue, Jan 19, 2016 at 3:28 PM, ayan guha  wrote:
>
>> It is not scanning the HBase. What it is doing is looping through your
>> list of Row keys and fetching data for each 1 at a time.
>>
>> Ex: Your solr result has 5 records, with Row Keys R1...R5.
>> Then list will be [R1,R2,...R5]
>>
>> Then table.get(list) will do something like:
>>
>> res=[]
>> for k in list:
>>  v = getFromHbaseWithRowKey(k)## This is just for
>> illustration, there is no such function :)
>>   res.add(v)
>> return res
>>
>> On Wed, Jan 20, 2016 at 10:09 AM, beeshma r  wrote:
>>
>>> Hi
>>>
>>>  I trying to integrated Hbase-solr-spark.
>>> Solr  is indexing all the documents from Hbase through hbase-indexer .
>>> Through the Spark I am manipulating all datasets .Thing is after getting
>>> the solrdocuments from the solr query ,it has the  rowkey and rowvalues .So
>>> directly i got the rowkeys and corresponding values
>>>
>>> question is 'its really need once again scan Hbase table through Get
>>> with rowkey from solrdocument'?
>>>
>>> example code
>>>
>>> HTable table = new HTable(conf, "");
>>> Get get = null;
>>> List list = new ArrayList();
>>> String url =  " ";
>>> SolrServer server = new HttpSolrServer(url);
>>> SolrQuery query = new SolrQuery(" ");
>>> query.setStart(0);
>>> query.setRows(10);
>>> QueryResponse response = server.query(query);
>>> SolrDocumentList docs = response.getResults();
>>> for (SolrDocument doc : docs) {
>>> get = new Get(Bytes.toBytes((String) doc.getFieldValue("rowkey")));
>>>  list.add(get);
>>>
>>>   }
>>>
>>> *Result[] res = table.get(list);//This is really need? because it takes
>>> extra time to scan right?*
>>> This piece of code i got from
>>> http://www.programering.com/a/MTM5kDMwATI.html
>>>
>>> please correct if anything wrong :)
>>>
>>> Thanks
>>> Beesh
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


--


process of executing a program in a distributed environment without hadoop

2016-01-19 Thread Kamaruddin
I want to execute a program in a distributed environment without using hadoop
and only in spark cluster. What is the best way to use it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/process-of-executing-a-program-in-a-distributed-environment-without-hadoop-tp26015.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Appending filename information to RDD initialized by sc.textFile

2016-01-19 Thread Akhil Das
You can use the sc.newAPIHadoopFile and pass your own InputFormat and
RecordReader which will read the compressed .gz files to your usecase. For
a start, you can look at the:

- wholeTextFile implementation

- WholeTextFileInputFormat

- WholeTextFileRecordReader






Thanks
Best Regards

On Tue, Jan 19, 2016 at 11:48 PM, Femi Anthony  wrote:

>
>
>  I  have a set of log files I would like to read into an RDD. These files
> are all compressed .gz and are the filenames are date stamped. The source
> of these files is the page view statistics data for wikipedia
>
> http://dumps.wikimedia.org/other/pagecounts-raw/
>
> The file names look like this:
>
> pagecounts-20090501-00.gz
> pagecounts-20090501-01.gz
> pagecounts-20090501-02.gz
>
> What I would like to do is read in all such files in a directory and
> prepend the date from the filename (e.g. 20090501) to each row of the
> resulting RDD. I first thought of using *sc.wholeTextFiles(..)* instead of
>  *sc.textFile(..)*, which creates a PairRDD with the key being the file
> name with a path, but*sc.wholeTextFiles()* doesn't handle compressed .gz
> files.
>
> Any suggestions would be welcome.
>
> --
> http://www.femibyte.com/twiki5/bin/view/Tech/
> http://www.nextmatrix.com
> "Great spirits have always encountered violent opposition from mediocre
> minds." - Albert Einstein.
>


Re: How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala

2016-01-19 Thread Vishal Maru
It seems Spark is not able to serialize your function code to worker nodes.

I have tried to put a solution in simple set of commands. Maybe you can
combine last four line into function.


val arr = Array((1,"A","<20","0"), (1,"A",">20 & <40","1"), (1,"B",">20 &
<40","0"), (1,"C",">20 & <40","0"), (1,"C",">20 & <40","0"),
(2,"A","<20","0"), (3,"B",">20 & <40","1"), (3,"B",">40","2"))

val rdd = sc.parallelize(arr)

val prdd = rdd.map(a => (a._1,a))
val totals = prdd.groupByKey.map(a => (a._1, a._2.size))

var n1 = rdd.map(a => ((a._1, a._2), 1) )
var n2 = n1.reduceByKey(_+_).map(a => (a._1._1, (a._1._2, a._2)))
var n3 = n2.join(totals).map(a => (a._1, (a._2._1._1, a._2._1._2.toDouble /
a._2._2)))
var n4 = n3.map(a => (a._1, a._2._1 + ":" +
a._2._2.toString)).reduceByKey((a, b) => a + "|" + b)

n4.collect.foreach(println)




On Mon, Jan 18, 2016 at 6:47 AM, Neha Mehta  wrote:

> Hi,
>
> I have a scenario wherein my dataset has around 30 columns. It is
> basically user activity information. I need to group the information by
> each user and then for each column/activity parameter I need to find the
> percentage affinity for each value in that column for that user. Below is
> the sample input and output.
>
> UserId C1 C2 C3
> 1 A <20 0
> 1 A >20 & <40 1
> 1 B >20 & <40 0
> 1 C >20 & <40 0
> 1 C >20 & <40 0
> 2 A <20 0
> 3 B >20 & <40 1
> 3 B >40 2
>
>
>
>
>
>
>
>
> Output
>
>
> 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2
> 2 A:1 <20:1 0:01
> 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5
>
> Presently this is how I am calculating these values:
> Group by UserId and C1 and compute values for C1 for all the users, then
> do a group by by Userid and C2 and find the fractions for C2 for each user
> and so on. This approach is quite slow.  Also the number of records for
> each user will be at max 30. So I would like to take a second approach
> wherein I do a groupByKey and pass the entire list of records for each key
> to a function which computes all the percentages for each column for each
> user at once. Below are the steps I am trying to follow:
>
> 1. Dataframe1 => group by UserId , find the counts of records for each
> user. Join the results back to the input so that counts are available with
> each record
> 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2))
>
> def myUserAggregator(rows: Iterable[Row]):
> scala.collection.mutable.Map[Int,String] = {
> val returnValue = scala.collection.mutable.Map[Int,String]()
> if (rows != null) {
>   val activityMap = scala.collection.mutable.Map[Int,
> scala.collection.mutable.Map[String,
> Int]]().withDefaultValue(scala.collection.mutable.Map[String,
> Int]().withDefaultValue(0))
>
>   val rowIt = rows.iterator
>   var sentCount = 1
>   for (row <- rowIt) {
> sentCount = row(29).toString().toInt
> for (i <- 0 until row.length) {
>   var m = activityMap(i)
>   if (activityMap(i) == null) {
> m = collection.mutable.Map[String,
> Int]().withDefaultValue(0)
>   }
>   m(row(i).toString()) += 1
>   activityMap.update(i, m)
> }
>   }
>   var activityPPRow: Row = Row()
>   for((k,v) <- activityMap) {
>   var rowVal:String = ""
>   for((a,b) <- v) {
> rowVal += rowVal + a + ":" + b/sentCount + "|"
>   }
>   returnValue.update(k, rowVal)
> //  activityPPRow.apply(k) = rowVal
>   }
>
> }
> return returnValue
>   }
>
> When I run step 2 I get the following error. I am new to Scala and Spark
> and am unable to figure out how to pass the Iterable[Row] to a function and
> get back the results.
>
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
> at org.apache.spark.rdd.RDD.map(RDD.scala:317)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:97)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:102)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:104)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:106)
> at
> 

Re: process of executing a program in a distributed environment without hadoop

2016-01-19 Thread Akhil Das
If you are processing a file, then you can keep the same file in all
machines in the same location and everything should work.

Thanks
Best Regards

On Wed, Jan 20, 2016 at 11:15 AM, Kamaruddin  wrote:

> I want to execute a program in a distributed environment without using
> hadoop
> and only in spark cluster. What is the best way to use it?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/process-of-executing-a-program-in-a-distributed-environment-without-hadoop-tp26015.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Concurrent Spark jobs

2016-01-19 Thread Madabhattula Rajesh Kumar
Hi,

Just a thought. Can we use Spark Job Server and trigger jobs through rest
apis. In this case, all jobs will share same context and run the jobs
parallel.

If any one has other thoughts please share

Regards,
Rajesh

On Tue, Jan 19, 2016 at 10:28 PM, emlyn  wrote:

> We have a Spark application that runs a number of ETL jobs, writing the
> outputs to Redshift (using databricks/spark-redshift). This is triggered by
> calling DataFrame.write.save on the different DataFrames one after another.
> I noticed that during the Redshift load while the output of one job is
> being
> loaded into Redshift (which can take ~20 minutes for some jobs), the
> cluster
> is sitting idle.
>
> In order to maximise the use of the cluster, we tried starting a thread for
> each job so that they can all be submitted simultaneously, and therefore
> the
> cluster can be utilised by another job while one is being written to
> Redshift.
>
> However, when this is run, it fails with a TimeoutException (see stack
> trace
> below). Would it make sense to increase "spark.sql.broadcastTimeout"? I'm
> not sure that would actually solve anything. Should it not be possible to
> save multiple DataFrames simultaneously? Or any other hints on how to make
> better use of the cluster's resources?
>
> Thanks.
>
>
> Stack trace:
>
> Exception in thread "main" java.util.concurrent.ExecutionException:
> java.util.concurrent.TimeoutException: Futures timed out after [300
> seconds]
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> ...
> at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> ...
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
>
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at
>
> org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
> at
> org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
> at
>
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
> at
>
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
> at
> org.apache.spark.sql.DataFrame.rdd$lzycompute(DataFrame.scala:1676)
> at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:1673)
> at
> org.apache.spark.sql.DataFrame.mapPartitions(DataFrame.scala:1465)
> at
>
> com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:264)
> at
>
> com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:374)
> at
>
> com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
> at
>
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Re: spark dataframe jdbc read/write using dbcp connection pool

2016-01-19 Thread fightf...@163.com
Hi,
Thanks a lot for your suggestion. I then tried the following code :
val prop = new java.util.Properties
prop.setProperty("user","test")
prop.setProperty("password", "test")
prop.setProperty("partitionColumn", "added_year")
prop.setProperty("lowerBound", "1985")
prop.setProperty("upperBound","2015")
prop.setProperty("numPartitions", "200")

val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop)
val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop)

val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4")

The added_year column in mysql table contains range of (1985-2015), and I pass 
the numPartitions property 
to get the partition purpose. Is this what you recommend ? Can you advice a 
little more implementation on this ? 

Best,
Sun.



fightf...@163.com
 
From: 刘虓
Date: 2016-01-20 11:26
To: fightf...@163.com
CC: user
Subject: Re: spark dataframe jdbc read/write using dbcp connection pool
Hi,
I suggest you partition the JDBC reading on a indexed column of the mysql table

2016-01-20 10:11 GMT+08:00 fightf...@163.com :
Hi , 
I want to load really large volumn datasets from mysql using spark dataframe 
api. And then save as 
parquet file or orc file to facilitate that with hive / Impala. The datasets 
size is about 1 billion records and 
when I am using the following naive code to run that , Error occurs and 
executor lost failure.

val prop = new java.util.Properties
prop.setProperty("user","test")
prop.setProperty("password", "test")

val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)

val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf")

I can see from the executor log and the message is like the following. I can 
see from the log that the wait_timeout threshold reached 
and there is no retry mechanism in the code process. So I am asking you experts 
to help on tuning this. Or should I try to use a jdbc
connection pool to increase parallelism ? 

   16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 
0.0 (TID 0)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure

The last packet successfully received from the server was 377,769 milliseconds 
ago.  The last packet sent successfully to the server was 377,790 milliseconds 
ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

Caused by: java.io.EOFException: Can not read response from server. Expected to 
read 4 bytes, read 1 bytes before connection was unexpectedly lost.
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
... 22 more
16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
4
16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 
(TID 2)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure





fightf...@163.com



Re: spark dataframe jdbc read/write using dbcp connection pool

2016-01-19 Thread 刘虓
Hi,
I suggest you partition the JDBC reading on a indexed column of the mysql
table

2016-01-20 10:11 GMT+08:00 fightf...@163.com :

> Hi ,
> I want to load really large volumn datasets from mysql using spark
> dataframe api. And then save as
> parquet file or orc file to facilitate that with hive / Impala. The
> datasets size is about 1 billion records and
> when I am using the following naive code to run that , Error occurs and
> executor lost failure.
>
> val prop = new java.util.Properties
> prop.setProperty("user","test")
> prop.setProperty("password", "test")
>
> val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
> val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
> val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
> val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)
>
> val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
> jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf")
>
> I can see from the executor log and the message is like the following. I
> can see from the log that the wait_timeout threshold reached
> and there is no retry mechanism in the code process. So I am asking you
> experts to help on tuning this. Or should I try to use a jdbc
> connection pool to increase parallelism ?
>
>
> 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 
> (TID 0)
>
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
> failure
>
>
> The last packet successfully received from the server was 377,769 
> milliseconds ago.  The last packet sent successfully to the server was 
> 377,790 milliseconds ago.
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
> Caused by:
> java.io.EOFException: Can not read response from server. Expected to read 4 
> bytes, read 1 bytes before connection was unexpectedly lost.
> at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
> at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
> ... 22 more
>
> 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
> task 4
> 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
>
> 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 
> (TID 2)
>
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
> failure
>
>
>
> --
> fightf...@163.com
>


Redundant common columns of nature full outer join

2016-01-19 Thread Zhong Wang
Hi all,

I am joining two tables with common columns using full outer join. However,
the current Dataframe API doesn't support nature joins, so the output
contains redundant common columns from both of the tables.

Is there any way to remove these redundant columns for a "nature" full
outer join? For a left outer join or right outer join, I can select just
the common columns from the left table or the right table. However, for a
full outer join, it seems it is quite difficult to do that, because there
are null values in both of the left and right common columns.


Thanks,
Zhong


Re: Parquet write optimization by row group size config

2016-01-19 Thread Akhil Das
Did you try re-partitioning the data before doing the write?

Thanks
Best Regards

On Tue, Jan 19, 2016 at 6:13 PM, Pavel Plotnikov <
pavel.plotni...@team.wrike.com> wrote:

> Hello,
> I'm using spark on some machines in standalone mode, data storage is
> mounted on this machines via nfs. A have input data stream and when i'm
> trying to store all data for hour in parquet, a job executes mostly on one
> core and this hourly data are stored in 40- 50 minutes. It is very slow!
> And it is not IO problem. After research how parquet file works, i'm found
> that it can be parallelized on row group abstraction level.
> I think row group for my files is to large, and how can i change it?
> When i create to big DataFrame i devides in parts very well and writes
> quikly!
>
> Thanks,
> Pavel
>


Re: Docker/Mesos with Spark

2016-01-19 Thread Nagaraj Chandrashekar
Hi John,

I recently deployed Redis instances using Kubernetes framework on Apache Mesos. 
  Kubernetes uses POD concept and you can run your requirements (Redis/Spark) 
as a docker container and also adds up some of the HA concepts to the instances.

Cheers
Nagaraj C

From: Darren Govoni >
Date: Wednesday, January 20, 2016 at 8:15 AM
To: Sathish Kumaran Vairavelu 
>, Tim Chen 
>
Cc: John Omernik >, user 
>
Subject: Re: Docker/Mesos with Spark

I also would be interested in some best practice for making this work.

Where will the writeup be posted? On mesosphere website?



Sent from my Verizon Wireless 4G LTE smartphone


 Original message 
From: Sathish Kumaran Vairavelu 
>
Date: 01/19/2016 7:00 PM (GMT-05:00)
To: Tim Chen >
Cc: John Omernik >, user 
>
Subject: Re: Docker/Mesos with Spark

Thank you! Looking forward for it..


On Tue, Jan 19, 2016 at 4:03 PM Tim Chen 
> wrote:
Hi Sathish,

Sorry about that, I think that's a good idea and I'll write up a section in the 
Spark documentation page to explain how it can work. We (Mesosphere) have been 
doing this for our DCOS spark for our past releases and has been working well 
so far.

Thanks!

Tim

On Tue, Jan 19, 2016 at 12:28 PM, Sathish Kumaran Vairavelu 
> wrote:
Hi Tim

Do you have any materials/blog for running Spark in a container in Mesos 
cluster environment? I have googled it but couldn't find info on it. Spark 
documentation says it is possible, but no details provided.. Please help


Thanks

Sathish




On Mon, Sep 21, 2015 at 11:54 AM Tim Chen 
> wrote:
Hi John,

There is no other blog post yet, I'm thinking to do a series of posts but so 
far haven't get time to do that yet.

Running Spark in docker containers makes distributing spark versions easy, it's 
simple to upgrade and automatically caches on the slaves so the same image just 
runs right away. Most of the docker perf is usually related to network and 
filesystem overheads, but I think with recent changes in Spark to make Mesos 
sandbox the default temp dir filesystem won't be a big concern as it's mostly 
writing to the mounted in Mesos sandbox. Also Mesos uses host network by 
default so network is affected much.

Most of the cluster mode limitation is that you need to make the spark job 
files available somewhere that all the slaves can access remotely (http, s3, 
hdfs, etc) or available on all slaves locally by path.

I'll try to make more doc efforts once I get my existing patches and testing 
infra work done.

Let me know if you have more questions,

Tim

On Sat, Sep 19, 2015 at 5:42 AM, John Omernik 
> wrote:
I was searching in the 1.5.0 docs on the Docker on Mesos capabilities and just 
found you CAN run it this way.  Are there any user posts, blog posts, etc on 
why and how you'd do this?

Basically, at first I was questioning why you'd run spark in a docker 
container, i.e., if you run with tar balled executor, what are you really 
gaining?  And in this setup, are you losing out on performance somehow? (I am 
guessing smarter people than I have figured that out).

Then I came along a situation where I wanted to use a python library with 
spark, and it had to be installed on every node, and I realized one big 
advantage of dockerized spark would be that spark apps that needed other 
libraries could be contained and built well.

OK, that's huge, let's do that.  For my next question there are lot of 
"questions" have on how this actually works.  Does Clustermode/client mode 
apply here? If so, how?  Is there a good walk through on getting this setup? 
Limitations? Gotchas?  Should I just dive in an start working with it? Has 
anyone done any stories/rough documentation? This seems like a really helpful 
feature to scaling out spark, and letting developers truly build what they need 
without tons of admin overhead, so I really want to explore.

Thanks!

John




Re: SparkContext SyntaxError: invalid syntax

2016-01-19 Thread Felix Cheung

I have to run this to install the pre-req to get jeykyll build to work, you do 
need the python pygments package:
(I’m on ubuntu)sudo apt-get install ruby ruby-dev make gcc nodejssudo gem 
install jekyll --no-rdoc --no-risudo gem install jekyll-redirect-fromsudo 
apt-get install python-Pygmentssudo apt-get install python-sphinxsudo gem 
install pygments.rb

Hope that helps!If not, I can try putting together doc change but I’d rather 
you could make progress :)





On Mon, Jan 18, 2016 at 6:36 AM -0800, "Andrew Weiner" 
 wrote:





Hi Felix,

Yeah, when I try to build the docs using jekyll build, I get a LoadError
(cannot load such file -- pygments) and I'm having trouble getting past it
at the moment.

>From what I could tell, this does not apply to YARN in client mode.  I was
able to submit jobs in client mode and they would run fine without using
the appMasterEnv property.  I even confirmed that my environment variables
persisted during the job when run in client mode.  There is something about
YARN cluster mode that uses a different environment (the YARN Application
Master environment) and requires the appMasterEnv property for setting
environment variables.

On Sun, Jan 17, 2016 at 11:37 PM, Felix Cheung 
wrote:

> Do you still need help on the PR?
> btw, does this apply to YARN client mode?
>
> --
> From: andrewweiner2...@u.northwestern.edu
> Date: Sun, 17 Jan 2016 17:00:39 -0600
> Subject: Re: SparkContext SyntaxError: invalid syntax
> To: cutl...@gmail.com
> CC: user@spark.apache.org
>
>
> Yeah, I do think it would be worth explicitly stating this in the docs.  I
> was going to try to edit the docs myself and submit a pull request, but I'm
> having trouble building the docs from github.  If anyone else wants to do
> this, here is approximately what I would say:
>
> (To be added to
> http://spark.apache.org/docs/latest/configuration.html#environment-variables
> )
> "Note: When running Spark on YARN in cluster mode, environment variables
> need to be set using the spark.yarn.appMasterEnv.[EnvironmentVariableName]
> property in your conf/spark-defaults.conf file.  Environment variables
> that are set in spark-env.sh will not be reflected in the YARN
> Application Master process in cluster mode.  See the YARN-related Spark
> Properties
> 
> for more information."
>
> I might take another crack at building the docs myself if nobody beats me
> to this.
>
> Andrew
>
>
> On Fri, Jan 15, 2016 at 5:01 PM, Bryan Cutler  wrote:
>
> Glad you got it going!  It's wasn't very obvious what needed to be set,
> maybe it is worth explicitly stating this in the docs since it seems to
> have come up a couple times before too.
>
> Bryan
>
> On Fri, Jan 15, 2016 at 12:33 PM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
> Actually, I just found this [
> https://issues.apache.org/jira/browse/SPARK-1680], which after a bit of
> googling and reading leads me to believe that the preferred way to change
> the yarn environment is to edit the spark-defaults.conf file by adding this
> line:
> spark.yarn.appMasterEnv.PYSPARK_PYTHON/path/to/python
>
> While both this solution and the solution from my prior email work, I
> believe this is the preferred solution.
>
> Sorry for the flurry of emails.  Again, thanks for all the help!
>
> Andrew
>
> On Fri, Jan 15, 2016 at 1:47 PM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
> I finally got the pi.py example to run in yarn cluster mode.  This was the
> key insight:
> https://issues.apache.org/jira/browse/SPARK-9229
>
> I had to set SPARK_YARN_USER_ENV in spark-env.sh:
> export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=/home/aqualab/local/bin/python"
>
> This caused the PYSPARK_PYTHON environment variable to be used in my yarn
> environment in cluster mode.
>
> Thank you for all your help!
>
> Best,
> Andrew
>
>
>
> On Fri, Jan 15, 2016 at 12:57 PM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
> I tried playing around with my environment variables, and here is an
> update.
>
> When I run in cluster mode, my environment variables do not persist
> throughout the entire job.
> For example, I tried creating a local copy of HADOOP_CONF_DIR in
> /home//local/etc/hadoop/conf, and then, in spark-env.sh I the
> variable:
> export HADOOP_CONF_DIR=/home//local/etc/hadoop/conf
>
> Later, when we print the environment variables in the python code, I see
> this:
>
> ('HADOOP_CONF_DIR', '/etc/hadoop/conf')
>
> However, when I run in client mode, I see this:
>
> ('HADOOP_CONF_DIR', '/home/awp066/local/etc/hadoop/conf')
>
> Furthermore, if I omit that environment variable from spark-env.sh 
> altogether, I get the expected error in both client and cluster mode:
>
> When running with master 'yarn'
> either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the 

Re: RDD immutablility

2016-01-19 Thread Marco
It depends on what you mean by "write access".  The RDDs are immutable, so
you can't really change them. When you apply a mapping/filter/groupBy
function, you are creating a new RDD starting from the original one.

Kind regards,
Marco

2016-01-19 13:27 GMT+01:00 Dave :

> Hi Marco,
>
> Yes, that answers my question. I just wanted to be sure as the API gave me
> write access to the immutable data which means its up to the developer to
> know not to modify the input parameters for these API's.
>
> Thanks for the response.
> Dave.
>
>
> On 19/01/16 12:25, Marco wrote:
>
> Hello,
>
> RDD are immutable by design. The reasons, to quote Sean Owen in this
> answer ( https://www.quora.com/Why-is-a-spark-RDD-immutable ), are the
> following :
>
> Immutability rules out a big set of potential problems due to updates from
>> multiple threads at once. Immutable data is definitely safe to share across
>> processes.
>
> They're not just immutable but a deterministic function of their input.
>> This plus immutability also means the RDD's parts can be recreated at any
>> time. This makes caching, sharing and replication easy.
>> These are significant design wins, at the cost of having to copy data
>> rather than mutate it in place. Generally, that's a decent tradeoff to
>> make: gaining the fault tolerance and correctness with no developer effort
>> is worth spending memory and CPU on, since the latter are cheap.
>> A corollary: immutable data can as easily live in memory as on disk. This
>> makes it reasonable to easily move operations that hit disk to instead use
>> data in memory, and again, adding memory is much easier than adding I/O
>> bandwidth.
>> Of course, an RDD isn't really a collection of data, but just a recipe
>> for making data from other data. It is not literally computed by
>> materializing every RDD completely. That is, a lot of the "copy" can be
>> optimized away too.
>
>
> I hope it answers your question.
>
> Kind regards,
> Marco
>
> 2016-01-19 13:14 GMT+01:00 ddav :
>
>> Hi,
>>
>> Certain API's (map, mapValues) give the developer access to the data
>> stored
>> in RDD's.
>> Am I correct in saying that these API's must never modify the data but
>> always return a new object with a copy of the data if the data needs to be
>> updated for the returned RDD.
>>
>> Thanks,
>> Dave.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-immutablility-tp26007.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Is there a way to co-locate partitions from two partitioned RDDs?

2016-01-19 Thread nwali
Hi,

I am working with Spark in Java on top of a HDFS cluster. In my code two
RDDs are partitioned with the same partitioner (HashPartitioner with the
same number of partitions), so they are co-partitioned.
Thus same keys are on the same partitions' number but that does not mean
that both RDDs are necessarily co-located, that's to say that same
partitions are on same nodes.
For example partition#1 from RDD#1 may not be on the same node as
partition#1 from RDD#2. I would like to co-locate partitioned RDDs to reduce
data transfer between nodes when applying a join operation on the RDDs. 
Is there a way to do that?

Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-co-locate-partitions-from-two-partitioned-RDDs-tp26008.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD immutablility

2016-01-19 Thread Sean Owen
It's a good question. You can easily imagine an RDD of classes that
are mutable. Yes, if you modify these objects, the result is pretty
undefined, so don't do that.

On Tue, Jan 19, 2016 at 12:27 PM, Dave  wrote:
> Hi Marco,
>
> Yes, that answers my question. I just wanted to be sure as the API gave me
> write access to the immutable data which means its up to the developer to
> know not to modify the input parameters for these API's.
>
> Thanks for the response.
> Dave.
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Parquet write optimization by row group size config

2016-01-19 Thread Pavel Plotnikov
Hello,
I'm using spark on some machines in standalone mode, data storage is
mounted on this machines via nfs. A have input data stream and when i'm
trying to store all data for hour in parquet, a job executes mostly on one
core and this hourly data are stored in 40- 50 minutes. It is very slow!
And it is not IO problem. After research how parquet file works, i'm found
that it can be parallelized on row group abstraction level.
I think row group for my files is to large, and how can i change it?
When i create to big DataFrame i devides in parts very well and writes
quikly!

Thanks,
Pavel


Re: RDD immutablility

2016-01-19 Thread Dave

Thanks Sean.

On 19/01/16 13:36, Sean Owen wrote:

It's a good question. You can easily imagine an RDD of classes that
are mutable. Yes, if you modify these objects, the result is pretty
undefined, so don't do that.

On Tue, Jan 19, 2016 at 12:27 PM, Dave  wrote:

Hi Marco,

Yes, that answers my question. I just wanted to be sure as the API gave me
write access to the immutable data which means its up to the developer to
know not to modify the input parameters for these API's.

Thanks for the response.
Dave.





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD immutablility

2016-01-19 Thread Marco
Hello,

RDD are immutable by design. The reasons, to quote Sean Owen in this answer
( https://www.quora.com/Why-is-a-spark-RDD-immutable ), are the following :

Immutability rules out a big set of potential problems due to updates from
> multiple threads at once. Immutable data is definitely safe to share across
> processes.

They're not just immutable but a deterministic function of their input.
> This plus immutability also means the RDD's parts can be recreated at any
> time. This makes caching, sharing and replication easy.
> These are significant design wins, at the cost of having to copy data
> rather than mutate it in place. Generally, that's a decent tradeoff to
> make: gaining the fault tolerance and correctness with no developer effort
> is worth spending memory and CPU on, since the latter are cheap.
> A corollary: immutable data can as easily live in memory as on disk. This
> makes it reasonable to easily move operations that hit disk to instead use
> data in memory, and again, adding memory is much easier than adding I/O
> bandwidth.
> Of course, an RDD isn't really a collection of data, but just a recipe for
> making data from other data. It is not literally computed by materializing
> every RDD completely. That is, a lot of the "copy" can be optimized away
> too.


I hope it answers your question.

Kind regards,
Marco

2016-01-19 13:14 GMT+01:00 ddav :

> Hi,
>
> Certain API's (map, mapValues) give the developer access to the data stored
> in RDD's.
> Am I correct in saying that these API's must never modify the data but
> always return a new object with a copy of the data if the data needs to be
> updated for the returned RDD.
>
> Thanks,
> Dave.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-immutablility-tp26007.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Reuse Executor JVM across different JobContext

2016-01-19 Thread praveen S
Can you give me more details on Spark's jobserver.

Regards,
Praveen
On 18 Jan 2016 03:30, "Jia"  wrote:

> I guess all jobs submitted through JobServer are executed in the same JVM,
> so RDDs cached by one job can be visible to all other jobs executed later.
> On Jan 17, 2016, at 3:56 PM, Mark Hamstra  wrote:
>
> Yes, that is one of the basic reasons to use a
> jobserver/shared-SparkContext.  Otherwise, in order share the data in an
> RDD you have to use an external storage system, such as a distributed
> filesystem or Tachyon.
>
> On Sun, Jan 17, 2016 at 1:52 PM, Jia  wrote:
>
>> Thanks, Mark. Then, I guess JobServer can fundamentally solve my problem,
>> so that jobs can be submitted at different time and still share RDDs.
>>
>> Best Regards,
>> Jia
>>
>>
>> On Jan 17, 2016, at 3:44 PM, Mark Hamstra 
>> wrote:
>>
>> There is a 1-to-1 relationship between Spark Applications and
>> SparkContexts -- fundamentally, a Spark Applications is a program that
>> creates and uses a SparkContext, and that SparkContext is destroyed when
>> then Application ends.  A jobserver generically and the Spark JobServer
>> specifically is an Application that keeps a SparkContext open for a long
>> time and allows many Jobs to be be submitted and run using that shared
>> SparkContext.
>>
>> More than one Application/SparkContext unavoidably implies more than one
>> JVM process per Worker -- Applications/SparkContexts cannot share JVM
>> processes.
>>
>> On Sun, Jan 17, 2016 at 1:15 PM, Jia  wrote:
>>
>>> Hi, Mark, sorry for the confusion.
>>>
>>> Let me clarify, when an application is submitted, the master will tell
>>> each Spark worker to spawn an executor JVM process. All the task sets  of
>>> the application will be executed by the executor. After the application
>>> runs to completion. The executor process will be killed.
>>> But I hope that all applications submitted can run in the same executor,
>>> can JobServer do that? If so, it’s really good news!
>>>
>>> Best Regards,
>>> Jia
>>>
>>> On Jan 17, 2016, at 3:09 PM, Mark Hamstra 
>>> wrote:
>>>
>>> You've still got me confused.  The SparkContext exists at the Driver,
>>> not on an Executor.
>>>
>>> Many Jobs can be run by a SparkContext -- it is a common pattern to use
>>> something like the Spark Jobserver where all Jobs are run through a shared
>>> SparkContext.
>>>
>>> On Sun, Jan 17, 2016 at 12:57 PM, Jia Zou 
>>> wrote:
>>>
 Hi, Mark, sorry, I mean SparkContext.
 I mean to change Spark into running all submitted jobs (SparkContexts)
 in one executor JVM.

 Best Regards,
 Jia

 On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra 
 wrote:

> -dev
>
> What do you mean by JobContext?  That is a Hadoop mapreduce concept,
> not Spark.
>
> On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou 
> wrote:
>
>> Dear all,
>>
>> Is there a way to reuse executor JVM across different JobContexts?
>> Thanks.
>>
>> Best Regards,
>> Jia
>>
>
>

>>>
>>>
>>
>>
>
>


Re: RDD immutablility

2016-01-19 Thread Dave

Hi Marco,

Yes, that answers my question. I just wanted to be sure as the API gave 
me write access to the immutable data which means its up to the 
developer to know not to modify the input parameters for these API's.


Thanks for the response.
Dave.

On 19/01/16 12:25, Marco wrote:

Hello,

RDD are immutable by design. The reasons, to quote Sean Owen in this 
answer ( https://www.quora.com/Why-is-a-spark-RDD-immutable ), are the 
following :


Immutability rules out a big set of potential problems due to
updates from multiple threads at once. Immutable data is
definitely safe to share across processes.

They're not just immutable but a deterministic function of their
input. This plus immutability also means the RDD's parts can be
recreated at any time. This makes caching, sharing and replication
easy.
These are significant design wins, at the cost of having to copy
data rather than mutate it in place. Generally, that's a decent
tradeoff to make: gaining the fault tolerance and correctness with
no developer effort is worth spending memory and CPU on, since the
latter are cheap.
A corollary: immutable data can as easily live in memory as on
disk. This makes it reasonable to easily move operations that hit
disk to instead use data in memory, and again, adding memory is
much easier than adding I/O bandwidth.
Of course, an RDD isn't really a collection of data, but just a
recipe for making data from other data. It is not literally
computed by materializing every RDD completely. That is, a lot of
the "copy" can be optimized away too.


I hope it answers your question.

Kind regards,
Marco

2016-01-19 13:14 GMT+01:00 ddav >:


Hi,

Certain API's (map, mapValues) give the developer access to the
data stored
in RDD's.
Am I correct in saying that these API's must never modify the data but
always return a new object with a copy of the data if the data
needs to be
updated for the returned RDD.

Thanks,
Dave.



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/RDD-immutablility-tp26007.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org







Re: sqlContext.cacheTable("tableName") vs dataFrame.cache()

2016-01-19 Thread Jerry Lam
Is cacheTable similar to asTempTable before? 

Sent from my iPhone

> On 19 Jan, 2016, at 4:18 am, George Sigletos  wrote:
> 
> Thanks Kevin for your reply.
> 
> I was suspecting the same thing as well, although it still does not make much 
> sense to me why would you need to do both:
> myData.cache()
> sqlContext.cacheTable("myData")
> 
> in case you are using both sqlContext and dataframes to execute queries
> 
> dataframe.select(...) and sqlContext.sql("select ...") are equivalent, as far 
> as I understand
> 
> Kind regards,
> George
> 
>> On Fri, Jan 15, 2016 at 6:15 PM, Kevin Mellott  
>> wrote:
>> Hi George,
>> 
>> I believe that sqlContext.cacheTable("tableName") is to be used when you 
>> want to cache the data that is being used within a Spark SQL query. For 
>> example, take a look at the code below.
>>  
>>> val myData = sqlContext.load("com.databricks.spark.csv", Map("path" -> 
>>> "hdfs://somepath/file", "header" -> "false").toDF("col1", "col2")
>>> myData.registerTempTable("myData")  
>> 
>> Here, the usage of cache() will affect ONLY the myData.select query. 
>>> myData.cache() 
>>> myData.select("col1", "col2").show() 
>>  
>> Here, the usage of cacheTable will affect ONLY the sqlContext.sql query.
>>> sqlContext.cacheTable("myData")
>>> sqlContext.sql("SELECT col1, col2 FROM myData").show()
>> 
>> Thanks,
>> Kevin
>> 
>>> On Fri, Jan 15, 2016 at 7:00 AM, George Sigletos  
>>> wrote:
>>> According to the documentation they are exactly the same, but in my queries 
>>> 
>>> dataFrame.cache() 
>>> 
>>> results in much faster execution times vs doing 
>>> 
>>> sqlContext.cacheTable("tableName")
>>> 
>>> Is there any explanation about this? I am not caching the RDD prior to 
>>> creating the dataframe. Using Pyspark on Spark 1.5.2
>>> 
>>> Kind regards,
>>> George
> 


Re: Reuse Executor JVM across different JobContext

2016-01-19 Thread Jia
Hi, Praveen, have you checked out this, which might have the details you need:
https://spark-summit.org/2014/wp-content/uploads/2014/07/Spark-Job-Server-Easy-Spark-Job-Management-Chan-Chu.pdf

Best Regards,
Jia


On Jan 19, 2016, at 7:28 AM, praveen S  wrote:

> Can you give me more details on Spark's jobserver.
> 
> Regards, 
> Praveen
> 
> On 18 Jan 2016 03:30, "Jia"  wrote:
> I guess all jobs submitted through JobServer are executed in the same JVM, so 
> RDDs cached by one job can be visible to all other jobs executed later.
> On Jan 17, 2016, at 3:56 PM, Mark Hamstra  wrote:
> 
>> Yes, that is one of the basic reasons to use a 
>> jobserver/shared-SparkContext.  Otherwise, in order share the data in an RDD 
>> you have to use an external storage system, such as a distributed filesystem 
>> or Tachyon.
>> 
>> On Sun, Jan 17, 2016 at 1:52 PM, Jia  wrote:
>> Thanks, Mark. Then, I guess JobServer can fundamentally solve my problem, so 
>> that jobs can be submitted at different time and still share RDDs.
>> 
>> Best Regards,
>> Jia
>> 
>> 
>> On Jan 17, 2016, at 3:44 PM, Mark Hamstra  wrote:
>> 
>>> There is a 1-to-1 relationship between Spark Applications and SparkContexts 
>>> -- fundamentally, a Spark Applications is a program that creates and uses a 
>>> SparkContext, and that SparkContext is destroyed when then Application 
>>> ends.  A jobserver generically and the Spark JobServer specifically is an 
>>> Application that keeps a SparkContext open for a long time and allows many 
>>> Jobs to be be submitted and run using that shared SparkContext.
>>> 
>>> More than one Application/SparkContext unavoidably implies more than one 
>>> JVM process per Worker -- Applications/SparkContexts cannot share JVM 
>>> processes.  
>>> 
>>> On Sun, Jan 17, 2016 at 1:15 PM, Jia  wrote:
>>> Hi, Mark, sorry for the confusion.
>>> 
>>> Let me clarify, when an application is submitted, the master will tell each 
>>> Spark worker to spawn an executor JVM process. All the task sets  of the 
>>> application will be executed by the executor. After the application runs to 
>>> completion. The executor process will be killed.
>>> But I hope that all applications submitted can run in the same executor, 
>>> can JobServer do that? If so, it’s really good news!
>>> 
>>> Best Regards,
>>> Jia
>>> 
>>> On Jan 17, 2016, at 3:09 PM, Mark Hamstra  wrote:
>>> 
 You've still got me confused.  The SparkContext exists at the Driver, not 
 on an Executor.
 
 Many Jobs can be run by a SparkContext -- it is a common pattern to use 
 something like the Spark Jobserver where all Jobs are run through a shared 
 SparkContext.
 
 On Sun, Jan 17, 2016 at 12:57 PM, Jia Zou  wrote:
 Hi, Mark, sorry, I mean SparkContext.
 I mean to change Spark into running all submitted jobs (SparkContexts) in 
 one executor JVM.
 
 Best Regards,
 Jia
 
 On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra  
 wrote:
 -dev
 
 What do you mean by JobContext?  That is a Hadoop mapreduce concept, not 
 Spark.
 
 On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou  wrote:
 Dear all,
 
 Is there a way to reuse executor JVM across different JobContexts? Thanks.
 
 Best Regards,
 Jia
 
 
 
>>> 
>>> 
>> 
>> 
> 



Re: Split columns in RDD

2016-01-19 Thread Sabarish Sasidharan
The most efficient to determine the number of columns would be to do a
take(1) and split in the driver.

Regards
Sab
On 19-Jan-2016 8:48 pm, "Richard Siebeling"  wrote:

> Hi,
>
> what is the most efficient way to split columns and know how many columns
> are created.
>
> Here is the current RDD
> -
> ID   STATE
> -
> 1   TX, NY, FL
> 2   CA, OH
> -
>
> This is the preferred output:
> -
> IDSTATE_1 STATE_2  STATE_3
> -
> 1 TX  NY  FL
> 2 CA  OH
> -
>
> With a separated with the new columns STATE_1, STATE_2, STATE_3
>
>
> It looks like the following output is feasible using a ReduceBy operator
> -
> IDSTATE_1 STATE_2  STATE_3   NEW_COLUMNS
> -
> 1 TXNY   FLSTATE_1, STATE_2,
> STATE_3
> 2 CAOH STATE_1, STATE_2
> -
>
> Then in the reduce step, the distinct new columns can be calculated.
> Is it possible to get the second output where next to the RDD the
> new_columns are saved somewhere?
> Or is the required to use the second approach?
>
> thanks in advance,
> Richard
>
>


Can I configure Spark on multiple nodes using local filesystem on each node?

2016-01-19 Thread Jia Zou
Dear all,

Can I configure Spark on multiple nodes without HDFS, so that output data
will be written to the local file system on each node?

I guess there is no such feature in Spark, but just want to confirm.

Best Regards,
Jia


can we create dummy variables from categorical variables, using sparkR

2016-01-19 Thread Devesh Raj Singh
Hi,

Can we create dummy variables for categorical variables in sparkR like we
do using "dummies" package in R

-- 
Warm regards,
Devesh.


Split columns in RDD

2016-01-19 Thread Richard Siebeling
Hi,

what is the most efficient way to split columns and know how many columns
are created.

Here is the current RDD
-
ID   STATE
-
1   TX, NY, FL
2   CA, OH
-

This is the preferred output:
-
IDSTATE_1 STATE_2  STATE_3
-
1 TX  NY  FL
2 CA  OH
-

With a separated with the new columns STATE_1, STATE_2, STATE_3


It looks like the following output is feasible using a ReduceBy operator
-
IDSTATE_1 STATE_2  STATE_3   NEW_COLUMNS
-
1 TXNY   FLSTATE_1, STATE_2,
STATE_3
2 CAOH STATE_1, STATE_2
-

Then in the reduce step, the distinct new columns can be calculated.
Is it possible to get the second output where next to the RDD the
new_columns are saved somewhere?
Or is the required to use the second approach?

thanks in advance,
Richard


Re: Can I configure Spark on multiple nodes using local filesystem on each node?

2016-01-19 Thread Pavel Plotnikov
Hi,

I'm using Spark in standalone mode without HDFS, and shared folder is
mounted on nodes via nfs. It looks like each node write data like in local
file system.

Regards,
Pavel

On Tue, Jan 19, 2016 at 5:39 PM Jia Zou  wrote:

> Dear all,
>
> Can I configure Spark on multiple nodes without HDFS, so that output data
> will be written to the local file system on each node?
>
> I guess there is no such feature in Spark, but just want to confirm.
>
> Best Regards,
> Jia
>


Re: spark yarn client mode

2016-01-19 Thread 刘虓
Hi,
No,you don't need to.
However,when submitting jobs certain resources will be uploaded to
hdfs,which could be a performance issue
read the log and you will understand:

15/12/29 11:10:06 INFO Client: Uploading resource
file:/data/spark/spark152/lib/spark-assembly-1.5.2-hadoop2.6.0.jar -> hdfs

15/12/29 11:10:08 INFO Client: Uploading resource
file:/data/spark/spark152/python/lib/pyspark.zip -> hdfs

15/12/29 11:10:08 INFO Client: Uploading resource
file:/data/spark/spark152/python/lib/py4j-0.8.2.1-src.zip -> hdfs

15/12/29 11:10:08 INFO Client: Uploading resource
file:/data/tmp/spark-86791975-2cef-4663-aacd-5da95e58cd91/__spark_conf__6261788210225867171.zip
-> hdfs

2016-01-19 19:43 GMT+08:00 Sanjeev Verma :

> Hi
>
> Do I need to install spark on all the yarn cluster node if I want to
> submit the job to yarn client?
> is there any way exists in which I can spawn a spark job executors on the
> cluster nodes where I have not installed spark.
>
> Thanks
> Sanjeev
>


Re: Split columns in RDD

2016-01-19 Thread Daniel Imberman
Hi Richard,

If I understand the question correctly it sounds like you could probably do
this using mapValues (I'm assuming that you want two pieces of information
out of all rows, the states as individual items, and the number of states
in the row)


val separatedInputStrings = input:RDD[(Int, String).mapValues{
val inputsString = "TX,NV,WY"
val stringList = inputString.split(",")
(stringList, stringList.size)
}

If you then wanted to find out how many state columns you should have in
your table you could use a normal reduce (with a filter beforehand to
reduce how much data you are shuffling)

val numColumns = separatedInputStrings.filter(_._2).reduce(math.max)

I hope this helps!



On Tue, Jan 19, 2016 at 8:05 AM Richard Siebeling 
wrote:

> that's true and that's the way we're doing it now but then we're only
> using the first row to determine the number of splitted columns.
> It could be that in the second (or last) row there are 10 new columns and
> we'd like to know that too.
>
> Probably a reduceby operator can be used to do that, but I'm hoping that
> there is a better or another way,
>
> thanks,
> Richard
>
> On Tue, Jan 19, 2016 at 4:22 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> The most efficient to determine the number of columns would be to do a
>> take(1) and split in the driver.
>>
>> Regards
>> Sab
>> On 19-Jan-2016 8:48 pm, "Richard Siebeling"  wrote:
>>
>>> Hi,
>>>
>>> what is the most efficient way to split columns and know how many
>>> columns are created.
>>>
>>> Here is the current RDD
>>> -
>>> ID   STATE
>>> -
>>> 1   TX, NY, FL
>>> 2   CA, OH
>>> -
>>>
>>> This is the preferred output:
>>> -
>>> IDSTATE_1 STATE_2  STATE_3
>>> -
>>> 1 TX  NY  FL
>>> 2 CA  OH
>>> -
>>>
>>> With a separated with the new columns STATE_1, STATE_2, STATE_3
>>>
>>>
>>> It looks like the following output is feasible using a ReduceBy operator
>>> -
>>> IDSTATE_1 STATE_2  STATE_3   NEW_COLUMNS
>>> -
>>> 1 TXNY   FLSTATE_1, STATE_2,
>>> STATE_3
>>> 2 CAOH STATE_1, STATE_2
>>> -
>>>
>>> Then in the reduce step, the distinct new columns can be calculated.
>>> Is it possible to get the second output where next to the RDD the
>>> new_columns are saved somewhere?
>>> Or is the required to use the second approach?
>>>
>>> thanks in advance,
>>> Richard
>>>
>>>
>


Re: Split columns in RDD

2016-01-19 Thread Daniel Imberman
edit: Mistake in the second code example

val numColumns = separatedInputStrings.filter{ case(id, (stateList,
numStates)) => numStates}.reduce(math.max)


On Tue, Jan 19, 2016 at 8:17 AM Daniel Imberman 
wrote:

> Hi Richard,
>
> If I understand the question correctly it sounds like you could probably
> do this using mapValues (I'm assuming that you want two pieces of
> information out of all rows, the states as individual items, and the number
> of states in the row)
>
>
> val separatedInputStrings = input:RDD[(Int, String).mapValues{
> val inputsString = "TX,NV,WY"
> val stringList = inputString.split(",")
> (stringList, stringList.size)
> }
>
> If you then wanted to find out how many state columns you should have in
> your table you could use a normal reduce (with a filter beforehand to
> reduce how much data you are shuffling)
>
> val numColumns = separatedInputStrings.filter(_._2).reduce(math.max)
>
> I hope this helps!
>
>
>
> On Tue, Jan 19, 2016 at 8:05 AM Richard Siebeling 
> wrote:
>
>> that's true and that's the way we're doing it now but then we're only
>> using the first row to determine the number of splitted columns.
>> It could be that in the second (or last) row there are 10 new columns and
>> we'd like to know that too.
>>
>> Probably a reduceby operator can be used to do that, but I'm hoping that
>> there is a better or another way,
>>
>> thanks,
>> Richard
>>
>> On Tue, Jan 19, 2016 at 4:22 PM, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> The most efficient to determine the number of columns would be to do a
>>> take(1) and split in the driver.
>>>
>>> Regards
>>> Sab
>>> On 19-Jan-2016 8:48 pm, "Richard Siebeling" 
>>> wrote:
>>>
 Hi,

 what is the most efficient way to split columns and know how many
 columns are created.

 Here is the current RDD
 -
 ID   STATE
 -
 1   TX, NY, FL
 2   CA, OH
 -

 This is the preferred output:
 -
 IDSTATE_1 STATE_2  STATE_3
 -
 1 TX  NY  FL
 2 CA  OH
 -

 With a separated with the new columns STATE_1, STATE_2, STATE_3


 It looks like the following output is feasible using a ReduceBy operator
 -
 IDSTATE_1 STATE_2  STATE_3   NEW_COLUMNS
 -
 1 TXNY   FLSTATE_1,
 STATE_2, STATE_3
 2 CAOH STATE_1, STATE_2
 -

 Then in the reduce step, the distinct new columns can be calculated.
 Is it possible to get the second output where next to the RDD the
 new_columns are saved somewhere?
 Or is the required to use the second approach?

 thanks in advance,
 Richard


>>


Re: Reuse Executor JVM across different JobContext

2016-01-19 Thread Gene Pang
Yes, you can share RDDs with Tachyon, while keeping the data in memory.
Spark jobs can write to a Tachyon path (tachyon://host:port/path/) and
other jobs can read from the same path.

Here is a presentation that includes that use case:
http://www.slideshare.net/TachyonNexus/tachyon-presentation-at-ampcamp-6-november-2015

Thanks,
Gene

On Sun, Jan 17, 2016 at 1:56 PM, Mark Hamstra 
wrote:

> Yes, that is one of the basic reasons to use a
> jobserver/shared-SparkContext.  Otherwise, in order share the data in an
> RDD you have to use an external storage system, such as a distributed
> filesystem or Tachyon.
>
> On Sun, Jan 17, 2016 at 1:52 PM, Jia  wrote:
>
>> Thanks, Mark. Then, I guess JobServer can fundamentally solve my problem,
>> so that jobs can be submitted at different time and still share RDDs.
>>
>> Best Regards,
>> Jia
>>
>>
>> On Jan 17, 2016, at 3:44 PM, Mark Hamstra 
>> wrote:
>>
>> There is a 1-to-1 relationship between Spark Applications and
>> SparkContexts -- fundamentally, a Spark Applications is a program that
>> creates and uses a SparkContext, and that SparkContext is destroyed when
>> then Application ends.  A jobserver generically and the Spark JobServer
>> specifically is an Application that keeps a SparkContext open for a long
>> time and allows many Jobs to be be submitted and run using that shared
>> SparkContext.
>>
>> More than one Application/SparkContext unavoidably implies more than one
>> JVM process per Worker -- Applications/SparkContexts cannot share JVM
>> processes.
>>
>> On Sun, Jan 17, 2016 at 1:15 PM, Jia  wrote:
>>
>>> Hi, Mark, sorry for the confusion.
>>>
>>> Let me clarify, when an application is submitted, the master will tell
>>> each Spark worker to spawn an executor JVM process. All the task sets  of
>>> the application will be executed by the executor. After the application
>>> runs to completion. The executor process will be killed.
>>> But I hope that all applications submitted can run in the same executor,
>>> can JobServer do that? If so, it’s really good news!
>>>
>>> Best Regards,
>>> Jia
>>>
>>> On Jan 17, 2016, at 3:09 PM, Mark Hamstra 
>>> wrote:
>>>
>>> You've still got me confused.  The SparkContext exists at the Driver,
>>> not on an Executor.
>>>
>>> Many Jobs can be run by a SparkContext -- it is a common pattern to use
>>> something like the Spark Jobserver where all Jobs are run through a shared
>>> SparkContext.
>>>
>>> On Sun, Jan 17, 2016 at 12:57 PM, Jia Zou 
>>> wrote:
>>>
 Hi, Mark, sorry, I mean SparkContext.
 I mean to change Spark into running all submitted jobs (SparkContexts)
 in one executor JVM.

 Best Regards,
 Jia

 On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra 
 wrote:

> -dev
>
> What do you mean by JobContext?  That is a Hadoop mapreduce concept,
> not Spark.
>
> On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou 
> wrote:
>
>> Dear all,
>>
>> Is there a way to reuse executor JVM across different JobContexts?
>> Thanks.
>>
>> Best Regards,
>> Jia
>>
>
>

>>>
>>>
>>
>>
>


Re: Split columns in RDD

2016-01-19 Thread Richard Siebeling
that's true and that's the way we're doing it now but then we're only using
the first row to determine the number of splitted columns.
It could be that in the second (or last) row there are 10 new columns and
we'd like to know that too.

Probably a reduceby operator can be used to do that, but I'm hoping that
there is a better or another way,

thanks,
Richard

On Tue, Jan 19, 2016 at 4:22 PM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> The most efficient to determine the number of columns would be to do a
> take(1) and split in the driver.
>
> Regards
> Sab
> On 19-Jan-2016 8:48 pm, "Richard Siebeling"  wrote:
>
>> Hi,
>>
>> what is the most efficient way to split columns and know how many columns
>> are created.
>>
>> Here is the current RDD
>> -
>> ID   STATE
>> -
>> 1   TX, NY, FL
>> 2   CA, OH
>> -
>>
>> This is the preferred output:
>> -
>> IDSTATE_1 STATE_2  STATE_3
>> -
>> 1 TX  NY  FL
>> 2 CA  OH
>> -
>>
>> With a separated with the new columns STATE_1, STATE_2, STATE_3
>>
>>
>> It looks like the following output is feasible using a ReduceBy operator
>> -
>> IDSTATE_1 STATE_2  STATE_3   NEW_COLUMNS
>> -
>> 1 TXNY   FLSTATE_1, STATE_2,
>> STATE_3
>> 2 CAOH STATE_1, STATE_2
>> -
>>
>> Then in the reduce step, the distinct new columns can be calculated.
>> Is it possible to get the second output where next to the RDD the
>> new_columns are saved somewhere?
>> Or is the required to use the second approach?
>>
>> thanks in advance,
>> Richard
>>
>>


RangePartitioning

2016-01-19 Thread ddav
Hi,

I have the following pair RDD created in java. 

JavaPairRDD progRef =
sc.textFile(programReferenceDataFile, 12).filter(
(String s) -> !s.startsWith("#")).mapToPair(
(String s) -> {
ProgramDataRef ref = new 
ProgramDataRef(s);
return new Tuple2(ref.startTime, ref);
}
);

I need to partition this RDD using the provided RangePartitioner in the
framework. However, I am unable to determine what the input parameters are
in order to create the partitioner. 

new RangePartitioner(12, ?, true, ?, ?);

I am specifically looking for info on the 2nd parameter - RDD> rdd - which doesn't match my pair RDD. How do I create
an RDD of this type from the RDD I have (or the input text file). 
Also, the 4th and 5th parameter - scala.math.Ordering evidence$1,
scala.reflect.ClassTag evidence$2. How do I create these classes based on
my key type of Integer. 

Finally once I have my RDD partitioned I want to call
OrderedRDDFunctions.filterByRange(...). I believe once I have the answers to
the above question I will have the input parameters in order to create an
instance of OrderedRDDFunctions.   

Thanks,
Dave.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RangePartitioning-tp26010.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Concurrent Spark jobs

2016-01-19 Thread emlyn
We have a Spark application that runs a number of ETL jobs, writing the
outputs to Redshift (using databricks/spark-redshift). This is triggered by
calling DataFrame.write.save on the different DataFrames one after another.
I noticed that during the Redshift load while the output of one job is being
loaded into Redshift (which can take ~20 minutes for some jobs), the cluster
is sitting idle.

In order to maximise the use of the cluster, we tried starting a thread for
each job so that they can all be submitted simultaneously, and therefore the
cluster can be utilised by another job while one is being written to
Redshift.

However, when this is run, it fails with a TimeoutException (see stack trace
below). Would it make sense to increase "spark.sql.broadcastTimeout"? I'm
not sure that would actually solve anything. Should it not be possible to
save multiple DataFrames simultaneously? Or any other hints on how to make
better use of the cluster's resources?

Thanks.


Stack trace:

Exception in thread "main" java.util.concurrent.ExecutionException:
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
...
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at
org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.DataFrame.rdd$lzycompute(DataFrame.scala:1676)
at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:1673)
at org.apache.spark.sql.DataFrame.mapPartitions(DataFrame.scala:1465)
at
com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:264)
at
com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:374)
at
com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: can we create dummy variables from categorical variables, using sparkR

2016-01-19 Thread Vinayak Agrawal
Yes, you can use Rformula library. Please see
https://databricks.com/blog/2015/10/05/generalized-linear-models-in-sparkr-and-r-formula-support-in-mllib.html

On Tue, Jan 19, 2016 at 10:34 AM, Devesh Raj Singh 
wrote:

> Hi,
>
> Can we create dummy variables for categorical variables in sparkR like we
> do using "dummies" package in R
>
> --
> Warm regards,
> Devesh.
>



-- 
Vinayak Agrawal
Big Data Analytics
IBM

"To Strive, To Seek, To Find and Not to Yield!"
~Lord Alfred Tennyson


Re: Split columns in RDD

2016-01-19 Thread Daniel Imberman
edit 2: filter should be map

val numColumns = separatedInputStrings.map{ case(id, (stateList,
numStates)) => numStates}.reduce(math.max)

On Tue, Jan 19, 2016 at 8:19 AM Daniel Imberman 
wrote:

> edit: Mistake in the second code example
>
> val numColumns = separatedInputStrings.filter{ case(id, (stateList,
> numStates)) => numStates}.reduce(math.max)
>
>
> On Tue, Jan 19, 2016 at 8:17 AM Daniel Imberman 
> wrote:
>
>> Hi Richard,
>>
>> If I understand the question correctly it sounds like you could probably
>> do this using mapValues (I'm assuming that you want two pieces of
>> information out of all rows, the states as individual items, and the number
>> of states in the row)
>>
>>
>> val separatedInputStrings = input:RDD[(Int, String).mapValues{
>> val inputsString = "TX,NV,WY"
>> val stringList = inputString.split(",")
>> (stringList, stringList.size)
>> }
>>
>> If you then wanted to find out how many state columns you should have in
>> your table you could use a normal reduce (with a filter beforehand to
>> reduce how much data you are shuffling)
>>
>> val numColumns = separatedInputStrings.filter(_._2).reduce(math.max)
>>
>> I hope this helps!
>>
>>
>>
>> On Tue, Jan 19, 2016 at 8:05 AM Richard Siebeling 
>> wrote:
>>
>>> that's true and that's the way we're doing it now but then we're only
>>> using the first row to determine the number of splitted columns.
>>> It could be that in the second (or last) row there are 10 new columns
>>> and we'd like to know that too.
>>>
>>> Probably a reduceby operator can be used to do that, but I'm hoping that
>>> there is a better or another way,
>>>
>>> thanks,
>>> Richard
>>>
>>> On Tue, Jan 19, 2016 at 4:22 PM, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
 The most efficient to determine the number of columns would be to do a
 take(1) and split in the driver.

 Regards
 Sab
 On 19-Jan-2016 8:48 pm, "Richard Siebeling" 
 wrote:

> Hi,
>
> what is the most efficient way to split columns and know how many
> columns are created.
>
> Here is the current RDD
> -
> ID   STATE
> -
> 1   TX, NY, FL
> 2   CA, OH
> -
>
> This is the preferred output:
> -
> IDSTATE_1 STATE_2  STATE_3
> -
> 1 TX  NY  FL
> 2 CA  OH
> -
>
> With a separated with the new columns STATE_1, STATE_2, STATE_3
>
>
> It looks like the following output is feasible using a ReduceBy
> operator
> -
> IDSTATE_1 STATE_2  STATE_3   NEW_COLUMNS
> -
> 1 TXNY   FLSTATE_1,
> STATE_2, STATE_3
> 2 CAOH STATE_1, STATE_2
> -
>
> Then in the reduce step, the distinct new columns can be calculated.
> Is it possible to get the second output where next to the RDD the
> new_columns are saved somewhere?
> Or is the required to use the second approach?
>
> thanks in advance,
> Richard
>
>
>>>