Building RDD for a Custom MPP Database

2015-10-05 Thread VJ
Hi
I have to build a RDD for a custom MPP database, which is shared across
several nodes. I would like to do this using Java; Can I extend the JavaRDD
and override the specific methods? Also, if can I override the
getlocationPreferences methods as well? Is there any other alternatives,
where I can leverage existing RDD?

Any pointers appreciated 

Thanks
VJ



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Building-RDD-for-a-Custom-MPP-Database-tp24934.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pyspark 1.5.1: Error when using findSynonyms after loading Word2VecModel

2015-10-05 Thread evg952
After loading a word2vec model that I have trained and saved I get the
following error when trying to use the findSynonyms function:

synonyms = model.findSynonyms("pope", 20)
  File
"/Users/edgarvelasco/minhash/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/mllib/feature.py",
line 450, in findSynonyms
ValueError: too many values to unpack

I do not understand why this is occurring since this works perfectly fine
when I try to use this function directly after training a model without
saving. I am running this locally on my mac.

Thank you for the help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-1-5-1-Error-when-using-findSynonyms-after-loading-Word2VecModel-tp24933.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD of ImmutableList

2015-10-05 Thread Igor Berman
kryo doesn't support guava's collections by default
I remember encountered project in github that fixes this(not sure though).
I've ended to stop using guava collections as soon as spark rdds are
concerned.

On 5 October 2015 at 21:04, Jakub Dubovsky 
wrote:

> Hi all,
>
>   I would like to have an advice on how to use ImmutableList with RDD. Small
> presentation of an essence of my problem in spark-shell with guava jar
> added:
>
> scala> import com.google.common.collect.ImmutableList
> import com.google.common.collect.ImmutableList
>
> scala> val arr = Array(ImmutableList.of(1,2), ImmutableList.of(2,4),
> ImmutableList.of(3,6))
> arr: Array[com.google.common.collect.ImmutableList[Int]] = Array([1, 2],
> [2, 4], [3, 6])
>
> scala> val rdd = sc.parallelize(arr)
> rdd:
> org.apache.spark.rdd.RDD[com.google.common.collect.ImmutableList[Int]] =
> ParallelCollectionRDD[0] at parallelize at :24
>
> scala> rdd.count
>
>  This results in kryo exception saying that it cannot add a new element to
> list instance while deserialization:
>
> java.io.IOException: java.lang.UnsupportedOperationException
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
> at
> org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
> ...
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsupportedOperationException
> at
> com.google.common.collect.ImmutableCollection.add(ImmutableCollection.java:91)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
> ...
>
>   It somehow makes sense. But I cannot think of a workaround and I do not
> believe that using ImmutableList with RDD is not possible. How this is
> solved?
>
>   Thank you in advance!
>
>Jakub Dubovsky
>
>


RE: Building RDD for a Custom MPP Database

2015-10-05 Thread java8964
You want to implement a custom InputFormat for your MPP, which can provide the 
location preference information to Spark.
Yong

> Date: Mon, 5 Oct 2015 10:53:27 -0700
> From: vjan...@sankia.com
> To: user@spark.apache.org
> Subject: Building RDD for a Custom MPP Database
> 
> Hi
> I have to build a RDD for a custom MPP database, which is shared across
> several nodes. I would like to do this using Java; Can I extend the JavaRDD
> and override the specific methods? Also, if can I override the
> getlocationPreferences methods as well? Is there any other alternatives,
> where I can leverage existing RDD?
> 
> Any pointers appreciated 
> 
> Thanks
> VJ
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Building-RDD-for-a-Custom-MPP-Database-tp24934.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

Re: "Method json([class java.util.HashMap]) does not exist" when reading JSON on PySpark

2015-10-05 Thread Fernando Paladini
Thank you for the replies and sorry about the delay, my e-mail client send
this conversation to Spam (??).

I'll take a look in your tips and come back later to post my questions /
progress. Again, thank you so much!

2015-09-30 18:37 GMT-03:00 Michael Armbrust :

> I think the problem here is that you are passing in parsed JSON that
> stored as a dictionary (which is converted to a hashmap when going into the
> JVM).  You should instead be passing in the path to the json file
> (formatted as Akhil suggests) so that Spark can do the parsing in
> parallel.  The other option would be to construct and RDD of JSON string
> and pass that to the JSON method.
>
> On Wed, Sep 30, 2015 at 2:28 AM, Akhil Das 
> wrote:
>
>> Each Json Doc should be in a single line i guess.
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
>>
>> Note that the file that is offered as *a json file* is not a typical
>> JSON file. Each line must contain a separate, self-contained valid JSON
>> object. As a consequence, a regular multi-line JSON file will most often
>> fail.
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Sep 29, 2015 at 11:07 AM, Fernando Paladini > > wrote:
>>
>>> Hello guys,
>>>
>>> I'm very new to Spark and I'm having some troubles when reading a JSON
>>> to dataframe on PySpark.
>>>
>>> I'm getting a JSON object from an API response and I would like to store
>>> it in Spark as a DataFrame (I've read that DataFrame is better than RDD,
>>> that's accurate?). For what I've read
>>> 
>>> on documentation, I just need to call the method sqlContext.read.json in
>>> order to do what I want.
>>>
>>> *Following is the code from my test application:*
>>> json_object = json.loads(response.text)
>>> sc = SparkContext("local", appName="JSON to RDD")
>>> sqlContext = SQLContext(sc)
>>> dataframe = sqlContext.read.json(json_object)
>>> dataframe.show()
>>>
>>> *The problem is that when I run **"spark-submit myExample.py" I got the
>>> following error:*
>>> 15/09/29 01:18:54 INFO BlockManagerMasterEndpoint: Registering block
>>> manager localhost:48634 with 530.0 MB RAM, BlockManagerId(driver,
>>> localhost, 48634)
>>> 15/09/29 01:18:54 INFO BlockManagerMaster: Registered BlockManager
>>> Traceback (most recent call last):
>>>   File "/home/paladini/ufxc/lisha/learning/spark-api-kairos/test1.py",
>>> line 35, in 
>>> dataframe = sqlContext.read.json(json_object)
>>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
>>> line 144, in json
>>>   File
>>> "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line
>>> 538, in __call__
>>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
>>> 36, in deco
>>>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>>> line 304, in get_return_value
>>> py4j.protocol.Py4JError: An error occurred while calling o21.json. Trace:
>>> py4j.Py4JException: Method json([class java.util.HashMap]) does not exist
>>> at
>>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
>>> at
>>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
>>> at py4j.Gateway.invoke(Gateway.java:252)
>>> at
>>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> *What I'm doing wrong? *
>>> Check out this gist
>>>  to see the JSON
>>> I'm trying to load.
>>>
>>> Thanks!
>>> Fernando Paladini
>>>
>>
>>
>


-- 
Fernando Paladini


Please help: Processes with HiveContext slower in cluster

2015-10-05 Thread Saif.A.Ellafi
Hi,

I have a HiveContext job which takes less than 1 minute to complete in local 
mode with 16 cores.
However, when I launch it over stand-alone cluster, it takes for ever, probably 
can't even finish. Even when I have the same only node running up in which I 
execute it locally.

How could I diagnose this issue? where can I start?

Thanks!
Saif



Re: How to optimize group by query fired using hiveContext.sql?

2015-10-05 Thread Umesh Kacha
Hi thanks I usually get see the following errors in Spark logs and because
of that I think executor gets lost all of the following happens because
huge data shuffle and I cant avoid that dont know what to do please guide

15/08/16 12:26:46 WARN spark.HeartbeatReceiver: Removing executor 10
with no recent heartbeats:

1051638 ms exceeds timeout 100 ms

Or

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)



OR YARN kills container because of

Container [pid=26783,containerID=container_1389136889967_0009_01_02]
is running beyond physical memory limits. Current usage: 30.2 GB of 30
GB physical memory used; Killing container.


On Mon, Oct 5, 2015 at 8:00 AM, Alex Rovner 
wrote:

> Can you at least copy paste the error(s) you are seeing when the job
> fails? Without the error message(s), it's hard to even suggest anything.
>
> *Alex Rovner*
> *Director, Data Engineering *
> *o:* 646.759.0052
>
> * *
>
> On Sat, Oct 3, 2015 at 9:50 AM, Umesh Kacha  wrote:
>
>> Hi thanks I cant share yarn logs because of privacy in my company but I
>> can tell you I have seen yarn logs there I have not found anything except
>> YARN killing container because it is exceeds physical memory capacity.
>>
>> I am using the following command line script Above job launches around
>> 1500 ExecutorService threads from a driver with a thread pool of 15 so at a
>> time 15 jobs will be running as showing in UI.
>>
>> ./spark-submit --class com.xyz.abc.MySparkJob
>>
>> --conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=512M" -
>>
>> -driver-java-options -XX:MaxPermSize=512m -
>>
>> -driver-memory 4g --master yarn-client
>>
>> --executor-memory 27G --executor-cores 2
>>
>> --num-executors 40
>>
>> --jars /path/to/others-jars
>>
>> /path/to/spark-job.jar
>>
>>
>> On Sat, Oct 3, 2015 at 7:11 PM, Alex Rovner 
>> wrote:
>>
>>> Can you send over your yarn logs along with the command you are using to
>>> submit your job?
>>>
>>> *Alex Rovner*
>>> *Director, Data Engineering *
>>> *o:* 646.759.0052
>>>
>>> * *
>>>
>>> On Sat, Oct 3, 2015 at 9:07 AM, Umesh Kacha 
>>> wrote:
>>>
 Hi Alex thanks much for the reply. Please read the following for more
 details about my problem.


 http://stackoverflow.com/questions/32317285/spark-executor-oom-issue-on-yarn

 My each container has 8 core and 30 GB max memory. So I 

Re: GraphX: How can I tell if 2 nodes are connected?

2015-10-05 Thread Dino Fancellu
Ah thanks, got it working with that.

e.g.

val (_,smap)=shortest.vertices.filter(_._1==src).first
smap.contains(dest)

Is there anything a little less eager?

i.e. that doesn't compute all the distances from all source nodes, where I
can supply the source vertex id,  dest vertex id, and just get an int back.

Thanks 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-How-can-I-tell-if-2-nodes-are-connected-tp24926p24935.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Exception: "You must build Spark with Hive. Export 'SPARK_HIVE=true' and run build/sbt assembly"

2015-10-05 Thread Ted Yu
In the tar ball, do you see any class(es) from spark-hive module ?

>From the error message, I don't think so.

Cheers

On Mon, Oct 5, 2015 at 11:16 AM, Ahmed Cheriat 
wrote:

> Thanks Ted for your reply.
> Well, it's a stanalone spark version "spark-1.5.0-bin-hadoop2.6" (windows
> 7).
> To launch spark i use the prompt command (dos):
> bin\pyspark --jars "my_path_to_mysql_jdbc.jar"
>
> This command starts a notebook pyspark  without errors.
>
>
> 2015-10-05 18:29 GMT+02:00 Ted Yu :
>
>> What command did you use to build Spark 1.5.0 ?
>>
>> bq. Export 'SPARK_HIVE=true' and run build/sbt assembly
>>
>> Please following the above.
>>
>> BTW 1.5.1 has been released which is more stable.
>>
>> Please use 1.5.1
>>
>> Cheers
>>
>> On Mon, Oct 5, 2015 at 9:25 AM, cherah30  wrote:
>>
>>> I work with Spark 1.5 on windows 7, with anacond and pyspark. everything
>>> works fine until I wanted to test the connection to my MySQL database.
>>> So I
>>> started watching it
>>>
>>> https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
>>> <
>>> https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
>>> >
>>> .
>>> Everything is set (jdbc, ... Etc).
>>>
>>> To start playing with, I just wanted to connect to my Mysql database to
>>> retrieve data from a table.
>>>
>>> Here is my code
>>>
>>> from pyspark.sql import HiveContext
>>> df_mysql = sqlHiveContext.read.format("jdbc").options(url =
>>> "jdbc:mysql://localhost:3306/my_bdd_name", driver =
>>> "com.mysql.jdbc.Driver",
>>> dbtable="bdd_My_table_nameXX",  user ="my_id", password="my_pw").load()
>>>
>>> And here is the exception message :
>>> Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and
>>> run build/sbt assembly", Py4JJavaError(u'An error occurred while calling
>>> None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o28)).
>>>
>>> You get an idea of what to do?
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Exception-You-must-build-Spark-with-Hive-Export-SPARK-HIVE-true-and-run-build-sbt-assembly-tp24928.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Ahmed Cheriat
>


Re: "Method json([class java.util.HashMap]) does not exist" when reading JSON on PySpark

2015-10-05 Thread Fernando Paladini
Update:

I've updated my code and now I have the following JSON:
https://gist.github.com/paladini/27bb5636d91dec79bd56
In the same link you can check the output from "spark-submit
myPythonScript.py", where I call "myDataframe.show()". The following is
printed by Spark (among other useless debug information):


​
That's correct for the given JSON input
 (gist link above)?
How can I test if Spark can understand this DataFrame and make complex
manipulations with that?

Thank you! Hope you can help me soon :3
Fernando Paladini.

2015-10-05 15:23 GMT-03:00 Fernando Paladini :

> Thank you for the replies and sorry about the delay, my e-mail client send
> this conversation to Spam (??).
>
> I'll take a look in your tips and come back later to post my questions /
> progress. Again, thank you so much!
>
> 2015-09-30 18:37 GMT-03:00 Michael Armbrust :
>
>> I think the problem here is that you are passing in parsed JSON that
>> stored as a dictionary (which is converted to a hashmap when going into the
>> JVM).  You should instead be passing in the path to the json file
>> (formatted as Akhil suggests) so that Spark can do the parsing in
>> parallel.  The other option would be to construct and RDD of JSON string
>> and pass that to the JSON method.
>>
>> On Wed, Sep 30, 2015 at 2:28 AM, Akhil Das 
>> wrote:
>>
>>> Each Json Doc should be in a single line i guess.
>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
>>>
>>> Note that the file that is offered as *a json file* is not a typical
>>> JSON file. Each line must contain a separate, self-contained valid JSON
>>> object. As a consequence, a regular multi-line JSON file will most often
>>> fail.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Sep 29, 2015 at 11:07 AM, Fernando Paladini <
>>> fnpalad...@gmail.com> wrote:
>>>
 Hello guys,

 I'm very new to Spark and I'm having some troubles when reading a JSON
 to dataframe on PySpark.

 I'm getting a JSON object from an API response and I would like to
 store it in Spark as a DataFrame (I've read that DataFrame is better than
 RDD, that's accurate?). For what I've read
 
 on documentation, I just need to call the method sqlContext.read.json in
 order to do what I want.

 *Following is the code from my test application:*
 json_object = json.loads(response.text)
 sc = SparkContext("local", appName="JSON to RDD")
 sqlContext = SQLContext(sc)
 dataframe = sqlContext.read.json(json_object)
 dataframe.show()

 *The problem is that when I run **"spark-submit myExample.py" I got
 the following error:*
 15/09/29 01:18:54 INFO BlockManagerMasterEndpoint: Registering block
 manager localhost:48634 with 530.0 MB RAM, BlockManagerId(driver,
 localhost, 48634)
 15/09/29 01:18:54 INFO BlockManagerMaster: Registered BlockManager
 Traceback (most recent call last):
   File "/home/paladini/ufxc/lisha/learning/spark-api-kairos/test1.py",
 line 35, in 
 dataframe = sqlContext.read.json(json_object)
   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
 line 144, in json
   File
 "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line
 538, in __call__
   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
 36, in deco
   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
 line 304, in get_return_value
 py4j.protocol.Py4JError: An error occurred while calling o21.json.
 Trace:
 py4j.Py4JException: Method json([class java.util.HashMap]) does not
 exist
 at
 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
 at
 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
 at py4j.Gateway.invoke(Gateway.java:252)
 at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:745)

 *What I'm doing wrong? *
 Check out this gist
  to see the
 JSON I'm trying to load.

 Thanks!
 Fernando Paladini

>>>
>>>
>>
>
>
> --
> Fernando Paladini
>



-- 
Fernando Paladini


Re: GraphX: How can I tell if 2 nodes are connected?

2015-10-05 Thread Anwar Rizal
Maybe connected component is what you need ?
On Oct 5, 2015 19:02, "Robineast"  wrote:

> GraphX has a Shortest Paths algorithm implementation which will tell you,
> for
> all vertices in the graph, the shortest distance to a specific ('landmark')
> vertex. The returned value is '/a graph where each vertex attribute is a
> map
> containing the shortest-path distance to each reachable landmark vertex/'.
> If there is no path to the landmark vertex then the map for the source
> vertex is empty
>
>
>
> -
> Robin East
> Spark GraphX in Action Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-How-can-I-tell-if-2-nodes-are-connected-tp24926p24930.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RDD of ImmutableList

2015-10-05 Thread Jakub Dubovsky
Hi all,



  I would like to have an advice on how to use ImmutableList with RDD. Small
 presentation of an essence of my problem in spark-shell with guava jar 
added:




scala> import com.google.common.collect.ImmutableList

import com.google.common.collect.ImmutableList




scala> val arr = Array(ImmutableList.of(1,2), ImmutableList.of(2,4), 
ImmutableList.of(3,6))

arr: Array[com.google.common.collect.ImmutableList[Int]] = Array([1, 2], [2,
4], [3, 6])




scala> val rdd = sc.parallelize(arr)


rdd: org.apache.spark.rdd.RDD[com.google.common.collect.ImmutableList[Int]] 
= ParallelCollectionRDD[0] at parallelize at :24




scala> rdd.count





 This results in kryo exception saying that it cannot add a new element to 
list instance while deserialization:




java.io.IOException: java.lang.UnsupportedOperationException



        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)

        at org.apache.spark.rdd.ParallelCollectionPartition.readObject
(ParallelCollectionRDD.scala:70)

        ...

        at java.lang.Thread.run(Thread.java:745)


Caused by: java.lang.UnsupportedOperationException

        at com.google.common.collect.ImmutableCollection.add
(ImmutableCollection.java:91)

        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read
(CollectionSerializer.java:109)

        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read
(CollectionSerializer.java:18)

        ...





  It somehow makes sense. But I cannot think of a workaround and I do not 
believe that using ImmutableList with RDD is not possible. How this is 
solved?




  Thank you in advance!




   Jakub Dubovsky






Re: RDD of ImmutableList

2015-10-05 Thread Jakub Dubovsky

Thank you for quick reaction.




I have to say this is very surprising to me. I never received an advice to 
stop using an immutable approach. Whole RDD is designed to be immutable 
(which is sort of sabotaged by not being able to (de)serialize immutable 
classes properly). I will ask on dev list if this is to be changed or not.




Ok, I have let go initial feelings and now let's be pragmatic. And this is 
still for everyone not just Igor:




I use a class from a library which is immutable. Now I want to use this 
class to represent my data in RDD because this saves me a huge amount of 
work. The class uses ImmutableList as one of its fields. That's why it 
fails. But isn't there a way to workaround this? I ask this because I have 
exactly zero knowledge about kryo and the way how it works. So for example 
would some of these two work?




1) Change the external class so that it implements writeObject, readObject 
methods (it's java). Will these methods be used by kryo? (I can ask the 
maintainers of a library to change the class if the change is reasonable. 
Adding these methods would be while dropping immutability certainly wouldn'
t)




2) Wrap the class to scala class which would translate the data during (de)
serialization?




  Thanks!

  Jakub Dubovsky


-- Původní zpráva --
Od: Igor Berman 
Komu: Jakub Dubovsky 
Datum: 5. 10. 2015 20:11:35
Předmět: Re: RDD of ImmutableList

"

kryo doesn't support guava's collections by default
I remember encountered project in github that fixes this(not sure though). 
I've ended to stop using guava collections as soon as spark rdds are 
concerned.




On 5 October 2015 at 21:04, Jakub Dubovsky  wrote:
"
Hi all,



  I would like to have an advice on how to use ImmutableList with RDD. Small
 presentation of an essence of my problem in spark-shell with guava jar 
added:




scala> import com.google.common.collect.ImmutableList

import com.google.common.collect.ImmutableList




scala> val arr = Array(ImmutableList.of(1,2), ImmutableList.of(2,4), 
ImmutableList.of(3,6))

arr: Array[com.google.common.collect.ImmutableList[Int]] = Array([1, 2], [2,
4], [3, 6])




scala> val rdd = sc.parallelize(arr)


rdd: org.apache.spark.rdd.RDD[com.google.common.collect.ImmutableList[Int]] 
= ParallelCollectionRDD[0] at parallelize at :24




scala> rdd.count





 This results in kryo exception saying that it cannot add a new element to 
list instance while deserialization:




java.io.IOException: java.lang.UnsupportedOperationException



        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)

        at org.apache.spark.rdd.ParallelCollectionPartition.readObject
(ParallelCollectionRDD.scala:70)

        ...

        at java.lang.Thread.run(Thread.java:745)


Caused by: java.lang.UnsupportedOperationException

        at com.google.common.collect.ImmutableCollection.add
(ImmutableCollection.java:91)

        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read
(CollectionSerializer.java:109)

        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read
(CollectionSerializer.java:18)

        ...





  It somehow makes sense. But I cannot think of a workaround and I do not 
believe that using ImmutableList with RDD is not possible. How this is 
solved?




  Thank you in advance!




   Jakub Dubovsky





"



"

Re: Reading JSON in Pyspark throws scala.MatchError

2015-10-05 Thread Davies Liu
Could you create a JIRA to track this bug?

On Fri, Oct 2, 2015 at 1:42 PM, balajikvijayan
 wrote:
> Running Windows 8.1, Python 2.7.x, Scala 2.10.5, Spark 1.4.1.
>
> I'm trying to read in a large quantity of json data in a couple of files and
> I receive a scala.MatchError when I do so. Json, Python and stack trace all
> shown below.
>
> Json:
>
> {
> "dataunit": {
> "page_view": {
> "nonce": 438058072,
> "person": {
> "user_id": 5846
> },
> "page": {
> "url": "http://mysite.com/blog;
> }
> }
> },
> "pedigree": {
> "true_as_of_secs": 1438627992
> }
> }
>
> Python:
>
> import pyspark
> sc = pyspark.SparkContext()
> sqlContext = pyspark.SQLContext(sc)
> pageviews = sqlContext.read.json("[Path to folder containing file with above
> json]")
> pageviews.collect()
>
> Stack Trace:
> Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 32.0 failed 1 times, most recent failure: Lost task 1.0 in stage
> 32.0 (TID 133, localhost): scala.MatchError:
> (VALUE_STRING,ArrayType(StructType(),true)) (of class scala.Tuple2)
> at
> org.apache.spark.sql.json.JacksonParser$.convertField(JacksonParser.scala:49)
> at
> org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:201)
> at
> org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:193)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:116)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:111)
> at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:111)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:111)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:111)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
> at

Re: GraphX: How can I tell if 2 nodes are connected?

2015-10-05 Thread Robineast
GraphX doesn't implement Tinkerpop functionality but there is an external
effort to provide an implementation. See
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-4279



-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-How-can-I-tell-if-2-nodes-are-connected-tp24926p24941.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL "SELECT ... LIMIT" scans the entire Hive table?

2015-10-05 Thread YaoPau
I'm using SqlCtx connected to Hive in CDH 5.4.4.  When I run "SELECT * FROM
my_db.my_tbl LIMIT 5", it scans the entire table like Hive would instead of
doing a .take(5) on it and returning results immediately.

Is there a way to get Spark SQL to use .take(5) instead of the Hive logic of
scanning the full table when running a SELECT?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-SELECT-LIMIT-scans-the-entire-Hive-table-tp24938.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: String operation in filter with a special character

2015-10-05 Thread Michael Armbrust
Double quotes (") are used to create string literals in HiveQL / Spark
SQL.  So you are asking if the string A+B equals the number 2.0.

You should use backticks (`) to escape weird characters in column names.

On Mon, Oct 5, 2015 at 12:59 AM, Hemminger Jeff  wrote:

> I have a rather odd use case. I have a DataFrame column name with a +
> value in it.
> The app performs some processing steps before determining the column name,
> and it
> would be much easier to code if I could use the DataFrame filter
> operations with a String.
>
> This demonstrates the issue I am having:
>
> dataFrame.filter(renamed("A+B").equalTo(2.0)).show()
>
> This will return all rows with the column value matching 2.0, as expected.
>
> dataFrame.filter("\"A+B\"=2.0").show()
>
> This executes but does not return the correct results. It returns an empty
> result.
>
> dataFrame.filter("\"A+C\"=2.0").show()
>
> Referencing a non-existent column name returns the same empty result.
>
> Any suggestions?
>
> Jeff
>


Re: Secondary Sorting in Spark

2015-10-05 Thread Adrian Tanase
Great article, especially the use of a custom partitioner.

Also, sorting by multiple fields by creating a tuple out of them is an awesome, 
easy to miss, Scala feature.

Sent from my iPhone

On 04 Oct 2015, at 21:41, Bill Bejeck 
> wrote:

I've written blog post on secondary sorting in Spark and I'd thought I'd share 
it with the group

http://codingjunkie.net/spark-secondary-sort/

Thanks,
Bill


Re: Broadcast var is null

2015-10-05 Thread Adrian Tanase
FYI the same happens with accumulators when recovering from checkpoint. I'd 
love to see this fixed somehow as the workaround (using a singleton factory in 
foreachRdd to make sure the accumulators are initialized instead of null) is 
really intrusive...

Sent from my iPhone

On 05 Oct 2015, at 22:52, Tathagata Das 
> wrote:

Make sure the broadcast variable works independent of the streaming 
application. Then make sure it work without have 
StreamingContext.getOrCreate(). That will disambiguate whether that error is 
thrown when starting a new context, or when recovering a context from 
checkpoint (as getOrCreate is supposed to do).

On Mon, Oct 5, 2015 at 9:23 AM, dpristin 
> wrote:
Hi,

Can anyone point me out to what I'm doing wrong? I've implemented a very
basic spark streaming app that uses a single broadcast variable. When it
runs locally it produces a proper output (the array I broadcast). But when
deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here
is the code:

--- imports go here

object BroadcastTest extends App {
  val logger = LoggerFactory.getLogger("OinkSparkMain")
  logger.info("OinkSparkMain - Setup Logger")

// This is our custom context setup code; nothing fancy goes on here
  val config = Configuration(args)
  val ssc: StreamingContext =
StreamingContext.getOrCreate(config.checkpointDirectory, () => {
SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))})


  val kafkaStreamFactory = new KafkaStreamFactory(config, ssc)
  val messages = kafkaStreamFactory.Create

  // Grab the value data above kafka input dstream as a string
  val events = messages.map( s => s._2 )

  //Create a broadcast variable - straight from the dev guide
  val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3))

  //Try to print out the value of the broadcast var here
  val transformed = events.transform(rdd => {
rdd.map(x => {
  if(broadcastVar == null) {
println("broadcastVar is null")
  }  else {
println("broadcastVar value: " + broadcastVar.value.mkString("|"))
  }
  x
})
  })

  transformed.foreachRDD(x => logger.info("Data: " +
x.collect.mkString("|")))

  ssc.start()
  ssc.awaitTermination()
}

Any input is very much appreciated!

Regards,
Dmitry.







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: Spark context on thrift server

2015-10-05 Thread Michael Armbrust
Isolation for different sessions will hopefully be fixed by
https://github.com/apache/spark/pull/8909

On Mon, Oct 5, 2015 at 8:38 AM, Younes Naguib <
younes.nag...@tritondigital.com> wrote:

> Hi,
>
>
>
> We’re using a spark thrift server and we connect using jdbc to run queries.
>
> Every time we run a set query, like “set schema”, it seems to affect the
> server, and not the session only.
>
>
>
> Is that an expected behavior? Or am I missing something.
>
>
>
>
>
> *Younes Naguib*
>
> Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8
>
> Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | younes.naguib
> @tritondigital.com 
>
>
>


Re: "Method json([class java.util.HashMap]) does not exist" when reading JSON on PySpark

2015-10-05 Thread Michael Armbrust
Looks correct to me.  Try for example:

from pyspark.sql.functions import *
df.withColumn("value", explode(df['values'])).show()

On Mon, Oct 5, 2015 at 2:15 PM, Fernando Paladini 
wrote:

> Update:
>
> I've updated my code and now I have the following JSON:
> https://gist.github.com/paladini/27bb5636d91dec79bd56
> In the same link you can check the output from "spark-submit
> myPythonScript.py", where I call "myDataframe.show()". The following is
> printed by Spark (among other useless debug information):
>
>
> ​
> That's correct for the given JSON input
>  (gist link
> above)? How can I test if Spark can understand this DataFrame and make
> complex manipulations with that?
>
> Thank you! Hope you can help me soon :3
> Fernando Paladini.
>
> 2015-10-05 15:23 GMT-03:00 Fernando Paladini :
>
>> Thank you for the replies and sorry about the delay, my e-mail client
>> send this conversation to Spam (??).
>>
>> I'll take a look in your tips and come back later to post my questions /
>> progress. Again, thank you so much!
>>
>> 2015-09-30 18:37 GMT-03:00 Michael Armbrust :
>>
>>> I think the problem here is that you are passing in parsed JSON that
>>> stored as a dictionary (which is converted to a hashmap when going into the
>>> JVM).  You should instead be passing in the path to the json file
>>> (formatted as Akhil suggests) so that Spark can do the parsing in
>>> parallel.  The other option would be to construct and RDD of JSON string
>>> and pass that to the JSON method.
>>>
>>> On Wed, Sep 30, 2015 at 2:28 AM, Akhil Das 
>>> wrote:
>>>
 Each Json Doc should be in a single line i guess.
 http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

 Note that the file that is offered as *a json file* is not a typical
 JSON file. Each line must contain a separate, self-contained valid JSON
 object. As a consequence, a regular multi-line JSON file will most often
 fail.

 Thanks
 Best Regards

 On Tue, Sep 29, 2015 at 11:07 AM, Fernando Paladini <
 fnpalad...@gmail.com> wrote:

> Hello guys,
>
> I'm very new to Spark and I'm having some troubles when reading a JSON
> to dataframe on PySpark.
>
> I'm getting a JSON object from an API response and I would like to
> store it in Spark as a DataFrame (I've read that DataFrame is better than
> RDD, that's accurate?). For what I've read
> 
> on documentation, I just need to call the method sqlContext.read.json in
> order to do what I want.
>
> *Following is the code from my test application:*
> json_object = json.loads(response.text)
> sc = SparkContext("local", appName="JSON to RDD")
> sqlContext = SQLContext(sc)
> dataframe = sqlContext.read.json(json_object)
> dataframe.show()
>
> *The problem is that when I run **"spark-submit myExample.py" I got
> the following error:*
> 15/09/29 01:18:54 INFO BlockManagerMasterEndpoint: Registering block
> manager localhost:48634 with 530.0 MB RAM, BlockManagerId(driver,
> localhost, 48634)
> 15/09/29 01:18:54 INFO BlockManagerMaster: Registered BlockManager
> Traceback (most recent call last):
>   File "/home/paladini/ufxc/lisha/learning/spark-api-kairos/test1.py",
> line 35, in 
> dataframe = sqlContext.read.json(json_object)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
> line 144, in json
>   File
> "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line
> 538, in __call__
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
> 36, in deco
>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 304, in get_return_value
> py4j.protocol.Py4JError: An error occurred while calling o21.json.
> Trace:
> py4j.Py4JException: Method json([class java.util.HashMap]) does not
> exist
> at
> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
> at
> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
> at py4j.Gateway.invoke(Gateway.java:252)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
>
> *What I'm doing wrong? *
> Check out this gist
>  to see the
> JSON I'm trying to load.
>
> Thanks!
> Fernando Paladini
>


>>>
>>
>>
>> --
>> Fernando Paladini
>>
>
>
>
> --
> Fernando Paladini
>


Re: Broadcast var is null

2015-10-05 Thread Tathagata Das
Make sure the broadcast variable works independent of the streaming
application. Then make sure it work without have
StreamingContext.getOrCreate(). That will disambiguate whether that error
is thrown when starting a new context, or when recovering a context from
checkpoint (as getOrCreate is supposed to do).

On Mon, Oct 5, 2015 at 9:23 AM, dpristin  wrote:

> Hi,
>
> Can anyone point me out to what I'm doing wrong? I've implemented a very
> basic spark streaming app that uses a single broadcast variable. When it
> runs locally it produces a proper output (the array I broadcast). But when
> deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here
> is the code:
>
> --- imports go here
>
> object BroadcastTest extends App {
>   val logger = LoggerFactory.getLogger("OinkSparkMain")
>   logger.info("OinkSparkMain - Setup Logger")
>
> // This is our custom context setup code; nothing fancy goes on here
>   val config = Configuration(args)
>   val ssc: StreamingContext =
> StreamingContext.getOrCreate(config.checkpointDirectory, () => {
> SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))})
>
>
>   val kafkaStreamFactory = new KafkaStreamFactory(config, ssc)
>   val messages = kafkaStreamFactory.Create
>
>   // Grab the value data above kafka input dstream as a string
>   val events = messages.map( s => s._2 )
>
>   //Create a broadcast variable - straight from the dev guide
>   val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3))
>
>   //Try to print out the value of the broadcast var here
>   val transformed = events.transform(rdd => {
> rdd.map(x => {
>   if(broadcastVar == null) {
> println("broadcastVar is null")
>   }  else {
> println("broadcastVar value: " + broadcastVar.value.mkString("|"))
>   }
>   x
> })
>   })
>
>   transformed.foreachRDD(x => logger.info("Data: " +
> x.collect.mkString("|")))
>
>   ssc.start()
>   ssc.awaitTermination()
> }
>
> Any input is very much appreciated!
>
> Regards,
> Dmitry.
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Usage of transform for code reuse between Streaming and Batch job affects the performance ?

2015-10-05 Thread Adrian Tanase
It shouldn't, as lots of the streaming operations delegate to transform under 
the hood. Easiest way to make sure is to look at the source code - with a 
decent IDE navigating around should be a breeze.

As a matter of fact, for more advanced operations where you may want to control 
the partitioning (e.g. unioning 2 DStreams or a simple flatMap) you will be 
forced to use transform as the DStreams hide away some of the control.

-adrian

Sent from my iPhone

> On 05 Oct 2015, at 03:59, swetha  wrote:
> 
> Hi,
> 
> I have the following code for code reuse between the batch and the streaming
> job
> 
> *  val groupedAndSortedSessions =
> sessions.transform(rdd=>JobCommon.getGroupedAndSortedSessions(rdd))*
> 
> The same code without code reuse between the batch and the streaming has the
> following. 
> 
> * val groupedSessions = sessions.groupByKey();
> 
>val sortedSessions  = groupedSessions.mapValues[(List[(Long,
> String)])](iter => iter.toList.sortBy(_._1))
> *
> 
> Does use of transform for code reuse affect groupByKey performance?
> 
> 
> Thanks,
> Swetha
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Usage-of-transform-for-code-reuse-between-Streaming-and-Batch-job-affects-the-performance-tp24920.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



OutOfMemoryError

2015-10-05 Thread Ramkumar V
Hi,

When i submit java spark job in cluster mode, i'm getting following
exception.

*LOG TRACE :*

INFO yarn.ExecutorRunnable: Setting up executor with commands:
List({{JAVA_HOME}}/bin/java, -server, -XX:OnOutOfMemoryError='kill
 %p', -Xms1024m, -Xmx1024m, -Djava.io.tmpdir={{PWD}}/tmp,
'-Dspark.ui.port=0', '-Dspark.driver.port=48309',
-Dspark.yarn.app.container.log.dir=, org.apache.spark.executor.CoarseGrainedExecutorBackend,
--driver-url, akka.tcp://sparkDriver@ip:port/user/CoarseGrainedScheduler,
 --executor-id, 2, --hostname, hostname , --cores, 1, --app-id,
application_1441965028669_9009, --user-class-path, file:$PWD
/__app__.jar, --user-class-path, file:$PWD/json-20090211.jar, 1>,
/stdout, 2>, /stderr).

I have a cluster of 11 machines (9 - 64 GB memory and 2 - 32 GB memory ).
my input data of size 128 GB.

How to solve this exception ? is it depends on driver.memory and
execuitor.memory setting ?


*Thanks*,



looking for HDP users

2015-10-05 Thread Tamas Szuromi
Hello,

I'm looking for someone who using hortonworks data platform especially 2.3
and also using spark 1.5.x.

I have the following issue with hdp and I wanted to know is a general bug
with HDP or just a local issue.

https://issues.apache.org/jira/browse/SPARK-10896

Thanks in advance!


*Tamas*


K-Means seems biased to one center

2015-10-05 Thread Justin Pihony
(Cross post with
http://stackoverflow.com/questions/32936380/k-means-clustering-is-biased-to-one-center)


I have a corpus of wiki pages (baseball, hockey, music, football) which I'm
running through tfidf and then through kmeans. After a couple issues to
start (you can see my previous questions), I'm finally getting a
KMeansModel...but
when I try to predict, I keep getting the same center. Is this because of
the small dataset, or because I'm comparing a multi-word document against a
smaller amount of words(1-20) query? Or is there something else I'm doing
wrong? See the below code:

//Preprocessing of data includes splitting into words
//and removing words with only 1 or 2 characters
val corpus: RDD[Seq[String]]
val hashingTF = new HashingTF(10)
val tf = hashingTF.transform(corpus)
val idf = new IDF().fit(tf)
val tfidf = idf.transform(tf).cache
val kMeansModel = KMeans.train(tfidf, 3, 10)

val queryTf = hashingTF.transform(List("music"))
val queryTfidf = idf.transform(queryTf)
kMeansModel.predict(queryTfidf) //Always the same, no matter the term supplied


String operation in filter with a special character

2015-10-05 Thread Hemminger Jeff
I have a rather odd use case. I have a DataFrame column name with a + value
in it.
The app performs some processing steps before determining the column name,
and it
would be much easier to code if I could use the DataFrame filter operations
with a String.

This demonstrates the issue I am having:

dataFrame.filter(renamed("A+B").equalTo(2.0)).show()

This will return all rows with the column value matching 2.0, as expected.

dataFrame.filter("\"A+B\"=2.0").show()

This executes but does not return the correct results. It returns an empty
result.

dataFrame.filter("\"A+C\"=2.0").show()

Referencing a non-existent column name returns the same empty result.

Any suggestions?

Jeff


Re: performance difference between Thrift server and SparkSQL?

2015-10-05 Thread Jeff Thompson
Thanks for the suggestion.  The output from EXPLAIN is indeed equivalent in
both sparkSQL and via the Thrift server.  I did some more testing.  The
source of the performance difference is in the way I was triggering the
sparkSQL query.  I was using .count() instead of .collect().  When I use
.collect() I get the same performance as the Thrift server.  My table has
28 columns.  I guess that .count() only required one column to be loaded
into memory, whereas .collect() required all columns to be loaded?
Curiously, it doesn't appear to matter how many rows are returned.  The
speed is the same even if I adjust the query to return 0 rows.  Anyway,
looks like it was a poor comparison on my part.  No real performance
difference between Thrift and SparkSQL.  Thanks for the help.

-Jeff

On Sat, Oct 3, 2015 at 1:26 PM, Michael Armbrust 
wrote:

> Underneath the covers, the thrift server is just calling
> 
> hiveContext.sql(...) so this is surprising.  Maybe running EXPLAIN or
> EXPLAIN EXTENDED in both modes would be helpful in debugging?
>
>
>
> On Sat, Oct 3, 2015 at 1:08 PM, Jeff Thompson <
> jeffreykeatingthomp...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm running a simple SQL query over a ~700 million row table of the form:
>>
>> SELECT * FROM my_table WHERE id = '12345';
>>
>> When I submit the query via beeline & the JDBC thrift server it returns
>> in 35s
>> When I submit the exact same query using sparkSQL from a pyspark shell
>> (sqlContex.sql("SELECT * FROM ")) it returns in 3s.
>>
>> Both times are obtained from the spark web UI.  The query only returns 43
>> rows, a small amount of data.
>>
>> The table was created by saving a sparkSQL dataframe as a parquet file
>> and then calling createExternalTable.
>>
>> I have tried to ensure that all relevant cluster parameters are
>> equivalent across the two queries:
>> spark.executor.memory = 6g
>> spark.executor.instances = 100
>> no explicit caching (storage tab in web UI is empty)
>> spark version: 1.4.1
>> Hadoop v2.5.0-cdh5.3.0, running spark on top of YARN
>> jobs run on the same physical cluster (on-site harware)
>>
>> From the web UIs, I can see that the query plans are clearly different,
>> and I think this may be the source of the performance difference.
>>
>> Thrift server job:
>> 1 stage only, stage 1 (35s) map -> Filter -> mapPartitions
>>
>> SparkSQL job:
>> 2 stages, stage 1 (2s): map -> filter -> Project -> Aggregate ->
>> Exchange, stage 2 (0.4s): Exchange -> Aggregate -> mapPartitions
>>
>> Is this a know issue?  Is there anything I can do to get the Thrift
>> server to use the same query optimizer as the one used by sparkSQL?  I'd
>> love to pick up a ~10x performance gain for my jobs submitted via the
>> Thrift server.
>>
>> Best regards,
>>
>> Jeff
>>
>
>


Re: OutOfMemoryError

2015-10-05 Thread Jean-Baptiste Onofré

Hi Ramkumar,

did you try to increase Xmx of the workers ?

Regards
JB

On 10/05/2015 08:56 AM, Ramkumar V wrote:

Hi,

When i submit java spark job in cluster mode, i'm getting following
exception.

*LOG TRACE :*

INFO yarn.ExecutorRunnable: Setting up executor with commands:
List({{JAVA_HOME}}/bin/java, -server, -XX:OnOutOfMemoryError='kill
  %p', -Xms1024m, -Xmx1024m, -Djava.io.tmpdir={{PWD}}/tmp,
'-Dspark.ui.port=0', '-Dspark.driver.port=48309',
-Dspark.yarn.app.container.log.dir=, org.apache.spark.executor.CoarseGrainedExecutorBackend,
--driver-url, akka.tcp://sparkDriver@ip:port/user/CoarseGrainedScheduler,
  --executor-id, 2, --hostname, hostname , --cores, 1, --app-id,
application_1441965028669_9009, --user-class-path, file:$PWD
/__app__.jar, --user-class-path, file:$PWD/json-20090211.jar, 1>,
/stdout, 2>, /stderr).

I have a cluster of 11 machines (9 - 64 GB memory and 2 - 32 GB memory
). my input data of size 128 GB.

How to solve this exception ? is it depends on driver.memory and
execuitor.memory setting ?


*Thanks*,




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Graphx hangs and crashes on EdgeRDD creation

2015-10-05 Thread William Saar
Hi,
I am trying to run a GraphX job on 20 million edges with Spark 1.5.1, but the 
job seems to hang for 30 minutes on a single executor when creating the graph 
and eventually crashes with "IllegalArgumentException: Size exceeds 
Integer.MAX_VALUE"

I suspect this is because of partitioning problem, but how can I control the 
partitioning of the creation of the EdgeRDD?

My graph code only does the following:
val graph = Graph.fromEdgeTuples(indexedEdges, 0, None, 
StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER);
graph.connectedComponents().vertices

The web UI shows the following while the job is hanging (I am running this 
inside a transform operation on spark streaming)
transform at 
MyJob.scala:62+details
RDD: EdgeRDD, 
EdgeRDD

org.apache.spark.streaming.dstream.DStream.transform(DStream.scala:649)

com.example.MyJob$.main(MyJob.scala:62)

com.example.MyJob.main(MyJob.scala)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

The executor thread dump while the job is hanging is the following
Thread 66: Executor task launch worker-1 (RUNNABLE)
java.lang.System.identityHashCode(Native Method)
com.esotericsoftware.kryo.util.IdentityObjectIntMap.get(IdentityObjectIntMap.java:241)
com.esotericsoftware.kryo.util.MapReferenceResolver.getWrittenId(MapReferenceResolver.java:28)
com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:588)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply$mcV$sp(DiskStore.scala:81)
org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81)
org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:82)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791)
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:88)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

The failure stack trace is as follows:
15/10/02 17:09:54 ERROR JobScheduler: Error generating jobs for time 
144379620 ms
org.apache.spark.SparkException: Job aborted due to stage failure: Task 39 in 
stage 10.0 failed 4 times, most recent failure: Lost task 39.3 in stage 10.0 
(TID 168, 172.26.88.66): java.lang.RuntimeException: 
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)
at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at 

Re: Store DStreams into Hive using Hive Streaming

2015-10-05 Thread Krzysztof Zarzycki
I'm also interested in this feature. Did you guys found some information
about how to use Hive Streaming with Spark Streaming?

Thanks,
Krzysiek

2015-07-17 20:16 GMT+02:00 unk1102 :

> Hi I have similar use case did you found solution for this problem of
> loading
> DStreams in Hive using Spark Streaming. Please guide. Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Store-DStreams-into-Hive-using-Hive-Streaming-tp18307p23885.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark.streaming.kafka.maxRatePerPartition for direct stream

2015-10-05 Thread Tathagata Das
Also, the backpressure configuration only applies to Spark 1.5 and above.
Just making that clear.

On Fri, Oct 2, 2015 at 6:55 AM, Cody Koeninger  wrote:

> But turning backpressure on won't stop you from choking on the first batch
> if you're doing e.g. some kind of in-memory aggregate that can't handle
> that many records at once.
>
> On Fri, Oct 2, 2015 at 1:10 AM, Sourabh Chandak 
> wrote:
>
>> Thanks Cody, will try to do some estimation.
>>
>> Thanks Nicolae, will try out this config.
>>
>> Thanks,
>> Sourabh
>>
>> On Thu, Oct 1, 2015 at 11:01 PM, Nicolae Marasoiu <
>> nicolae.maras...@adswizz.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> Set 10ms and spark.streaming.backpressure.enabled=true
>>>
>>>
>>> This should automatically delay the next batch until the current one is
>>> processed, or at least create that balance over a few batches/periods
>>> between the consume/process rate vs ingestion rate.
>>>
>>>
>>> Nicu
>>>
>>> --
>>> *From:* Cody Koeninger 
>>> *Sent:* Thursday, October 1, 2015 11:46 PM
>>> *To:* Sourabh Chandak
>>> *Cc:* user
>>> *Subject:* Re: spark.streaming.kafka.maxRatePerPartition for direct
>>> stream
>>>
>>> That depends on your job, your cluster resources, the number of seconds
>>> per batch...
>>>
>>> You'll need to do some empirical work to figure out how many messages
>>> per batch a given executor can handle.  Divide that by the number of
>>> seconds per batch.
>>>
>>>
>>>
>>> On Thu, Oct 1, 2015 at 3:39 PM, Sourabh Chandak 
>>> wrote:
>>>
 Hi,

 I am writing a spark streaming job using the direct stream method for
 kafka and wanted to handle the case of checkpoint failure when we'll have
 to reprocess the entire data from starting. By default for every new
 checkpoint it tries to load everything from each partition and that takes a
 lot of time for processing. After some searching found out that there
 exists a config spark.streaming.kafka.maxRatePerPartition which can be used
 to tackle this. My question is what will be a suitable range for this
 config if we have ~12 million messages in kafka with maximum message size
 ~10 MB.

 Thanks,
 Sourabh

>>>
>>>
>>
>


Re: question on make multiple external calls within each partition

2015-10-05 Thread Ashish Soni
Need more details but you might want to filter the data first ( create multiple 
RDD) and then process.


> On Oct 5, 2015, at 8:35 PM, Chen Song  wrote:
> 
> We have a use case with the following design in Spark Streaming.
> 
> Within each batch,
> * data is read and partitioned by some key
> * forEachPartition is used to process the entire partition
> * within each partition, there are several REST clients created to connect to 
> different REST services
> * for the list of records within each partition, it will call these services, 
> each service call is independent of others; records are just pre-partitioned 
> to make these calls more efficiently.
> 
> I have a question
> * Since each call is time taking and to prevent the calls to be executed 
> sequentially, how can I parallelize the service calls within processing of 
> each partition? Can I just use Scala future within forEachPartition(or 
> mapPartitions)?
> 
> Any suggestions greatly appreciated.
> 
> Chen
> 
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming job filling a lot of data in local spark nodes

2015-10-05 Thread Tathagata Das
You could have it. But do remember that it is brute force blunt hammer to
forcefully delete everything older than the ttl. So if you are using some
broadcast variable across streaming batches, that broadcasted data will get
deleted as well, and jobs will start failing. You could get around that by
rebroadcasting periodically, and using the newly broadcasted object, rather
than the older object.

On Thu, Oct 1, 2015 at 5:59 PM, swetha kasireddy 
wrote:

> We have limited disk space. So, can we have spark.cleaner.ttl to clean up
> the files? Or is there any setting that can cleanup old temp files?
>
> On Mon, Sep 28, 2015 at 7:02 PM, Shixiong Zhu  wrote:
>
>> These files are created by shuffle and just some temp files. They are not
>> necessary for checkpointing and only stored in your local temp directory.
>> They will be stored in "/tmp" by default. You can use `spark.local.dir` to
>> set the path if you find your "/tmp" doesn't have enough space.
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-09-29 1:04 GMT+08:00 swetha :
>>
>>>
>>> Hi,
>>>
>>> I see a lot of data getting filled locally as shown below from my
>>> streaming
>>> job. I have my checkpoint set to hdfs. But, I still see the following
>>> data
>>> filling my local nodes. Any idea if I can make this stored in hdfs
>>> instead
>>> of storing the data locally?
>>>
>>> -rw-r--r--  1520 Sep 17 18:43 shuffle_23119_5_0.index
>>> -rw-r--r--  1 180564255 Sep 17 18:43 shuffle_23129_2_0.data
>>> -rw-r--r--  1 364850277 Sep 17 18:45 shuffle_23145_8_0.data
>>> -rw-r--r--  1  267583750 Sep 17 18:46 shuffle_23105_4_0.data
>>> -rw-r--r--  1  136178819 Sep 17 18:48 shuffle_23123_8_0.data
>>> -rw-r--r--  1  159931184 Sep 17 18:48 shuffle_23167_8_0.data
>>> -rw-r--r--  1520 Sep 17 18:49 shuffle_23315_7_0.index
>>> -rw-r--r--  1520 Sep 17 18:50 shuffle_23319_3_0.index
>>> -rw-r--r--  1   92240350 Sep 17 18:51 shuffle_23305_2_0.data
>>> -rw-r--r--  1   40380158 Sep 17 18:51 shuffle_23323_6_0.data
>>> -rw-r--r--  1  369653284 Sep 17 18:52 shuffle_23103_6_0.data
>>> -rw-r--r--  1  371932812 Sep 17 18:52 shuffle_23125_6_0.data
>>> -rw-r--r--  1   19857974 Sep 17 18:53 shuffle_23291_19_0.data
>>> -rw-r--r--  1  55342005 Sep 17 18:53 shuffle_23305_8_0.data
>>> -rw-r--r--  1   92920590 Sep 17 18:53 shuffle_23303_4_0.data
>>>
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-job-filling-a-lot-of-data-in-local-spark-nodes-tp24846.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: question on make multiple external calls within each partition

2015-10-05 Thread Tathagata Das
You could create a threadpool on demand within the foreachPartitoin
function, then handoff the REST calls to that threadpool, get back the
futures and wait for them to finish. Should be pretty straightforward. Make
sure that your foreachPartition function cleans up the threadpool before
finishing. Alternatively, you can create an on-demand singleton threadpool
that is reused across batches, will reduce the cost of creating threadpools
everytime.

On Mon, Oct 5, 2015 at 6:07 PM, Ashish Soni  wrote:

> Need more details but you might want to filter the data first ( create
> multiple RDD) and then process.
>
>
> > On Oct 5, 2015, at 8:35 PM, Chen Song  wrote:
> >
> > We have a use case with the following design in Spark Streaming.
> >
> > Within each batch,
> > * data is read and partitioned by some key
> > * forEachPartition is used to process the entire partition
> > * within each partition, there are several REST clients created to
> connect to different REST services
> > * for the list of records within each partition, it will call these
> services, each service call is independent of others; records are just
> pre-partitioned to make these calls more efficiently.
> >
> > I have a question
> > * Since each call is time taking and to prevent the calls to be executed
> sequentially, how can I parallelize the service calls within processing of
> each partition? Can I just use Scala future within forEachPartition(or
> mapPartitions)?
> >
> > Any suggestions greatly appreciated.
> >
> > Chen
> >
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: laziness in textFile reading from HDFS?

2015-10-05 Thread Mohammed Guller
Is there any specific reason for caching the RDD? How many passes you make over 
the dataset? 

Mohammed

-Original Message-
From: Matt Narrell [mailto:matt.narr...@gmail.com] 
Sent: Saturday, October 3, 2015 9:50 PM
To: Mohammed Guller
Cc: davidkl; user@spark.apache.org
Subject: Re: laziness in textFile reading from HDFS?

Is there any more information or best practices here?  I have the exact same 
issues when reading large data sets from HDFS (larger than available RAM) and I 
cannot run without setting the RDD persistence level to MEMORY_AND_DISK_SER, 
and using nearly all the cluster resources.

Should I repartition this RDD to be equal to the number of cores?  

I notice that the job duration on the YARN UI is about 30 minutes longer than 
the Spark UI.  When the job initially starts, there is no tasks shown in the 
Spark UI..?

All I;m doing is reading records from HDFS text files with sc.textFile, and 
rewriting them back to HDFS grouped by a timestamp.

Thanks,
mn

> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  wrote:
> 
> 1) It is not required to have the same amount of memory as data. 
> 2) By default the # of partitions are equal to the number of HDFS 
> blocks
> 3) Yes, the read operation is lazy
> 4) It is okay to have more number of partitions than number of cores. 
> 
> Mohammed
> 
> -Original Message-
> From: davidkl [mailto:davidkl...@hotmail.com]
> Sent: Monday, September 28, 2015 1:40 AM
> To: user@spark.apache.org
> Subject: laziness in textFile reading from HDFS?
> 
> Hello,
> 
> I need to process a significant amount of data every day, about 4TB. This 
> will be processed in batches of about 140GB. The cluster this will be running 
> on doesn't have enough memory to hold the dataset at once, so I am trying to 
> understand how this works internally.
> 
> When using textFile to read an HDFS folder (containing multiple files), I 
> understand that the number of partitions created are equal to the number of 
> HDFS blocks, correct? Are those created in a lazy way? I mean, if the number 
> of blocks/partitions is larger than the number of cores/threads the Spark 
> driver was launched with (N), are N partitions created initially and then the 
> rest when required? Or are all those partitions created up front?
> 
> I want to avoid reading the whole data into memory just to spill it out to 
> disk if there is no enough memory.
> 
> Thanks! 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-textFi
> le-reading-from-HDFS-tp24837.html Sent from the Apache Spark User List 
> mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Writing UDF with variable number of arguments

2015-10-05 Thread tridib
Hi Friends,
I want to write a UDF which takes variable number of arguments with varying
type.

myudf(String key1, String value1, String key2, int value2,)

What is the best way to do it in Spark?

Thanks
Tridib



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-UDF-with-variable-number-of-arguments-tp24940.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Streaming Performance w/ UpdateStateByKey

2015-10-05 Thread Jeff Nadler
While investigating performance challenges in a Streaming application using
UpdateStateByKey, I found that serialization of state was a meaningful (not
dominant) portion of our execution time.

In StateDStream.scala, serialized persistence is required:

 super.persist(StorageLevel.MEMORY_ONLY_SER)

I can see why that might be a good choice for a default.For our
workload, I made a clone that uses StorageLevel.MEMORY_ONLY.   I've just
completed some tests and it is indeed faster, with the expected cost of
greater memory usage.   For us that would be a good tradeoff.

I'm not taking any particular extra risks by doing this, am I?

Should this be configurable?  Perhaps yet another signature for
PairDStreamFunctions.updateStateByKey?

Thanks for sharing any thoughts-

Jef


Re: Lookup / Access of master data in spark streaming

2015-10-05 Thread Tathagata Das
Yes, when old broacast objects are not referenced any more in the driver,
then associated data in the driver AND the executors will get cleared.

On Mon, Oct 5, 2015 at 1:40 PM, Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> @td does that mean that the "old" broadcasted data will in any way be
> "garbage collected" at some point if no RDD or transformation is using it
> anymore ?
>
> Regards,
>
> Olivier.
>
> 2015-04-09 21:49 GMT+02:00 Amit Assudani :
>
>> Thanks a lot TD for detailed answers. The answers lead to few more
>> questions,
>>
>>
>>1. "the transform RDD-to-RDD function runs on the driver “ - I didn’t
>>understand this, does it mean when I use transform function on DStream, it
>>is not parallelized, surely I m missing something here.
>>2.  updateStateByKey I think won’t work in this use case,  I have
>>three separate attribute streams ( with different frequencies ) make up 
>> the
>>combined state ( i.e. Entity ) at point in time on which I want to do some
>>processing. Do you think otherwise ?
>>3. transform+join seems only option so far, but any guestimate how
>>would this perform/ react on cluster ? Assuming, master data in 100s of
>>Gbs, and join is based on some row key. We are talking about slice of
>>stream data to be joined with 100s of Gbs of master data continuously. Is
>>it something can be done but should not be done ?
>>
>> Regards,
>> Amit
>>
>> From: Tathagata Das 
>> Date: Thursday, April 9, 2015 at 3:13 PM
>> To: amit assudani 
>> Cc: "user@spark.apache.org" 
>> Subject: Re: Lookup / Access of master data in spark streaming
>>
>> Responses inline. Hope they help.
>>
>> On Thu, Apr 9, 2015 at 8:20 AM, Amit Assudani 
>> wrote:
>>
>>> Hi Friends,
>>>
>>> I am trying to solve a use case in spark streaming, I need help on
>>> getting to right approach on lookup / update the master data.
>>>
>>> Use case ( simplified )
>>> I’ve a dataset of entity with three attributes and identifier/row key in
>>> a persistent store.
>>>
>>> Each attribute along with row key come from a different stream let’s
>>> say, effectively 3 source streams.
>>>
>>> Now whenever any attribute comes up, I want to update/sync the
>>> persistent store and do some processing, but the processing would require
>>> the latest state of entity with latest values of three attributes.
>>>
>>> I wish if I have the all the entities cached in some sort of centralized
>>> cache ( like we have data in hdfs ) within spark streaming which may be
>>> used for data local processing. But I assume there is no such thing.
>>>
>>> potential approaches I m thinking of, I suspect first two are not
>>> feasible, but I want to confirm,
>>>   1.  Is Broadcast Variables mutable ? If yes, can I use it as cache
>>> for all entities sizing  around 100s of GBs provided i have a cluster with
>>> enough RAM.
>>>
>>
>> Broadcast variables are not mutable. But you can always create a new
>> broadcast variable when you want and use the "latest" broadcast variable in
>> your computation.
>>
>> dstream.transform { rdd =>
>>
>>val latestBroacast = getLatestBroadcastVariable()  // fetch existing
>> or update+create new and return
>>val transformedRDD = rdd. ..  // use  latestBroacast in RDD
>> tranformations
>>transformedRDD
>> }
>>
>> Since the transform RDD-to-RDD function runs on the driver every batch
>> interval, it will always use the latest broadcast variable that you want.
>> Though note that whenever you create a new broadcast, the next batch may
>> take a little longer to as the data needs to be actually broadcasted out.
>> That can also be made asynchronous by running a simple task (to force the
>> broadcasting out) on any new broadcast variable in a different thread as
>> Spark Streaming batch schedule, but using the same underlying Spark Context.
>>
>>
>>
>>>
>>>1. Is there any kind of sticky partition possible, so that I route
>>>my stream data to go through the same node where I've the corresponding
>>>entities, subset of entire store, cached in memory within JVM / off heap 
>>> on
>>>the node, this would avoid lookups from store.
>>>
>>> You could use updateStateByKey. That is quite sticky, but does not
>> eliminate the possibility that it can run on a different node. In fact this
>> is necessary for fault-tolerance - what if the node it was supposed to run
>> goes down? The task will be run on a different node, and you have to
>>  design your application such that it can handle that.
>>
>>
>>>1. If I stream the entities from persistent store into engine, this
>>>becomes 4th stream - the entity stream, how do i use join / merge to 
>>> enable
>>>stream 1,2,3 to lookup and update the data from stream 4. Would
>>>DStream.join work for few seconds worth of data in attribute streams with
>>>all data in entity 

Re: Broadcast var is null

2015-10-05 Thread Dmitry Pristin
Hi guys,
thanks a lot for responding so quickly!

I've reduced the code to the code below - no streaming, no Kafka, no
checkpoint. Unfortunately the end result is the same. Any suggestion to
where I'm messing up would be very much appreciated !

object BroadcastTest extends App {
  val logger = LoggerFactory.getLogger("OinkSparkMain")
  logger.info("OinkSparkMain - Setup Logger")

  val sparkConf = new SparkConf().setAppName("OinkSparkMain")
  val sc : SparkContext = new SparkContext(sparkConf)

  val rdd = sc.parallelize(Array(1,2,3));

  val arr = Array(1, 2, 3)
  val broadcastVar = sc.broadcast(arr)

  val mappedEvents =  rdd.map(e => {
val l = LoggerFactory.getLogger("OinkSparkMain1")

if (broadcastVar == null) {
  l.info("broadcastVar is null")
  (e, "empty")
}
else {
  val str = broadcastVar.value.mkString(" | ")
  l.info("broadcastVar is " + str)
  (e, str)
}
  })

  logger.info("** Total reduced count: " + mappedEvents.collect().length)
}


On Mon, Oct 5, 2015 at 4:14 PM, Adrian Tanase  wrote:

> FYI the same happens with accumulators when recovering from checkpoint.
> I'd love to see this fixed somehow as the workaround (using a singleton
> factory in foreachRdd to make sure the accumulators are initialized instead
> of null) is really intrusive...
>
> Sent from my iPhone
>
> On 05 Oct 2015, at 22:52, Tathagata Das  wrote:
>
> Make sure the broadcast variable works independent of the streaming
> application. Then make sure it work without have
> StreamingContext.getOrCreate(). That will disambiguate whether that error
> is thrown when starting a new context, or when recovering a context from
> checkpoint (as getOrCreate is supposed to do).
>
> On Mon, Oct 5, 2015 at 9:23 AM, dpristin  wrote:
>
>> Hi,
>>
>> Can anyone point me out to what I'm doing wrong? I've implemented a very
>> basic spark streaming app that uses a single broadcast variable. When it
>> runs locally it produces a proper output (the array I broadcast). But when
>> deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here
>> is the code:
>>
>> --- imports go here
>>
>> object BroadcastTest extends App {
>>   val logger = LoggerFactory.getLogger("OinkSparkMain")
>>   logger.info("OinkSparkMain - Setup Logger")
>>
>> // This is our custom context setup code; nothing fancy goes on here
>>   val config = Configuration(args)
>>   val ssc: StreamingContext =
>> StreamingContext.getOrCreate(config.checkpointDirectory, () => {
>> SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))})
>>
>>
>>   val kafkaStreamFactory = new KafkaStreamFactory(config, ssc)
>>   val messages = kafkaStreamFactory.Create
>>
>>   // Grab the value data above kafka input dstream as a string
>>   val events = messages.map( s => s._2 )
>>
>>   //Create a broadcast variable - straight from the dev guide
>>   val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3))
>>
>>   //Try to print out the value of the broadcast var here
>>   val transformed = events.transform(rdd => {
>> rdd.map(x => {
>>   if(broadcastVar == null) {
>> println("broadcastVar is null")
>>   }  else {
>> println("broadcastVar value: " + broadcastVar.value.mkString("|"))
>>   }
>>   x
>> })
>>   })
>>
>>   transformed.foreachRDD(x => logger.info("Data: " +
>> x.collect.mkString("|")))
>>
>>   ssc.start()
>>   ssc.awaitTermination()
>> }
>>
>> Any input is very much appreciated!
>>
>> Regards,
>> Dmitry.
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


save DF to JDBC

2015-10-05 Thread Ruslan Dautkhanov
http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases


Spark JDBC can read data from JDBC, but can it save back to JDBC?
Like to an Oracle database through its jdbc driver.

Also looked at SQL Context documentation
https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SQLContext.html
and can't find anything relevant.

Thanks!


-- 
Ruslan Dautkhanov


Re: Spark thrift service and Hive impersonation.

2015-10-05 Thread Jagat Singh
Hello Steve,

Thanks for confirmation.

Is there any work planned work on this.

Thanks,

Jagat Singh



On Wed, Sep 30, 2015 at 9:37 PM, Vinay Shukla  wrote:

> Steve is right,
>  The Spark thing server does not profs page end user identity downstream
> yet.
>
>
>
> On Wednesday, September 30, 2015, Steve Loughran 
> wrote:
>
>>
>> On 30 Sep 2015, at 03:24, Mohammed Guller  wrote:
>>
>> Does each user needs to start own thrift server to use it?
>>
>>
>>
>> No. One of the benefits of the Spark Thrift Server is that it allows
>> multiple users to share a single SparkContext.
>>
>>
>>
>> Most likely, you have file permissions issue.
>>
>>
>>
>>
>> I don't think the spark hive thrift server does the multi-user stuff (yet)
>>
>> Mohammed
>>
>>
>>
>> *From:* Jagat Singh [mailto:jagatsi...@gmail.com]
>> *Sent:* Tuesday, September 29, 2015 5:30 PM
>> *To:* SparkUser
>> *Subject:* Spark thrift service and Hive impersonation.
>>
>>
>>
>> Hi,
>>
>>
>>
>> I have started the Spark thrift service using spark user.
>>
>>
>>
>> Does each user needs to start own thrift server to use it?
>>
>>
>>
>> Using beeline i am able to connect to server and execute show tables;
>>
>>
>>
>> However when we try to execute some real query it runs as spark user and
>> HDFS permissions does not allow them to be read.
>>
>>
>>
>> The query fails with error
>>
>>
>>
>> 0: jdbc:hive2://localhost:1> select count(*) from mytable;
>>
>> Error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch
>> table mytable. java.security.AccessControlException: Permission denied:
>> user=spark, access=READ, inode="/data/mytable":tdcprdr:tdcprdr:drwxr-x--x
>>
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)
>>
>>
>>
>>
>>
>> And in thrift server we get log.
>>
>>
>>
>>
>>
>> In the hive-site.xml we have impersonation enabled.
>>
>>
>>
>>
>>
>>   hive.server2.enable.doAs
>>
>>   true
>>
>> 
>>
>>
>>
>> 
>>
>>   hive.server2.enable.impersonation
>>
>>   true
>>
>> 
>>
>>
>>
>> Is there any other configuration to be done for it to work like normal
>> hive thrift server.
>>
>>
>>
>> Thanks
>>
>>
>>


ERROR: "Size exceeds Integer.MAX_VALUE" Spark 1.5

2015-10-05 Thread Muhammad Ahsan
Hello Everyone !

I am working with spark 1.5 over YARN. I am trying something like

val results = sqlContext.sql("SELECT guid FROM clickstream group by guid")

results.take(10).foreach(println)


But I am getting the following error. I am using data frames and unable to
resolve this error, please help


org.apache.spark.SparkException: Job aborted due to stage failure: Task 815
in stage 13.0 failed 4 times, most recent failure: Lost task 815.3 in stage
13.0 (TID 3612, amd-014.test.com): java.lang.RuntimeException:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

at
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at

Re: Spark SQL "SELECT ... LIMIT" scans the entire Hive table?

2015-10-05 Thread Michael Armbrust
It does do a take.  Run explain to make sure that is the case.  Why do you
think its reading the whole table?

On Mon, Oct 5, 2015 at 1:53 PM, YaoPau  wrote:

> I'm using SqlCtx connected to Hive in CDH 5.4.4.  When I run "SELECT * FROM
> my_db.my_tbl LIMIT 5", it scans the entire table like Hive would instead of
> doing a .take(5) on it and returning results immediately.
>
> Is there a way to get Spark SQL to use .take(5) instead of the Hive logic
> of
> scanning the full table when running a SELECT?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-SELECT-LIMIT-scans-the-entire-Hive-table-tp24938.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Streaming Performance w/ UpdateStateByKey

2015-10-05 Thread Tathagata Das
You could call DStream.persist(StorageLevel.MEMORY_ONLY) on the
stateDStream returned by updateStateByKey to achieve the same. As you have
seen, the downside is greater memory usage, and also higher GC overheads
(that;s the main one usually). So I suggest you run your benchmarks for a
long enough time to see what is the GC overheads. If it turns out that some
batches are randomly taking longer because of some task in some executor
being stuck in GC, then its going to be bad.

Alternatively, you could also starting playing with CMS GC, etc.

BTW, it would be amazing, if you can share the number in your benchmarks.
Number of states, how complex are the objects in state, whats the
processing time and whats the improvements.

TD


On Mon, Oct 5, 2015 at 2:28 PM, Jeff Nadler  wrote:

>
> While investigating performance challenges in a Streaming application
> using UpdateStateByKey, I found that serialization of state was a
> meaningful (not dominant) portion of our execution time.
>
> In StateDStream.scala, serialized persistence is required:
>
>  super.persist(StorageLevel.MEMORY_ONLY_SER)
>
> I can see why that might be a good choice for a default.For our
> workload, I made a clone that uses StorageLevel.MEMORY_ONLY.   I've just
> completed some tests and it is indeed faster, with the expected cost of
> greater memory usage.   For us that would be a good tradeoff.
>
> I'm not taking any particular extra risks by doing this, am I?
>
> Should this be configurable?  Perhaps yet another signature for
> PairDStreamFunctions.updateStateByKey?
>
> Thanks for sharing any thoughts-
>
> Jef
>
>
>
>


RE: save DF to JDBC

2015-10-05 Thread Young, Matthew T
I’ve gotten it to work with SQL Server (with limitations; it’s buggy and 
doesn’t work with some types/operations).

https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameWriter.html
 is the Java API you are looking for; the JDBC method lets you write to JDBC 
databases.

I haven’t tried Oracle database, but I would expect it to work at least 
somewhat.

From: Ruslan Dautkhanov [mailto:dautkha...@gmail.com]
Sent: Monday, October 05, 2015 2:44 PM
To: user 
Subject: save DF to JDBC

http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

Spark JDBC can read data from JDBC, but can it save back to JDBC?
Like to an Oracle database through its jdbc driver.

Also looked at SQL Context documentation
https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SQLContext.html
and can't find anything relevant.

Thanks!


--
Ruslan Dautkhanov


Re: save DF to JDBC

2015-10-05 Thread Richard Hillegas

Hi Ruslan,

Here is some sample code which writes a DataFrame to a table in a Derby
database:

import org.apache.spark.sql._
import org.apache.spark.sql.types._

val binaryVal = Array[Byte] ( 1, 2, 3, 4 )
val timestampVal = java.sql.Timestamp.valueOf("1996-01-01 03:30:36")
val dateVal = java.sql.Date.valueOf("1996-01-01")

val allTypes = sc.parallelize(
Array(
  (1,
  1.toLong,
  1.toDouble,
  1.toFloat,
  1.toShort,
  1.toByte,
  "true".toBoolean,
  "one ring to rule them all",
  binaryVal,
  timestampVal,
  dateVal,
  BigDecimal.valueOf(42549.12)
  )
)).toDF(
  "int_col",
  "long_col",
  "double_col",
  "float_col",
  "short_col",
  "byte_col",
  "boolean_col",
  "string_col",
  "binary_col",
  "timestamp_col",
  "date_col",
  "decimal_col"
  )

val properties = new java.util.Properties()

allTypes.write.jdbc("jdbc:derby:/Users/rhillegas/derby/databases/derby1",
"all_spark_types", properties)

Hope this helps,

Rick Hillegas
STSM, IBM Analytics, Platform - IBM USA


Ruslan Dautkhanov  wrote on 10/05/2015 02:44:20 PM:

> From: Ruslan Dautkhanov 
> To: user 
> Date: 10/05/2015 02:45 PM
> Subject: save DF to JDBC
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-
> to-other-databases
>
> Spark JDBC can read data from JDBC, but can it save back to JDBC?
> Like to an Oracle database through its jdbc driver.
>
> Also looked at SQL Context documentation
> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/
> SQLContext.html
> and can't find anything relevant.
>
> Thanks!
>
>
> --
> Ruslan Dautkhanov

Re: spark-ec2 config files.

2015-10-05 Thread Hemminger Jeff
The spark-ec2 script generates spark config files from templates. Those are
located here:
https://github.com/amplab/spark-ec2/tree/branch-1.5/templates/root/spark/conf
Note the link is referring to the 1.5 branch.
Is this what you are looking for?
Jeff

On Mon, Oct 5, 2015 at 8:56 AM, Renato Perini 
wrote:

> Can someone provide the relevant config files generated by Spark EC2
> script?
> I'm configuring a Spark cluster on EC2 manually, and I would like to
> compare my config files (spark-defaults.conf, spark-env.sh) with those
> generated by the spark-ec2 script.
> Of course, hide your sensitive informations.
>
> Thank you.
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Store DStreams into Hive using Hive Streaming

2015-10-05 Thread Tathagata Das
Hive is not designed for OLTP workloads like data insertions and updates
you want to do with Spark Streaming. Hive is mainly for OLAP workloads
where you already have data and you want to run bulk queries on the data.
Other systems like HBase and Cassandra are more designed for OLTP. Please
think about your system architecture based on how each of these are
designed.

On Mon, Oct 5, 2015 at 3:07 AM, Umesh Kacha  wrote:

> Hi no didn't find any solution still I need that feature of hive streaming
> using Spark please let me know if you get something. Alternative solution
> is to use storm for hive processing. I would like to stick to Spark so
> still searching.
> On Oct 5, 2015 2:51 PM, "Krzysztof Zarzycki"  wrote:
>
>> I'm also interested in this feature. Did you guys found some information
>> about how to use Hive Streaming with Spark Streaming?
>>
>> Thanks,
>> Krzysiek
>>
>> 2015-07-17 20:16 GMT+02:00 unk1102 :
>>
>>> Hi I have similar use case did you found solution for this problem of
>>> loading
>>> DStreams in Hive using Spark Streaming. Please guide. Thanks.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Store-DStreams-into-Hive-using-Hive-Streaming-tp18307p23885.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Re: [Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2015-10-05 Thread Andrew Or
Hi all,

Both the history server and the shuffle service are backward compatible,
but not forward compatible. This means as long as you have the latest
version of history server / shuffle service running in your cluster then
you're fine (you don't need multiple of them).

That said, an old shuffle service (e.g. 1.2) also happens to work with say
Spark 1.4 because the shuffle file formats haven't changed. However, there
are no guarantees that this will remain the case.

-Andrew

2015-10-05 16:37 GMT-07:00 Alex Rovner :

> We are running CDH 5.4 with Spark 1.3 as our main version and that version
> is configured to use the external shuffling service. We have also installed
> Spark 1.5 and have configured it not to use the external shuffling service
> and that works well for us so far. I would be interested myself how to
> configure multiple versions to use the same shuffling service.
>
> *Alex Rovner*
> *Director, Data Engineering *
> *o:* 646.759.0052
>
> * *
>
> On Mon, Oct 5, 2015 at 11:06 AM, Andreas Fritzler <
> andreas.fritz...@gmail.com> wrote:
>
>> Hi Steve, Alex,
>>
>> how do you handle the distribution and configuration of
>> the spark-*-yarn-shuffle.jar on your NodeManagers if you want to use 2
>> different Spark versions?
>>
>> Regards,
>> Andreas
>>
>> On Mon, Oct 5, 2015 at 4:54 PM, Steve Loughran 
>> wrote:
>>
>>>
>>> > On 5 Oct 2015, at 16:48, Alex Rovner  wrote:
>>> >
>>> > Hey Steve,
>>> >
>>> > Are you referring to the 1.5 version of the history server?
>>> >
>>>
>>>
>>> Yes. I should warn, however, that there's no guarantee that a history
>>> server running the 1.4 code will handle the histories of a 1.5+ job. In
>>> fact, I'm fairly confident it won't, as the events to get replayed are
>>> different.
>>>
>>
>>
>


question on make multiple external calls within each partition

2015-10-05 Thread Chen Song
We have a use case with the following design in Spark Streaming.

Within each batch,
* data is read and partitioned by some key
* forEachPartition is used to process the entire partition
* within each partition, there are several REST clients created to connect
to different REST services
* for the list of records within each partition, it will call these
services, each service call is independent of others; records are just
pre-partitioned to make these calls more efficiently.

I have a question
* Since each call is time taking and to prevent the calls to be executed
sequentially, how can I parallelize the service calls within processing of
each partition? Can I just use Scala future within forEachPartition(or
mapPartitions)?

Any suggestions greatly appreciated.

Chen


Re: RDD of ImmutableList

2015-10-05 Thread Adrian Tanase
If you don't need to write data back using that library I'd say go for #2. 
Convert to a scala class and standard lists, should be easier down the line. 
That being said, you may end up writing custom code if you stick with kryo 
anyway...

Sent from my iPhone

On 05 Oct 2015, at 21:42, Jakub Dubovsky 
> wrote:

Thank you for quick reaction.

I have to say this is very surprising to me. I never received an advice to stop 
using an immutable approach. Whole RDD is designed to be immutable (which is 
sort of sabotaged by not being able to (de)serialize immutable classes 
properly). I will ask on dev list if this is to be changed or not.

Ok, I have let go initial feelings and now let's be pragmatic. And this is 
still for everyone not just Igor:

I use a class from a library which is immutable. Now I want to use this class 
to represent my data in RDD because this saves me a huge amount of work. The 
class uses ImmutableList as one of its fields. That's why it fails. But isn't 
there a way to workaround this? I ask this because I have exactly zero 
knowledge about kryo and the way how it works. So for example would some of 
these two work?

1) Change the external class so that it implements writeObject, readObject 
methods (it's java). Will these methods be used by kryo? (I can ask the 
maintainers of a library to change the class if the change is reasonable. 
Adding these methods would be while dropping immutability certainly wouldn't)

2) Wrap the class to scala class which would translate the data during 
(de)serialization?

  Thanks!
  Jakub Dubovsky


-- P?vodn? zpr?va --
Od: Igor Berman >
Komu: Jakub Dubovsky 
>
Datum: 5. 10. 2015 20:11:35
P?edm?t: Re: RDD of ImmutableList

kryo doesn't support guava's collections by default
I remember encountered project in github that fixes this(not sure though). I've 
ended to stop using guava collections as soon as spark rdds are concerned.

On 5 October 2015 at 21:04, Jakub Dubovsky 
> wrote:
Hi all,

  I would like to have an advice on how to use ImmutableList with RDD. Small 
presentation of an essence of my problem in spark-shell with guava jar added:

scala> import com.google.common.collect.ImmutableList
import com.google.common.collect.ImmutableList

scala> val arr = Array(ImmutableList.of(1,2), ImmutableList.of(2,4), 
ImmutableList.of(3,6))
arr: Array[com.google.common.collect.ImmutableList[Int]] = Array([1, 2], [2, 
4], [3, 6])

scala> val rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[com.google.common.collect.ImmutableList[Int]] = 
ParallelCollectionRDD[0] at parallelize at :24

scala> rdd.count

 This results in kryo exception saying that it cannot add a new element to list 
instance while deserialization:

java.io.IOException: java.lang.UnsupportedOperationException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at 
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
...
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
at 
com.google.common.collect.ImmutableCollection.add(ImmutableCollection.java:91)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
...

  It somehow makes sense. But I cannot think of a workaround and I do not 
believe that using ImmutableList with RDD is not possible. How this is solved?

  Thank you in advance!

   Jakub Dubovsky




Re: StructType has more rows, than corresponding Row has objects.

2015-10-05 Thread Davies Liu
Could you tell us a way to reproduce this failure? Reading from JSON or Parquet?

On Mon, Oct 5, 2015 at 4:28 AM, Eugene Morozov
 wrote:
> Hi,
>
> We're building our own framework on top of spark and we give users pretty
> complex schema to work with. That requires from us to build dataframes by
> ourselves: we transform business objects to rows and struct types and uses
> these two to create dataframe.
>
> Everything was fine until I started to upgrade to spark 1.5.0 (from 1.3.1).
> Seems to be catalyst engine has been changed and now using almost the same
> code to produce rows and struct types I have the following:
> http://ibin.co/2HzUsoe9O96l, some of rows in the end result have different
> number of values and corresponding struct types.
>
> I'm almost sure it's my own fault, but there is always a small chance, that
> something is wrong in spark codebase. If you've seen something similar or if
> there is a jira for smth similar, I'd be glad to know. Thanks.
> --
> Be well!
> Jean Morozov

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2015-10-05 Thread Alex Rovner
We are running CDH 5.4 with Spark 1.3 as our main version and that version
is configured to use the external shuffling service. We have also installed
Spark 1.5 and have configured it not to use the external shuffling service
and that works well for us so far. I would be interested myself how to
configure multiple versions to use the same shuffling service.

*Alex Rovner*
*Director, Data Engineering *
*o:* 646.759.0052

* *

On Mon, Oct 5, 2015 at 11:06 AM, Andreas Fritzler <
andreas.fritz...@gmail.com> wrote:

> Hi Steve, Alex,
>
> how do you handle the distribution and configuration of
> the spark-*-yarn-shuffle.jar on your NodeManagers if you want to use 2
> different Spark versions?
>
> Regards,
> Andreas
>
> On Mon, Oct 5, 2015 at 4:54 PM, Steve Loughran 
> wrote:
>
>>
>> > On 5 Oct 2015, at 16:48, Alex Rovner  wrote:
>> >
>> > Hey Steve,
>> >
>> > Are you referring to the 1.5 version of the history server?
>> >
>>
>>
>> Yes. I should warn, however, that there's no guarantee that a history
>> server running the 1.4 code will handle the histories of a 1.5+ job. In
>> fact, I'm fairly confident it won't, as the events to get replayed are
>> different.
>>
>
>


RE: No space left on device when running graphx job

2015-10-05 Thread Jack Yang
Just something usual as below:



1.  Check the physical disk volume (particularly /tmp folder)

2.  Use spark.local.dir to check the size of the temp files

3.  Add more workers

4.  Decrease partitions (in code)

From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: Saturday, 26 September 2015 12:27 AM
To: Jack Yang
Cc: Ted Yu; Andy Huang; user@spark.apache.org
Subject: Re: No space left on device when running graphx job

Would you mind sharing what your solution was? It would help those on the forum 
who might run into the same problem. Even it it’s a silly ‘gotcha’ it would 
help to know what it was and how you spotted the source of the issue.

Robin



On 25 Sep 2015, at 05:34, Jack Yang > 
wrote:

Hi all,
I resolved the problems.
Thanks folk.
Jack

From: Jack Yang [mailto:j...@uow.edu.au]
Sent: Friday, 25 September 2015 9:57 AM
To: Ted Yu; Andy Huang
Cc: user@spark.apache.org
Subject: RE: No space left on device when running graphx job

Also, please see the screenshot below from spark web ui:
This is the snapshot just 5 seconds (I guess) before the job crashed.



From: Jack Yang [mailto:j...@uow.edu.au]
Sent: Friday, 25 September 2015 9:55 AM
To: Ted Yu; Andy Huang
Cc: user@spark.apache.org
Subject: RE: No space left on device when running graphx job

Hi, here is the full stack trace:

15/09/25 09:50:14 WARN scheduler.TaskSetManager: Lost task 21088.0 in stage 6.0 
(TID 62230, 192.168.70.129): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:86)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:84)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:84)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:168)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:84)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1.apply(IndexShuffleBlockResolver.scala:80)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFile$1.apply(IndexShuffleBlockResolver.scala:80)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFile(IndexShuffleBlockResolver.scala:88)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


I am using df –i command to monitor the inode usage, which shows the below all 
the time:

Filesystem  Inodes  IUsed  IFree IUse% Mounted on
/dev/sda1  1245184 275424 969760   23% /
udev382148484 3816641% /dev
tmpfs   384505366 3841391% /run
none384505  3 3845021% /run/lock
none384505  1 3845041% /run/shm



From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Thursday, 24 September 2015 9:12 PM
To: Andy Huang
Cc: Jack Yang; user@spark.apache.org
Subject: Re: No space left on device when running graphx job

Andy:
Can you show complete stack trace ?

Have you checked there are enough free inode on the .129 machine ?

Cheers

On Sep 23, 2015, at 11:43 PM, Andy Huang 
> wrote:
Hi Jack,

Are you writing out to disk? Or it sounds like Spark is spilling to disk (RAM 
filled up) and it's running out of disk space.

Cheers
Andy

On Thu, Sep 24, 2015 at 4:29 PM, Jack Yang 
> wrote:
Hi folk,

I have an issue of graphx. (spark: 1.4.0 + 4 machines + 4G 

Re: spark-ec2 config files.

2015-10-05 Thread Renato Perini

Yes. Thank you Jeff, really appreciated the help.

Renato.

Il 06/10/2015 00:06, Hemminger Jeff ha scritto:
The spark-ec2 script generates spark config files from templates. 
Those are located here:

https://github.com/amplab/spark-ec2/tree/branch-1.5/templates/root/spark/conf
Note the link is referring to the 1.5 branch.
Is this what you are looking for?
Jeff

On Mon, Oct 5, 2015 at 8:56 AM, Renato Perini > wrote:


Can someone provide the relevant config files generated by Spark
EC2 script?
I'm configuring a Spark cluster on EC2 manually, and I would like
to compare my config files (spark-defaults.conf, spark-env.sh)
with those generated by the spark-ec2 script.
Of course, hide your sensitive informations.

Thank you.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org







Re: String operation in filter with a special character

2015-10-05 Thread Hemminger Jeff
Thank you!

On Tue, Oct 6, 2015 at 4:50 AM, Michael Armbrust 
wrote:

> Double quotes (") are used to create string literals in HiveQL / Spark
> SQL.  So you are asking if the string A+B equals the number 2.0.
>
> You should use backticks (`) to escape weird characters in column names.
>
> On Mon, Oct 5, 2015 at 12:59 AM, Hemminger Jeff  wrote:
>
>> I have a rather odd use case. I have a DataFrame column name with a +
>> value in it.
>> The app performs some processing steps before determining the column
>> name, and it
>> would be much easier to code if I could use the DataFrame filter
>> operations with a String.
>>
>> This demonstrates the issue I am having:
>>
>> dataFrame.filter(renamed("A+B").equalTo(2.0)).show()
>>
>> This will return all rows with the column value matching 2.0, as expected.
>>
>> dataFrame.filter("\"A+B\"=2.0").show()
>>
>> This executes but does not return the correct results. It returns an
>> empty result.
>>
>> dataFrame.filter("\"A+C\"=2.0").show()
>>
>> Referencing a non-existent column name returns the same empty result.
>>
>> Any suggestions?
>>
>> Jeff
>>
>
>


How can I disable logging when running local[*]?

2015-10-05 Thread Jeff Jones
I’ve written an application that hosts the Spark driver in-process using 
“local[*]”. I’ve turned off logging in my conf/log4j.properties file. I’ve also 
tried putting the following code prior to creating my SparkContext. These were 
coupled together from various posts I’ve. None of these steps have worked. I’m 
still getting a ton of logging to the console. Anything else I can try?

Thanks,
Jeff

private def disableLogging(): Unit = {
  import org.apache.log4j.PropertyConfigurator

  PropertyConfigurator.configure("conf/log4j.properties")
  Logger.getRootLogger().setLevel(Level.OFF)
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)
}


This message (and any attachments) is intended only for the designated 
recipient(s). It
may contain confidential or proprietary information, or have other limitations 
on use as
indicated by the sender. If you are not a designated recipient, you may not 
review, use,
copy or distribute this message. If you received this in error, please notify 
the sender by
reply e-mail and delete this message.


Re: ERROR: "Size exceeds Integer.MAX_VALUE" Spark 1.5

2015-10-05 Thread Anuj Kumar
You may be hitting the 2GB limit. See-
https://issues.apache.org/jira/browse/SPARK-5928
https://issues.apache.org/jira/browse/SPARK-6190
https://issues.apache.org/jira/browse/SPARK-6235

Increasing the number of partitions might help-
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
but I have not tried this configuration parameter earlier.

OTOH, I didn't understand the motive of the query. What exactly is the
purpose? - Are you looking for distinct guids?

Regards,
Anuj

On Tue, Oct 6, 2015 at 3:42 AM, Muhammad Ahsan 
wrote:

> Hello Everyone !
>
> I am working with spark 1.5 over YARN. I am trying something like
>
> val results = sqlContext.sql("SELECT guid FROM clickstream group by guid")
>
> results.take(10).foreach(println)
>
>
> But I am getting the following error. I am using data frames and unable to
> resolve this error, please help
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 815 in stage 13.0 failed 4 times, most recent failure: Lost task 815.3 in
> stage 13.0 (TID 3612, amd-014.test.com): java.lang.RuntimeException:
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
> at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)
> at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
> at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
>
> at

Re: How can I disable logging when running local[*]?

2015-10-05 Thread Alex Kozlov
Did you try “--driver-java-options
'-Dlog4j.configuration=file:/'” and setting the
log4j.rootLogger=FATAL,console?

On Mon, Oct 5, 2015 at 8:19 PM, Jeff Jones 
wrote:

> I’ve written an application that hosts the Spark driver in-process using
> “local[*]”. I’ve turned off logging in my conf/log4j.properties file. I’ve
> also tried putting the following code prior to creating my SparkContext.
> These were coupled together from various posts I’ve. None of these steps
> have worked. I’m still getting a ton of logging to the console. Anything
> else I can try?
>
> Thanks,
> Jeff
>
> private def disableLogging(): Unit = {
>   import org.apache.log4j.PropertyConfigurator
>
>   PropertyConfigurator.configure("conf/log4j.properties")
>   Logger.getRootLogger().setLevel(Level.OFF)
>   Logger.getLogger("org").setLevel(Level.OFF)
>   Logger.getLogger("akka").setLevel(Level.OFF)
> }
>
>
>
> This message (and any attachments) is intended only for the designated
> recipient(s). It
> may contain confidential or proprietary information, or have other
> limitations on use as
> indicated by the sender. If you are not a designated recipient, you may
> not review, use,
> copy or distribute this message. If you received this in error, please
> notify the sender by
> reply e-mail and delete this message.
>



-- 
Alex Kozlov
(408) 507-4987
(408) 830-9982 fax
(650) 887-2135 efax
ale...@gmail.com


Re: save DF to JDBC

2015-10-05 Thread Ruslan Dautkhanov
Thank you Richard and Matthew.

DataFrameWriter first appeared in Spark 1.4. Sorry, I should have mentioned
earlier, we're on CDH 5.4 / Spark 1.3. No options for this version?


Best regards,
Ruslan Dautkhanov

On Mon, Oct 5, 2015 at 4:00 PM, Richard Hillegas  wrote:

> Hi Ruslan,
>
> Here is some sample code which writes a DataFrame to a table in a Derby
> database:
>
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
>
> val binaryVal = Array[Byte] ( 1, 2, 3, 4 )
> val timestampVal = java.sql.Timestamp.valueOf("1996-01-01 03:30:36")
> val dateVal = java.sql.Date.valueOf("1996-01-01")
>
> val allTypes = sc.parallelize(
> Array(
>   (1,
>   1.toLong,
>   1.toDouble,
>   1.toFloat,
>   1.toShort,
>   1.toByte,
>   "true".toBoolean,
>   "one ring to rule them all",
>   binaryVal,
>   timestampVal,
>   dateVal,
>   BigDecimal.valueOf(42549.12)
>   )
> )).toDF(
>   "int_col",
>   "long_col",
>   "double_col",
>   "float_col",
>   "short_col",
>   "byte_col",
>   "boolean_col",
>   "string_col",
>   "binary_col",
>   "timestamp_col",
>   "date_col",
>   "decimal_col"
>   )
>
> val properties = new java.util.Properties()
>
> allTypes.write.jdbc("jdbc:derby:/Users/rhillegas/derby/databases/derby1",
> "all_spark_types", properties)
>
> Hope this helps,
>
> Rick Hillegas
> STSM, IBM Analytics, Platform - IBM USA
>
>
> Ruslan Dautkhanov  wrote on 10/05/2015 02:44:20 PM:
>
> > From: Ruslan Dautkhanov 
> > To: user 
> > Date: 10/05/2015 02:45 PM
> > Subject: save DF to JDBC
> >
> > http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-
> > to-other-databases
> >
> > Spark JDBC can read data from JDBC, but can it save back to JDBC?
> > Like to an Oracle database through its jdbc driver.
> >
> > Also looked at SQL Context documentation
> > https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/
> > SQLContext.html
> > and can't find anything relevant.
> >
> > Thanks!
> >
> >
> > --
> > Ruslan Dautkhanov
>


Re: Store DStreams into Hive using Hive Streaming

2015-10-05 Thread Umesh Kacha
Hi no didn't find any solution still I need that feature of hive streaming
using Spark please let me know if you get something. Alternative solution
is to use storm for hive processing. I would like to stick to Spark so
still searching.
On Oct 5, 2015 2:51 PM, "Krzysztof Zarzycki"  wrote:

> I'm also interested in this feature. Did you guys found some information
> about how to use Hive Streaming with Spark Streaming?
>
> Thanks,
> Krzysiek
>
> 2015-07-17 20:16 GMT+02:00 unk1102 :
>
>> Hi I have similar use case did you found solution for this problem of
>> loading
>> DStreams in Hive using Spark Streaming. Please guide. Thanks.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Store-DStreams-into-Hive-using-Hive-Streaming-tp18307p23885.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark 1.5.0 Error on startup

2015-10-05 Thread Steve Loughran
this is windows, isn't it? you're missing a winutils.exe in the right place, 
and hadoop core isn't providing anything meaningful. Hadoop 2.8 will provide 
more diagnostics, including a link to this wiki page:


https://wiki.apache.org/hadoop/WindowsProblems



On 5 Oct 2015, at 05:26, Julius Fernandes 
> wrote:


Caused by: java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:808)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)
at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097)
at 
org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:582)
at 
org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:557)
at 
org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:599)
at 
org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554)
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)
... 56 more



StructType has more rows, than corresponding Row has objects.

2015-10-05 Thread Eugene Morozov
Hi,

We're building our own framework on top of spark and we give users pretty
complex schema to work with. That requires from us to build dataframes by
ourselves: we transform business objects to rows and struct types and uses
these two to create dataframe.

Everything was fine until I started to upgrade to spark 1.5.0 (from 1.3.1).
Seems to be catalyst engine has been changed and now using almost the same
code to produce rows and struct types I have the following:
http://ibin.co/2HzUsoe9O96l, some of rows in the end result have different
number of values and corresponding struct types.

I'm almost sure it's my own fault, but there is always a small chance, that
something is wrong in spark codebase. If you've seen something similar or
if there is a jira for smth similar, I'd be glad to know. Thanks.
--
Be well!
Jean Morozov


Spark handling parallel requests

2015-10-05 Thread tarek.abouzeid91
Hi ,
i am using Scala , doing a socket program to catch multiple requests at same 
time and then call a function which uses spark to handle each process , i have 
a multi-threaded server to handle the multiple requests and pass each to spark 
, but there's a bottleneck as the spark doesn't initialize a sub task for the 
new request , is it even possible to do parallel processing using single spark 
job ?Best Regards, --  Best Regards, -- Tarek Abouzeid

Re: Spark 1.5.0 Error on startup

2015-10-05 Thread Julius Fernandes
You are right this is related to Windows. I am using Windows8.

Placed the WinUtils.exe in HADOOP_HOME/bin.

Issue does not occur anymore.



On Mon, Oct 5, 2015 at 3:30 PM, Steve Loughran 
wrote:

> this is windows, isn't it? you're missing a winutils.exe in the right
> place, and hadoop core isn't providing anything meaningful. Hadoop 2.8 will
> provide more diagnostics, including a link to this wiki page:
>
>
> https://wiki.apache.org/hadoop/WindowsProblems
>
>
>
> On 5 Oct 2015, at 05:26, Julius Fernandes  wrote:
>
>
> Caused by: java.lang.NullPointerException
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010)
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
> at org.apache.hadoop.util.Shell.run(Shell.java:455)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
> at org.apache.hadoop.util.Shell.execCommand(Shell.java:808)
> at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)
> at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097)
> at
> org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:582)
> at
> org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:557)
> at
> org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:599)
> at
> org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554)
> at
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)
> ... 56 more
>
>
>


[Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2015-10-05 Thread Andreas Fritzler
Hi,

I was just wondering, if it is possible to register multiple versions of
the aux-services with YARN as described in the documentation:



   1. In the yarn-site.xml on each node, add spark_shuffle to
   yarn.nodemanager.aux-services, then set
   yarn.nodemanager.aux-services.spark_shuffle.class to
   org.apache.spark.network.yarn.YarnShuffleService. Additionally, set all
   relevantspark.shuffle.service.* configurations
   .

The reason for the question is: I am trying to run multiple versions of
Spark in parallel. Does anybody have any experience on how such a dual
version operation holds up in terms of downward-compatibility?

Maybe sticking to the latest version of the aux-service will do the trick?

Regards,
Andreas

[1]
http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation


Job on Yarn not using all given capacity ends up failing

2015-10-05 Thread Cesar Berezowski
Hi, 

I recently upgraded from 1.2.1 to 1.3.1 (through HDP). 

I have a job that does a cartesian product on two datasets (2K and 500K lines 
minimum) to do string matching.

I updated it to use Dataframes because the old code wouldn’t run anymore 
(deprecated RDD functions).

It used to run very well and use all allocated memory/cores but doesn’t anymore 
and I can’t figure out why.

My cluster has 4 workers each with 14 available vCores and 40 GB of RAM

I run the job with the following properties: 
—master yarn-client
—num-executors 12
—executor-cores 4
—executor-memory 12G

That should give me 3 executors per workers however I always end up with ~3 
executors (total) and 2 tasks that end up failing on « OutOfMemoryError: GC 
overhead limit exceeded », restart and fail again because output directory 
exists.

Any idea ?

Thanks !

César.




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OutOfMemoryError

2015-10-05 Thread Ramkumar V
No. I didn't try to increase xmx.

*Thanks*,



On Mon, Oct 5, 2015 at 1:36 PM, Jean-Baptiste Onofré 
wrote:

> Hi Ramkumar,
>
> did you try to increase Xmx of the workers ?
>
> Regards
> JB
>
> On 10/05/2015 08:56 AM, Ramkumar V wrote:
>
>> Hi,
>>
>> When i submit java spark job in cluster mode, i'm getting following
>> exception.
>>
>> *LOG TRACE :*
>>
>> INFO yarn.ExecutorRunnable: Setting up executor with commands:
>> List({{JAVA_HOME}}/bin/java, -server, -XX:OnOutOfMemoryError='kill
>>   %p', -Xms1024m, -Xmx1024m, -Djava.io.tmpdir={{PWD}}/tmp,
>> '-Dspark.ui.port=0', '-Dspark.driver.port=48309',
>> -Dspark.yarn.app.container.log.dir=> _DIR>, org.apache.spark.executor.CoarseGrainedExecutorBackend,
>> --driver-url, akka.tcp://sparkDriver@ip:port/user/CoarseGrainedScheduler,
>>   --executor-id, 2, --hostname, hostname , --cores, 1, --app-id,
>> application_1441965028669_9009, --user-class-path, file:$PWD
>> /__app__.jar, --user-class-path, file:$PWD/json-20090211.jar, 1>,
>> /stdout, 2>, /stderr).
>>
>> I have a cluster of 11 machines (9 - 64 GB memory and 2 - 32 GB memory
>> ). my input data of size 128 GB.
>>
>> How to solve this exception ? is it depends on driver.memory and
>> execuitor.memory setting ?
>>
>>
>> *Thanks*,
>> 
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Error: could not find function "includePackage"

2015-10-05 Thread jayendra.par...@yahoo.in
As mentioned on the website that “includePackage” command can be used to
include existing R packages, but when I am using this command R is giving
this error :-

Error: could not find function "includePackage"

And there is no function called  includePackage in sparkR package version
1.5.0, so how can I import other R packages in sparkR.

Please guide me

Thanks 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-could-not-find-function-includePackage-tp24924.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2015-10-05 Thread Steve Loughran

> On 5 Oct 2015, at 15:59, Alex Rovner  wrote:
> 
> I have the same question about the history server. We are trying to run 
> multiple versions of Spark and are wondering if the history server is 
> backwards compatible.

yes, it supports the pre-1.4 "Single attempt" logs as well as the 1.4+ multiple 
attempt model.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DStream Transformation to save JSON in Cassandra 2.1

2015-10-05 Thread Ashish Soni
try this


You can use dstream.map to conver it to JavaDstream with only the data you
are interested probably return an Pojo of your JSON

and then call foreachRDD and inside that call below line

javaFunctions(rdd).writerBuilder("table", "keyspace",
mapToRow(Class.class)).saveToCassandra();

On Mon, Oct 5, 2015 at 10:14 AM, Prateek .  wrote:

> Hi,
>
> I am beginner in Spark , this is sample data I get from Kafka stream:
>
> {"id":
> "9f5ccb3d5f4f421392fb98978a6b368f","coordinate":{"ax":"1.20","ay":"3.80","az":"9.90","oa":"8.03","ob":"8.8","og":"9.97"}}
>
>   val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicMap).map(_._2)
>   val jsonf =
> lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
> Any]])
>
>   I am getting a, DSTream[Map[String,Any]]. I need to store each
> coordinate values in the below Cassandra schema
>
> CREATE TABLE iotdata.coordinate (
> id text PRIMARY KEY, ax double, ay double, az double, oa double, ob
> double, oz double
> )
>
> For this what transformations I need to apply before I execute
> saveToCassandra().
>
> Thank You,
> Prateek
>
>
> "DISCLAIMER: This message is proprietary to Aricent and is intended solely
> for the use of the individual to whom it is addressed. It may contain
> privileged or confidential information and should not be circulated or used
> for any purpose other than for what it is intended. If you have received
> this message in error, please notify the originator immediately. If you are
> not the intended recipient, you are notified that you are strictly
> prohibited from using, copying, altering, or disclosing the contents of
> this message. Aricent accepts no responsibility for loss or damage arising
> from the use of the information transmitted by this email including damage
> from virus."
>


Re: DStream Transformation to save JSON in Cassandra 2.1

2015-10-05 Thread Jean-Baptiste Onofré

Hi Prateek,

I see two ways:

- using Cassandra CQL to adapt the RDD in the DStream to Cassandra
- using a Cassandra converter

You have a couple of code snippet in the examples. Let me know if you 
need a code sample.


Regards
JB

On 10/05/2015 04:14 PM, Prateek . wrote:

Hi,

I am beginner in Spark , this is sample data I get from Kafka stream:

{"id": 
"9f5ccb3d5f4f421392fb98978a6b368f","coordinate":{"ax":"1.20","ay":"3.80","az":"9.90","oa":"8.03","ob":"8.8","og":"9.97"}}

   val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
   val jsonf = 
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

   I am getting a, DSTream[Map[String,Any]]. I need to store each coordinate 
values in the below Cassandra schema

CREATE TABLE iotdata.coordinate (
 id text PRIMARY KEY, ax double, ay double, az double, oa double, ob 
double, oz double
)

For this what transformations I need to apply before I execute 
saveToCassandra().

Thank You,
Prateek


"DISCLAIMER: This message is proprietary to Aricent and is intended solely for the 
use of the individual to whom it is addressed. It may contain privileged or confidential 
information and should not be circulated or used for any purpose other than for what it 
is intended. If you have received this message in error, please notify the originator 
immediately. If you are not the intended recipient, you are notified that you are 
strictly prohibited from using, copying, altering, or disclosing the contents of this 
message. Aricent accepts no responsibility for loss or damage arising from the use of the 
information transmitted by this email including damage from virus."

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error: could not find function "includePackage"

2015-10-05 Thread Ted Yu
includePackage is defined in R/pkg/R/context.R

FYI

On Mon, Oct 5, 2015 at 6:46 AM, jayendra.par...@yahoo.in <
jayendra.par...@yahoo.in> wrote:

> As mentioned on the website that “includePackage” command can be used to
> include existing R packages, but when I am using this command R is giving
> this error :-
>
> Error: could not find function "includePackage"
>
> And there is no function called  includePackage in sparkR package version
> 1.5.0, so how can I import other R packages in sparkR.
>
> Please guide me
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-could-not-find-function-includePackage-tp24924.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


save checkpoint during dataframe row iteration

2015-10-05 Thread Justin Permar
Good morning,

I have a typical iterator loop on a DataFrame loaded from a parquet data
source:

val conf = new SparkConf().setAppName("Simple
Application").setMaster("local")
val sc = new JavaSparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val parquetDataFrame =
sqlContext.read.parquet(parquetFilename.getAbsolutePath)
parquetDataFrame.foreachPartition {
  rowIterator =>
rowIterator.foreach { row =>
// ... do work
}
}

My use case is quite simple: I would like to save a checkpoint during
processing, and if the driver program fails, skip over the initial records
in the parquet file, and continue from the checkpoint. This would be
analogous to storing a loop iterator value in a standard C++/Java for loop.

My question is: are there any guarantees about the ordering of rows in the
"foreach" closure? Even if there are not guarantees in general (i.e., for
DataFrame from any source), considering that the data frame is created from
a parquet file, are there any guarantees?

Is it possible to implement my use case?

Thanks for your time.


Re: [Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2015-10-05 Thread Andreas Fritzler
Hi Steve, Alex,

how do you handle the distribution and configuration of
the spark-*-yarn-shuffle.jar on your NodeManagers if you want to use 2
different Spark versions?

Regards,
Andreas

On Mon, Oct 5, 2015 at 4:54 PM, Steve Loughran 
wrote:

>
> > On 5 Oct 2015, at 16:48, Alex Rovner  wrote:
> >
> > Hey Steve,
> >
> > Are you referring to the 1.5 version of the history server?
> >
>
>
> Yes. I should warn, however, that there's no guarantee that a history
> server running the 1.4 code will handle the histories of a 1.5+ job. In
> fact, I'm fairly confident it won't, as the events to get replayed are
> different.
>


RE: Error: could not find function "includePackage"

2015-10-05 Thread Koen Vantomme
 

Verzonden vanaf mijn Sony Xperia™-smartphone

 jayendra.par...@yahoo.in schreef 

>As mentioned on the website that “includePackage” command can be used to
>include existing R packages, but when I am using this command R is giving
>this error :-
>
>Error: could not find function "includePackage"
>
>And there is no function called  includePackage in sparkR package version
>1.5.0, so how can I import other R packages in sparkR.
>
>Please guide me
>
>Thanks 
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Error-could-not-find-function-includePackage-tp24924.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


DStream Transformation to save JSON in Cassandra 2.1

2015-10-05 Thread Prateek .
Hi,

I am beginner in Spark , this is sample data I get from Kafka stream:

{"id": 
"9f5ccb3d5f4f421392fb98978a6b368f","coordinate":{"ax":"1.20","ay":"3.80","az":"9.90","oa":"8.03","ob":"8.8","og":"9.97"}}

  val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
  val jsonf = 
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

  I am getting a, DSTream[Map[String,Any]]. I need to store each coordinate 
values in the below Cassandra schema

CREATE TABLE iotdata.coordinate (
id text PRIMARY KEY, ax double, ay double, az double, oa double, ob double, 
oz double
)

For this what transformations I need to apply before I execute 
saveToCassandra().

Thank You,
Prateek


"DISCLAIMER: This message is proprietary to Aricent and is intended solely for 
the use of the individual to whom it is addressed. It may contain privileged or 
confidential information and should not be circulated or used for any purpose 
other than for what it is intended. If you have received this message in error, 
please notify the originator immediately. If you are not the intended 
recipient, you are notified that you are strictly prohibited from using, 
copying, altering, or disclosing the contents of this message. Aricent accepts 
no responsibility for loss or damage arising from the use of the information 
transmitted by this email including damage from virus."


Re: [Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2015-10-05 Thread Alex Rovner
I have the same question about the history server. We are trying to run
multiple versions of Spark and are wondering if the history server is
backwards compatible.

*Alex Rovner*
*Director, Data Engineering *
*o:* 646.759.0052

* *

On Mon, Oct 5, 2015 at 9:22 AM, Andreas Fritzler  wrote:

> Hi,
>
> I was just wondering, if it is possible to register multiple versions of
> the aux-services with YARN as described in the documentation:
>
>
>
>1. In the yarn-site.xml on each node, add spark_shuffle to
>yarn.nodemanager.aux-services, then set
>yarn.nodemanager.aux-services.spark_shuffle.class to
>org.apache.spark.network.yarn.YarnShuffleService. Additionally, set
>all relevantspark.shuffle.service.* configurations
>.
>
> The reason for the question is: I am trying to run multiple versions of
> Spark in parallel. Does anybody have any experience on how such a dual
> version operation holds up in terms of downward-compatibility?
>
> Maybe sticking to the latest version of the aux-service will do the trick?
>
> Regards,
> Andreas
>
> [1]
> http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
>


RE: How to install a Spark Package?

2015-10-05 Thread jeff saremi
yes those.Here's for example Avro's:
spark-shell --packages com.databricks:spark-avro_2.10:2.0.1The way i read this 
is that this line would instruct Spark to go and get the package.
But does that mean installation? Is this permament? do I need to specify it 
one? or each time? Will this be downloaded each time?
What if the environment that i'm running my Spark in does no allow such 
connection?
If i have downloaded these packages ist there a way to install them permanently?

Subject: Re: How to install a Spark Package?
From: yuzhih...@gmail.com
Date: Sun, 4 Oct 2015 21:05:44 -0700
CC: user@spark.apache.org
To: jeffsar...@hotmail.com

Are you talking about package which is listed onhttp://spark-packages.org
The package should come with installation instructions, right ?
On Oct 4, 2015, at 8:55 PM, jeff saremi  wrote:




So that it is available even in offline mode? I can't seem to be able to find 
any notes on thatthanksjeff 
  

Spark context on thrift server

2015-10-05 Thread Younes Naguib
Hi,

We're using a spark thrift server and we connect using jdbc to run queries.
Every time we run a set query, like "set schema", it seems to affect the 
server, and not the session only.

Is that an expected behavior? Or am I missing something.


Younes Naguib
Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8
Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | 
younes.nag...@tritondigital.com 



Re: [Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2015-10-05 Thread Alex Rovner
Hey Steve,

Are you referring to the 1.5 version of the history server?

*Alex Rovner*
*Director, Data Engineering *
*o:* 646.759.0052

* *

On Mon, Oct 5, 2015 at 10:18 AM, Steve Loughran 
wrote:

>
> > On 5 Oct 2015, at 15:59, Alex Rovner  wrote:
> >
> > I have the same question about the history server. We are trying to run
> multiple versions of Spark and are wondering if the history server is
> backwards compatible.
>
> yes, it supports the pre-1.4 "Single attempt" logs as well as the 1.4+
> multiple attempt model.
>
>


Re: [Spark on YARN] Multiple Auxiliary Shuffle Service Versions

2015-10-05 Thread Steve Loughran

> On 5 Oct 2015, at 16:48, Alex Rovner  wrote:
> 
> Hey Steve,
> 
> Are you referring to the 1.5 version of the history server?
> 


Yes. I should warn, however, that there's no guarantee that a history server 
running the 1.4 code will handle the histories of a 1.5+ job. In fact, I'm 
fairly confident it won't, as the events to get replayed are different.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: "java.io.IOException: Filesystem closed" on executors

2015-10-05 Thread Lan Jiang
I am still facing this issue. Executor dies due to

org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem closed
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
...
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
at java.io.DataInputStream.read(DataInputStream.java:149)

Spark automatically launched new executors and the whole job completed
fine. Anyone has a clue what's going on?

The spark job reads avro files from a directory, do some basic map/filter
and then repartition to 1, write the result to HDFS. I use spark 1.3 with
spark-avro (1.0.0). The error only happens when running on the whole
dataset. When running on 1/3 of the files, the same job completes without
error.


On Thu, Oct 1, 2015 at 2:41 PM, Lan Jiang  wrote:

> Hi, there
>
> Here is the problem I ran into when executing a Spark Job (Spark 1.3). The
> spark job is loading a bunch of avro files using Spark SQL spark-avro 1.0.0
> library. Then it does some filter/map transformation, repartition to 1
> partition and then write to HDFS. It creates 2 stages. The total HDFS block
> number is around 12000, thus it creates 12000 partitions, thus 12000 tasks
> for the first stage. I have total 9 executors launched with 5 thread for
> each. The job has run fine until the very end.  When it reaches 19980/2
> tasks succeeded, it suddenly failed the last 20 tasks and I lost 2
> executors. The spark did launched 2 new executors and finishes the job
> eventually by reprocessing the 20 tasks.
>
> I only ran into this issue when I run the spark application on the full
> dataset. When I run the 1/3 of the dataset, everything finishes fine
> without error.
>
> Question 1: What is the root cause of this issue? It is simiar to
> http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed
> and https://issues.apache.org/jira/browse/SPARK-3052, but it says the
> issue has been fixed since 1.2
> Quesiton 2: I am a little surprised that after the 2 new executors were
> launched,  replacing the two failed executors, they simply reprocessed the
> failed 20 tasks/partitions.  What about the results for other parititons
> processed by the 2 failed executors before? I assumed the results of these
> parititons are stored to the local disk and thus do not need to be computed
> by the new exectuors?  When are the data stored locally? Is it
> configuration? This question is for my own understanding about the spark
> framework.
>
> The exception causing the exectuor failure is below
>
> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem
> closed
> at
> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Filesystem closed
> at 

Custom RDD for Proprietary MPP database

2015-10-05 Thread VJ Anand
Hi,

I need to build a RDD that supports a custom built Database (Which is
sharded) across several nodes. I need to build an RDD that can support and
provide the partitions specific to this database.
I would like to do this in Java - I see there are JavaRDD, and other
specific RDD available - my question, is if I subclass or extend this RDD -
can I override the getPartitions, and other methods? Or is there any other
alternative? Any help or pointers much appreciated

Thanks
VJ


Exception: "You must build Spark with Hive. Export 'SPARK_HIVE=true' and run build/sbt assembly"

2015-10-05 Thread cherah30
I work with Spark 1.5 on windows 7, with anacond and pyspark. everything
works fine until I wanted to test the connection to my MySQL database. So I
started watching it 
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

 
.
Everything is set (jdbc, ... Etc).

To start playing with, I just wanted to connect to my Mysql database to
retrieve data from a table.

Here is my code

from pyspark.sql import HiveContext
df_mysql = sqlHiveContext.read.format("jdbc").options(url = 
"jdbc:mysql://localhost:3306/my_bdd_name", driver = "com.mysql.jdbc.Driver",
dbtable="bdd_My_table_nameXX",  user ="my_id", password="my_pw").load()

And here is the exception message : 
Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and
run build/sbt assembly", Py4JJavaError(u'An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o28)).

You get an idea of what to do?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exception-You-must-build-Spark-with-Hive-Export-SPARK-HIVE-true-and-run-build-sbt-assembly-tp24928.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX: How can I tell if 2 nodes are connected?

2015-10-05 Thread Robineast
GraphX has a Shortest Paths algorithm implementation which will tell you, for
all vertices in the graph, the shortest distance to a specific ('landmark')
vertex. The returned value is '/a graph where each vertex attribute is a map
containing the shortest-path distance to each reachable landmark vertex/'.
If there is no path to the landmark vertex then the map for the source
vertex is empty



-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-How-can-I-tell-if-2-nodes-are-connected-tp24926p24930.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark metrics cpu/memory

2015-10-05 Thread gtanguy
I would like to monitor cpu/memory usage.

I read the section Metrics on :
http://spark.apache.org/docs/1.3.1/monitoring.html.

Here my $SPARK_HOME/conf/metrics.properties

# Enable CsvSink for all instances
*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink

# Polling period for CsvSink
*.sink.csv.period=1

*.sink.csv.unit=seconds

# Polling directory for CsvSink
*.sink.csv.directory=/home/spark/Documents/test/

In my spark application I add this on sparkcontex : 
.set("spark.metrics.conf",
"/home/spark/development/spark/conf/metrics.properties")

What I tried :

1/ spark@cv-local:~$ $SPARK_HOME/sbin/stop-all.sh
2/ spark@cv-local:~$ ls ~/Documents/test/   //--> EMPTY
3/ spark@cv-local:~$ $SPARK_HOME/sbin/start-all.sh
4/ spark@cv-local:~$ ls ~/Documents/test/
master.apps.csv master.workers.csvworker.coresUsed.csv 
worker.memFree_MB.csv
master.waitingApps.csv  worker.coresFree.csv  worker.executors.csv 
worker.memUsed_MB.csv

5/ Start my spark application (sbt "run-main ..")
6/ spark@cv-local:~/Documents/test$ ls
local-1444064889008..BlockManager.disk.diskSpaceUsed_MB.csv  
master.apps.csv
local-1444064889008..BlockManager.memory.maxMem_MB.csv   
master.waitingApps.csv
local-1444064889008..BlockManager.memory.memUsed_MB.csv  
master.workers.csv
local-1444064889008..BlockManager.memory.remainingMem_MB.csv 
worker.coresFree.csv
local-1444064889008..DAGScheduler.job.activeJobs.csv 
worker.coresUsed.csv
local-1444064889008..DAGScheduler.job.allJobs.csv
worker.executors.csv
local-1444064889008..DAGScheduler.stage.failedStages.csv 
worker.memFree_MB.csv
local-1444064889008..DAGScheduler.stage.runningStages.csv
worker.memUsed_MB.csv
local-1444064889008..DAGScheduler.stage.waitingStages.csv

I did this 6 steps on local and on a cluster. The result is same expect on
the cluster there is not the files worker.*.csv.

When I run my application only the driver seems to be monitored, why? 

How can I get the memory/cpu usage of the master and slaves?








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-metrics-cpu-memory-tp24932.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Utility for PySpark DataFrames - smartframes

2015-10-05 Thread Don Drake
I would like to announce a Python package that makes creating rows in
DataFrames in PySpark as easy as creating an object.

Code is available on GitHub, PyPi, and soon to be on spark-packages.org.


https://github.com/dondrake/smartframes

Motivation

Spark DataFrames provide a nice interface to datasets that have a schema.
Getting data from your code into a DataFrame in Python means creating a
Row() object with field names and respective values. Given that you already
have a schema with data types per field, it would be nice to easily take an
object that represents the row and create the Row() object automatically.

Smartframes allow you to define a class by just creating the schema that
represents the fields and datatypes. You can then create an object and set
the values like any other Python class. When you are ready to store that in
a DataFrame, just call the createRow() method.

The createRow() method will coerce any values into the correct data types,
for example, if a field is defined as an IntegerType and the value set in
the class is a String, it will attempt to convert the string to an Integer
before creating the Row().

This was written when creating Row()'s with Long datatypes and finding that
Spark did not handle converting integers as longs when passing values to
the JVM. I needed a consistent manner to create Row() for all of my
DataFrames.
Installation

pip install smartframes


Any feedback is appreciated.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
800-733-2143


Re: Spark on YARN using Java 1.8 fails

2015-10-05 Thread Ted Yu
YARN 2.7.1 (running on the cluster) was built with Java 1.8, I assume.

Have you used the following command to retrieve / inspect logs ?
yarn logs -applicationId

Cheers

On Mon, Oct 5, 2015 at 8:41 AM, mvle  wrote:

> Hi,
>
> I have successfully run pyspark on Spark 1.5.1 on YARN 2.7.1 with Java
> OpenJDK 1.7.
> However, when I run the same test on Java OpenJDK 1.8 (or Oracle Java 1.8),
> I cannot start up pyspark.
> Has anyone been able to run Spark on YARN with Java 1.8?
>
> I get ApplicationMaster disassociated messages...
>
> 15/10/05 09:55:05 INFO cluster.YarnClientSchedulerBackend: Application
> application_1444055784612_0003 has started running.
> 15/10/05 09:55:05 INFO util.Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53518.
> 15/10/05 09:55:05 INFO netty.NettyBlockTransferService: Server created on
> 53518
> 15/10/05 09:55:05 INFO storage.BlockManagerMaster: Trying to register
> BlockManager
> 15/10/05 09:55:05 INFO storage.BlockManagerMasterEndpoint: Registering
> block
> manager xxx.172.232.xx:53518 with 530.0 MB RAM, BlockManagerId(driver,
> xxx.172.232.xx, 53518)
> 15/10/05 09:55:05 INFO storage.BlockManagerMaster: Registered BlockManager
> 15/10/05 09:55:05 INFO scheduler.EventLoggingListener: Logging events to
> hdfs://h-all1-001:9000/user/hadoop/sparklogs/application_1444055784612_0003
> 15/10/05 09:55:07 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
> ApplicationMaster has disassociated: xxx.172.232.xx:42628
> 15/10/05 09:55:07 WARN remote.ReliableDeliverySupervisor: Association with
> remote system [akka.tcp://sparkyar...@xxx.172.232.xx:42628] has failed,
> address is now gated for [5000] ms. Reason: [Disassociated]
> 15/10/05 09:55:07 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
> ApplicationMaster has disassociated: xxx.172.232.xx:42628
> 15/10/05 09:55:09 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
> ApplicationMaster registered as
> AkkaRpcEndpointRef(Actor[akka.tcp://sparkyar...@xxx.172.232.xx
> :60077/user/YarnAM#-560267402])
> 15/10/05 09:55:09 INFO cluster.YarnClientSchedulerBackend: Add WebUI
> Filter.
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS
> -> h-all1-001, PROXY_URI_BASES ->
> http://h-all1-001:8088/proxy/application_1444055784612_0003),
> /proxy/application_1444055784612_0003
> 15/10/05 09:55:09 INFO ui.JettyUtils: Adding filter:
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
> 15/10/05 09:55:13 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
> ApplicationMaster has disassociated: xxx.172.232.xx:60077
> 15/10/05 09:55:13 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
> ApplicationMaster has disassociated: xxx.172.232.xx:60077
> 15/10/05 09:55:13 WARN remote.ReliableDeliverySupervisor: Association with
> remote system [akka.tcp://sparkyar...@xxx.172.232.xx:60077] has failed,
> address is now gated for [5000] ms. Reason: [Disassociated]
> 15/10/05 09:55:13 ERROR cluster.YarnClientSchedulerBackend: Yarn
> application
> has already exited with state FINISHED!
> 15/10/05 09:55:13 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/metrics/json,null}
> 15/10/05 09:55:13 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
> 15/10/05 09:55:13 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/api,null}
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-YARN-using-Java-1-8-fails-tp24925.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


GraphX: How can I tell if 2 nodes are connected?

2015-10-05 Thread Dino Fancellu
Is there an existing api to see if 2 nodes in a graph are connected?

e.g. a->b, b->c, c->d

can I get to d, starting from a? (yes I hope!)

I'm not asking the route, just want to know if there is a route.

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-How-can-I-tell-if-2-nodes-are-connected-tp24926.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Exception: "You must build Spark with Hive. Export 'SPARK_HIVE=true' and run build/sbt assembly"

2015-10-05 Thread Ted Yu
What command did you use to build Spark 1.5.0 ?

bq. Export 'SPARK_HIVE=true' and run build/sbt assembly

Please following the above.

BTW 1.5.1 has been released which is more stable.

Please use 1.5.1

Cheers

On Mon, Oct 5, 2015 at 9:25 AM, cherah30  wrote:

> I work with Spark 1.5 on windows 7, with anacond and pyspark. everything
> works fine until I wanted to test the connection to my MySQL database. So I
> started watching it
>
> https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
> <
> https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
> >
> .
> Everything is set (jdbc, ... Etc).
>
> To start playing with, I just wanted to connect to my Mysql database to
> retrieve data from a table.
>
> Here is my code
>
> from pyspark.sql import HiveContext
> df_mysql = sqlHiveContext.read.format("jdbc").options(url =
> "jdbc:mysql://localhost:3306/my_bdd_name", driver =
> "com.mysql.jdbc.Driver",
> dbtable="bdd_My_table_nameXX",  user ="my_id", password="my_pw").load()
>
> And here is the exception message :
> Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and
> run build/sbt assembly", Py4JJavaError(u'An error occurred while calling
> None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o28)).
>
> You get an idea of what to do?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Exception-You-must-build-Spark-with-Hive-Export-SPARK-HIVE-true-and-run-build-sbt-assembly-tp24928.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Where to put import sqlContext.implicits._ to be able to work on DataFrames in another file?

2015-10-05 Thread Kristina Rogale Plazonic
Hi all,

I have a Scala project with multiple files: a main file and a file with
utility functions on DataFrames. However, using $"colname" to refer to a
column of the DataFrame in the utils file (see code below) produces a
compile-time error as follows:

"value $ is not a member of StringContext"

My utils code works fine if (I work in spark-shell or):
-  I pass sqlContext as a parameter  to each util function
-  I do import sqlContext.implicits._   inside each util function  (as
below)
(that solution seems ugly and onerous to me?)

My questions:

1. Shouldn't implicits be part of a companion object (e.g. of the object
SQLContext), rather than (singleton) class instance sqlContext? If
implicits are part of the companion object, they could be defined as
imports at the top of each file?

2. Where can I put import sqlContext.implicits._ in order not to invoke it
in every function?

3. Googling, I saw Scala 2.11 might solve this problem? But won't that
cause possible compatibility problems with different jars in 2.10? (I'd
rather stick with 2.10)

Many thanks for any suggestions and insights!
Kristina

My toy code (my use case is data munging in preparation for ml/mllib and I
wanted to separate preprocessing of data to another file):

Hello.scala:

import HelloUtils._

object HelloWorld {
val conf = new SparkConf().setAppName("HelloDataFrames")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

case class RecordTest(name:String, category:String, age:Int)
val adf = sc.parallelize(Seq(
 RecordTest("a", "cat1", 1),
 RecordTest("b", "cat2", 5)   )).toDF

test(adf, sqlContext)  // calling function in HelloUtils
}

HelloUtils.scala:

import org.apache.spark.sql.{DataFrame,SQLContext}

object HelloUtils{

 def test(adf:DataFrame, sqlContext:SQLContext) =  {
import sqlContext.implicits._   // I want to get rid of this line
adf.filter( $"name" === "a").show()
 }

 /// desired way of writing test() function
   def testDesired(adf:DataFrame) =
 adf.filter( $"name" === "a").show()
}


How to change verbosity level and redirect verbosity to file?

2015-10-05 Thread Saif.A.Ellafi
Hi,

I would like to read the full spark-submit log once a job is completed, since 
the output is not stdout, how could I redirect spark output to a file in linux?

Thanks,
Saif



Spark Survey Results 2015 are now available

2015-10-05 Thread Denny Lee
Thanks to all of you who provided valuable feedback in our Spark Survey
2015.  Because of the survey, we have a better picture of who’s using
Spark, how they’re using it, and what they’re using it to build–insights
that will guide major updates to the Spark platform as we move into Spark’s
next phase of growth. The results are summarized in an info graphic
available here: Spark Survey Results 2015 are now available
.
Thank you to everyone who participated in Spark Survey 2015 and for your
help in shaping Spark’s future!


Spark on YARN using Java 1.8 fails

2015-10-05 Thread mvle
Hi,

I have successfully run pyspark on Spark 1.5.1 on YARN 2.7.1 with Java
OpenJDK 1.7.
However, when I run the same test on Java OpenJDK 1.8 (or Oracle Java 1.8),
I cannot start up pyspark.
Has anyone been able to run Spark on YARN with Java 1.8?

I get ApplicationMaster disassociated messages...

15/10/05 09:55:05 INFO cluster.YarnClientSchedulerBackend: Application
application_1444055784612_0003 has started running.
15/10/05 09:55:05 INFO util.Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 53518.
15/10/05 09:55:05 INFO netty.NettyBlockTransferService: Server created on
53518
15/10/05 09:55:05 INFO storage.BlockManagerMaster: Trying to register
BlockManager
15/10/05 09:55:05 INFO storage.BlockManagerMasterEndpoint: Registering block
manager xxx.172.232.xx:53518 with 530.0 MB RAM, BlockManagerId(driver,
xxx.172.232.xx, 53518)
15/10/05 09:55:05 INFO storage.BlockManagerMaster: Registered BlockManager
15/10/05 09:55:05 INFO scheduler.EventLoggingListener: Logging events to
hdfs://h-all1-001:9000/user/hadoop/sparklogs/application_1444055784612_0003
15/10/05 09:55:07 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
ApplicationMaster has disassociated: xxx.172.232.xx:42628
15/10/05 09:55:07 WARN remote.ReliableDeliverySupervisor: Association with
remote system [akka.tcp://sparkyar...@xxx.172.232.xx:42628] has failed,
address is now gated for [5000] ms. Reason: [Disassociated]
15/10/05 09:55:07 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
ApplicationMaster has disassociated: xxx.172.232.xx:42628
15/10/05 09:55:09 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
ApplicationMaster registered as
AkkaRpcEndpointRef(Actor[akka.tcp://sparkyar...@xxx.172.232.xx:60077/user/YarnAM#-560267402])
15/10/05 09:55:09 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter.
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS
-> h-all1-001, PROXY_URI_BASES ->
http://h-all1-001:8088/proxy/application_1444055784612_0003),
/proxy/application_1444055784612_0003
15/10/05 09:55:09 INFO ui.JettyUtils: Adding filter:
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
15/10/05 09:55:13 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
ApplicationMaster has disassociated: xxx.172.232.xx:60077
15/10/05 09:55:13 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
ApplicationMaster has disassociated: xxx.172.232.xx:60077
15/10/05 09:55:13 WARN remote.ReliableDeliverySupervisor: Association with
remote system [akka.tcp://sparkyar...@xxx.172.232.xx:60077] has failed,
address is now gated for [5000] ms. Reason: [Disassociated]
15/10/05 09:55:13 ERROR cluster.YarnClientSchedulerBackend: Yarn application
has already exited with state FINISHED!
15/10/05 09:55:13 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}
15/10/05 09:55:13 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
15/10/05 09:55:13 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/api,null}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-YARN-using-Java-1-8-fails-tp24925.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org