Re: --packages configuration equivalent item name?

2016-04-04 Thread Saisai Shao
spark.jars.ivy, spark.jars.packages, spark.jars.excludes is the
configurations you can use.

Thanks
Saisai

On Sun, Apr 3, 2016 at 1:59 AM, Russell Jurney 
wrote:

> Thanks, Andy!
>
> On Mon, Mar 28, 2016 at 8:44 AM, Andy Davidson <
> a...@santacruzintegration.com> wrote:
>
>> Hi Russell
>>
>> I use Jupyter python notebooks a lot. Here is how I start the server
>>
>> set -x # turn debugging on
>>
>> #set +x # turn debugging off
>>
>>
>> # https://github.com/databricks/spark-csv
>>
>> # http://spark-packages.org/package/datastax/spark-cassandra-connector
>>
>> #
>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md
>>
>> #
>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md#pyspark-with-data-frames
>>
>>
>> # packages are ',' seperate with no white space
>>
>> extraPkgs="--packages
>> com.databricks:spark-csv_2.11:1.3.0,datastax:spark-cassandra-connector:1.6.0-M1-s_2.10"
>>
>>
>> export PYSPARK_PYTHON=python3
>>
>> export PYSPARK_DRIVER_PYTHON=python3
>>
>> IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark $extraPkgs --conf
>> spark.cassandra.connection.host=
>> ec2-54-153-102-232.us-west-1.compute.amazonaws.com $*
>>
>>
>>
>> From: Russell Jurney 
>> Date: Sunday, March 27, 2016 at 7:22 PM
>> To: "user @spark" 
>> Subject: --packages configuration equivalent item name?
>>
>> I run PySpark with CSV support like so: IPYTHON=1 pyspark --packages
>> com.databricks:spark-csv_2.10:1.4.0
>>
>> I don't want to type this --packages argument each time. Is there a
>> config item for --packages? I can't find one in the reference at
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> If there is no way to do this, please let me know so I can make a JIRA
>> for this feature.
>>
>> Thanks!
>> --
>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>
>>
>
>
> --
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>


Re: RDD Partitions not distributed evenly to executors

2016-04-04 Thread Mike Hynes
Dear all,

Thank you for your responses.

Michael Slavitch:
> Just to be sure:  Has spark-env.sh and spark-defaults.conf been correctly 
> propagated to all nodes?  Are they identical?
Yes; these files are stored on a shared memory directory accessible to
all nodes.

Koert Kuipers:
> we ran into similar issues and it seems related to the new memory
> management. can you try:
> spark.memory.useLegacyMode = true
I reran the exact same code with a restarted cluster using this
modification, and did not observe any difference. The partitioning is
still imbalanced.

Ted Yu:
> If the changes can be ported over to 1.6.1, do you mind reproducing the issue 
> there ?
Since the spark.memory.useLegacyMode setting did not impact my code
execution, I will have to change the Spark dependency back to earlier
versions to see if the issue persists and get back to you.

Meanwhile, if anyone else has any other ideas or experience, please let me know.

Mike

On 4/4/16, Koert Kuipers  wrote:
> we ran into similar issues and it seems related to the new memory
> management. can you try:
> spark.memory.useLegacyMode = true
>
> On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> [ CC'ing dev list since nearly identical questions have occurred in
>> user list recently w/o resolution;
>> c.f.:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>> ]
>>
>> Hello,
>>
>> In short, I'm reporting a problem concerning load imbalance of RDD
>> partitions across a standalone cluster. Though there are 16 cores
>> available per node, certain nodes will have >16 partitions, and some
>> will correspondingly have <16 (and even 0).
>>
>> In more detail: I am running some scalability/performance tests for
>> vector-type operations. The RDDs I'm considering are simple block
>> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>> are generated with a fixed number of elements given by some multiple
>> of the available cores, and subsequently hash-partitioned by their
>> integer block index.
>>
>> I have verified that the hash partitioning key distribution, as well
>> as the keys themselves, are both correct; the problem is truly that
>> the partitions are *not* evenly distributed across the nodes.
>>
>> For instance, here is a representative output for some stages and
>> tasks in an iterative program. This is a very simple test with 2
>> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>> examples stages from the stderr log are stages 7 and 9:
>> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
>> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>>
>> When counting the location of the partitions on the compute nodes from
>> the stderr logs, however, you can clearly see the imbalance. Examples
>> lines are:
>> 13627 task 0.0 in stage 7.0 (TID 196,
>> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>> 13628 task 1.0 in stage 7.0 (TID 197,
>> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>> 13629 task 2.0 in stage 7.0 (TID 198,
>> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>>
>> Grep'ing the full set of above lines for each hostname, himrod-?,
>> shows the problem occurs in each stage. Below is the output, where the
>> number of partitions stored on each node is given alongside its
>> hostname as in (himrod-?,num_partitions):
>> Stage 7: (himrod-1,0) (himrod-2,64)
>> Stage 9: (himrod-1,16) (himrod-2,48)
>> Stage 12: (himrod-1,0) (himrod-2,64)
>> Stage 14: (himrod-1,16) (himrod-2,48)
>> The imbalance is also visible when the executor ID is used to count
>> the partitions operated on by executors.
>>
>> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
>> (but the modifications do not touch the scheduler, and are irrelevant
>> for these particular tests). Has something changed radically in 1.6+
>> that would make a previously (<=1.5) correct configuration go haywire?
>> Have new configuration settings been added of which I'm unaware that
>> could lead to this problem?
>>
>> Please let me know if others in the community have observed this, and
>> thank you for your time,
>> Mike
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Thanks,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Standard Deviation in Hive 2 is still incorrect

2016-04-04 Thread Mich Talebzadeh
Hi,

I reported back in April 2015 that what Hive calls Standard Deviation
Function  STDDEV is a pointer to STDDEV_POP. This is incorrect and has not
been rectified in Hive 2

Both Oracle and Sybase point STDDEV to STDDEV_SAMP not STDDEV_POP. Also I
did tests with Spark 1.6 as well and Spark correctly points STTDEV to
STDDEV_SAMP.

The following query was used

SELECT

SQRT((SUM(POWER(AMOUNT_SOLD,2))-(COUNT(1)*POWER(AVG(AMOUNT_SOLD),2)))/(COUNT(1)-1))
AS MYSTDDEV,
STDDEV(amount_sold) AS STDDEV,
STDDEV_SAMP(amount_sold) AS STDDEV_SAMP,
STDDEV_POP(amount_sold) AS STDDEV_POP
fromsales;

The following is from running the above query on Hive where STDDEV -->
 STDDEV_POP which is incorrect

++-++-+--+
|  mystddev  |   stddev|stddev_samp |
stddev_pop  |
++-++-+--+
| 260.7270919450411  | 260.72704617040444  | 260.7270722861465  |
260.72704617040444
|
++-++-+--+

The following is from Spark-sql where STDDEV -->  STDDEV_SAMP which is
correct

++-++-+--+
|  mystddev  |   stddev|stddev_samp |
stddev_pop  |
++-++-+--+
| 260.7270919450411  | 260.7270722861637   | 260.7270722861637  |
260.72704617042166  |
++-++-+--+

Hopefully The Hive one will be corrected.

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: spark-shell failing but pyspark works

2016-04-04 Thread Cyril Scetbon
The only way I've found to make it work now is by using the current spark 
context and changing its configuration using spark-shell options. Which is 
really different from pyspark where you can't instantiate a new one, initialize 
it etc..
> On Apr 4, 2016, at 18:16, Cyril Scetbon  wrote:
> 
> It doesn't as you can see : http://pastebin.com/nKcMCtGb 
> 
> 
> I don't need to set the master as I'm using Yarn and I'm on one of the yarn 
> nodes. When I instantiate the Spark Streaming Context with the spark conf, it 
> tries to create a new Spark Context but even with 
> .set("spark.driver.allowMultipleContexts", "true") it doesn't work and 
> complains at line 956 that the Spark Context created by spark-shell was not 
> initialized with allowMultipleContexts ...
> 
> 
>> On Apr 4, 2016, at 16:29, Mich Talebzadeh > > wrote:
>> 
>> Hi Cyril,
>> 
>> You can connect to Spark shell from any node. The connection is made to 
>> master through --master IP Address like below:
>> 
>> spark-shell --master spark://50.140.197.217:7077 
>> 
>> 
>> Now in the Scala code you can specify something like below:
>> 
>> val sparkConf = new SparkConf().
>>  setAppName("StreamTest").
>>  setMaster("local").
>>  set("spark.cores.max", "2").
>>  set("spark.driver.allowMultipleContexts", "true").
>>  set("spark.hadoop.validateOutputSpecs", "false")
>> 
>> And that will work
>> 
>> Have you tried it?
>> 
>> HTH
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>>  
>> http://talebzadehmich.wordpress.com 
>>  
>> 
>> On 4 April 2016 at 21:11, Cyril Scetbon > > wrote:
>> I suppose it doesn't work using spark-shell too ? If you can confirm
>> 
>> Thanks
>> 
>>> On Apr 3, 2016, at 03:39, Mich Talebzadeh >> > wrote:
>>> 
>>> This works fine for me
>>> 
>>> val sparkConf = new SparkConf().
>>>  setAppName("StreamTest").
>>>  setMaster("yarn-client").
>>>  set("spark.cores.max", "12").
>>>  set("spark.driver.allowMultipleContexts", "true").
>>>  set("spark.hadoop.validateOutputSpecs", "false")
>>> 
>>> Time: 1459669805000 ms
>>> ---
>>> ---
>>> Time: 145966986 ms
>>> ---
>>> (Sun Apr 3 08:35:01 BST 2016  === Sending messages from rhes5)
>>> 
>>> 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> 
>>>  
>>> http://talebzadehmich.wordpress.com 
>>>  
>>> 
>>> On 3 April 2016 at 03:34, Cyril Scetbon >> > wrote:
>>> Nobody has any idea ?
>>> 
>>> > On Mar 31, 2016, at 23:22, Cyril Scetbon >> > > wrote:
>>> >
>>> > Hi,
>>> >
>>> > I'm having issues to create a StreamingContext with Scala using 
>>> > spark-shell. It tries to access the localhost interface and the 
>>> > Application Master is not running on this interface :
>>> >
>>> > ERROR ApplicationMaster: Failed to connect to driver at localhost:47257, 
>>> > retrying ...
>>> >
>>> > I don't have the issue with Python and pyspark which works fine (you can 
>>> > see it uses the ip address) :
>>> >
>>> > ApplicationMaster: Driver now available: 192.168.10.100:43290 
>>> > 
>>> >
>>> > I use similar codes though :
>>> >
>>> > test.scala :
>>> > --
>>> >
>>> > import org.apache.spark._
>>> > import org.apache.spark.streaming._
>>> > val app = "test-scala"
>>> > val conf = new SparkConf().setAppName(app).setMaster("yarn-client")
>>> > val ssc = new StreamingContext(conf, Seconds(3))
>>> >
>>> > command used : spark-shell -i test.scala
>>> >
>>> > test.py :
>>> > ---
>>> >
>>> > from pyspark import SparkConf, SparkContext
>>> > from pyspark.streaming import StreamingContext
>>> > app = "test-python"
>>> > conf = SparkConf().setAppName(app).setMaster("yarn-client")
>>> > sc = SparkContext(conf=conf)
>>> > ssc = StreamingContext(sc, 3)
>>> >
>>> > command used : pyspark test.py
>>> >
>>> > Any idea why scala can't instantiate it ? I thought python was barely 
>>> > using scala under the hood, but it seems there are differences. Are there 
>>> > any parameters set using Scala but not Python ?
>>> >
>>> > Thanks
>>> > --

Re: Spark Streaming - NotSerializableException: Methods & Closures:

2016-04-04 Thread Ted Yu
bq. I'm on version 2.10 for spark

The above is Scala version.
Can you give us the Spark version ?

Thanks

On Mon, Apr 4, 2016 at 2:36 PM, mpawashe  wrote:

> Hi all,
>
> I am using Spark Streaming API (I'm on version 2.10 for spark and
> streaming), and I am running into a function serialization issue that I do
> not run into when using Spark in batch (non-streaming) mode.
>
> If I wrote code like this:
>
> def run(): Unit = {
> val newStream = stream.map(x => { x + " foo " })
> // ...
> }
>
> everything works fine.. But if I try it like this:
>
> def transform(x: String): String = { x + " foo " }
>
> def run(): Unit = {
> val newStream = stream.map(transform)
> // ...
> }
>
> ..the program fails being unable to serialize the closure (which when
> passing a method to a function that expects a closure, it should be
> auto-converted to my understanding).
>
> However it works fine if I declare a closure inside run() and use that like
> so:
>
> val transform = (x: String) => { x + " foo " }
>
> If it's declared outside of run(), however, it will also crash.
>
> This is an example stack trace of the error I'm running into. This can be a
> hassle to debug so I hope I wouldn't have to get around this by having to
> use a local closure/function every time. Thanks for any help in advance.
>
> org.apache.spark.SparkException: Task not serializable
> at
>
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
> at
>
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
> at
>
> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:266)
> at
> org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527)
> at com.my.cool.app.MyClass.run(MyClass.scala:90)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> at java.lang.reflect.Method.invoke(Unknown Source)
> at
> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
> at
> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> Caused by: java.io.NotSerializableException: Graph is unexpectedly null
> when
> DStream is being serialized.
> Serialization stack:
>
> at
>
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> at
>
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
> at
>
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
> at
>
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
> ... 20 more
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark-shell failing but pyspark works

2016-04-04 Thread Cyril Scetbon
It doesn't as you can see : http://pastebin.com/nKcMCtGb

I don't need to set the master as I'm using Yarn and I'm on one of the yarn 
nodes. When I instantiate the Spark Streaming Context with the spark conf, it 
tries to create a new Spark Context but even with 
.set("spark.driver.allowMultipleContexts", "true") it doesn't work and 
complains at line 956 that the Spark Context created by spark-shell was not 
initialized with allowMultipleContexts ...


> On Apr 4, 2016, at 16:29, Mich Talebzadeh  wrote:
> 
> Hi Cyril,
> 
> You can connect to Spark shell from any node. The connection is made to 
> master through --master IP Address like below:
> 
> spark-shell --master spark://50.140.197.217:7077 
> 
> Now in the Scala code you can specify something like below:
> 
> val sparkConf = new SparkConf().
>  setAppName("StreamTest").
>  setMaster("local").
>  set("spark.cores.max", "2").
>  set("spark.driver.allowMultipleContexts", "true").
>  set("spark.hadoop.validateOutputSpecs", "false")
> 
> And that will work
> 
> Have you tried it?
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 4 April 2016 at 21:11, Cyril Scetbon  > wrote:
> I suppose it doesn't work using spark-shell too ? If you can confirm
> 
> Thanks
> 
>> On Apr 3, 2016, at 03:39, Mich Talebzadeh > > wrote:
>> 
>> This works fine for me
>> 
>> val sparkConf = new SparkConf().
>>  setAppName("StreamTest").
>>  setMaster("yarn-client").
>>  set("spark.cores.max", "12").
>>  set("spark.driver.allowMultipleContexts", "true").
>>  set("spark.hadoop.validateOutputSpecs", "false")
>> 
>> Time: 1459669805000 ms
>> ---
>> ---
>> Time: 145966986 ms
>> ---
>> (Sun Apr 3 08:35:01 BST 2016  === Sending messages from rhes5)
>> 
>> 
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>>  
>> http://talebzadehmich.wordpress.com 
>>  
>> 
>> On 3 April 2016 at 03:34, Cyril Scetbon > > wrote:
>> Nobody has any idea ?
>> 
>> > On Mar 31, 2016, at 23:22, Cyril Scetbon > > > wrote:
>> >
>> > Hi,
>> >
>> > I'm having issues to create a StreamingContext with Scala using 
>> > spark-shell. It tries to access the localhost interface and the 
>> > Application Master is not running on this interface :
>> >
>> > ERROR ApplicationMaster: Failed to connect to driver at localhost:47257, 
>> > retrying ...
>> >
>> > I don't have the issue with Python and pyspark which works fine (you can 
>> > see it uses the ip address) :
>> >
>> > ApplicationMaster: Driver now available: 192.168.10.100:43290 
>> > 
>> >
>> > I use similar codes though :
>> >
>> > test.scala :
>> > --
>> >
>> > import org.apache.spark._
>> > import org.apache.spark.streaming._
>> > val app = "test-scala"
>> > val conf = new SparkConf().setAppName(app).setMaster("yarn-client")
>> > val ssc = new StreamingContext(conf, Seconds(3))
>> >
>> > command used : spark-shell -i test.scala
>> >
>> > test.py :
>> > ---
>> >
>> > from pyspark import SparkConf, SparkContext
>> > from pyspark.streaming import StreamingContext
>> > app = "test-python"
>> > conf = SparkConf().setAppName(app).setMaster("yarn-client")
>> > sc = SparkContext(conf=conf)
>> > ssc = StreamingContext(sc, 3)
>> >
>> > command used : pyspark test.py
>> >
>> > Any idea why scala can't instantiate it ? I thought python was barely 
>> > using scala under the hood, but it seems there are differences. Are there 
>> > any parameters set using Scala but not Python ?
>> >
>> > Thanks
>> > --
>> > Cyril SCETBON
>> >
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> > 
>> > For additional commands, e-mail: user-h...@spark.apache.org 
>> > 
>> >
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> 
>> For additional 

Spark Streaming and Performance

2016-04-04 Thread Mich Talebzadeh
Hi,

Having started using Spark Streaming with Kafka it seems that it offers a
number of good opportunities. I am considering using for Complex Event
Processing (CEP) by building CEP adaptors and Spark transformers.

Anyway before going there I would like to see if anyone has done benchmarks
on Spark Streaming specifically with regard to volume and velocity of data
being streamed in.

To clarify one of the parameters that I found out is the calibration of
Spark Streaming by setting StreamingContext(sparkConf, Seconds(nn)) where
nn is the number of seconds to an optimum value based on the frequency data
streaming in. For example if the data is streaming in at the rate one tick
every 60 seconds would you put nn = 55 seconds?

Also what are dependencies on the volume of data coming in. Are they
certain parameters to scale the performance of Spark Streaming and how
caching i.e.

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)

messages.cache()

is going to improve the performance?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Spark Streaming - NotSerializableException: Methods & Closures:

2016-04-04 Thread mpawashe
Hi all,

I am using Spark Streaming API (I'm on version 2.10 for spark and
streaming), and I am running into a function serialization issue that I do
not run into when using Spark in batch (non-streaming) mode.

If I wrote code like this:

def run(): Unit = {
val newStream = stream.map(x => { x + " foo " })
// ...
}

everything works fine.. But if I try it like this:

def transform(x: String): String = { x + " foo " }

def run(): Unit = {
val newStream = stream.map(transform)
// ...
}

..the program fails being unable to serialize the closure (which when
passing a method to a function that expects a closure, it should be
auto-converted to my understanding).

However it works fine if I declare a closure inside run() and use that like
so:

val transform = (x: String) => { x + " foo " }

If it's declared outside of run(), however, it will also crash.

This is an example stack trace of the error I'm running into. This can be a
hassle to debug so I hope I wouldn't have to get around this by having to
use a local closure/function every time. Thanks for any help in advance.

org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
at
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:266)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527)
at com.my.cool.app.MyClass.run(MyClass.scala:90)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at 
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.io.NotSerializableException: Graph is unexpectedly null when
DStream is being serialized.
Serialization stack:

at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 20 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark unable to convert dataframe column to a vector: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

2016-04-04 Thread Andy Davidson
Hi Jeff

Sorry I did not respond sooner. I was out of town

Here is the code I use to initialize the HiveContext

# load data set
from pyspark.sql import HiveContext #,SQLContext, Row

# window functions require HiveContext (spark 2.x will not require hive)
#sqlContext = SQLContext(sc)
hiveSqlContext = HiveContext(sc)


Here is the complete stack trace. Could the problem be due to the size of
numDimensions?

numDimensions = 713912692155621377

The indices are sorted, not sure why this exception is raised

p/pyspark/mllib/linalg/__init__.py", line 531, in __init__
raise TypeError("indices array must be sorted")
TypeError: indices array must be sorted


import numpy as np
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import VectorUDT
​
#sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# = 3 = size
# [0,1] int indices
#[1.0, 3.0] values
​
​
"""
root
 |-- id: string (nullable = true)
 |-- follows: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- id: long (nullable = false)
 |||-- screenName: string (nullable = false)
​
"""
​
def toSparseVector(pojoList) :
indices = []
for pojo in pojoList :
indices.append(pojo.id)

sortedIndices = sorted(indices)
logical = np.ones(len(sortedIndices))
vec = Vectors.sparse(numDimensions, sortedIndices,  logical)
return vec

#myUDF = udf(lambda pojoList: labelStr if (labelStr == "noise") else
"injury", StringType())
​
newColName = "features"
myUDF = udf(toSparseVector, VectorUDT())
featuresDF = df.withColumn(newColName, myUDF(df["follows"]))
In [16]:
featuresDF.printSchema()
featuresDF.show()
root
 |-- id: string (nullable = true)
 |-- follows: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- id: long (nullable = false)
 |||-- screenName: string (nullable = false)
 |-- features: vector (nullable = true)

---
Py4JJavaError Traceback (most recent call last)
 in ()
  1 featuresDF.printSchema()
> 2 featuresDF.show()

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/datafr
ame.py in show(self, n, truncate)
255 +---+-+
256 """
--> 257 print(self._jdf.showString(n, truncate))
258 
259 def __repr__(self):

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.z
ip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814 
815 for temp_arg in temp_args:

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.
py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.z
ip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id,
name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling o128.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 16.0 failed 1 times, most recent failure: Lost task 0.0 in stage
16.0 (TID 219, localhost): org.apache.spark.api.python.PythonException:
Traceback (most recent call last):
  File 
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/p
yspark/worker.py", line 111, in main
process()
  File 
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/p
yspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File 
"/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/
pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File 
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/funct
ions.py", line 1563, in 
func = lambda _, it: map(lambda x: returnType.toInternal(f(*x)), it)
  File "", line 28, in toSparseVector
  File 
"/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/
pyspark.zip/pyspark/mllib/linalg/__init__.py", line 827, in sparse
return SparseVector(size, *args)
  File 
"/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/
pyspark.zip/pyspark/mllib/linalg/__init__.py", line 531, in __init__
raise TypeError("indices array must be sorted")
TypeError: indices array must be sorted

at 

Re: RDDs caching in typical machine learning use cases

2016-04-04 Thread Eugene Morozov
Hi,

Yes, I believe people do that. I also believe that SparkML is able to
figure out when to cache some internal RDD also. That's definitely true for
random forest algo. It doesn't harm to cache the same RDD twice, too.

But it's not clear what'd you want to know...

--
Be well!
Jean Morozov

On Sun, Apr 3, 2016 at 11:34 AM, Sergey  wrote:

> Hi Spark ML experts!
>
> Do you use RDDs caching somewhere together with ML lib to speed up
> calculation?
> I mean typical machine learning use cases.
> Train-test split, train, evaluate, apply model.
>
> Sergey.
>


Re: spark-shell failing but pyspark works

2016-04-04 Thread Mich Talebzadeh
Hi Cyril,

You can connect to Spark shell from any node. The connection is made to
master through --master IP Address like below:

spark-shell --master spark://50.140.197.217:7077

Now in the Scala code you can specify something like below:

val sparkConf = new SparkConf().
 setAppName("StreamTest").
 setMaster("local").
 set("spark.cores.max", "2").
 set("spark.driver.allowMultipleContexts", "true").
 set("spark.hadoop.validateOutputSpecs", "false")

And that will work

Have you tried it?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 4 April 2016 at 21:11, Cyril Scetbon  wrote:

> I suppose it doesn't work using spark-shell too ? If you can confirm
>
> Thanks
>
> On Apr 3, 2016, at 03:39, Mich Talebzadeh 
> wrote:
>
> This works fine for me
>
> val sparkConf = new SparkConf().
>  setAppName("StreamTest").
>  setMaster("yarn-client").
>  set("spark.cores.max", "12").
>  set("spark.driver.allowMultipleContexts", "true").
>  set("spark.hadoop.validateOutputSpecs", "false")
>
> Time: 1459669805000 ms
> ---
> ---
> Time: 145966986 ms
> ---
> (Sun Apr 3 08:35:01 BST 2016  === Sending messages from rhes5)
>
>
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 3 April 2016 at 03:34, Cyril Scetbon  wrote:
>
>> Nobody has any idea ?
>>
>> > On Mar 31, 2016, at 23:22, Cyril Scetbon  wrote:
>> >
>> > Hi,
>> >
>> > I'm having issues to create a StreamingContext with Scala using
>> spark-shell. It tries to access the localhost interface and the Application
>> Master is not running on this interface :
>> >
>> > ERROR ApplicationMaster: Failed to connect to driver at
>> localhost:47257, retrying ...
>> >
>> > I don't have the issue with Python and pyspark which works fine (you
>> can see it uses the ip address) :
>> >
>> > ApplicationMaster: Driver now available: 192.168.10.100:43290
>> >
>> > I use similar codes though :
>> >
>> > test.scala :
>> > --
>> >
>> > import org.apache.spark._
>> > import org.apache.spark.streaming._
>> > val app = "test-scala"
>> > val conf = new SparkConf().setAppName(app).setMaster("yarn-client")
>> > val ssc = new StreamingContext(conf, Seconds(3))
>> >
>> > command used : spark-shell -i test.scala
>> >
>> > test.py :
>> > ---
>> >
>> > from pyspark import SparkConf, SparkContext
>> > from pyspark.streaming import StreamingContext
>> > app = "test-python"
>> > conf = SparkConf().setAppName(app).setMaster("yarn-client")
>> > sc = SparkContext(conf=conf)
>> > ssc = StreamingContext(sc, 3)
>> >
>> > command used : pyspark test.py
>> >
>> > Any idea why scala can't instantiate it ? I thought python was barely
>> using scala under the hood, but it seems there are differences. Are there
>> any parameters set using Scala but not Python ?
>> >
>> > Thanks
>> > --
>> > Cyril SCETBON
>> >
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: spark-shell failing but pyspark works

2016-04-04 Thread Cyril Scetbon
I suppose it doesn't work using spark-shell too ? If you can confirm

Thanks
> On Apr 3, 2016, at 03:39, Mich Talebzadeh  wrote:
> 
> This works fine for me
> 
> val sparkConf = new SparkConf().
>  setAppName("StreamTest").
>  setMaster("yarn-client").
>  set("spark.cores.max", "12").
>  set("spark.driver.allowMultipleContexts", "true").
>  set("spark.hadoop.validateOutputSpecs", "false")
> 
> Time: 1459669805000 ms
> ---
> ---
> Time: 145966986 ms
> ---
> (Sun Apr 3 08:35:01 BST 2016  === Sending messages from rhes5)
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 3 April 2016 at 03:34, Cyril Scetbon  > wrote:
> Nobody has any idea ?
> 
> > On Mar 31, 2016, at 23:22, Cyril Scetbon  > > wrote:
> >
> > Hi,
> >
> > I'm having issues to create a StreamingContext with Scala using 
> > spark-shell. It tries to access the localhost interface and the Application 
> > Master is not running on this interface :
> >
> > ERROR ApplicationMaster: Failed to connect to driver at localhost:47257, 
> > retrying ...
> >
> > I don't have the issue with Python and pyspark which works fine (you can 
> > see it uses the ip address) :
> >
> > ApplicationMaster: Driver now available: 192.168.10.100:43290 
> > 
> >
> > I use similar codes though :
> >
> > test.scala :
> > --
> >
> > import org.apache.spark._
> > import org.apache.spark.streaming._
> > val app = "test-scala"
> > val conf = new SparkConf().setAppName(app).setMaster("yarn-client")
> > val ssc = new StreamingContext(conf, Seconds(3))
> >
> > command used : spark-shell -i test.scala
> >
> > test.py :
> > ---
> >
> > from pyspark import SparkConf, SparkContext
> > from pyspark.streaming import StreamingContext
> > app = "test-python"
> > conf = SparkConf().setAppName(app).setMaster("yarn-client")
> > sc = SparkContext(conf=conf)
> > ssc = StreamingContext(sc, 3)
> >
> > command used : pyspark test.py
> >
> > Any idea why scala can't instantiate it ? I thought python was barely using 
> > scala under the hood, but it seems there are differences. Are there any 
> > parameters set using Scala but not Python ?
> >
> > Thanks
> > --
> > Cyril SCETBON
> >
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> > 
> > For additional commands, e-mail: user-h...@spark.apache.org 
> > 
> >
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: SPARK-13900 - Join with simple OR conditions take too long

2016-04-04 Thread Mich Talebzadeh
Actually this may not be a bug. It just the Optimizer decides to do a
nested loop join over Hash Join when more that two OR joins are involved

With one equality predicate Hash JOin is chosen

4> SELECT COUNT(SALES.PROD_ID) from SALES, SALES2
5> WHERE SALES.CUST_ID = SALES2.CUST_ID
6> go
QUERY PLAN FOR STATEMENT 1 (at line 4).
Optimized using Parallel Mode

STEP 1
The type of query is SELECT.
4 operator(s) under root
   |ROOT:EMIT Operator (VA = 4)
   |
   |   |SCALAR AGGREGATE Operator (VA = 3)
   |   |  Evaluate Ungrouped COUNT AGGREGATE.
   |   |
   |   |   |*HASH JOIN Operator* (VA = 2) (Join Type: Inner Join)
   |   |   | Using Worktable1 for internal storage.
   |   |   |  Key Count: 1
   |   |   |
   |   |   |   |SCAN Operator (VA = 0)
   |   |   |   |  FROM TABLE
   |   |   |   |  SALES2
   |   |   |   |  Table Scan.
   |   |   |   |  Forward Scan.
   |   |   |   |  Positioning at start of table.
   |   |   |   |  Using I/O Size 64 Kbytes for data pages.
   |   |   |   |  With LRU Buffer Replacement Strategy for data pages.
   |   |   |
   |   |   |   |SCAN Operator (VA = 1)
   |   |   |   |  FROM TABLE
   |   |   |   |  SALES
   |   |   |   |  Table Scan.
   |   |   |   |  Forward Scan.
   |   |   |   |  Positioning at start of table.
   |   |   |   |  Using I/O Size 64 Kbytes for data pages.
   |   |   |   |  With MRU Buffer Replacement Strategy for data pages.

Total estimated I/O cost for statement 1 (at line 4): 783206.

Now if I chose two predicates it reverts to Nested Loop Join

4> SELECT COUNT(SALES.PROD_ID) from SALES, SALES2
5> WHERE SALES.CUST_ID = SALES2.CUST_ID
6> OR
7> SALES.TIME_ID = SALES2.TIME_ID
8> go

STEP 1
The type of query is SET OPTION ON.
Total estimated I/O cost for statement 3 (at line 3): 0.

QUERY PLAN FOR STATEMENT 4 (at line 4).
Optimized using Parallel Mode

STEP 1
The type of query is SELECT.
5 operator(s) under root
   |ROOT:EMIT Operator (VA = 5)
   |
   |   |SCALAR AGGREGATE Operator (VA = 4)
   |   |  Evaluate Ungrouped COUNT AGGREGATE.
   |   |
   |   |   |*NESTED LOOP JOIN* Operator (VA = 3) (Join Type: Inner Join)
   |   |   |
   |   |   |   |SCAN Operator (VA = 0)
   |   |   |   |  FROM TABLE
   |   |   |   |  SALES
   |   |   |   |  Table Scan.
   |   |   |   |  Forward Scan.
   |   |   |   |  Positioning at start of table.
   |   |   |   |  Using I/O Size 64 Kbytes for data pages.
   |   |   |   |  With MRU Buffer Replacement Strategy for data pages.
   |   |   |
   |   |   |   |RESTRICT Operator (VA = 2)(0)(0)(0)(7)(0)
   |   |   |   |
   |   |   |   |   |SCAN Operator (VA = 1)
   |   |   |   |   |  FROM TABLE
   |   |   |   |   |  SALES2
   |   |   |   |   |  Table Scan.
   |   |   |   |   |  Forward Scan.
   |   |   |   |   |  Positioning at start of table.
   |   |   |   |   |  Using I/O Size 64 Kbytes for data pages.
   |   |   |   |   |  With LRU Buffer Replacement Strategy for data
pages.

Total estimated I/O cost for statement 4 (at line 4): 2147483647.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 13:19, ashokkumar rajendran <
ashokkumar.rajend...@gmail.com> wrote:

> I agree with Hemant's comment. But it does not give good results for
> simple usecases like 2 OR conditions. Ultimately we need good results from
> Spark for end users. shall we consider this as a request to support SQL
> hints then? Is there any plan to support SQL hint in Spark in upcoming
> release?
>
> Regards
> Ashok
>
> On Fri, Apr 1, 2016 at 5:04 PM, Robin East  wrote:
>
>> Yes and even today CBO (e.g. in Oracle) will still require hints in some
>> cases so I think it is more like:
>>
>> RBO -> RBO + Hints -> CBO + Hints. Most relational databases meet
>> significant numbers of corner cases where CBO plans simply don’t do what
>> you would want. I don’t know enough about Spark SQL to comment on whether
>> the same problems would afflict Spark.
>>
>>
>>
>>
>> On 31 Mar 2016, at 15:54, Yong Zhang  wrote:
>>
>> I agree that there won't be a generic solution for these kind of cases.
>>
>> Without the CBO from Spark or Hadoop ecosystem in short future, maybe
>> Spark DataFrame/SQL should support more hints from the end user, as in
>> these cases, end users will be smart enough to tell the engine what is the
>> correct way to do.
>>
>> Weren't the relational DBs doing exactly same path? RBO -> RBO + Hints ->
>> CBO?
>>
>> Yong
>>
>> --
>> Date: Thu, 31 Mar 2016 16:07:14 +0530
>> Subject: Re: SPARK-13900 - Join with simple OR conditions 

Re: Support for time column type?

2016-04-04 Thread Philip Weaver
Hmm, yeah it looks like I could use that to represent time since start of
day. I'm porting existing large SQL queries from Postgres to Spark SQL for
a quickPOC, so I'd prefer not to have to make many changes to it. I'm not
sure if the CalendarIntervalType can be used as a drop-in replacement (i.e.
if all the same operators are defined for it), but I'll give it a try.

- Philip


On Fri, Apr 1, 2016 at 1:33 PM, Michael Armbrust 
wrote:

> There is also CalendarIntervalType.  Is that what you are looking for?
>
> On Fri, Apr 1, 2016 at 1:11 PM, Philip Weaver 
> wrote:
>
>> Hi, I don't see any mention of a time type in the documentation (there is
>> DateType and TimestampType, but not TimeType), and have been unable to find
>> any documentation about whether this will be supported in the future. Does
>> anyone know if this is currently supported or will be supported in the
>> future?
>>
>
>


Re: All inclusive uber-jar

2016-04-04 Thread Haroon Rasheed
To add to Mich, I put the build.sb under the Myproject root folder :
MyProject/buildt.sbt
and the assembly.sbt is placed in the folder called "project" under the
MyProject folder:
MyProject/project/assembly.sbt

also the the first line in build.sbt is to import the assembly keys as
below:

import AssemblyKeys._

assemblySettings

name := "tripdata"

version := "1.0"

scalaVersion := "2.10.4"

resolvers += "Typesafe Repository" at "
http://repo.typesafe.com/typesafe/releases/;

libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0.2"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" %
"provided"

libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.3.0"

libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.3.0" %
"provided"

scalacOptions += "-deprecation"

// The following is the class that will run when the jar is compiled!

mainClass in assembly := Some("TripData")

Run the command from MyProject folder as below:
MyProject $sbt assembly


Regards,
Haroon Syed.

On 4 April 2016 at 11:42, Mich Talebzadeh  wrote:

> Thanks all
>
> Actually I have a generic shell script that for a given Scala program
> creates jar file using Maven or SBT.
>
> I modified that one to create a uber jar file as well using SBT assembly.
> The root directory structure in uber has one more sub-directory.
>
> In addition SBT relies on a single sbt file. Maven relies on pom.xml. In
> contrast SBT assembly requires a build.sbt file plus assembly,sbt file
>
> The assembly sbt file asssenly.sbt simply has one liner
>
> addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
>
> The buid.sbt has the following in my case (a generic one)
>
> lazy val root = (project in file(".")).
>   settings(
> name := "${APPLICATION}",
> version := "1.0",
> scalaVersion := "2.10.4",
> mainClass in Compile := Some("myPackage.${APPLICATION}")
>   )
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
> "provided"
> libraryDependencies += "junit" % "junit" % "4.12"
> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
> "1.6.1"
> // META-INF discarding
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>{
> case PathList("META-INF", xs @ _*) => MergeStrategy.discard
> case x => MergeStrategy.first
>}
> }
>
> So if I want to create an sbt assembly jar file and assuming that the
> Scala file is called TestStream_assembly then I just run the shell script
> passing the Application 'A' and type 'T' to the shell
>
> ./generic.ksh -A TestStream_assembly -T assembly
>
> For simple sbt that will be
>
> ./generic.ksh -A TestStream -T sbt
>
> and for maven it will be
>
> ./generic.ksh -A TestStream -T mvn
>
>
> Cheers,
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 4 April 2016 at 16:30, vetal king  wrote:
>
>>
>> -- Forwarded message --
>> From: vetal king 
>> Date: Mon, Apr 4, 2016 at 8:59 PM
>> Subject: Re: All inclusive uber-jar
>> To: Mich Talebzadeh 
>>
>>
>> Not sure how to create uber jar using sbt, but this is how you can do it
>> using maven.
>>
>> 
>>  
>>   
>> org.apache.maven.plugins
>> maven-shade-plugin
>> 2.4.3
>> 
>>   
>> package
>> 
>>   shade
>> 
>> 
>>   
>> 
>> **:**
>> 
>>   
>> 
>>   
>> 
>>   
>>
>> 
>>
>> But instead of creating a uber jar, consider providing maven coordinates
>> with the help of sark-submit's --packages and --repositories options
>>
>>
>>
>> On Mon, Apr 4, 2016 at 7:06 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> When one builds a project for Spark in this case Spark streaming with
>>> SBT, as usual I add dependencies as follows:
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>>> "1.6.1"
>>>
>>> However when I submit it through spark-submit I need to put the package
>>> containing KafkaUtils the same way I do it in spark-shell
>>>
>>> ${SPARK_HOME}/bin/spark-submit \
>>>  --jars

Re: All inclusive uber-jar

2016-04-04 Thread Mich Talebzadeh
Thanks all

Actually I have a generic shell script that for a given Scala program
creates jar file using Maven or SBT.

I modified that one to create a uber jar file as well using SBT assembly.
The root directory structure in uber has one more sub-directory.

In addition SBT relies on a single sbt file. Maven relies on pom.xml. In
contrast SBT assembly requires a build.sbt file plus assembly,sbt file

The assembly sbt file asssenly.sbt simply has one liner

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

The buid.sbt has the following in my case (a generic one)

lazy val root = (project in file(".")).
  settings(
name := "${APPLICATION}",
version := "1.0",
scalaVersion := "2.10.4",
mainClass in Compile := Some("myPackage.${APPLICATION}")
  )
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
"provided"
libraryDependencies += "junit" % "junit" % "4.12"
libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
"1.6.1"
// META-INF discarding
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
   {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
   }
}

So if I want to create an sbt assembly jar file and assuming that the Scala
file is called TestStream_assembly then I just run the shell script passing
the Application 'A' and type 'T' to the shell

./generic.ksh -A TestStream_assembly -T assembly

For simple sbt that will be

./generic.ksh -A TestStream -T sbt

and for maven it will be

./generic.ksh -A TestStream -T mvn


Cheers,



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 4 April 2016 at 16:30, vetal king  wrote:

>
> -- Forwarded message --
> From: vetal king 
> Date: Mon, Apr 4, 2016 at 8:59 PM
> Subject: Re: All inclusive uber-jar
> To: Mich Talebzadeh 
>
>
> Not sure how to create uber jar using sbt, but this is how you can do it
> using maven.
>
> 
>  
>   
> org.apache.maven.plugins
> maven-shade-plugin
> 2.4.3
> 
>   
> package
> 
>   shade
> 
> 
>   
> 
> **:**
> 
>   
> 
>   
> 
>   
>
> 
>
> But instead of creating a uber jar, consider providing maven coordinates
> with the help of sark-submit's --packages and --repositories options
>
>
>
> On Mon, Apr 4, 2016 at 7:06 PM, Mich Talebzadeh  > wrote:
>
>> Hi,
>>
>>
>> When one builds a project for Spark in this case Spark streaming with
>> SBT, as usual I add dependencies as follows:
>>
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>> "1.6.1"
>>
>> However when I submit it through spark-submit I need to put the package
>> containing KafkaUtils the same way I do it in spark-shell
>>
>> ${SPARK_HOME}/bin/spark-submit \
>>  --jars
>> /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
>> .
>>
>> Now if I want to distribute this as all-in-one package so that it can be
>> run from any node, I have been told  that I need to create an uber-jar. I
>> have not done this before so I assume an uber-jar will be totally self
>> contained with all the classes etc.
>>
>> Can someone elaborate on this please?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>
>
>


Spark SQL(Hive query through HiveContext) always creating 31 partitions

2016-04-04 Thread nitinkak001
I am running hive queries using HiveContext from my Spark code. No matter
which query I run and how much data it is, it always generates 31
partitions. Anybody knows the reason? Is there a predefined/configurable
setting for it? I essentially need more partitions.

I using this code snippet to execute hive query:

/var pairedRDD = hqlContext.sql(hql).rdd.map(...)/

Thanks,
Nitin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Hive-query-through-HiveContext-always-creating-31-partitions-tp26671.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkDriver throwing java.lang.OutOfMemoryError: Java heap space

2016-04-04 Thread Nirav Patel
Hi,

We are using spark 1.5.2 and recently hitting this issue after our dataset
grew from 140GB to 160GB. Error is thrown during shuffle fetch on reduce
side which all should happen on executors and executor should report them!
However its gets reported only on driver. SparkContext gets shutdown from
driver side after this error occur.

Here's what I see in driver logs.



2016-04-04 03:51:32,889 INFO [sparkDriver-akka.actor.default-dispatcher-17]
org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 3 to hdn3.mycomp:37339
2016-04-04 03:51:32,890 INFO [sparkDriver-akka.actor.default-dispatcher-17]
org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 3 to hdn3.mycomp:57666
2016-04-04 03:51:33,133 INFO [sparkDriver-akka.actor.default-dispatcher-21]
org.apache.spark.storage.BlockManagerInfo: Removed broadcast_12_piece0 on
10.250.70.117:42566 in memory (size: 1939.0 B, free: 232.5 MB)
2016-04-04 03:51:38,432 ERROR
[sparkDriver-akka.actor.default-dispatcher-14]
org.apache.spark.rpc.akka.ErrorMonitor: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at
com.google.protobuf.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62)
at
akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(AkkaPduCodec.scala:138)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:740)
at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-04-04 03:51:38,432 ERROR
[sparkDriver-akka.actor.default-dispatcher-21] akka.actor.ActorSystemImpl:
Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at
com.google.protobuf.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62)
at
akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(AkkaPduCodec.scala:138)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:740)
at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-04-04 03:51:40,246 ERROR [sparkDriver-akka.actor.default-dispatcher-4]
akka.actor.ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at 

Fwd: All inclusive uber-jar

2016-04-04 Thread vetal king
-- Forwarded message --
From: vetal king 
Date: Mon, Apr 4, 2016 at 8:59 PM
Subject: Re: All inclusive uber-jar
To: Mich Talebzadeh 


Not sure how to create uber jar using sbt, but this is how you can do it
using maven.


 
  
org.apache.maven.plugins
maven-shade-plugin
2.4.3

  
package

  shade


  

**:**

  

  

  



But instead of creating a uber jar, consider providing maven coordinates
with the help of sark-submit's --packages and --repositories options



On Mon, Apr 4, 2016 at 7:06 PM, Mich Talebzadeh 
wrote:

> Hi,
>
>
> When one builds a project for Spark in this case Spark streaming with SBT,
> as usual I add dependencies as follows:
>
> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
> "1.6.1"
>
> However when I submit it through spark-submit I need to put the package
> containing KafkaUtils the same way I do it in spark-shell
>
> ${SPARK_HOME}/bin/spark-submit \
>  --jars
> /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
> .
>
> Now if I want to distribute this as all-in-one package so that it can be
> run from any node, I have been told  that I need to create an uber-jar. I
> have not done this before so I assume an uber-jar will be totally self
> contained with all the classes etc.
>
> Can someone elaborate on this please?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: RDD Partitions not distributed evenly to executors

2016-04-04 Thread Ted Yu
bq. the modifications do not touch the scheduler

If the changes can be ported over to 1.6.1, do you mind reproducing the
issue there ?

I ask because master branch changes very fast. It would be good to narrow
the scope where the behavior you observed started showing.

On Mon, Apr 4, 2016 at 6:12 AM, Mike Hynes <91m...@gmail.com> wrote:

> [ CC'ing dev list since nearly identical questions have occurred in
> user list recently w/o resolution;
> c.f.:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
> ]
>
> Hello,
>
> In short, I'm reporting a problem concerning load imbalance of RDD
> partitions across a standalone cluster. Though there are 16 cores
> available per node, certain nodes will have >16 partitions, and some
> will correspondingly have <16 (and even 0).
>
> In more detail: I am running some scalability/performance tests for
> vector-type operations. The RDDs I'm considering are simple block
> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
> are generated with a fixed number of elements given by some multiple
> of the available cores, and subsequently hash-partitioned by their
> integer block index.
>
> I have verified that the hash partitioning key distribution, as well
> as the keys themselves, are both correct; the problem is truly that
> the partitions are *not* evenly distributed across the nodes.
>
> For instance, here is a representative output for some stages and
> tasks in an iterative program. This is a very simple test with 2
> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
> examples stages from the stderr log are stages 7 and 9:
> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>
> When counting the location of the partitions on the compute nodes from
> the stderr logs, however, you can clearly see the imbalance. Examples
> lines are:
> 13627 task 0.0 in stage 7.0 (TID 196,
> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
> 13628 task 1.0 in stage 7.0 (TID 197,
> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
> 13629 task 2.0 in stage 7.0 (TID 198,
> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>
> Grep'ing the full set of above lines for each hostname, himrod-?,
> shows the problem occurs in each stage. Below is the output, where the
> number of partitions stored on each node is given alongside its
> hostname as in (himrod-?,num_partitions):
> Stage 7: (himrod-1,0) (himrod-2,64)
> Stage 9: (himrod-1,16) (himrod-2,48)
> Stage 12: (himrod-1,0) (himrod-2,64)
> Stage 14: (himrod-1,16) (himrod-2,48)
> The imbalance is also visible when the executor ID is used to count
> the partitions operated on by executors.
>
> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
> (but the modifications do not touch the scheduler, and are irrelevant
> for these particular tests). Has something changed radically in 1.6+
> that would make a previously (<=1.5) correct configuration go haywire?
> Have new configuration settings been added of which I'm unaware that
> could lead to this problem?
>
> Please let me know if others in the community have observed this, and
> thank you for your time,
> Mike
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: RDD Partitions not distributed evenly to executors

2016-04-04 Thread Michael Slavitch
Just to be sure:  Has spark-env.sh and spark-defaults.conf been correctly 
propagated to all nodes?  Are they identical?


> On Apr 4, 2016, at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote:
> 
> [ CC'ing dev list since nearly identical questions have occurred in
> user list recently w/o resolution;
> c.f.:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
> ]
> 
> Hello,
> 
> In short, I'm reporting a problem concerning load imbalance of RDD
> partitions across a standalone cluster. Though there are 16 cores
> available per node, certain nodes will have >16 partitions, and some
> will correspondingly have <16 (and even 0).
> 
> In more detail: I am running some scalability/performance tests for
> vector-type operations. The RDDs I'm considering are simple block
> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
> are generated with a fixed number of elements given by some multiple
> of the available cores, and subsequently hash-partitioned by their
> integer block index.
> 
> I have verified that the hash partitioning key distribution, as well
> as the keys themselves, are both correct; the problem is truly that
> the partitions are *not* evenly distributed across the nodes.
> 
> For instance, here is a representative output for some stages and
> tasks in an iterative program. This is a very simple test with 2
> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
> examples stages from the stderr log are stages 7 and 9:
> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
> 
> When counting the location of the partitions on the compute nodes from
> the stderr logs, however, you can clearly see the imbalance. Examples
> lines are:
> 13627 task 0.0 in stage 7.0 (TID 196,
> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
> 13628 task 1.0 in stage 7.0 (TID 197,
> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
> 13629 task 2.0 in stage 7.0 (TID 198,
> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
> 
> Grep'ing the full set of above lines for each hostname, himrod-?,
> shows the problem occurs in each stage. Below is the output, where the
> number of partitions stored on each node is given alongside its
> hostname as in (himrod-?,num_partitions):
> Stage 7: (himrod-1,0) (himrod-2,64)
> Stage 9: (himrod-1,16) (himrod-2,48)
> Stage 12: (himrod-1,0) (himrod-2,64)
> Stage 14: (himrod-1,16) (himrod-2,48)
> The imbalance is also visible when the executor ID is used to count
> the partitions operated on by executors.
> 
> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
> (but the modifications do not touch the scheduler, and are irrelevant
> for these particular tests). Has something changed radically in 1.6+
> that would make a previously (<=1.5) correct configuration go haywire?
> Have new configuration settings been added of which I'm unaware that
> could lead to this problem?
> 
> Please let me know if others in the community have observed this, and
> thank you for your time,
> Mike
> 
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: All inclusive uber-jar

2016-04-04 Thread Marco Mistroni
Hi
U can use SBT assembly to create uber jar. U should set spark libraries as
'provided' in ur SBT
Hth
Marco
Ps apologies if by any chances I m telling u something u already know
On 4 Apr 2016 2:36 pm, "Mich Talebzadeh"  wrote:

> Hi,
>
>
> When one builds a project for Spark in this case Spark streaming with SBT,
> as usual I add dependencies as follows:
>
> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
> "1.6.1"
>
> However when I submit it through spark-submit I need to put the package
> containing KafkaUtils the same way I do it in spark-shell
>
> ${SPARK_HOME}/bin/spark-submit \
>  --jars
> /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
> .
>
> Now if I want to distribute this as all-in-one package so that it can be
> run from any node, I have been told  that I need to create an uber-jar. I
> have not done this before so I assume an uber-jar will be totally self
> contained with all the classes etc.
>
> Can someone elaborate on this please?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


All inclusive uber-jar

2016-04-04 Thread Mich Talebzadeh
Hi,


When one builds a project for Spark in this case Spark streaming with SBT,
as usual I add dependencies as follows:

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
"1.6.1"

However when I submit it through spark-submit I need to put the package
containing KafkaUtils the same way I do it in spark-shell

${SPARK_HOME}/bin/spark-submit \
 --jars
/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
.

Now if I want to distribute this as all-in-one package so that it can be
run from any node, I have been told  that I need to create an uber-jar. I
have not done this before so I assume an uber-jar will be totally self
contained with all the classes etc.

Can someone elaborate on this please?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


RDD Partitions not distributed evenly to executors

2016-04-04 Thread Mike Hynes
[ CC'ing dev list since nearly identical questions have occurred in
user list recently w/o resolution;
c.f.:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
]

Hello,

In short, I'm reporting a problem concerning load imbalance of RDD
partitions across a standalone cluster. Though there are 16 cores
available per node, certain nodes will have >16 partitions, and some
will correspondingly have <16 (and even 0).

In more detail: I am running some scalability/performance tests for
vector-type operations. The RDDs I'm considering are simple block
vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
are generated with a fixed number of elements given by some multiple
of the available cores, and subsequently hash-partitioned by their
integer block index.

I have verified that the hash partitioning key distribution, as well
as the keys themselves, are both correct; the problem is truly that
the partitions are *not* evenly distributed across the nodes.

For instance, here is a representative output for some stages and
tasks in an iterative program. This is a very simple test with 2
nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
examples stages from the stderr log are stages 7 and 9:
7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639

When counting the location of the partitions on the compute nodes from
the stderr logs, however, you can clearly see the imbalance. Examples
lines are:
13627 task 0.0 in stage 7.0 (TID 196,
himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
13628 task 1.0 in stage 7.0 (TID 197,
himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
13629 task 2.0 in stage 7.0 (TID 198,
himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&

Grep'ing the full set of above lines for each hostname, himrod-?,
shows the problem occurs in each stage. Below is the output, where the
number of partitions stored on each node is given alongside its
hostname as in (himrod-?,num_partitions):
Stage 7: (himrod-1,0) (himrod-2,64)
Stage 9: (himrod-1,16) (himrod-2,48)
Stage 12: (himrod-1,0) (himrod-2,64)
Stage 14: (himrod-1,16) (himrod-2,48)
The imbalance is also visible when the executor ID is used to count
the partitions operated on by executors.

I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
(but the modifications do not touch the scheduler, and are irrelevant
for these particular tests). Has something changed radically in 1.6+
that would make a previously (<=1.5) correct configuration go haywire?
Have new configuration settings been added of which I'm unaware that
could lead to this problem?

Please let me know if others in the community have observed this, and
thank you for your time,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Failed to get broadcast_1_piece0 of broadcast_1

2016-04-04 Thread Akhilesh Pathodia
Hi,

I am running spark jobs on yarn in cluster mode. The job get the messages
from kafka direct stream. I am using broadcast variables and checkpointing
every 30 seconds. When I start the job first time it runs fine without any
issue. If I kill the job and restart it throws below exception in executor
upon receiving a message from kafka:

java.io.IOException: org.apache.spark.SparkException: Failed to get
broadcast_1_piece0 of broadcast_1
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
net.juniper.spark.stream.LogDataStreamProcessor$2.call(LogDataStreamProcessor.java:177)
at 
net.juniper.spark.stream.LogDataStreamProcessor$2.call(LogDataStreamProcessor.java:1)
at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172)
at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Does anyone have idea how to resolve this error?

Spark version: 1.5.0
CDH 5.5.1

Thanks,
AKhilesh


Spark 1.6.1 binary pre-built for Hadoop 2.6 may be broken

2016-04-04 Thread Kousuke Saruta
Hi all,

I noticed the binary pre-build for Hadoop 2.6 which we can download from
spark.apache.org/downloads.html (Direct Download) may be broken.
I couldn't decompress at least following 4 tgzs with "tar xfzv" command
and md5-checksum did't match.

* spark-1.6.1-bin-hadoop2.6.tgz
* spark-1.6.1-bin-hadoop2.4.tgz
* spark-1.6.1-bin-hadoop2.3.tgz
* spark-1.6.1-bin-cdh4.tgz

Following 3 tgzs were decompressed successfully.

* spark-1.6.1-bin-hadoop1.tgz
* spark-1.6.1-bin-without-hadoop.tgz
* spark-1.6.1.tgz was decompressed successfully.

Regards,
Kousuke


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re:

2016-04-04 Thread Akhil Das
1 core with 4 partitions means it executes it one by one, not parallel. For
the Kafka question, if you don't have higher data volume then you may not
need 40 partitions.

Thanks
Best Regards

On Sat, Apr 2, 2016 at 7:35 PM, Hemalatha A <
hemalatha.amru...@googlemail.com> wrote:

> Hello,
>
> As per Spark programming guide, it says "we should have 2-4 partitions for
> each CPU in your cluster.". In this case how does 1 CPU core process 2-4
> partitions at the same time?
> Link - http://spark.apache.org/docs/latest/programming-guide.html (under
> Rdd section)
>
> Does it do context switching between tasks or run them in parallel? If it
> does context switching how is it efficient compared to 1:1 partition vs
> Core?
>
> PS: If we are using Kafka direct API  in which kafka partitions=  Rdd
> partitions. Does that mean we should give 40 kafka partitions for 10 CPU
> Cores?
>
> --
>
>
> Regards
> Hemalatha
>


Re: Read Parquet in Java Spark

2016-04-04 Thread Akhil Das
I wasn't knowing you have a parquet file containing json data.

Thanks
Best Regards

On Mon, Apr 4, 2016 at 2:44 PM, Ramkumar V  wrote:

> Hi Akhil,
>
> Thanks for your help. Why do you put separator as "," ?
>
> I have a parquet file which contains only json in each line.
>
> *Thanks*,
> 
>
>
> On Mon, Apr 4, 2016 at 2:34 PM, Akhil Das 
> wrote:
>
>> Something like this (in scala):
>>
>> val rdd = parquetFile.javaRDD().map(row => row.mkstring(","))
>>
>> You can create a map operation over your javaRDD to convert the
>> org.apache.spark.sql.Row
>> 
>> to String (the Row.mkstring() Operation)
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Apr 4, 2016 at 12:02 PM, Ramkumar V 
>> wrote:
>>
>>> Any idea on this ? How to convert parquet file into JavaRDD ?
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Thu, Mar 31, 2016 at 4:30 PM, Ramkumar V 
>>> wrote:
>>>
 Hi,

 Thanks for the reply.  I tried this. It's returning JavaRDD
 instead of JavaRDD. How to get JavaRDD ?

 Error :
 incompatible types:
 org.apache.spark.api.java.JavaRDD cannot be
 converted to org.apache.spark.api.java.JavaRDD





 *Thanks*,
 


 On Thu, Mar 31, 2016 at 2:57 PM, UMESH CHAUDHARY 
 wrote:

> From Spark Documentation:
>
> DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
>
> JavaRDD jRDD= parquetFile.javaRDD()
>
> javaRDD() method will convert the DF to RDD
>
> On Thu, Mar 31, 2016 at 2:51 PM, Ramkumar V 
> wrote:
>
>> Hi,
>>
>> I'm trying to read parquet log files in Java Spark. Parquet log files
>> are stored in hdfs. I want to read and convert that parquet file into
>> JavaRDD. I could able to find Sqlcontext dataframe api. How can I read if
>> it is sparkcontext and rdd ? what is the best way to read it ?
>>
>> *Thanks*,
>> 
>>
>>
>

>>>
>>
>


Re: Read Parquet in Java Spark

2016-04-04 Thread Ramkumar V
Hi Akhil,

Thanks for your help. Why do you put separator as "," ?

I have a parquet file which contains only json in each line.

*Thanks*,



On Mon, Apr 4, 2016 at 2:34 PM, Akhil Das 
wrote:

> Something like this (in scala):
>
> val rdd = parquetFile.javaRDD().map(row => row.mkstring(","))
>
> You can create a map operation over your javaRDD to convert the
> org.apache.spark.sql.Row
> 
> to String (the Row.mkstring() Operation)
>
> Thanks
> Best Regards
>
> On Mon, Apr 4, 2016 at 12:02 PM, Ramkumar V 
> wrote:
>
>> Any idea on this ? How to convert parquet file into JavaRDD ?
>>
>> *Thanks*,
>> 
>>
>>
>> On Thu, Mar 31, 2016 at 4:30 PM, Ramkumar V 
>> wrote:
>>
>>> Hi,
>>>
>>> Thanks for the reply.  I tried this. It's returning JavaRDD instead
>>> of JavaRDD. How to get JavaRDD ?
>>>
>>> Error :
>>> incompatible types:
>>> org.apache.spark.api.java.JavaRDD cannot be
>>> converted to org.apache.spark.api.java.JavaRDD
>>>
>>>
>>>
>>>
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>> On Thu, Mar 31, 2016 at 2:57 PM, UMESH CHAUDHARY 
>>> wrote:
>>>
 From Spark Documentation:

 DataFrame parquetFile = sqlContext.read().parquet("people.parquet");

 JavaRDD jRDD= parquetFile.javaRDD()

 javaRDD() method will convert the DF to RDD

 On Thu, Mar 31, 2016 at 2:51 PM, Ramkumar V 
 wrote:

> Hi,
>
> I'm trying to read parquet log files in Java Spark. Parquet log files
> are stored in hdfs. I want to read and convert that parquet file into
> JavaRDD. I could able to find Sqlcontext dataframe api. How can I read if
> it is sparkcontext and rdd ? what is the best way to read it ?
>
> *Thanks*,
> 
>
>

>>>
>>
>


Re: Read Parquet in Java Spark

2016-04-04 Thread Akhil Das
Something like this (in scala):

val rdd = parquetFile.javaRDD().map(row => row.mkstring(","))

You can create a map operation over your javaRDD to convert the
org.apache.spark.sql.Row

to String (the Row.mkstring() Operation)

Thanks
Best Regards

On Mon, Apr 4, 2016 at 12:02 PM, Ramkumar V  wrote:

> Any idea on this ? How to convert parquet file into JavaRDD ?
>
> *Thanks*,
> 
>
>
> On Thu, Mar 31, 2016 at 4:30 PM, Ramkumar V 
> wrote:
>
>> Hi,
>>
>> Thanks for the reply.  I tried this. It's returning JavaRDD instead
>> of JavaRDD. How to get JavaRDD ?
>>
>> Error :
>> incompatible types:
>> org.apache.spark.api.java.JavaRDD cannot be
>> converted to org.apache.spark.api.java.JavaRDD
>>
>>
>>
>>
>>
>> *Thanks*,
>> 
>>
>>
>> On Thu, Mar 31, 2016 at 2:57 PM, UMESH CHAUDHARY 
>> wrote:
>>
>>> From Spark Documentation:
>>>
>>> DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
>>>
>>> JavaRDD jRDD= parquetFile.javaRDD()
>>>
>>> javaRDD() method will convert the DF to RDD
>>>
>>> On Thu, Mar 31, 2016 at 2:51 PM, Ramkumar V 
>>> wrote:
>>>
 Hi,

 I'm trying to read parquet log files in Java Spark. Parquet log files
 are stored in hdfs. I want to read and convert that parquet file into
 JavaRDD. I could able to find Sqlcontext dataframe api. How can I read if
 it is sparkcontext and rdd ? what is the best way to read it ?

 *Thanks*,
 


>>>
>>
>


Re: How many threads will be used to read RDBMS after set numPartitions =10 in Spark JDBC

2016-04-04 Thread Mich Talebzadeh
This all depends if you provide information to Driver on the underlying
RDBMS table and assuming that there is a unique ID on the underlying table
you can use to partition the load.

Have a look at this

http://metricbrew.com/get-data-from-databases-with-apache-spark-jdbc/

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 4 April 2016 at 09:09, Zhang, Jingyu  wrote:

> Hi All,
>
> I want read Mysql from Spark. Please let me know how many threads will be
> used to read the RDBMS after set numPartitions =10 in Spark JDBC. What is
> the best practice to read large dataset from RDBMS to Spark?
>
> Thanks,
>
> Jingyu
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.


How many threads will be used to read RDBMS after set numPartitions =10 in Spark JDBC

2016-04-04 Thread Zhang, Jingyu
Hi All,

I want read Mysql from Spark. Please let me know how many threads will be
used to read the RDBMS after set numPartitions =10 in Spark JDBC. What is
the best practice to read large dataset from RDBMS to Spark?

Thanks,

Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: Where to set properties for the retainedJobs/Stages?

2016-04-04 Thread Max Schmidt
Okay I put the props in the spark-defaults, but they are not recognized,
as they don't appear in the 'Environment' tab during a application
execution.

spark.eventLog.enabled for example.

Am 01.04.2016 um 21:22 schrieb Ted Yu:
> Please
> read 
> https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties
> w.r.t. spark-defaults.conf
>
> On Fri, Apr 1, 2016 at 12:06 PM, Max Schmidt  > wrote:
>
> Yes but doc doesn't say any word for which variable the configs
> are valid, so do I have to set them for the history-server? The
> daemon? The workers?
>
> And what if I use the java API instead of spark-submit for the jobs?
>
> I guess that the spark-defaults.conf are obsolete for the java API?
>
>
> Am 2016-04-01 18:58, schrieb Ted Yu:
>
> You can set them in spark-defaults.conf
>
> See
> also https://spark.apache.org/docs/latest/configuration.html#spark-ui
> [1]
>
> On Fri, Apr 1, 2016 at 8:26 AM, Max Schmidt  > wrote:
>
> Can somebody tell me the interaction between the properties:
>
> spark.ui.retainedJobs
> spark.ui.retainedStages
> spark.history.retainedApplications
>
> I know from the bugtracker, that the last one describes
> the number of
> applications the history-server holds in memory.
>
> Can I set the properties in the spark-env.sh? And where?
>
>
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> 
> For additional commands, e-mail:
> user-h...@spark.apache.org 
>
>
>
>
> Links:
> --
> [1]
> https://spark.apache.org/docs/latest/configuration.html#spark-ui
>
>
>
>
>

-- 
*Max Schmidt, Senior Java Developer* | m...@datapath.io
 | LinkedIn

Datapath.io
 
Decreasing AWS latency.
Your traffic optimized.

Datapath.io GmbH
Mainz | HRB Nr. 46222
Sebastian Spies, CEO



Re: Read Parquet in Java Spark

2016-04-04 Thread Ramkumar V
Any idea on this ? How to convert parquet file into JavaRDD ?

*Thanks*,



On Thu, Mar 31, 2016 at 4:30 PM, Ramkumar V  wrote:

> Hi,
>
> Thanks for the reply.  I tried this. It's returning JavaRDD instead
> of JavaRDD. How to get JavaRDD ?
>
> Error :
> incompatible types:
> org.apache.spark.api.java.JavaRDD cannot be
> converted to org.apache.spark.api.java.JavaRDD
>
>
>
>
>
> *Thanks*,
> 
>
>
> On Thu, Mar 31, 2016 at 2:57 PM, UMESH CHAUDHARY 
> wrote:
>
>> From Spark Documentation:
>>
>> DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
>>
>> JavaRDD jRDD= parquetFile.javaRDD()
>>
>> javaRDD() method will convert the DF to RDD
>>
>> On Thu, Mar 31, 2016 at 2:51 PM, Ramkumar V 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to read parquet log files in Java Spark. Parquet log files
>>> are stored in hdfs. I want to read and convert that parquet file into
>>> JavaRDD. I could able to find Sqlcontext dataframe api. How can I read if
>>> it is sparkcontext and rdd ? what is the best way to read it ?
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>>
>