Question Spark streaming - S3 textFileStream- How to get the current file name ?

2016-04-01 Thread Natu Lauchande
Hi,

I am using spark streamming and using the input strategy of  watching for
files in S3 directories.  Using the textFileStream method in the streamming
context.

The filename contains relevant  for my pipeline manipulation i wonder if
there is a more robust way to get this name other than capturing RDD debug
information and parse the logs.

Thanks,
Natu


partitioned parquet tables

2016-04-01 Thread Imran Akbar
Hi,

I'm reading in a CSV file, and I would like to write it back as a permanent
table, but with partitioning by year, etc.
Currently I do this:

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
df =
sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true').load('/Users/imran/Downloads/intermediate.csv')
df.saveAsTable("intermediate")

Which works great.

I also know I can do this:
df.write.partitionBy("year").parquet("path/to/output")

But how do I combine the two, to save a permanent table with partitioning,
in Parquet format?

thanks,
imran


Re: Problem with jackson lib running on spark

2016-04-01 Thread Ted Yu
Thanks for sharing the workaround.

Probably send a PR on tranquilizer github :-)

On Fri, Apr 1, 2016 at 12:50 PM, Marcelo Oikawa  wrote:

> Hi, list.
>
> Just to close the thread. Unfortunately, I didnt solve the jackson lib
> problem but I did a workaround that works fine for me. Perhaps this help
> another one.
>
> The problem raised from this line when I try to create tranquilizer object
> (used to connect to Druid) using this utility *fromConfig*.
>
> tranquilizer = 
> DruidBeams.fromConfig(dataSourceConfig).buildTranquilizer(tranquilizerBuider);
>
> Instead, I create the tranquilizer object-by-object as showed below:
>
> DruidDimensions dimentions = getDimensions(props);
> List aggregators = getAggregations(props);
>
> TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
> Timestamper> timestamper = map -> new 
> DateTime(map.get("timestamp"));
> DruidLocation druidLocation = DruidLocation.create("overlord", 
> "druid:firehose:%s", dataSource);
> DruidRollup druidRollup = DruidRollup.create(dimentions, aggregators, 
> QueryGranularity.ALL);
>
>
> ClusteredBeamTuning clusteredBeamTuning = ClusteredBeamTuning.builder()
> 
> .segmentGranularity(Granularity.FIVE_MINUTE)
> .windowPeriod(new 
> Period("PT60m"))
> .partitions(1)
> .replicants(1)
> .build();
>
> tranquilizer = DruidBeams.builder(timestamper)
>.curator(buildCurator(props))
>.discoveryPath("/druid/discovery")
>.location(druidLocation)
>.timestampSpec(timestampSpec)
>.rollup(druidRollup)
>.tuning(clusteredBeamTuning)
>.buildTranquilizer();
>
> tranquilizer.start();
>
> That worked for me. Thank you Ted, Alonso and other users.
>
>
> On Thu, Mar 31, 2016 at 4:08 PM, Marcelo Oikawa <
> marcelo.oik...@webradar.com> wrote:
>
>>
>> Please exclude jackson-databind - that was where the AnnotationMap class
>>> comes from.
>>>
>>
>> I tried as you suggest but i getting the same error. Seems strange
>> because when I see the generated jar there is nothing related as
>> AnnotationMap but there is a databind there.
>>
>>
>> ​
>>
>>
>>>
>>> On Thu, Mar 31, 2016 at 11:37 AM, Marcelo Oikawa <
>>> marcelo.oik...@webradar.com> wrote:
>>>
 Hi, Alonso.

 As you can see jackson-core is provided by several libraries, try to
> exclude it from spark-core, i think the minor version is included within
> it.
>

 There is no more than one jackson-core provides by spark-core. There
 are jackson-core and jackson-core-asl but are differents artifacts. BTW, I
 tried to exclude then but no sucess. Same error:

 java.lang.IllegalAccessError: tried to access method
 com.fasterxml.jackson.databind.introspect.AnnotatedMember.getAllAnnotations()Lcom/fasterxml/jackson/databind/introspect/AnnotationMap;
 from class
 com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector
 at
 com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector.findInjectableValueId(GuiceAnnotationIntrospector.java:39)
 at
 com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.findInjectableValueId(AnnotationIntrospectorPair.java:269)
 at
 com.fasterxml.jackson.databind.deser.BasicDeserializerFactory._addDeserializerConstructors(BasicDeserializerFactory.java:433)
 ...

 I guess the problem is incopabilities between jackson artifacts that
 comes from tranquility dependency vs spark prodided but I also tried to
 find same jackson artifacts but in different versions but there is no one.
 What is missing?


 Use this guide to see how to do it:
>
>
> https://maven.apache.org/guides/introduction/introduction-to-optional-and-excludes-dependencies.html
>
>
>
> Alonso Isidoro Roman.
>
> Mis citas preferidas (de hoy) :
> "Si depurar es el proceso de quitar los errores de software, entonces
> programar debe ser el proceso de introducirlos..."
>  -  Edsger Dijkstra
>
> My favorite quotes (today):
> "If debugging is the process of removing software bugs, then
> programming must be the process of putting ..."
>   - Edsger Dijkstra
>
> "If you pay peanuts you get monkeys"
>
>
> 2016-03-31 20:01 GMT+02:00 Marcelo Oikawa  >:
>
>> Hey, Alonso.
>>
>> here is the output:
>>
>> [INFO] spark-processor:spark-processor-druid:jar:1.0-SNAPSHOT
>> [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.6.1:provided
>> [INFO] |  +- 

Introducing Spark User Group in Korea & Question on creating non-software goods (stickers)

2016-04-01 Thread Kevin (Sangwoo) Kim
Hi all!

I'm Kevin, one of contributors of Spark and I'm organizing Spark User Group
in Korea. We're having 2500 members in community, and it's even growing
faster today.
https://www.facebook.com/groups/sparkkoreauser/
 -
Sorry, it's all Korean.

My co-organizer Sanghoon Lee (phoenixl...@gmail.com) is managing a big
event on 14th, April. It would be an event of 300+ people, and we're having
Doug Cutting (Creator of Hadoop) on that event, Dr. Kim, author of
DeepSpark: Spark-Based Deep Learning.
This is a page of the event (Sorry, it's all Korean again)
http://onoffmix.com/event/65057

My one question is, are we able to use Spark logo for printing some
stickers? (And where can we get the logo file if we can?)
I guess it would be a simple small logo sticker, and of course it's free
for participants of the event and Spark user groups.
The cost for sticker will be sponsored by SK C, it's one of big company
in Korea and Mr. Lee is working for, and the sponsorship is totally
non-commercial.

And any feedback is welcome for successful community & event!

Regards,
Kevin


spark-shell with different username

2016-04-01 Thread Matt Tenenbaum
Hello all —

tl;dr: I’m having an issue running spark-shell from my laptop (or other
non-cluster-affiliated machine), and I think the issue boils down to
usernames. Can I convince spark/scala that I’m someone other than $USER?

A bit of background: our cluster is CDH 5.4.8, installed with Cloudera
Manager 5.5. We use LDAP, and my login on all hadoop-affiliated machines
(including the gateway boxes we use for running scheduled work) is
‘matt.tenenbaum’. When I run spark-shell on one of those machines,
everything is fine:

[matt.tenenbaum@remote-machine ~]$ HADOOP_CONF_DIR=/etc/hadoop/conf
SPARK_HOME=spark-1.6.0-bin-hadoop2.6
spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode
client

Everything starts up correctly, I get a scala prompt, the SparkContext and
SQL context are correctly initialized, and I’m off to the races:

16/04/01 23:27:00 INFO session.SessionState: Created local directory:
/tmp/35b58974-dad5-43c6-9864-43815d101ca0_resources
16/04/01 23:27:00 INFO session.SessionState: Created HDFS directory:
/tmp/hive/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0
16/04/01 23:27:00 INFO session.SessionState: Created local directory:
/tmp/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0
16/04/01 23:27:00 INFO session.SessionState: Created HDFS directory:
/tmp/hive/matt.tenenbaum/35b58974-dad5-43c6-9864-43815d101ca0/_tmp_space.db
16/04/01 23:27:00 INFO repl.SparkILoop: Created sql context (with Hive
support)..
SQL context available as sqlContext.

scala> 1 + 41
res0: Int = 42

scala> sc
res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4e9bd2c8

I am running 1.6 from a downloaded tgz file, rather than the spark-shell
made available to the cluster from CDH. I can copy that tgz to my laptop,
and grab a copy of the cluster configurations, and in a perfect world I
would then be able to run everything in the same way

[matt@laptop ~]$ HADOOP_CONF_DIR=path/to/hadoop/conf
SPARK_HOME=spark-1.6.0-bin-hadoop2.6
spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode
client

Notice there are two things that are different:

   1. My local username on my laptop is ‘matt’, which does not match my
   name on the remote machine.
   2. The Hadoop configs live somewhere other than /etc/hadoop/conf

Alas, #1 proves fatal because of cluster permissions (there is no
/user/matt/ in HDFS, and ‘matt’ is not a valid LDAP user). In the
initialization logging output, I can see that fail in an expected way:

16/04/01 16:37:19 INFO yarn.Client: Setting up container launch
context for our AM
16/04/01 16:37:19 INFO yarn.Client: Setting up the launch environment
for our AM container
16/04/01 16:37:19 INFO yarn.Client: Preparing resources for our AM container
16/04/01 16:37:20 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
16/04/01 16:37:21 ERROR spark.SparkContext: Error initializing SparkContext.
org.apache.hadoop.security.AccessControlException: Permission denied:
user=matt, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
at (... etc ...)

Fine. In other circumstances I’ve told Hadoop explicitly who I am by
setting HADOOP_USER_NAME. Maybe that works here?

[matt@laptop ~]$ HADOOP_USER_NAME=matt.tenenbaum
HADOOP_CONF_DIR=soma-conf SPARK_HOME=spark-1.6.0-bin-hadoop2.6
spark-1.6.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode
client

Eventually that fails too, but not for the same reason. Setting
HADOOP_USER_NAME is sufficient to allow initialization to get past the
access-control problems, and I can see it request a new application from
the cluster

16/04/01 16:43:08 INFO yarn.Client: Will allocate AM container, with
896 MB memory including 384 MB overhead
16/04/01 16:43:08 INFO yarn.Client: Setting up container launch
context for our AM
16/04/01 16:43:08 INFO yarn.Client: Setting up the launch environment
for our AM container
16/04/01 16:43:08 INFO yarn.Client: Preparing resources for our AM container
... [resource uploads happen here] ...
16/04/01 16:46:16 INFO spark.SecurityManager: Changing view acls to:
matt,matt.tenenbaum
16/04/01 16:46:16 INFO spark.SecurityManager: Changing modify acls to:
matt,matt.tenenbaum
16/04/01 16:46:16 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(matt, matt.tenenbaum); users with modify permissions:
Set(matt, matt.tenenbaum)
16/04/01 16:46:16 INFO yarn.Client: Submitting application 30965 to
ResourceManager
16/04/01 16:46:16 INFO impl.YarnClientImpl: Submitted application
application_1451332794331_30965
16/04/01 16:46:17 INFO yarn.Client: Application report for
application_1451332794331_30965 (state: ACCEPTED)
16/04/01 

Re: [Spark SQL]: UDF with Array[Double] as input

2016-04-01 Thread Michael Armbrust
What error are you getting?  Here is an example

.

External types are documented here:
http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

On Fri, Apr 1, 2016 at 1:59 PM, Jerry Lam  wrote:

> Hi spark users and developers,
>
> Anyone tried to pass in an Array[Double] as a input to the UDF? I tried it
> for many hours reading spark sql code but IK still couldn't figure out a
> way to do this.
>
> Best Regards,
>
> Jerry
>


Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Saisai Shao
So I think ramdisk is simple way to try.

Besides I think Reynold's suggestion is quite valid, with such high-end
machine, putting everything in memory might not improve the performance a
lot as assumed. Since bottleneck will be shifted, like memory bandwidth,
NUMA, CPU efficiency (serialization-deserialization, data processing...).
Also code design should well consider such usage scenario, to use resource
more efficiently.

Thanks
Saisai

On Sat, Apr 2, 2016 at 7:27 AM, Michael Slavitch  wrote:

> Yes we see it on final write.  Our preference is to eliminate this.
>
>
> On Fri, Apr 1, 2016, 7:25 PM Saisai Shao  wrote:
>
>> Hi Michael, shuffle data (mapper output) have to be materialized into
>> disk finally, no matter how large memory you have, it is the design purpose
>> of Spark. In you scenario, since you have a big memory, shuffle spill
>> should not happen frequently, most of the disk IO you see might be final
>> shuffle file write.
>>
>> So if you want to avoid this disk IO, you could use ramdisk as Reynold
>> suggested. If you want to avoid FS overhead of ramdisk, you could try to
>> hack a new shuffle implementation, since shuffle framework is pluggable.
>>
>>
>> On Sat, Apr 2, 2016 at 6:48 AM, Michael Slavitch 
>> wrote:
>>
>>> As I mentioned earlier this flag is now ignored.
>>>
>>>
>>> On Fri, Apr 1, 2016, 6:39 PM Michael Slavitch 
>>> wrote:
>>>
 Shuffling a 1tb set of keys and values (aka sort by key)  results in
 about 500gb of io to disk if compression is enabled. Is there any way to
 eliminate shuffling causing io?

 On Fri, Apr 1, 2016, 6:32 PM Reynold Xin  wrote:

> Michael - I'm not sure if you actually read my email, but spill has
> nothing to do with the shuffle files on disk. It was for the partitioning
> (i.e. sorting) process. If that flag is off, Spark will just run out of
> memory when data doesn't fit in memory.
>
>
> On Fri, Apr 1, 2016 at 3:28 PM, Michael Slavitch 
> wrote:
>
>> RAMdisk is a fine interim step but there is a lot of layers
>> eliminated by keeping things in memory unless there is need for 
>> spillover.
>>   At one time there was support for turning off spilling.  That was
>> eliminated.  Why?
>>
>>
>> On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan 
>> wrote:
>>
>>> I think Reynold's suggestion of using ram disk would be a good way to
>>> test if these are the bottlenecks or something else is.
>>> For most practical purposes, pointing local dir to ramdisk should
>>> effectively give you 'similar' performance as shuffling from memory.
>>>
>>> Are there concerns with taking that approach to test ? (I dont see
>>> any, but I am not sure if I missed something).
>>>
>>>
>>> Regards,
>>> Mridul
>>>
>>>
>>>
>>>
>>> On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch 
>>> wrote:
>>> > I totally disagree that it’s not a problem.
>>> >
>>> > - Network fetch throughput on 40G Ethernet exceeds the throughput
>>> of NVME
>>> > drives.
>>> > - What Spark is depending on is Linux’s IO cache as an effective
>>> buffer pool
>>> > This is fine for small jobs but not for jobs with datasets in the
>>> TB/node
>>> > range.
>>> > - On larger jobs flushing the cache causes Linux to block.
>>> > - On a modern 56-hyperthread 2-socket host the latency caused by
>>> multiple
>>> > executors writing out to disk increases greatly.
>>> >
>>> > I thought the whole point of Spark was in-memory computing?  It’s
>>> in fact
>>> > in-memory for some things but  use spark.local.dir as a buffer
>>> pool of
>>> > others.
>>> >
>>> > Hence, the performance of  Spark is gated by the performance of
>>> > spark.local.dir, even on large memory systems.
>>> >
>>> > "Currently it is not possible to not write shuffle files to disk.”
>>> >
>>> > What changes >would< make it possible?
>>> >
>>> > The only one that seems possible is to clone the shuffle service
>>> and make it
>>> > in-memory.
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Apr 1, 2016, at 4:57 PM, Reynold Xin 
>>> wrote:
>>> >
>>> > spark.shuffle.spill actually has nothing to do with whether we
>>> write shuffle
>>> > files to disk. Currently it is not possible to not write shuffle
>>> files to
>>> > disk, and typically it is not a problem because the network fetch
>>> throughput
>>> > is lower than what disks can sustain. In most cases, especially
>>> with SSDs,
>>> > there is little difference between putting all of those in memory
>>> and on
>>> > disk.
>>> >
>>> > However, it is becoming more 

Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
Ok I managed to make this work.

All I am interested is receiving messages from topic every minute. No
filtering yet jut full text

import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
//
val sparkConf = new SparkConf().
 setAppName("StreamTest").
 setMaster("local[12]").
 set("spark.driver.allowMultipleContexts", "true").
 set("spark.hadoop.validateOutputSpecs", "false")
val ssc = new StreamingContext(sparkConf, Seconds(60))
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
val topic = Set("newtopic")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)
messages.print()
ssc.start()


---
Time: 145955454 ms
---
(null,Sat Apr 2 00:33:01 BST 2016  === Sending messages from rhes5)
(null,1,'a7UkW5ZRaI_V8oRiPUNx0on6E06Ikr8_ILOxhVpgt6IoXXq2fF9ssYuJYcr49Cj4yp3nY9k8sHtIi_7XjltTVzqJ33beV2hIaqAj',101)
(null,2,'dnFxOkOibbKLR5m3CIeS3rhwn8hCiaZAfEaD7yXi6M7jXcvaFYBjClLDoNMEVgfLZVgJ9tXchqlGX44FmvhnarLFrtJNbTb1C6j4',102)
(null,3,'M9pvIOKMhaI_mSE3ExlovZWIxBE66KNEWGIGtCJF1qr_dGJX5sFKqLLa3Qv8aN2lCLi3lnGnMtqeZYBqE5YD586Vw50WWjL7ncZA',103)
(null,4,'9EROPf_dJZpdAHmBubTRxEUkvC9S_Xnll5bWmX0xcOPk7l4TGXPgEqxpUP52QG6pUIn74mvwWqF9vzZ2ZhsmV6WPOmUAw4Ub_nFU',104)
(null,5,'BLIi9a_n7Pfyc7r3nfzKfaNRa4Hmd9NlHEVDPkQS4xbgUWqU2bJeI6b8b1IMoStnmjMHhYLtFf4TQyJcpn85PSwFksggNVnQl1oL',105)



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 23:26, Mich Talebzadeh  wrote:

> I adopted this approach
>
> scala> val conf = new SparkConf().
>  |  setAppName("StreamTest").
>  |  setMaster("local[12]").
>  |  set("spark.driver.allowMultipleContexts", "true").
>  |  set("spark.hadoop.validateOutputSpecs", "false")
> conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@321d96f7
> scala> val ssc = new StreamingContext(conf, Seconds(60))
> ssc: org.apache.spark.streaming.StreamingContext =
> org.apache.spark.streaming.StreamingContext@5dbae9eb
> scala> val kafkaParams = Map("metadata.broker.list" -> "rhes564:9092")
> kafkaParams: scala.collection.immutable.Map[String,String] =
> Map(metadata.broker.list -> rhes564:9092)
> scala> val topics = Set("newtopic")
> topics: scala.collection.immutable.Set[String] = Set(newtopic)
> scala> val stream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics)
> stream: org.apache.spark.streaming.dstream.InputDStream[(Nothing,
> Nothing)] =
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@6d2d3b21
>
> So that opens data stream. What next?
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 1 April 2016 at 22:37, Mich Talebzadeh 
> wrote:
>
>> yes I noticed that
>>
>> scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
>> "rhes564:9092", "newtopic", 1)
>>
>> :52: error: overloaded method value createStream with
>> alternatives:
>>   (jssc:
>> org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum:
>> String,groupId: String,topics: java.util.Map[String,Integer],storageLevel:
>> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,String]
>> 
>>   (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum:
>> String,groupId: String,topics:
>> scala.collection.immutable.Map[String,Int],storageLevel:
>> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[(String,
>> String)]
>>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
>> String, String, String, Int)
>>  val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
>> "rhes564:9092", "newtopic", 1)
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 1 April 2016 at 22:25, Cody Koeninger  wrote:
>>
>>> You're not passing valid Scala values.  rhes564:2181  without quotes
>>> isn't a valid literal, newtopic isn't a list of strings, etc.
>>>
>>> On Fri, Apr 

Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Michael Slavitch
Yes we see it on final write.  Our preference is to eliminate this.

On Fri, Apr 1, 2016, 7:25 PM Saisai Shao  wrote:

> Hi Michael, shuffle data (mapper output) have to be materialized into disk
> finally, no matter how large memory you have, it is the design purpose of
> Spark. In you scenario, since you have a big memory, shuffle spill should
> not happen frequently, most of the disk IO you see might be final shuffle
> file write.
>
> So if you want to avoid this disk IO, you could use ramdisk as Reynold
> suggested. If you want to avoid FS overhead of ramdisk, you could try to
> hack a new shuffle implementation, since shuffle framework is pluggable.
>
>
> On Sat, Apr 2, 2016 at 6:48 AM, Michael Slavitch 
> wrote:
>
>> As I mentioned earlier this flag is now ignored.
>>
>>
>> On Fri, Apr 1, 2016, 6:39 PM Michael Slavitch  wrote:
>>
>>> Shuffling a 1tb set of keys and values (aka sort by key)  results in
>>> about 500gb of io to disk if compression is enabled. Is there any way to
>>> eliminate shuffling causing io?
>>>
>>> On Fri, Apr 1, 2016, 6:32 PM Reynold Xin  wrote:
>>>
 Michael - I'm not sure if you actually read my email, but spill has
 nothing to do with the shuffle files on disk. It was for the partitioning
 (i.e. sorting) process. If that flag is off, Spark will just run out of
 memory when data doesn't fit in memory.


 On Fri, Apr 1, 2016 at 3:28 PM, Michael Slavitch 
 wrote:

> RAMdisk is a fine interim step but there is a lot of layers eliminated
> by keeping things in memory unless there is need for spillover.   At one
> time there was support for turning off spilling.  That was eliminated.
> Why?
>
>
> On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan 
> wrote:
>
>> I think Reynold's suggestion of using ram disk would be a good way to
>> test if these are the bottlenecks or something else is.
>> For most practical purposes, pointing local dir to ramdisk should
>> effectively give you 'similar' performance as shuffling from memory.
>>
>> Are there concerns with taking that approach to test ? (I dont see
>> any, but I am not sure if I missed something).
>>
>>
>> Regards,
>> Mridul
>>
>>
>>
>>
>> On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch 
>> wrote:
>> > I totally disagree that it’s not a problem.
>> >
>> > - Network fetch throughput on 40G Ethernet exceeds the throughput
>> of NVME
>> > drives.
>> > - What Spark is depending on is Linux’s IO cache as an effective
>> buffer pool
>> > This is fine for small jobs but not for jobs with datasets in the
>> TB/node
>> > range.
>> > - On larger jobs flushing the cache causes Linux to block.
>> > - On a modern 56-hyperthread 2-socket host the latency caused by
>> multiple
>> > executors writing out to disk increases greatly.
>> >
>> > I thought the whole point of Spark was in-memory computing?  It’s
>> in fact
>> > in-memory for some things but  use spark.local.dir as a buffer pool
>> of
>> > others.
>> >
>> > Hence, the performance of  Spark is gated by the performance of
>> > spark.local.dir, even on large memory systems.
>> >
>> > "Currently it is not possible to not write shuffle files to disk.”
>> >
>> > What changes >would< make it possible?
>> >
>> > The only one that seems possible is to clone the shuffle service
>> and make it
>> > in-memory.
>> >
>> >
>> >
>> >
>> >
>> > On Apr 1, 2016, at 4:57 PM, Reynold Xin 
>> wrote:
>> >
>> > spark.shuffle.spill actually has nothing to do with whether we
>> write shuffle
>> > files to disk. Currently it is not possible to not write shuffle
>> files to
>> > disk, and typically it is not a problem because the network fetch
>> throughput
>> > is lower than what disks can sustain. In most cases, especially
>> with SSDs,
>> > there is little difference between putting all of those in memory
>> and on
>> > disk.
>> >
>> > However, it is becoming more common to run Spark on a few number of
>> beefy
>> > nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into
>> improving
>> > performance for those. Meantime, you can setup local ramdisks on
>> each node
>> > for shuffle writes.
>> >
>> >
>> >
>> > On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch <
>> slavi...@gmail.com>
>> > wrote:
>> >>
>> >> Hello;
>> >>
>> >> I’m working on spark with very large memory systems (2TB+) and
>> notice that
>> >> Spark spills to disk in shuffle.  Is there a way to force spark to
>> stay in
>> >> memory when doing shuffle 

Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Saisai Shao
Hi Michael, shuffle data (mapper output) have to be materialized into disk
finally, no matter how large memory you have, it is the design purpose of
Spark. In you scenario, since you have a big memory, shuffle spill should
not happen frequently, most of the disk IO you see might be final shuffle
file write.

So if you want to avoid this disk IO, you could use ramdisk as Reynold
suggested. If you want to avoid FS overhead of ramdisk, you could try to
hack a new shuffle implementation, since shuffle framework is pluggable.


On Sat, Apr 2, 2016 at 6:48 AM, Michael Slavitch  wrote:

> As I mentioned earlier this flag is now ignored.
>
>
> On Fri, Apr 1, 2016, 6:39 PM Michael Slavitch  wrote:
>
>> Shuffling a 1tb set of keys and values (aka sort by key)  results in
>> about 500gb of io to disk if compression is enabled. Is there any way to
>> eliminate shuffling causing io?
>>
>> On Fri, Apr 1, 2016, 6:32 PM Reynold Xin  wrote:
>>
>>> Michael - I'm not sure if you actually read my email, but spill has
>>> nothing to do with the shuffle files on disk. It was for the partitioning
>>> (i.e. sorting) process. If that flag is off, Spark will just run out of
>>> memory when data doesn't fit in memory.
>>>
>>>
>>> On Fri, Apr 1, 2016 at 3:28 PM, Michael Slavitch 
>>> wrote:
>>>
 RAMdisk is a fine interim step but there is a lot of layers eliminated
 by keeping things in memory unless there is need for spillover.   At one
 time there was support for turning off spilling.  That was eliminated.
 Why?


 On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan 
 wrote:

> I think Reynold's suggestion of using ram disk would be a good way to
> test if these are the bottlenecks or something else is.
> For most practical purposes, pointing local dir to ramdisk should
> effectively give you 'similar' performance as shuffling from memory.
>
> Are there concerns with taking that approach to test ? (I dont see
> any, but I am not sure if I missed something).
>
>
> Regards,
> Mridul
>
>
>
>
> On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch 
> wrote:
> > I totally disagree that it’s not a problem.
> >
> > - Network fetch throughput on 40G Ethernet exceeds the throughput of
> NVME
> > drives.
> > - What Spark is depending on is Linux’s IO cache as an effective
> buffer pool
> > This is fine for small jobs but not for jobs with datasets in the
> TB/node
> > range.
> > - On larger jobs flushing the cache causes Linux to block.
> > - On a modern 56-hyperthread 2-socket host the latency caused by
> multiple
> > executors writing out to disk increases greatly.
> >
> > I thought the whole point of Spark was in-memory computing?  It’s in
> fact
> > in-memory for some things but  use spark.local.dir as a buffer pool
> of
> > others.
> >
> > Hence, the performance of  Spark is gated by the performance of
> > spark.local.dir, even on large memory systems.
> >
> > "Currently it is not possible to not write shuffle files to disk.”
> >
> > What changes >would< make it possible?
> >
> > The only one that seems possible is to clone the shuffle service and
> make it
> > in-memory.
> >
> >
> >
> >
> >
> > On Apr 1, 2016, at 4:57 PM, Reynold Xin  wrote:
> >
> > spark.shuffle.spill actually has nothing to do with whether we write
> shuffle
> > files to disk. Currently it is not possible to not write shuffle
> files to
> > disk, and typically it is not a problem because the network fetch
> throughput
> > is lower than what disks can sustain. In most cases, especially with
> SSDs,
> > there is little difference between putting all of those in memory
> and on
> > disk.
> >
> > However, it is becoming more common to run Spark on a few number of
> beefy
> > nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into
> improving
> > performance for those. Meantime, you can setup local ramdisks on
> each node
> > for shuffle writes.
> >
> >
> >
> > On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch <
> slavi...@gmail.com>
> > wrote:
> >>
> >> Hello;
> >>
> >> I’m working on spark with very large memory systems (2TB+) and
> notice that
> >> Spark spills to disk in shuffle.  Is there a way to force spark to
> stay in
> >> memory when doing shuffle operations?   The goal is to keep the
> shuffle data
> >> either in the heap or in off-heap memory (in 1.6.x) and never touch
> the IO
> >> subsystem.  I am willing to have the job fail if it runs out of RAM.
> >>
> >> spark.shuffle.spill true  is deprecated in 1.6 and does 

Re: [SQL] A bug with withColumn?

2016-04-01 Thread Jacek Laskowski
On Thu, Mar 31, 2016 at 5:47 PM, Jacek Laskowski  wrote:

> It means that it's not only possible to rename a column using
> withColumnRenamed, but also replace the content of a column (in one
> shot) using withColumn with an existing column name. I can live with
> that :)

Hi,

Not only can I live with that, but I did use it today few times
because it was so nice! I used this when I needed to `cast` a column
and I could do it with just `withColumn`. Awesome! You should've seen
the faces of the people I've presented the solution to :)

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Metrics : Why is the Sink class declared private[spark] ?

2016-04-01 Thread Saisai Shao
There's a JIRA (https://issues.apache.org/jira/browse/SPARK-14151) about
it, please take a look.

Thanks
Saisai

On Sat, Apr 2, 2016 at 6:48 AM, Walid Lezzar  wrote:

> Hi,
>
> I looked into the spark code at how spark report metrics using the
> MetricsSystem class. I've seen that the spark MetricsSystem class when
> instantiated parses the metrics.properties file, tries to find the sinks
> class name and load them dinamically. It would be great to implement my own
> sink by inheriting from the org.apache.spark.metrics.sinks.Sink class but
> unfortunately, this class has been declared private[spark] ! So it is not
> possible to inverit from it ! Why is that ? Is this gonna change in future
> spark versions ?
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark Metrics : Why is the Sink class declared private[spark] ?

2016-04-01 Thread Walid Lezzar
Hi,

I looked into the spark code at how spark report metrics using the 
MetricsSystem class. I've seen that the spark MetricsSystem class when 
instantiated parses the metrics.properties file, tries to find the sinks class 
name and load them dinamically. It would be great to implement my own sink by 
inheriting from the org.apache.spark.metrics.sinks.Sink class but 
unfortunately, this class has been declared private[spark] ! So it is not 
possible to inverit from it ! Why is that ? Is this gonna change in future 
spark versions ? 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Michael Slavitch
Shuffling a 1tb set of keys and values (aka sort by key)  results in about
500gb of io to disk if compression is enabled. Is there any way to
eliminate shuffling causing io?

On Fri, Apr 1, 2016, 6:32 PM Reynold Xin  wrote:

> Michael - I'm not sure if you actually read my email, but spill has
> nothing to do with the shuffle files on disk. It was for the partitioning
> (i.e. sorting) process. If that flag is off, Spark will just run out of
> memory when data doesn't fit in memory.
>
>
> On Fri, Apr 1, 2016 at 3:28 PM, Michael Slavitch 
> wrote:
>
>> RAMdisk is a fine interim step but there is a lot of layers eliminated by
>> keeping things in memory unless there is need for spillover.   At one time
>> there was support for turning off spilling.  That was eliminated.  Why?
>>
>>
>> On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan 
>> wrote:
>>
>>> I think Reynold's suggestion of using ram disk would be a good way to
>>> test if these are the bottlenecks or something else is.
>>> For most practical purposes, pointing local dir to ramdisk should
>>> effectively give you 'similar' performance as shuffling from memory.
>>>
>>> Are there concerns with taking that approach to test ? (I dont see
>>> any, but I am not sure if I missed something).
>>>
>>>
>>> Regards,
>>> Mridul
>>>
>>>
>>>
>>>
>>> On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch 
>>> wrote:
>>> > I totally disagree that it’s not a problem.
>>> >
>>> > - Network fetch throughput on 40G Ethernet exceeds the throughput of
>>> NVME
>>> > drives.
>>> > - What Spark is depending on is Linux’s IO cache as an effective
>>> buffer pool
>>> > This is fine for small jobs but not for jobs with datasets in the
>>> TB/node
>>> > range.
>>> > - On larger jobs flushing the cache causes Linux to block.
>>> > - On a modern 56-hyperthread 2-socket host the latency caused by
>>> multiple
>>> > executors writing out to disk increases greatly.
>>> >
>>> > I thought the whole point of Spark was in-memory computing?  It’s in
>>> fact
>>> > in-memory for some things but  use spark.local.dir as a buffer pool of
>>> > others.
>>> >
>>> > Hence, the performance of  Spark is gated by the performance of
>>> > spark.local.dir, even on large memory systems.
>>> >
>>> > "Currently it is not possible to not write shuffle files to disk.”
>>> >
>>> > What changes >would< make it possible?
>>> >
>>> > The only one that seems possible is to clone the shuffle service and
>>> make it
>>> > in-memory.
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Apr 1, 2016, at 4:57 PM, Reynold Xin  wrote:
>>> >
>>> > spark.shuffle.spill actually has nothing to do with whether we write
>>> shuffle
>>> > files to disk. Currently it is not possible to not write shuffle files
>>> to
>>> > disk, and typically it is not a problem because the network fetch
>>> throughput
>>> > is lower than what disks can sustain. In most cases, especially with
>>> SSDs,
>>> > there is little difference between putting all of those in memory and
>>> on
>>> > disk.
>>> >
>>> > However, it is becoming more common to run Spark on a few number of
>>> beefy
>>> > nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into
>>> improving
>>> > performance for those. Meantime, you can setup local ramdisks on each
>>> node
>>> > for shuffle writes.
>>> >
>>> >
>>> >
>>> > On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch 
>>> > wrote:
>>> >>
>>> >> Hello;
>>> >>
>>> >> I’m working on spark with very large memory systems (2TB+) and notice
>>> that
>>> >> Spark spills to disk in shuffle.  Is there a way to force spark to
>>> stay in
>>> >> memory when doing shuffle operations?   The goal is to keep the
>>> shuffle data
>>> >> either in the heap or in off-heap memory (in 1.6.x) and never touch
>>> the IO
>>> >> subsystem.  I am willing to have the job fail if it runs out of RAM.
>>> >>
>>> >> spark.shuffle.spill true  is deprecated in 1.6 and does not work in
>>> >> Tungsten sort in 1.5.x
>>> >>
>>> >> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but
>>> this
>>> >> is ignored by the tungsten-sort shuffle manager; its optimized
>>> shuffles will
>>> >> continue to spill to disk when necessary.”
>>> >>
>>> >> If this is impossible via configuration changes what code changes
>>> would be
>>> >> needed to accomplish this?
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> -
>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> >> For additional commands, e-mail: user-h...@spark.apache.org
>>> >>
>>> >
>>> >
>>>
>> --
>> Michael Slavitch
>> 62 Renfrew Ave.
>> Ottawa Ontario
>> K1S 1Z5
>>
>
> --
Michael Slavitch
62 Renfrew Ave.
Ottawa Ontario
K1S 1Z5


Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Reynold Xin
It's spark.local.dir.


On Fri, Apr 1, 2016 at 3:37 PM, Yong Zhang  wrote:

> Is there a configuration in the Spark of location of "shuffle spilling"? I
> didn't recall ever see that one. Can you share it out?
>
> It will be good for a test writing to RAM Disk if that configuration is
> available.
>
> Thanks
>
> Yong
>
> --
> From: r...@databricks.com
> Date: Fri, 1 Apr 2016 15:32:23 -0700
> Subject: Re: Eliminating shuffle write and spill disk IO reads/writes in
> Spark
> To: slavi...@gmail.com
> CC: mri...@gmail.com; d...@spark.apache.org; user@spark.apache.org
>
>
> Michael - I'm not sure if you actually read my email, but spill has
> nothing to do with the shuffle files on disk. It was for the partitioning
> (i.e. sorting) process. If that flag is off, Spark will just run out of
> memory when data doesn't fit in memory.
>
>
> On Fri, Apr 1, 2016 at 3:28 PM, Michael Slavitch 
> wrote:
>
> RAMdisk is a fine interim step but there is a lot of layers eliminated by
> keeping things in memory unless there is need for spillover.   At one time
> there was support for turning off spilling.  That was eliminated.  Why?
>
>
> On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan  wrote:
>
> I think Reynold's suggestion of using ram disk would be a good way to
> test if these are the bottlenecks or something else is.
> For most practical purposes, pointing local dir to ramdisk should
> effectively give you 'similar' performance as shuffling from memory.
>
> Are there concerns with taking that approach to test ? (I dont see
> any, but I am not sure if I missed something).
>
>
> Regards,
> Mridul
>
>
>
>
> On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch 
> wrote:
> > I totally disagree that it’s not a problem.
> >
> > - Network fetch throughput on 40G Ethernet exceeds the throughput of NVME
> > drives.
> > - What Spark is depending on is Linux’s IO cache as an effective buffer
> pool
> > This is fine for small jobs but not for jobs with datasets in the TB/node
> > range.
> > - On larger jobs flushing the cache causes Linux to block.
> > - On a modern 56-hyperthread 2-socket host the latency caused by multiple
> > executors writing out to disk increases greatly.
> >
> > I thought the whole point of Spark was in-memory computing?  It’s in fact
> > in-memory for some things but  use spark.local.dir as a buffer pool of
> > others.
> >
> > Hence, the performance of  Spark is gated by the performance of
> > spark.local.dir, even on large memory systems.
> >
> > "Currently it is not possible to not write shuffle files to disk.”
> >
> > What changes >would< make it possible?
> >
> > The only one that seems possible is to clone the shuffle service and
> make it
> > in-memory.
> >
> >
> >
> >
> >
> > On Apr 1, 2016, at 4:57 PM, Reynold Xin  wrote:
> >
> > spark.shuffle.spill actually has nothing to do with whether we write
> shuffle
> > files to disk. Currently it is not possible to not write shuffle files to
> > disk, and typically it is not a problem because the network fetch
> throughput
> > is lower than what disks can sustain. In most cases, especially with
> SSDs,
> > there is little difference between putting all of those in memory and on
> > disk.
> >
> > However, it is becoming more common to run Spark on a few number of beefy
> > nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into
> improving
> > performance for those. Meantime, you can setup local ramdisks on each
> node
> > for shuffle writes.
> >
> >
> >
> > On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch 
> > wrote:
> >>
> >> Hello;
> >>
> >> I’m working on spark with very large memory systems (2TB+) and notice
> that
> >> Spark spills to disk in shuffle.  Is there a way to force spark to stay
> in
> >> memory when doing shuffle operations?   The goal is to keep the shuffle
> data
> >> either in the heap or in off-heap memory (in 1.6.x) and never touch the
> IO
> >> subsystem.  I am willing to have the job fail if it runs out of RAM.
> >>
> >> spark.shuffle.spill true  is deprecated in 1.6 and does not work in
> >> Tungsten sort in 1.5.x
> >>
> >> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but
> this
> >> is ignored by the tungsten-sort shuffle manager; its optimized shuffles
> will
> >> continue to spill to disk when necessary.”
> >>
> >> If this is impossible via configuration changes what code changes would
> be
> >> needed to accomplish this?
> >>
> >>
> >>
> >>
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
> >
>
> --
> Michael Slavitch
> 62 Renfrew Ave.
> Ottawa Ontario
> K1S 1Z5
>
>
>


RE: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Yong Zhang
Is there a configuration in the Spark of location of "shuffle spilling"? I 
didn't recall ever see that one. Can you share it out?
It will be good for a test writing to RAM Disk if that configuration is 
available.
Thanks
Yong

From: r...@databricks.com
Date: Fri, 1 Apr 2016 15:32:23 -0700
Subject: Re: Eliminating shuffle write and spill disk IO reads/writes in Spark
To: slavi...@gmail.com
CC: mri...@gmail.com; d...@spark.apache.org; user@spark.apache.org

Michael - I'm not sure if you actually read my email, but spill has nothing to 
do with the shuffle files on disk. It was for the partitioning (i.e. sorting) 
process. If that flag is off, Spark will just run out of memory when data 
doesn't fit in memory. 

On Fri, Apr 1, 2016 at 3:28 PM, Michael Slavitch  wrote:
RAMdisk is a fine interim step but there is a lot of layers eliminated by 
keeping things in memory unless there is need for spillover.   At one time 
there was support for turning off spilling.  That was eliminated.  Why? 

On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan  wrote:
I think Reynold's suggestion of using ram disk would be a good way to

test if these are the bottlenecks or something else is.

For most practical purposes, pointing local dir to ramdisk should

effectively give you 'similar' performance as shuffling from memory.



Are there concerns with taking that approach to test ? (I dont see

any, but I am not sure if I missed something).





Regards,

Mridul









On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch  wrote:

> I totally disagree that it’s not a problem.

>

> - Network fetch throughput on 40G Ethernet exceeds the throughput of NVME

> drives.

> - What Spark is depending on is Linux’s IO cache as an effective buffer pool

> This is fine for small jobs but not for jobs with datasets in the TB/node

> range.

> - On larger jobs flushing the cache causes Linux to block.

> - On a modern 56-hyperthread 2-socket host the latency caused by multiple

> executors writing out to disk increases greatly.

>

> I thought the whole point of Spark was in-memory computing?  It’s in fact

> in-memory for some things but  use spark.local.dir as a buffer pool of

> others.

>

> Hence, the performance of  Spark is gated by the performance of

> spark.local.dir, even on large memory systems.

>

> "Currently it is not possible to not write shuffle files to disk.”

>

> What changes >would< make it possible?

>

> The only one that seems possible is to clone the shuffle service and make it

> in-memory.

>

>

>

>

>

> On Apr 1, 2016, at 4:57 PM, Reynold Xin  wrote:

>

> spark.shuffle.spill actually has nothing to do with whether we write shuffle

> files to disk. Currently it is not possible to not write shuffle files to

> disk, and typically it is not a problem because the network fetch throughput

> is lower than what disks can sustain. In most cases, especially with SSDs,

> there is little difference between putting all of those in memory and on

> disk.

>

> However, it is becoming more common to run Spark on a few number of beefy

> nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into improving

> performance for those. Meantime, you can setup local ramdisks on each node

> for shuffle writes.

>

>

>

> On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch 

> wrote:

>>

>> Hello;

>>

>> I’m working on spark with very large memory systems (2TB+) and notice that

>> Spark spills to disk in shuffle.  Is there a way to force spark to stay in

>> memory when doing shuffle operations?   The goal is to keep the shuffle data

>> either in the heap or in off-heap memory (in 1.6.x) and never touch the IO

>> subsystem.  I am willing to have the job fail if it runs out of RAM.

>>

>> spark.shuffle.spill true  is deprecated in 1.6 and does not work in

>> Tungsten sort in 1.5.x

>>

>> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but this

>> is ignored by the tungsten-sort shuffle manager; its optimized shuffles will

>> continue to spill to disk when necessary.”

>>

>> If this is impossible via configuration changes what code changes would be

>> needed to accomplish this?

>>

>>

>>

>>

>>

>> -

>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

>> For additional commands, e-mail: user-h...@spark.apache.org

>>

>

>

-- 
Michael Slavitch62 Renfrew Ave.
Ottawa Ontario 
K1S 1Z5

  

Re: Scala: Perform Unit Testing in spark

2016-04-01 Thread Shishir Anshuman
When I added *"org.apache.spark" % "spark-core_2.10" % "1.6.0",  *it should
include spark-core_2.10-1.6.1-tests.jar.
Why do I need to use the jar file explicitly?

And how do I use the jars for compiling with *sbt* and running the tests on
spark?


On Sat, Apr 2, 2016 at 3:46 AM, Ted Yu  wrote:

> You need to include the following jars:
>
> jar tvf ./core/target/spark-core_2.10-1.6.1-tests.jar | grep SparkFunSuite
>   1787 Thu Mar 03 09:06:14 PST 2016
> org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
>   1780 Thu Mar 03 09:06:14 PST 2016
> org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
>   3982 Thu Mar 03 09:06:14 PST 2016 org/apache/spark/SparkFunSuite.class
>
> jar tvf ./mllib/target/spark-mllib_2.10-1.6.1-tests.jar | grep
> MLlibTestSparkContext
>   1447 Thu Mar 03 09:53:54 PST 2016
> org/apache/spark/mllib/util/MLlibTestSparkContext.class
>   1704 Thu Mar 03 09:53:54 PST 2016
> org/apache/spark/mllib/util/MLlibTestSparkContext$class.class
>
> On Fri, Apr 1, 2016 at 3:07 PM, Shishir Anshuman <
> shishiranshu...@gmail.com> wrote:
>
>> I got the file ALSSuite.scala and trying to run it. I have copied the
>> file under *src/test/scala *in my project folder. When I run *sbt test*,
>> I get errors. I have attached the screenshot of the errors. Befor *sbt
>> test*, I am building the package with *sbt package*.
>>
>> Dependencies of *simple.sbt*:
>>
>>>
>>>
>>>
>>>
>>> *libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.10" %
>>> "1.6.0", "org.apache.spark" % "spark-mllib_2.10" % "1.6.0" )*
>>
>>
>>
>>
>> On Sat, Apr 2, 2016 at 2:21 AM, Ted Yu  wrote:
>>
>>> Assuming your code is written in Scala, I would suggest using ScalaTest.
>>>
>>> Please take a look at the XXSuite.scala files under mllib/
>>>
>>> On Fri, Apr 1, 2016 at 1:31 PM, Shishir Anshuman <
>>> shishiranshu...@gmail.com> wrote:
>>>
 Hello,

 I have a code written in scala using Mllib. I want to perform unit
 testing it. I cant decide between Junit 4 and ScalaTest.
 I am new to Spark. Please guide me how to proceed with the testing.

 Thank you.

>>>
>>>
>>
>


Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Reynold Xin
Michael - I'm not sure if you actually read my email, but spill has nothing
to do with the shuffle files on disk. It was for the partitioning (i.e.
sorting) process. If that flag is off, Spark will just run out of memory
when data doesn't fit in memory.


On Fri, Apr 1, 2016 at 3:28 PM, Michael Slavitch  wrote:

> RAMdisk is a fine interim step but there is a lot of layers eliminated by
> keeping things in memory unless there is need for spillover.   At one time
> there was support for turning off spilling.  That was eliminated.  Why?
>
>
> On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan  wrote:
>
>> I think Reynold's suggestion of using ram disk would be a good way to
>> test if these are the bottlenecks or something else is.
>> For most practical purposes, pointing local dir to ramdisk should
>> effectively give you 'similar' performance as shuffling from memory.
>>
>> Are there concerns with taking that approach to test ? (I dont see
>> any, but I am not sure if I missed something).
>>
>>
>> Regards,
>> Mridul
>>
>>
>>
>>
>> On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch 
>> wrote:
>> > I totally disagree that it’s not a problem.
>> >
>> > - Network fetch throughput on 40G Ethernet exceeds the throughput of
>> NVME
>> > drives.
>> > - What Spark is depending on is Linux’s IO cache as an effective buffer
>> pool
>> > This is fine for small jobs but not for jobs with datasets in the
>> TB/node
>> > range.
>> > - On larger jobs flushing the cache causes Linux to block.
>> > - On a modern 56-hyperthread 2-socket host the latency caused by
>> multiple
>> > executors writing out to disk increases greatly.
>> >
>> > I thought the whole point of Spark was in-memory computing?  It’s in
>> fact
>> > in-memory for some things but  use spark.local.dir as a buffer pool of
>> > others.
>> >
>> > Hence, the performance of  Spark is gated by the performance of
>> > spark.local.dir, even on large memory systems.
>> >
>> > "Currently it is not possible to not write shuffle files to disk.”
>> >
>> > What changes >would< make it possible?
>> >
>> > The only one that seems possible is to clone the shuffle service and
>> make it
>> > in-memory.
>> >
>> >
>> >
>> >
>> >
>> > On Apr 1, 2016, at 4:57 PM, Reynold Xin  wrote:
>> >
>> > spark.shuffle.spill actually has nothing to do with whether we write
>> shuffle
>> > files to disk. Currently it is not possible to not write shuffle files
>> to
>> > disk, and typically it is not a problem because the network fetch
>> throughput
>> > is lower than what disks can sustain. In most cases, especially with
>> SSDs,
>> > there is little difference between putting all of those in memory and on
>> > disk.
>> >
>> > However, it is becoming more common to run Spark on a few number of
>> beefy
>> > nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into
>> improving
>> > performance for those. Meantime, you can setup local ramdisks on each
>> node
>> > for shuffle writes.
>> >
>> >
>> >
>> > On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch 
>> > wrote:
>> >>
>> >> Hello;
>> >>
>> >> I’m working on spark with very large memory systems (2TB+) and notice
>> that
>> >> Spark spills to disk in shuffle.  Is there a way to force spark to
>> stay in
>> >> memory when doing shuffle operations?   The goal is to keep the
>> shuffle data
>> >> either in the heap or in off-heap memory (in 1.6.x) and never touch
>> the IO
>> >> subsystem.  I am willing to have the job fail if it runs out of RAM.
>> >>
>> >> spark.shuffle.spill true  is deprecated in 1.6 and does not work in
>> >> Tungsten sort in 1.5.x
>> >>
>> >> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but
>> this
>> >> is ignored by the tungsten-sort shuffle manager; its optimized
>> shuffles will
>> >> continue to spill to disk when necessary.”
>> >>
>> >> If this is impossible via configuration changes what code changes
>> would be
>> >> needed to accomplish this?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>> >
>>
> --
> Michael Slavitch
> 62 Renfrew Ave.
> Ottawa Ontario
> K1S 1Z5
>


Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Michael Slavitch
RAMdisk is a fine interim step but there is a lot of layers eliminated by
keeping things in memory unless there is need for spillover.   At one time
there was support for turning off spilling.  That was eliminated.  Why?

On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan  wrote:

> I think Reynold's suggestion of using ram disk would be a good way to
> test if these are the bottlenecks or something else is.
> For most practical purposes, pointing local dir to ramdisk should
> effectively give you 'similar' performance as shuffling from memory.
>
> Are there concerns with taking that approach to test ? (I dont see
> any, but I am not sure if I missed something).
>
>
> Regards,
> Mridul
>
>
>
>
> On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch 
> wrote:
> > I totally disagree that it’s not a problem.
> >
> > - Network fetch throughput on 40G Ethernet exceeds the throughput of NVME
> > drives.
> > - What Spark is depending on is Linux’s IO cache as an effective buffer
> pool
> > This is fine for small jobs but not for jobs with datasets in the TB/node
> > range.
> > - On larger jobs flushing the cache causes Linux to block.
> > - On a modern 56-hyperthread 2-socket host the latency caused by multiple
> > executors writing out to disk increases greatly.
> >
> > I thought the whole point of Spark was in-memory computing?  It’s in fact
> > in-memory for some things but  use spark.local.dir as a buffer pool of
> > others.
> >
> > Hence, the performance of  Spark is gated by the performance of
> > spark.local.dir, even on large memory systems.
> >
> > "Currently it is not possible to not write shuffle files to disk.”
> >
> > What changes >would< make it possible?
> >
> > The only one that seems possible is to clone the shuffle service and
> make it
> > in-memory.
> >
> >
> >
> >
> >
> > On Apr 1, 2016, at 4:57 PM, Reynold Xin  wrote:
> >
> > spark.shuffle.spill actually has nothing to do with whether we write
> shuffle
> > files to disk. Currently it is not possible to not write shuffle files to
> > disk, and typically it is not a problem because the network fetch
> throughput
> > is lower than what disks can sustain. In most cases, especially with
> SSDs,
> > there is little difference between putting all of those in memory and on
> > disk.
> >
> > However, it is becoming more common to run Spark on a few number of beefy
> > nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into
> improving
> > performance for those. Meantime, you can setup local ramdisks on each
> node
> > for shuffle writes.
> >
> >
> >
> > On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch 
> > wrote:
> >>
> >> Hello;
> >>
> >> I’m working on spark with very large memory systems (2TB+) and notice
> that
> >> Spark spills to disk in shuffle.  Is there a way to force spark to stay
> in
> >> memory when doing shuffle operations?   The goal is to keep the shuffle
> data
> >> either in the heap or in off-heap memory (in 1.6.x) and never touch the
> IO
> >> subsystem.  I am willing to have the job fail if it runs out of RAM.
> >>
> >> spark.shuffle.spill true  is deprecated in 1.6 and does not work in
> >> Tungsten sort in 1.5.x
> >>
> >> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but
> this
> >> is ignored by the tungsten-sort shuffle manager; its optimized shuffles
> will
> >> continue to spill to disk when necessary.”
> >>
> >> If this is impossible via configuration changes what code changes would
> be
> >> needed to accomplish this?
> >>
> >>
> >>
> >>
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
> >
>
-- 
Michael Slavitch
62 Renfrew Ave.
Ottawa Ontario
K1S 1Z5


Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
I adopted this approach

scala> val conf = new SparkConf().
 |  setAppName("StreamTest").
 |  setMaster("local[12]").
 |  set("spark.driver.allowMultipleContexts", "true").
 |  set("spark.hadoop.validateOutputSpecs", "false")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@321d96f7
scala> val ssc = new StreamingContext(conf, Seconds(60))
ssc: org.apache.spark.streaming.StreamingContext =
org.apache.spark.streaming.StreamingContext@5dbae9eb
scala> val kafkaParams = Map("metadata.broker.list" -> "rhes564:9092")
kafkaParams: scala.collection.immutable.Map[String,String] =
Map(metadata.broker.list -> rhes564:9092)
scala> val topics = Set("newtopic")
topics: scala.collection.immutable.Set[String] = Set(newtopic)
scala> val stream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics)
stream: org.apache.spark.streaming.dstream.InputDStream[(Nothing, Nothing)]
= org.apache.spark.streaming.kafka.DirectKafkaInputDStream@6d2d3b21

So that opens data stream. What next?

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 22:37, Mich Talebzadeh  wrote:

> yes I noticed that
>
> scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
> "rhes564:9092", "newtopic", 1)
>
> :52: error: overloaded method value createStream with
> alternatives:
>   (jssc:
> org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum:
> String,groupId: String,topics: java.util.Map[String,Integer],storageLevel:
> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,String]
> 
>   (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum:
> String,groupId: String,topics:
> scala.collection.immutable.Map[String,Int],storageLevel:
> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[(String,
> String)]
>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
> String, String, String, Int)
>  val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
> "rhes564:9092", "newtopic", 1)
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 1 April 2016 at 22:25, Cody Koeninger  wrote:
>
>> You're not passing valid Scala values.  rhes564:2181  without quotes
>> isn't a valid literal, newtopic isn't a list of strings, etc.
>>
>> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
>>  wrote:
>> > Thanks Cody.
>> >
>> > Can I use Receiver-based Approach here?
>> >
>> > I have created the topic newtopic as below
>> >
>> > ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181
>> > --replication-factor 1 --partitions 1 --topic newtopic
>> >
>> >
>> > This is basically what I am doing the Spark
>> >
>> > val lines = ssc.socketTextStream("rhes564", 2181)
>> >
>> > Which obviously not working
>> >
>> > This is what is suggested in the doc
>> >
>> > import org.apache.spark.streaming.kafka._
>> >
>> > val kafkaStream = KafkaUtils.createStream(streamingContext,
>> >  [ZK quorum], [consumer group id], [per-topic number of Kafka
>> partitions
>> > to consume])
>> >
>> > *is a list of one or more zookeeper servers that make
>> quorum
>> > *is the name of kafka consumer group
>> > *is a list of one or more kafka topics to consume from
>> > *is the number of threads the kafka consumer should use
>> >
>> > Now this comes back with error. onviously not passing parameters
>> correctly!
>> >
>> > scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
>> > rhes564:2181, rhes564:9092, newtopic 1)
>> > :1: error: identifier expected but integer literal found.
>> >val kafkaStream = KafkaUtils.createStream(streamingContext,
>> > rhes564:2181, rhes564:9092, newtopic 1)
>> >
>> >
>> >
>> >
>> >
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> >
>> >
>> > On 1 April 2016 at 21:13, Cody Koeninger  wrote:
>> >>
>> >> It looks like you're using a plain socket stream to connect to a
>> >> zookeeper port, which won't work.
>> >>
>> >>   Look at
>> spark.apache.org/docs/latest/streaming-kafka-integration.html
>> >>
>> >> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
>> >>  wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I am just testing Spark streaming with Kafka.
>> >> >
>> >> > Basically I am broadcasting 

Re: Scala: Perform Unit Testing in spark

2016-04-01 Thread Ted Yu
You need to include the following jars:

jar tvf ./core/target/spark-core_2.10-1.6.1-tests.jar | grep SparkFunSuite
  1787 Thu Mar 03 09:06:14 PST 2016
org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
  1780 Thu Mar 03 09:06:14 PST 2016
org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
  3982 Thu Mar 03 09:06:14 PST 2016 org/apache/spark/SparkFunSuite.class

jar tvf ./mllib/target/spark-mllib_2.10-1.6.1-tests.jar | grep
MLlibTestSparkContext
  1447 Thu Mar 03 09:53:54 PST 2016
org/apache/spark/mllib/util/MLlibTestSparkContext.class
  1704 Thu Mar 03 09:53:54 PST 2016
org/apache/spark/mllib/util/MLlibTestSparkContext$class.class

On Fri, Apr 1, 2016 at 3:07 PM, Shishir Anshuman 
wrote:

> I got the file ALSSuite.scala and trying to run it. I have copied the file
> under *src/test/scala *in my project folder. When I run *sbt test*, I get
> errors. I have attached the screenshot of the errors. Befor *sbt test*, I
> am building the package with *sbt package*.
>
> Dependencies of *simple.sbt*:
>
>>
>>
>>
>>
>> *libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.10" %
>> "1.6.0", "org.apache.spark" % "spark-mllib_2.10" % "1.6.0" )*
>
>
>
>
> On Sat, Apr 2, 2016 at 2:21 AM, Ted Yu  wrote:
>
>> Assuming your code is written in Scala, I would suggest using ScalaTest.
>>
>> Please take a look at the XXSuite.scala files under mllib/
>>
>> On Fri, Apr 1, 2016 at 1:31 PM, Shishir Anshuman <
>> shishiranshu...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have a code written in scala using Mllib. I want to perform unit
>>> testing it. I cant decide between Junit 4 and ScalaTest.
>>> I am new to Spark. Please guide me how to proceed with the testing.
>>>
>>> Thank you.
>>>
>>
>>
>


Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Mridul Muralidharan
I think Reynold's suggestion of using ram disk would be a good way to
test if these are the bottlenecks or something else is.
For most practical purposes, pointing local dir to ramdisk should
effectively give you 'similar' performance as shuffling from memory.

Are there concerns with taking that approach to test ? (I dont see
any, but I am not sure if I missed something).


Regards,
Mridul




On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch  wrote:
> I totally disagree that it’s not a problem.
>
> - Network fetch throughput on 40G Ethernet exceeds the throughput of NVME
> drives.
> - What Spark is depending on is Linux’s IO cache as an effective buffer pool
> This is fine for small jobs but not for jobs with datasets in the TB/node
> range.
> - On larger jobs flushing the cache causes Linux to block.
> - On a modern 56-hyperthread 2-socket host the latency caused by multiple
> executors writing out to disk increases greatly.
>
> I thought the whole point of Spark was in-memory computing?  It’s in fact
> in-memory for some things but  use spark.local.dir as a buffer pool of
> others.
>
> Hence, the performance of  Spark is gated by the performance of
> spark.local.dir, even on large memory systems.
>
> "Currently it is not possible to not write shuffle files to disk.”
>
> What changes >would< make it possible?
>
> The only one that seems possible is to clone the shuffle service and make it
> in-memory.
>
>
>
>
>
> On Apr 1, 2016, at 4:57 PM, Reynold Xin  wrote:
>
> spark.shuffle.spill actually has nothing to do with whether we write shuffle
> files to disk. Currently it is not possible to not write shuffle files to
> disk, and typically it is not a problem because the network fetch throughput
> is lower than what disks can sustain. In most cases, especially with SSDs,
> there is little difference between putting all of those in memory and on
> disk.
>
> However, it is becoming more common to run Spark on a few number of beefy
> nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into improving
> performance for those. Meantime, you can setup local ramdisks on each node
> for shuffle writes.
>
>
>
> On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch 
> wrote:
>>
>> Hello;
>>
>> I’m working on spark with very large memory systems (2TB+) and notice that
>> Spark spills to disk in shuffle.  Is there a way to force spark to stay in
>> memory when doing shuffle operations?   The goal is to keep the shuffle data
>> either in the heap or in off-heap memory (in 1.6.x) and never touch the IO
>> subsystem.  I am willing to have the job fail if it runs out of RAM.
>>
>> spark.shuffle.spill true  is deprecated in 1.6 and does not work in
>> Tungsten sort in 1.5.x
>>
>> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but this
>> is ignored by the tungsten-sort shuffle manager; its optimized shuffles will
>> continue to spill to disk when necessary.”
>>
>> If this is impossible via configuration changes what code changes would be
>> needed to accomplish this?
>>
>>
>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
yes I noticed that

scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
"rhes564:9092", "newtopic", 1)

:52: error: overloaded method value createStream with alternatives:
  (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum:
String,groupId: String,topics: java.util.Map[String,Integer],storageLevel:
org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,String]

  (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum:
String,groupId: String,topics:
scala.collection.immutable.Map[String,Int],storageLevel:
org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[(String,
String)]
 cannot be applied to (org.apache.spark.streaming.StreamingContext, String,
String, String, Int)
 val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
"rhes564:9092", "newtopic", 1)


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 22:25, Cody Koeninger  wrote:

> You're not passing valid Scala values.  rhes564:2181  without quotes
> isn't a valid literal, newtopic isn't a list of strings, etc.
>
> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
>  wrote:
> > Thanks Cody.
> >
> > Can I use Receiver-based Approach here?
> >
> > I have created the topic newtopic as below
> >
> > ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181
> > --replication-factor 1 --partitions 1 --topic newtopic
> >
> >
> > This is basically what I am doing the Spark
> >
> > val lines = ssc.socketTextStream("rhes564", 2181)
> >
> > Which obviously not working
> >
> > This is what is suggested in the doc
> >
> > import org.apache.spark.streaming.kafka._
> >
> > val kafkaStream = KafkaUtils.createStream(streamingContext,
> >  [ZK quorum], [consumer group id], [per-topic number of Kafka
> partitions
> > to consume])
> >
> > *is a list of one or more zookeeper servers that make
> quorum
> > *is the name of kafka consumer group
> > *is a list of one or more kafka topics to consume from
> > *is the number of threads the kafka consumer should use
> >
> > Now this comes back with error. onviously not passing parameters
> correctly!
> >
> > scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
> > rhes564:2181, rhes564:9092, newtopic 1)
> > :1: error: identifier expected but integer literal found.
> >val kafkaStream = KafkaUtils.createStream(streamingContext,
> > rhes564:2181, rhes564:9092, newtopic 1)
> >
> >
> >
> >
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> >
> >
> > On 1 April 2016 at 21:13, Cody Koeninger  wrote:
> >>
> >> It looks like you're using a plain socket stream to connect to a
> >> zookeeper port, which won't work.
> >>
> >>   Look at spark.apache.org/docs/latest/streaming-kafka-integration.html
> >>
> >> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
> >>  wrote:
> >> >
> >> > Hi,
> >> >
> >> > I am just testing Spark streaming with Kafka.
> >> >
> >> > Basically I am broadcasting topic every minute to Host:port ->
> >> > rhes564:2181.
> >> > This is sending few lines through a shell script as follows:
> >> >
> >> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
> >> > --broker-list
> >> > rhes564:9092 --topic newtopic
> >> >
> >> > That works fine and I can see the messages in
> >> >
> >> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
> >> > --topic
> >> > newtopic
> >> >
> >> > Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
> >> >
> >> >
> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
> >> >
> >> >
> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
> >> >
> >> >
> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
> >> >
> >> >
> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
> >> >
> >> >
> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
> >> >
> >> > Now I try to see the topic in spark streaming as follows:
> >> >
> >> > val conf = new SparkConf().
> >> >  setAppName("StreamTest").
> >> >  setMaster("local[12]").
> >> >  set("spark.driver.allowMultipleContexts", "true").
> >> >  set("spark.hadoop.validateOutputSpecs", "false")
> >> > val sc = new SparkContext(conf)
> >> > // 

Re: Spark streaming issue

2016-04-01 Thread Cody Koeninger
You're not passing valid Scala values.  rhes564:2181  without quotes
isn't a valid literal, newtopic isn't a list of strings, etc.

On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
 wrote:
> Thanks Cody.
>
> Can I use Receiver-based Approach here?
>
> I have created the topic newtopic as below
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181
> --replication-factor 1 --partitions 1 --topic newtopic
>
>
> This is basically what I am doing the Spark
>
> val lines = ssc.socketTextStream("rhes564", 2181)
>
> Which obviously not working
>
> This is what is suggested in the doc
>
> import org.apache.spark.streaming.kafka._
>
> val kafkaStream = KafkaUtils.createStream(streamingContext,
>  [ZK quorum], [consumer group id], [per-topic number of Kafka partitions
> to consume])
>
> *is a list of one or more zookeeper servers that make quorum
> *is the name of kafka consumer group
> *is a list of one or more kafka topics to consume from
> *is the number of threads the kafka consumer should use
>
> Now this comes back with error. onviously not passing parameters correctly!
>
> scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
> rhes564:2181, rhes564:9092, newtopic 1)
> :1: error: identifier expected but integer literal found.
>val kafkaStream = KafkaUtils.createStream(streamingContext,
> rhes564:2181, rhes564:9092, newtopic 1)
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 1 April 2016 at 21:13, Cody Koeninger  wrote:
>>
>> It looks like you're using a plain socket stream to connect to a
>> zookeeper port, which won't work.
>>
>>   Look at spark.apache.org/docs/latest/streaming-kafka-integration.html
>>
>> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
>>  wrote:
>> >
>> > Hi,
>> >
>> > I am just testing Spark streaming with Kafka.
>> >
>> > Basically I am broadcasting topic every minute to Host:port ->
>> > rhes564:2181.
>> > This is sending few lines through a shell script as follows:
>> >
>> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
>> > --broker-list
>> > rhes564:9092 --topic newtopic
>> >
>> > That works fine and I can see the messages in
>> >
>> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
>> > --topic
>> > newtopic
>> >
>> > Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
>> >
>> > 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
>> >
>> > 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
>> >
>> > 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
>> >
>> > 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
>> >
>> > 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
>> >
>> > Now I try to see the topic in spark streaming as follows:
>> >
>> > val conf = new SparkConf().
>> >  setAppName("StreamTest").
>> >  setMaster("local[12]").
>> >  set("spark.driver.allowMultipleContexts", "true").
>> >  set("spark.hadoop.validateOutputSpecs", "false")
>> > val sc = new SparkContext(conf)
>> > // Create sqlContext based on HiveContext
>> > val sqlContext = new HiveContext(sc)
>> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> > //
>> > // Create a local StreamingContext with two working thread and batch
>> > interval of 1 second.
>> > // The master requires 2 cores to prevent from a starvation scenario.
>> > val ssc = new StreamingContext(conf, Minutes(1))
>> > // Create a DStream that will connect to hostname:port, like
>> > localhost:
>> > //val lines = ssc.socketTextStream("rhes564", 9092)
>> > val lines = ssc.socketTextStream("rhes564", 2181)
>> > // Split each line into words
>> > val words = lines.flatMap(_.split(" "))
>> > val pairs = words.map(word => (word, 1))
>> > val wordCounts = pairs.reduceByKey(_ + _)
>> > // Print the first ten elements of each RDD generated in this DStream to
>> > the
>> > console
>> > wordCounts.print()
>> > ssc.start()
>> >
>> > This is what I am getting:
>> >
>> >
>> > scala> ---
>> > Time: 145954176 ms
>> > ---
>> >
>> > But no values
>> >
>> > Have I got the port wrong in this case or the set up is incorrect?
>> >
>> >
>> > Thanks
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>
>


Spark Text Streaming Does not Recognize Folder using RegEx

2016-04-01 Thread Rachana Srivastava
Hello All,

I have written a simple program to get data from

JavaDStream textStream = jssc.textFileStream();
JavaDStream ceRDD = textStream.map(
new Function() {
public String call(String ceData) throws Exception {
System.out.println(ceData);
}
});
}

My code works file when we pass complete path of the input directory 
 = 
hdfs://quickstart.cloudera:8020//user/cloudera/CE/Output/OUTPUTYarnClusterCEQ/2016-04-01/4489867359541/
 WORKS Fine.
But
 = 
hdfs://quickstart.cloudera:8020/user/cloudera/CE/Output/OUTPUTYarnClusterCEQ/2016-04-01/*/
 DOES NOT WORK
When we pass the folder name using regEx then i am getting the exception below.

Exception

16/04/01 13:48:40 WARN FileInputDStream: Error finding new files
java.io.FileNotFoundException: File 
hdfs://quickstart.cloudera:8020/user/cloudera/CE/Output/OUTPUTYarnClusterCEQ/2016-04-01/*
 does not exist.
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:704)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:762)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:758)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:758)


Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Michael Slavitch
I totally disagree that it’s not a problem.

- Network fetch throughput on 40G Ethernet exceeds the throughput of NVME 
drives.
- What Spark is depending on is Linux’s IO cache as an effective buffer pool  
This is fine for small jobs but not for jobs with datasets in the TB/node range.
- On larger jobs flushing the cache causes Linux to block.
- On a modern 56-hyperthread 2-socket host the latency caused by multiple 
executors writing out to disk increases greatly. 

I thought the whole point of Spark was in-memory computing?  It’s in fact 
in-memory for some things but  use spark.local.dir as a buffer pool of others.  

Hence, the performance of  Spark is gated by the performance of 
spark.local.dir, even on large memory systems.

"Currently it is not possible to not write shuffle files to disk.”

What changes >would< make it possible?

The only one that seems possible is to clone the shuffle service and make it 
in-memory.





> On Apr 1, 2016, at 4:57 PM, Reynold Xin  wrote:
> 
> spark.shuffle.spill actually has nothing to do with whether we write shuffle 
> files to disk. Currently it is not possible to not write shuffle files to 
> disk, and typically it is not a problem because the network fetch throughput 
> is lower than what disks can sustain. In most cases, especially with SSDs, 
> there is little difference between putting all of those in memory and on disk.
> 
> However, it is becoming more common to run Spark on a few number of beefy 
> nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into improving 
> performance for those. Meantime, you can setup local ramdisks on each node 
> for shuffle writes.
> 
> 
> 
> On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch  > wrote:
> Hello;
> 
> I’m working on spark with very large memory systems (2TB+) and notice that 
> Spark spills to disk in shuffle.  Is there a way to force spark to stay in 
> memory when doing shuffle operations?   The goal is to keep the shuffle data 
> either in the heap or in off-heap memory (in 1.6.x) and never touch the IO 
> subsystem.  I am willing to have the job fail if it runs out of RAM.
> 
> spark.shuffle.spill true  is deprecated in 1.6 and does not work in Tungsten 
> sort in 1.5.x
> 
> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but this is 
> ignored by the tungsten-sort shuffle manager; its optimized shuffles will 
> continue to spill to disk when necessary.”
> 
> If this is impossible via configuration changes what code changes would be 
> needed to accomplish this?
> 
> 
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
Thanks Cody.

Can I use Receiver-based Approach here?

I have created the topic newtopic as below

${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181
--replication-factor 1 --partitions 1 --topic newtopic

This is basically what I am doing the Spark

val lines = ssc.socketTextStream("rhes564", 2181)

Which obviously not working

This is what is suggested in the doc

import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,
 [ZK quorum], [consumer group id], [per-topic number of Kafka
partitions to consume])

*is a list of one or more zookeeper servers that make quorum
*is the name of kafka consumer group
*is a list of one or more kafka topics to consume from
*is the number of threads the kafka consumer should use

Now this comes back with error. onviously not passing parameters correctly!

scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
rhes564:2181, rhes564:9092, newtopic 1)
:1: error: identifier expected but integer literal found.
   val kafkaStream = KafkaUtils.createStream(streamingContext,
rhes564:2181, rhes564:9092, newtopic 1)






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 21:13, Cody Koeninger  wrote:

> It looks like you're using a plain socket stream to connect to a
> zookeeper port, which won't work.
>
>   Look at spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
>  wrote:
> >
> > Hi,
> >
> > I am just testing Spark streaming with Kafka.
> >
> > Basically I am broadcasting topic every minute to Host:port ->
> rhes564:2181.
> > This is sending few lines through a shell script as follows:
> >
> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
> --broker-list
> > rhes564:9092 --topic newtopic
> >
> > That works fine and I can see the messages in
> >
> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
> --topic
> > newtopic
> >
> > Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
> >
> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
> >
> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
> >
> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
> >
> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
> >
> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
> >
> > Now I try to see the topic in spark streaming as follows:
> >
> > val conf = new SparkConf().
> >  setAppName("StreamTest").
> >  setMaster("local[12]").
> >  set("spark.driver.allowMultipleContexts", "true").
> >  set("spark.hadoop.validateOutputSpecs", "false")
> > val sc = new SparkContext(conf)
> > // Create sqlContext based on HiveContext
> > val sqlContext = new HiveContext(sc)
> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> > //
> > // Create a local StreamingContext with two working thread and batch
> > interval of 1 second.
> > // The master requires 2 cores to prevent from a starvation scenario.
> > val ssc = new StreamingContext(conf, Minutes(1))
> > // Create a DStream that will connect to hostname:port, like
> localhost:
> > //val lines = ssc.socketTextStream("rhes564", 9092)
> > val lines = ssc.socketTextStream("rhes564", 2181)
> > // Split each line into words
> > val words = lines.flatMap(_.split(" "))
> > val pairs = words.map(word => (word, 1))
> > val wordCounts = pairs.reduceByKey(_ + _)
> > // Print the first ten elements of each RDD generated in this DStream to
> the
> > console
> > wordCounts.print()
> > ssc.start()
> >
> > This is what I am getting:
> >
> >
> > scala> ---
> > Time: 145954176 ms
> > ---
> >
> > But no values
> >
> > Have I got the port wrong in this case or the set up is incorrect?
> >
> >
> > Thanks
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
>


[Spark SQL]: UDF with Array[Double] as input

2016-04-01 Thread Jerry Lam
Hi spark users and developers,

Anyone tried to pass in an Array[Double] as a input to the UDF? I tried it
for many hours reading spark sql code but IK still couldn't figure out a
way to do this.

Best Regards,

Jerry


Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Reynold Xin
spark.shuffle.spill actually has nothing to do with whether we write
shuffle files to disk. Currently it is not possible to not write shuffle
files to disk, and typically it is not a problem because the network fetch
throughput is lower than what disks can sustain. In most cases, especially
with SSDs, there is little difference between putting all of those in
memory and on disk.

However, it is becoming more common to run Spark on a few number of beefy
nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into
improving performance for those. Meantime, you can setup local ramdisks on
each node for shuffle writes.



On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch 
wrote:

> Hello;
>
> I’m working on spark with very large memory systems (2TB+) and notice that
> Spark spills to disk in shuffle.  Is there a way to force spark to stay in
> memory when doing shuffle operations?   The goal is to keep the shuffle
> data either in the heap or in off-heap memory (in 1.6.x) and never touch
> the IO subsystem.  I am willing to have the job fail if it runs out of RAM.
>
> spark.shuffle.spill true  is deprecated in 1.6 and does not work in
> Tungsten sort in 1.5.x
>
> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but this
> is ignored by the tungsten-sort shuffle manager; its optimized shuffles
> will continue to spill to disk when necessary.”
>
> If this is impossible via configuration changes what code changes would be
> needed to accomplish this?
>
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Scala: Perform Unit Testing in spark

2016-04-01 Thread Holden Karau
You can also look at spark-testing-base which works in both Scalatest and
Junit and see if that works for your use case.

On Friday, April 1, 2016, Ted Yu  wrote:

> Assuming your code is written in Scala, I would suggest using ScalaTest.
>
> Please take a look at the XXSuite.scala files under mllib/
>
> On Fri, Apr 1, 2016 at 1:31 PM, Shishir Anshuman <
> shishiranshu...@gmail.com
> > wrote:
>
>> Hello,
>>
>> I have a code written in scala using Mllib. I want to perform unit
>> testing it. I cant decide between Junit 4 and ScalaTest.
>> I am new to Spark. Please guide me how to proceed with the testing.
>>
>> Thank you.
>>
>
>

-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Scala: Perform Unit Testing in spark

2016-04-01 Thread Ted Yu
Assuming your code is written in Scala, I would suggest using ScalaTest.

Please take a look at the XXSuite.scala files under mllib/

On Fri, Apr 1, 2016 at 1:31 PM, Shishir Anshuman 
wrote:

> Hello,
>
> I have a code written in scala using Mllib. I want to perform unit testing
> it. I cant decide between Junit 4 and ScalaTest.
> I am new to Spark. Please guide me how to proceed with the testing.
>
> Thank you.
>


Re: Support for time column type?

2016-04-01 Thread Michael Armbrust
There is also CalendarIntervalType.  Is that what you are looking for?

On Fri, Apr 1, 2016 at 1:11 PM, Philip Weaver 
wrote:

> Hi, I don't see any mention of a time type in the documentation (there is
> DateType and TimestampType, but not TimeType), and have been unable to find
> any documentation about whether this will be supported in the future. Does
> anyone know if this is currently supported or will be supported in the
> future?
>


Scala: Perform Unit Testing in spark

2016-04-01 Thread Shishir Anshuman
Hello,

I have a code written in scala using Mllib. I want to perform unit testing
it. I cant decide between Junit 4 and ScalaTest.
I am new to Spark. Please guide me how to proceed with the testing.

Thank you.


Re: Spark streaming issue

2016-04-01 Thread Cody Koeninger
It looks like you're using a plain socket stream to connect to a
zookeeper port, which won't work.

  Look at spark.apache.org/docs/latest/streaming-kafka-integration.html

On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
 wrote:
>
> Hi,
>
> I am just testing Spark streaming with Kafka.
>
> Basically I am broadcasting topic every minute to Host:port -> rhes564:2181.
> This is sending few lines through a shell script as follows:
>
> cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list
> rhes564:9092 --topic newtopic
>
> That works fine and I can see the messages in
>
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 --topic
> newtopic
>
> Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
>
> Now I try to see the topic in spark streaming as follows:
>
> val conf = new SparkConf().
>  setAppName("StreamTest").
>  setMaster("local[12]").
>  set("spark.driver.allowMultipleContexts", "true").
>  set("spark.hadoop.validateOutputSpecs", "false")
> val sc = new SparkContext(conf)
> // Create sqlContext based on HiveContext
> val sqlContext = new HiveContext(sc)
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> //
> // Create a local StreamingContext with two working thread and batch
> interval of 1 second.
> // The master requires 2 cores to prevent from a starvation scenario.
> val ssc = new StreamingContext(conf, Minutes(1))
> // Create a DStream that will connect to hostname:port, like localhost:
> //val lines = ssc.socketTextStream("rhes564", 9092)
> val lines = ssc.socketTextStream("rhes564", 2181)
> // Split each line into words
> val words = lines.flatMap(_.split(" "))
> val pairs = words.map(word => (word, 1))
> val wordCounts = pairs.reduceByKey(_ + _)
> // Print the first ten elements of each RDD generated in this DStream to the
> console
> wordCounts.print()
> ssc.start()
>
> This is what I am getting:
>
>
> scala> ---
> Time: 145954176 ms
> ---
>
> But no values
>
> Have I got the port wrong in this case or the set up is incorrect?
>
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Support for time column type?

2016-04-01 Thread Philip Weaver
Hi, I don't see any mention of a time type in the documentation (there is
DateType and TimestampType, but not TimeType), and have been unable to find
any documentation about whether this will be supported in the future. Does
anyone know if this is currently supported or will be supported in the
future?


Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
Hi,

I am just testing Spark streaming with Kafka.

Basically I am broadcasting topic every minute to Host:port
-> rhes564:2181. This is sending few lines through a shell script as
follows:

cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list
rhes564:9092 --topic newtopic

That works fine and I can see the messages in

${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
--topic newtopic

Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2
yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105

Now I try to see the topic in spark streaming as follows:

val conf = new SparkConf().
 setAppName("StreamTest").
 setMaster("local[12]").
 set("spark.driver.allowMultipleContexts", "true").
 set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
// Create sqlContext based on HiveContext
val sqlContext = new HiveContext(sc)
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
//
// Create a local StreamingContext with two working thread and batch
interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val ssc = new StreamingContext(conf, Minutes(1))
// Create a DStream that will connect to hostname:port, like localhost:
//val lines = ssc.socketTextStream("rhes564", 9092)
val lines = ssc.socketTextStream("rhes564", 2181)
// Split each line into words
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to
the console
wordCounts.print()
ssc.start()

This is what I am getting:


scala> ---
Time: 145954176 ms
---

But no values

Have I got the port wrong in this case or the set up is incorrect?


Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: Problem with jackson lib running on spark

2016-04-01 Thread Marcelo Oikawa
Hi, list.

Just to close the thread. Unfortunately, I didnt solve the jackson lib
problem but I did a workaround that works fine for me. Perhaps this help
another one.

The problem raised from this line when I try to create tranquilizer object
(used to connect to Druid) using this utility *fromConfig*.

tranquilizer = 
DruidBeams.fromConfig(dataSourceConfig).buildTranquilizer(tranquilizerBuider);

Instead, I create the tranquilizer object-by-object as showed below:

DruidDimensions dimentions = getDimensions(props);
List aggregators = getAggregations(props);

TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
Timestamper> timestamper = map -> new
DateTime(map.get("timestamp"));
DruidLocation druidLocation = DruidLocation.create("overlord",
"druid:firehose:%s", dataSource);
DruidRollup druidRollup = DruidRollup.create(dimentions, aggregators,
QueryGranularity.ALL);


ClusteredBeamTuning clusteredBeamTuning = ClusteredBeamTuning.builder()

.segmentGranularity(Granularity.FIVE_MINUTE)
.windowPeriod(new
Period("PT60m"))
.partitions(1)
.replicants(1)
.build();

tranquilizer = DruidBeams.builder(timestamper)
   .curator(buildCurator(props))
   .discoveryPath("/druid/discovery")
   .location(druidLocation)
   .timestampSpec(timestampSpec)
   .rollup(druidRollup)
   .tuning(clusteredBeamTuning)
   .buildTranquilizer();

tranquilizer.start();

That worked for me. Thank you Ted, Alonso and other users.


On Thu, Mar 31, 2016 at 4:08 PM, Marcelo Oikawa  wrote:

>
> Please exclude jackson-databind - that was where the AnnotationMap class
>> comes from.
>>
>
> I tried as you suggest but i getting the same error. Seems strange because
> when I see the generated jar there is nothing related as AnnotationMap but
> there is a databind there.
>
>
> ​
>
>
>>
>> On Thu, Mar 31, 2016 at 11:37 AM, Marcelo Oikawa <
>> marcelo.oik...@webradar.com> wrote:
>>
>>> Hi, Alonso.
>>>
>>> As you can see jackson-core is provided by several libraries, try to
 exclude it from spark-core, i think the minor version is included within
 it.

>>>
>>> There is no more than one jackson-core provides by spark-core. There are
>>> jackson-core and jackson-core-asl but are differents artifacts. BTW, I
>>> tried to exclude then but no sucess. Same error:
>>>
>>> java.lang.IllegalAccessError: tried to access method
>>> com.fasterxml.jackson.databind.introspect.AnnotatedMember.getAllAnnotations()Lcom/fasterxml/jackson/databind/introspect/AnnotationMap;
>>> from class
>>> com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector
>>> at
>>> com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector.findInjectableValueId(GuiceAnnotationIntrospector.java:39)
>>> at
>>> com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.findInjectableValueId(AnnotationIntrospectorPair.java:269)
>>> at
>>> com.fasterxml.jackson.databind.deser.BasicDeserializerFactory._addDeserializerConstructors(BasicDeserializerFactory.java:433)
>>> ...
>>>
>>> I guess the problem is incopabilities between jackson artifacts that
>>> comes from tranquility dependency vs spark prodided but I also tried to
>>> find same jackson artifacts but in different versions but there is no one.
>>> What is missing?
>>>
>>>
>>> Use this guide to see how to do it:


 https://maven.apache.org/guides/introduction/introduction-to-optional-and-excludes-dependencies.html



 Alonso Isidoro Roman.

 Mis citas preferidas (de hoy) :
 "Si depurar es el proceso de quitar los errores de software, entonces
 programar debe ser el proceso de introducirlos..."
  -  Edsger Dijkstra

 My favorite quotes (today):
 "If debugging is the process of removing software bugs, then
 programming must be the process of putting ..."
   - Edsger Dijkstra

 "If you pay peanuts you get monkeys"


 2016-03-31 20:01 GMT+02:00 Marcelo Oikawa 
 :

> Hey, Alonso.
>
> here is the output:
>
> [INFO] spark-processor:spark-processor-druid:jar:1.0-SNAPSHOT
> [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.6.1:provided
> [INFO] |  +- org.apache.spark:spark-core_2.10:jar:1.6.1:provided
> [INFO] |  |  +- org.apache.avro:avro-mapred:jar:hadoop2:1.7.7:provided
> [INFO] |  |  |  +- org.apache.avro:avro-ipc:jar:1.7.7:provided
> [INFO] |  |  |  \- org.apache.avro:avro-ipc:jar:tests:1.7.7:provided
> [INFO] |  |  +- com.twitter:chill_2.10:jar:0.5.0:provided
> [INFO] |  |  |  \- com.esotericsoftware.kryo:kryo:jar:2.21:provided
> [INFO] |  |  | +-

Re: Where to set properties for the retainedJobs/Stages?

2016-04-01 Thread Ted Yu
Please read
https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties
w.r.t. spark-defaults.conf

On Fri, Apr 1, 2016 at 12:06 PM, Max Schmidt  wrote:

> Yes but doc doesn't say any word for which variable the configs are valid,
> so do I have to set them for the history-server? The daemon? The workers?
>
> And what if I use the java API instead of spark-submit for the jobs?
>
> I guess that the spark-defaults.conf are obsolete for the java API?
>
>
> Am 2016-04-01 18:58, schrieb Ted Yu:
>
>> You can set them in spark-defaults.conf
>>
>> See also https://spark.apache.org/docs/latest/configuration.html#spark-ui
>> [1]
>>
>> On Fri, Apr 1, 2016 at 8:26 AM, Max Schmidt  wrote:
>>
>> Can somebody tell me the interaction between the properties:
>>>
>>> spark.ui.retainedJobs
>>> spark.ui.retainedStages
>>> spark.history.retainedApplications
>>>
>>> I know from the bugtracker, that the last one describes the number of
>>> applications the history-server holds in memory.
>>>
>>> Can I set the properties in the spark-env.sh? And where?
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>
>>
>> Links:
>> --
>> [1] https://spark.apache.org/docs/latest/configuration.html#spark-ui
>>
>
>
>
>


Re: Spark Metrics Framework?

2016-04-01 Thread Yiannis Gkoufas
Hi Mike,

I am forwarding you a mail I sent a while ago regarding some related work I
did, hope you find it useful

Hi all,

I recently sent to the dev mailing list about this contribution, but I
thought it might be useful to post it here, since I have seen a lot of
people asking about OS-level metrics of Spark. This is the result of the
work we have been doing recently in IBM Research around Spark.

Essentially, we have extended Spark metrics system to utilize Hyperic Sigar
library to capture OS-level metrics and modified the Web UI to visualize
those metrics per application.

The above functionalities can be configured in the metrics.properties and
spark-defaults.conf files.

We have recorded a small demo that shows those capabilities which you can
find here :https
://
ibm.app.box.com

/s/vyaedlyb444a4zna1215c7puhxliqxdg


There is a blog post which gives more details on the functionality here:
www.spark.tc

/

sparkoscope-enabling-spark-optimization-through-

cross-stack-monitoring-and-visualization-2

/


and also there is a public repo where anyone can try it: https
://
github.com
/
ibm-research-ireland
/
sparkoscope


Hope someone finds it useful!

Thanks a lot!

Yiannis
On 1 Apr 2016 19:10, "Mike Sukmanowsky"  wrote:

> Thanks Silvio, JIRA submitted
> https://issues.apache.org/jira/browse/SPARK-14332.
>
> On Fri, 25 Mar 2016 at 12:46 Silvio Fiorito 
> wrote:
>
>> Hi Mike,
>>
>> Sorry got swamped with work and didn’t get a chance to reply.
>>
>> I misunderstood what you were trying to do. I thought you were just
>> looking to create custom metrics vs looking for the existing Hadoop Output
>> Format counters.
>>
>> I’m not familiar enough with the Hadoop APIs but I think it would require
>> a change to the SparkHadoopWriter
>> 
>> class since it generates the JobContext which is required to read the
>> counters. Then it could publish the counters to the Spark metrics system.
>>
>> I would suggest going ahead and submitting a JIRA request if there isn’t
>> one already.
>>
>> Thanks,
>> Silvio
>>
>> From: Mike Sukmanowsky 
>> Date: Friday, March 25, 2016 at 10:48 AM
>>
>> To: Silvio Fiorito , "
>> user@spark.apache.org" 
>> Subject: Re: Spark Metrics Framework?
>>
>> Pinging again - any thoughts?
>>
>> On Wed, 23 Mar 2016 at 09:17 Mike Sukmanowsky 
>> wrote:
>>
>>> Thanks Ted and Silvio. I think I'll need a bit more hand holding here,
>>> sorry. The way we use ES Hadoop is in pyspark via
>>> org.elasticsearch.hadoop.mr.EsOutputFormat in a saveAsNewAPIHadoopFile
>>> call. Given the Hadoop interop, I wouldn't assume that the EsOutputFormat
>>> class
>>> 
>>> could be modified to define a new Source and register it via
>>> MetricsSystem.createMetricsSystem. This feels like a good feature request
>>> for Spark actually: "Support Hadoop Counters in Input/OutputFormats as
>>> Spark metrics" but I wanted some feedback first to see if that makes sense.
>>>
>>> That said, some of the custom RDD classes
>>> 
>>>  could
>>> probably be modified to register a new Source when they perform
>>> reading/writing from/to Elasticsearch.
>>>
>>> On Tue, 22 Mar 2016 at 15:17 Silvio Fiorito <
>>> silvio.fior...@granturing.com> wrote:
>>>
 Hi Mike,

 It’s been a while since I worked on a custom Source but I think all you
 need to do is make your Source 

In-Memory Only Spark Shuffle

2016-04-01 Thread slavitch
Hello;

I’m working on spark with very large memory systems (2TB+) and notice that
Spark spills to disk in shuffle.  Is there a way to force spark to stay
exclusively in memory when doing shuffle operations?   The goal is to keep
the shuffle data either in the heap or in off-heap memory (in 1.6.x) and
never touch the IO subsystem.  I am willing to have the job fail if it runs
out of RAM.

spark.shuffle.spill true  is deprecated in 1.6 and does not work in Tungsten
sort in 1.5.x

"WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but this
is ignored by the tungsten-sort shuffle manager; its optimized shuffles will
continue to spill to disk when necessary.”

If this is impossible via configuration changes what code changes would be
needed to accomplish this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/In-Memory-Only-Spark-Shuffle-tp26661.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OutOfMemory with wide (289 column) dataframe

2016-04-01 Thread Ted Yu
bq. This was a big help!

The email (maybe only addressed to you) didn't come with your latest reply.

Do you mind sharing it ?

Thanks

On Fri, Apr 1, 2016 at 11:37 AM, ludflu  wrote:

> This was a big help! For the benefit of my fellow travelers running spark
> on
> EMR:
>
> I made a json file with the following:
>
> [ { "Classification": "yarn-site", "Properties": {
> "yarn.nodemanager.pmem-check-enabled": "false",
> "yarn.nodemanager.vmem-check-enabled": "false" } } ]
>
> and then I created my cluster like so:
>
> aws emr create-cluster --configurations
> file:///Users/jsnavely/project/frick/spark_config/nomem.json
> ...
>
> The other thing I noticed was that one of the dataframes I was joining
> against was actually coming from
> a gzip'd json file. gzip files are NOT splittable, so it wasn't properly
> parallelized, which means that the join were causing alot of memory
> pressure. I recompressed it was bzip2 and my job has been running with no
> errors.
>
> Thanks again!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-with-wide-289-column-dataframe-tp26651p26660.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


writing partitioned parquet files

2016-04-01 Thread Imran Akbar
Hi,

I'm reading in a CSV file, and I would like to write it back as a permanent
table, but with particular partitioning by year, etc.

Currently I do this:

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
df =
sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true').load('/Users/imran/Downloads/intermediate.csv')
df.saveAsTable("intermediate")

Which works great.

I also know I can do this:
df.write.partitionBy("year").parquet("path/to/output")

But how do I combine the two, to save a permanent table with partitioning,
in Parquet format?

thanks,
imran


Re: Where to set properties for the retainedJobs/Stages?

2016-04-01 Thread Max Schmidt
Yes but doc doesn't say any word for which variable the configs are 
valid, so do I have to set them for the history-server? The daemon? The 
workers?


And what if I use the java API instead of spark-submit for the jobs?

I guess that the spark-defaults.conf are obsolete for the java API?


Am 2016-04-01 18:58, schrieb Ted Yu:

You can set them in spark-defaults.conf

See 
also https://spark.apache.org/docs/latest/configuration.html#spark-ui 
[1]


On Fri, Apr 1, 2016 at 8:26 AM, Max Schmidt  wrote:


Can somebody tell me the interaction between the properties:

spark.ui.retainedJobs
spark.ui.retainedStages
spark.history.retainedApplications

I know from the bugtracker, that the last one describes the number 
of

applications the history-server holds in memory.

Can I set the properties in the spark-env.sh? And where?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Links:
--
[1] https://spark.apache.org/docs/latest/configuration.html#spark-ui





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Disk Full on one Worker is leading to Job Stuck and Executor Unresponsive

2016-04-01 Thread Abhishek Anand
Hi Ted,

Any thoughts on this ???

I am getting the same kind of error when I kill a worker on one of the
machines.
Even after killing the worker using kill -9 command, the executor shows up
on the spark UI with negative active tasks.

All the tasks on that worker starts to fail with the following exception.


16/04/01 23:54:20 WARN TaskSetManager: Lost task 141.0 in stage 19859.0
(TID 190333, 192.168.33.96): java.io.IOException: Failed to connect to /
192.168.33.97:63276
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /
192.168.33.97:63276
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more




Cheers !!
Abhi

On Fri, Apr 1, 2016 at 9:04 AM, Abhishek Anand 
wrote:

> This is what I am getting in the executor logs
>
> 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
> reverting partial writes to file
> /data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
> java.io.IOException: No space left on device
> at java.io.FileOutputStream.writeBytes(Native Method)
> at java.io.FileOutputStream.write(FileOutputStream.java:315)
> at
> org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:274)
>
>
>
> It happens every time the disk is full.
>
> On Fri, Apr 1, 2016 at 2:18 AM, Ted Yu  wrote:
>
>> Can you show the stack trace ?
>>
>> The log message came from
>> DiskBlockObjectWriter#revertPartialWritesAndClose().
>> Unfortunately, the method doesn't throw exception, making it a bit hard
>> for caller to know of the disk full condition.
>>
>> On Thu, Mar 31, 2016 at 11:32 AM, Abhishek Anand > > wrote:
>>
>>>
>>> Hi,
>>>
>>> Why is it so that when my disk space is full on one of the workers then
>>> the executor on that worker becomes unresponsive and the jobs on that
>>> worker fails with the exception
>>>
>>>
>>> 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
>>> reverting partial writes to file
>>> /data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
>>> java.io.IOException: No space left on device
>>>
>>>
>>> This is leading to my job getting stuck.
>>>
>>> As a workaround I have to kill the executor, clear the space on disk and
>>> new executor  relaunched by the worker and the failed stages are recomputed.
>>>
>>>
>>> How can I get rid of this problem i.e why my job get stuck on disk full
>>> issue on one of the workers ?
>>>
>>>
>>> Cheers !!!
>>> Abhi
>>>
>>>
>>
>


Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Michael Slavitch
Hello;

I’m working on spark with very large memory systems (2TB+) and notice that 
Spark spills to disk in shuffle.  Is there a way to force spark to stay in 
memory when doing shuffle operations?   The goal is to keep the shuffle data 
either in the heap or in off-heap memory (in 1.6.x) and never touch the IO 
subsystem.  I am willing to have the job fail if it runs out of RAM.

spark.shuffle.spill true  is deprecated in 1.6 and does not work in Tungsten 
sort in 1.5.x

"WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but this is 
ignored by the tungsten-sort shuffle manager; its optimized shuffles will 
continue to spill to disk when necessary.”

If this is impossible via configuration changes what code changes would be 
needed to accomplish this?





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OutOfMemory with wide (289 column) dataframe

2016-04-01 Thread ludflu
This was a big help! For the benefit of my fellow travelers running spark on
EMR:

I made a json file with the following:

[ { "Classification": "yarn-site", "Properties": {
"yarn.nodemanager.pmem-check-enabled": "false",
"yarn.nodemanager.vmem-check-enabled": "false" } } ]

and then I created my cluster like so:

aws emr create-cluster --configurations
file:///Users/jsnavely/project/frick/spark_config/nomem.json
...

The other thing I noticed was that one of the dataframes I was joining
against was actually coming from
a gzip'd json file. gzip files are NOT splittable, so it wasn't properly
parallelized, which means that the join were causing alot of memory
pressure. I recompressed it was bzip2 and my job has been running with no
errors.

Thanks again!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-with-wide-289-column-dataframe-tp26651p26660.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkML RandomForest java.lang.StackOverflowError

2016-04-01 Thread Joseph Bradley
Can you try reducing maxBins?  That reduces communication (at the cost of
coarser discretization of continuous features).

On Fri, Apr 1, 2016 at 11:32 AM, Joseph Bradley 
wrote:

> In my experience, 20K is a lot but often doable; 2K is easy; 200 is
> small.  Communication scales linearly in the number of features.
>
> On Thu, Mar 31, 2016 at 6:12 AM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> Joseph,
>>
>> Correction, there 20k features. Is it still a lot?
>> What number of features can be considered as normal?
>>
>> --
>> Be well!
>> Jean Morozov
>>
>> On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley 
>> wrote:
>>
>>> First thought: 70K features is *a lot* for the MLlib implementation (and
>>> any PLANET-like implementation)
>>>
>>> Using fewer partitions is a good idea.
>>>
>>> Which Spark version was this on?
>>>
>>> On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov <
>>> evgeny.a.moro...@gmail.com> wrote:
>>>
 The questions I have in mind:

 Is it smth that the one might expect? From the stack trace itself it's
 not clear where does it come from.
 Is it an already known bug? Although I haven't found anything like that.
 Is it possible to configure something to workaround / avoid this?

 I'm not sure it's the right thing to do, but I've
 increased thread stack size 10 times (to 80MB)
 reduced default parallelism 10 times (only 20 cores are available)

 Thank you in advance.

 --
 Be well!
 Jean Morozov

 On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov <
 evgeny.a.moro...@gmail.com> wrote:

> Hi,
>
> I have a web service that provides rest api to train random forest
> algo.
> I train random forest on a 5 nodes spark cluster with enough memory -
> everything is cached (~22 GB).
> On a small datasets up to 100k samples everything is fine, but with
> the biggest one (400k samples and ~70k features) I'm stuck with
> StackOverflowError.
>
> Additional options for my web service
> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192"
> spark.default.parallelism = 200.
>
> On a 400k samples dataset
> - (with default thread stack size) it took 4 hours of training to get
> the error.
> - with increased stack size it took 60 hours to hit it.
> I can increase it, but it's hard to say what amount of memory it needs
> and it's applied to all of the treads and might waste a lot of memory.
>
> I'm looking at different stages at event timeline now and see that
> task deserialization time gradually increases. And at the end task
> deserialization time is roughly same as executor computing time.
>
> Code I use to train model:
>
> int MAX_BINS = 16;
> int NUM_CLASSES = 0;
> double MIN_INFO_GAIN = 0.0;
> int MAX_MEMORY_IN_MB = 256;
> double SUBSAMPLING_RATE = 1.0;
> boolean USE_NODEID_CACHE = true;
> int CHECKPOINT_INTERVAL = 10;
> int RANDOM_SEED = 12345;
>
> int NODE_SIZE = 5;
> int maxDepth = 30;
> int numTrees = 50;
> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), 
> maxDepth, NUM_CLASSES, MAX_BINS,
> QuantileStrategy.Sort(), new 
> scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN,
> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, 
> CHECKPOINT_INTERVAL);
> RandomForestModel model = 
> RandomForest.trainRegressor(labeledPoints.rdd(), strategy, numTrees, 
> "auto", RANDOM_SEED);
>
>
> Any advice would be highly appreciated.
>
> The exception (~3000 lines long):
>  java.lang.StackOverflowError
> at
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320)
> at
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333)
> at
> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828)
> at
> java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> 

Re: SparkML RandomForest java.lang.StackOverflowError

2016-04-01 Thread Joseph Bradley
In my experience, 20K is a lot but often doable; 2K is easy; 200 is small.
Communication scales linearly in the number of features.

On Thu, Mar 31, 2016 at 6:12 AM, Eugene Morozov 
wrote:

> Joseph,
>
> Correction, there 20k features. Is it still a lot?
> What number of features can be considered as normal?
>
> --
> Be well!
> Jean Morozov
>
> On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley 
> wrote:
>
>> First thought: 70K features is *a lot* for the MLlib implementation (and
>> any PLANET-like implementation)
>>
>> Using fewer partitions is a good idea.
>>
>> Which Spark version was this on?
>>
>> On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov <
>> evgeny.a.moro...@gmail.com> wrote:
>>
>>> The questions I have in mind:
>>>
>>> Is it smth that the one might expect? From the stack trace itself it's
>>> not clear where does it come from.
>>> Is it an already known bug? Although I haven't found anything like that.
>>> Is it possible to configure something to workaround / avoid this?
>>>
>>> I'm not sure it's the right thing to do, but I've
>>> increased thread stack size 10 times (to 80MB)
>>> reduced default parallelism 10 times (only 20 cores are available)
>>>
>>> Thank you in advance.
>>>
>>> --
>>> Be well!
>>> Jean Morozov
>>>
>>> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov <
>>> evgeny.a.moro...@gmail.com> wrote:
>>>
 Hi,

 I have a web service that provides rest api to train random forest
 algo.
 I train random forest on a 5 nodes spark cluster with enough memory -
 everything is cached (~22 GB).
 On a small datasets up to 100k samples everything is fine, but with the
 biggest one (400k samples and ~70k features) I'm stuck with
 StackOverflowError.

 Additional options for my web service
 spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192"
 spark.default.parallelism = 200.

 On a 400k samples dataset
 - (with default thread stack size) it took 4 hours of training to get
 the error.
 - with increased stack size it took 60 hours to hit it.
 I can increase it, but it's hard to say what amount of memory it needs
 and it's applied to all of the treads and might waste a lot of memory.

 I'm looking at different stages at event timeline now and see that task
 deserialization time gradually increases. And at the end task
 deserialization time is roughly same as executor computing time.

 Code I use to train model:

 int MAX_BINS = 16;
 int NUM_CLASSES = 0;
 double MIN_INFO_GAIN = 0.0;
 int MAX_MEMORY_IN_MB = 256;
 double SUBSAMPLING_RATE = 1.0;
 boolean USE_NODEID_CACHE = true;
 int CHECKPOINT_INTERVAL = 10;
 int RANDOM_SEED = 12345;

 int NODE_SIZE = 5;
 int maxDepth = 30;
 int numTrees = 50;
 Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), 
 maxDepth, NUM_CLASSES, MAX_BINS,
 QuantileStrategy.Sort(), new 
 scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN,
 MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, 
 CHECKPOINT_INTERVAL);
 RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), 
 strategy, numTrees, "auto", RANDOM_SEED);


 Any advice would be highly appreciated.

 The exception (~3000 lines long):
  java.lang.StackOverflowError
 at
 java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320)
 at
 java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333)
 at
 java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828)
 at
 java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453)
 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 scala.collection.immutable.$colon$colon.readObject(List.scala:366)
 at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at
 

Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Michael Slavitch
Hello;

I’m working on spark with very large memory systems (2TB+) and notice that 
Spark spills to disk in shuffle.  Is there a way to force spark to stay in 
memory when doing shuffle operations?   The goal is to keep the shuffle data 
either in the heap or in off-heap memory (in 1.6.x) and never touch the IO 
subsystem.  I am willing to have the job fail if it runs out of RAM.

spark.shuffle.spill true  is deprecated in 1.6 and does not work in Tungsten 
sort in 1.5.x

"WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but this is 
ignored by the tungsten-sort shuffle manager; its optimized shuffles will 
continue to spill to disk when necessary.”

If this is impossible via configuration changes what code changes would be 
needed to accomplish this?





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SocketTimeoutException

2016-04-01 Thread Sergey
Hi!


I get SocketTimeoutException when execute piece of code first time.

When I re-run it - it works fine. The code just reads csv file and
transforms it to dataframe. Any ideas abot the reason?


import pyspark_csv as pycsv
plaintext_rdd = sc.textFile(r'file:///c:\data\sample.csv')
dataframe = pycsv.csvToDataFrame(sql, plaintext_rdd)


csv file size is 50MB.



I use Spark 1.6.1 on Windows locally.


Regards,

Sergey.




Py4JJavaError Traceback (most recent call
last) in ()  2
plaintext_rdd = sc.textFile(r'file:///c:\data\sample.csv')  3
#plaintext_rdd.count()> 4 dataframe = pycsv.csvToDataFrame(sql,
plaintext_rdd)
C:\Users\sergun\AppData\Local\Temp\spark-f4884acc-f8fd-416b-836b-2adebfd8e027\userFiles-02d6b0ee-569a-41ee-bd80-ccff2e09725e\pyspark_csv.py
in csvToDataFrame(sqlCtx, rdd, columns, sep, parseDate)
c:\spark\python\pyspark\rdd.py in first(self)   1313
ValueError: RDD is empty   1314 """-> 1315 rs =
self.take(1)   1316 if rs:   1317 return rs[0]
c:\spark\python\pyspark\rdd.py in take(self, num)   12951296
  p = range(partsScanned, min(partsScanned + numPartsToTry,
totalParts))-> 1297 res = self.context.runJob(self,
takeUpToNumLeft, p)   12981299 items += res
c:\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc,
partitions, allowLocal)937 # SparkContext#runJob.938
  mappedRDD = rdd.mapPartitions(partitionFunc)--> 939 port
= self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd,
partitions)940 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))941
c:\spark\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py in
__call__(self, *args)811 answer =
self.gateway_client.send_command(command)812 return_value
= get_return_value(--> 813 answer, self.gateway_client,
self.target_id, self.name)814 815 for temp_arg in
temp_args:
c:\spark\python\pyspark\sql\utils.py in deco(*a, **kw) 43 def
deco(*a, **kw): 44 try:---> 45 return f(*a,
**kw) 46 except py4j.protocol.Py4JJavaError as e: 47
  s = e.java_exception.toString()
c:\spark\python\lib\py4j-0.9-src.zip\py4j\protocol.py in
get_return_value(answer, gateway_client, target_id, name)306
  raise Py4JJavaError(307 "An error
occurred while calling {0}{1}{2}.\n".--> 308
format(target_id, ".", name), value)309 else:310
  raise Py4JError(
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0
in stage 0.0 (TID 0, localhost): org.apache.spark.SparkException:
Python worker did not connect back in time
at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:136)
at 
org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:134)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.

: Accept timed out
at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
at 
java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
at 
java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:131)
... 12 more

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at 

Re: Spark Metrics Framework?

2016-04-01 Thread Mike Sukmanowsky
Thanks Silvio, JIRA submitted
https://issues.apache.org/jira/browse/SPARK-14332.

On Fri, 25 Mar 2016 at 12:46 Silvio Fiorito 
wrote:

> Hi Mike,
>
> Sorry got swamped with work and didn’t get a chance to reply.
>
> I misunderstood what you were trying to do. I thought you were just
> looking to create custom metrics vs looking for the existing Hadoop Output
> Format counters.
>
> I’m not familiar enough with the Hadoop APIs but I think it would require
> a change to the SparkHadoopWriter
> 
> class since it generates the JobContext which is required to read the
> counters. Then it could publish the counters to the Spark metrics system.
>
> I would suggest going ahead and submitting a JIRA request if there isn’t
> one already.
>
> Thanks,
> Silvio
>
> From: Mike Sukmanowsky 
> Date: Friday, March 25, 2016 at 10:48 AM
>
> To: Silvio Fiorito , "user@spark.apache.org"
> 
> Subject: Re: Spark Metrics Framework?
>
> Pinging again - any thoughts?
>
> On Wed, 23 Mar 2016 at 09:17 Mike Sukmanowsky 
> wrote:
>
>> Thanks Ted and Silvio. I think I'll need a bit more hand holding here,
>> sorry. The way we use ES Hadoop is in pyspark via
>> org.elasticsearch.hadoop.mr.EsOutputFormat in a saveAsNewAPIHadoopFile
>> call. Given the Hadoop interop, I wouldn't assume that the EsOutputFormat
>> class
>> 
>> could be modified to define a new Source and register it via
>> MetricsSystem.createMetricsSystem. This feels like a good feature request
>> for Spark actually: "Support Hadoop Counters in Input/OutputFormats as
>> Spark metrics" but I wanted some feedback first to see if that makes sense.
>>
>> That said, some of the custom RDD classes
>> 
>>  could
>> probably be modified to register a new Source when they perform
>> reading/writing from/to Elasticsearch.
>>
>> On Tue, 22 Mar 2016 at 15:17 Silvio Fiorito <
>> silvio.fior...@granturing.com> wrote:
>>
>>> Hi Mike,
>>>
>>> It’s been a while since I worked on a custom Source but I think all you
>>> need to do is make your Source in the org.apache.spark package.
>>>
>>> Thanks,
>>> Silvio
>>>
>>> From: Mike Sukmanowsky 
>>> Date: Tuesday, March 22, 2016 at 3:13 PM
>>> To: Silvio Fiorito , "
>>> user@spark.apache.org" 
>>> Subject: Re: Spark Metrics Framework?
>>>
>>> The Source class is private
>>> 
>>> to the spark package and any new Sources added to the metrics registry must
>>> be of type Source
>>> .
>>> So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1
>>> code, but the same is true in 1.6.1.
>>>
>>> On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito <
>>> silvio.fior...@granturing.com> wrote:
>>>
 You could use the metric sources and sinks described here:
 http://spark.apache.org/docs/latest/monitoring.html#metrics

 If you want to push the metrics to another system you can define a
 custom sink. You can also extend the metrics by defining a custom source.

 From: Mike Sukmanowsky 
 Date: Monday, March 21, 2016 at 11:54 AM
 To: "user@spark.apache.org" 
 Subject: Spark Metrics Framework?

 We make extensive use of the elasticsearch-hadoop library for
 Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be
 very handy to have access to some of the many metrics
 
 that the library makes available when running in map reduce mode. The 
 library's
 author noted
 
 that Spark doesn't offer any kind of a similar metrics API where by these
 metrics could be reported or aggregated on.

 Are there any plans to bring a metrics framework similar to Hadoop's
 Counter system to Spark or is there an alternative means for us to grab
 metrics exposed when using Hadoop APIs to load/save RDDs?

 Thanks,
 Mike

>>>


strange behavior of pyspark RDD zip

2016-04-01 Thread Sergey
Hi!

I'm on Spark 1.6.1 in local mode on Windows.

And have issue with zip of zip'pping of two RDDs of __equal__ size and
__equal__ partitions number (I also tried to repartition both RDDs to one
partition).
I get such exception when I do rdd1.zip(rdd2).count():

File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main
  File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 106, in process
  File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line
263, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File "c:\spark\python\pyspark\rddsampler.py", line 95, in func
for obj in iterator:
  File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line
322, in load_stream
" in pair: (%d, %d)" % (len(keys), len(vals)))
ValueError: Can not deserialize RDD with different number of items in
pair: (256, 512)


can spark-csv package accept strings instead of files?

2016-04-01 Thread Benjamin Kim
Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
data strings. Each string representing the header row and multiple rows of data 
along with delimiters. I would like to feed each thru a CSV parser to convert 
the data into a dataframe and, ultimately, UPSERT a Hive/HBase table with this 
data.

Please let me know if you have any ideas.

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Logistic regression throwing errors

2016-04-01 Thread James Hammerton
Hi,

On a particular .csv data set - which I can use in WEKA's logistic
regression implementation without any trouble, I'm getting errors like the
following:

16/04/01 18:04:18 ERROR LBFGS: Failure! Resetting history:
> breeze.optimize.FirstOrderException: Line search failed

These errors cause the learning to fail - f1 = 0.

Anyone got any idea why this might happen?

Regards,

James


Re: Where to set properties for the retainedJobs/Stages?

2016-04-01 Thread Ted Yu
You can set them in spark-defaults.conf

See also https://spark.apache.org/docs/latest/configuration.html#spark-ui

On Fri, Apr 1, 2016 at 8:26 AM, Max Schmidt  wrote:

> Can somebody tell me the interaction between the properties:
>
> spark.ui.retainedJobs
> spark.ui.retainedStages
> spark.history.retainedApplications
>
> I know from the bugtracker, that the last one describes the number of
> applications the history-server holds in memory.
>
> Can I set the properties in the spark-env.sh? And where?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Thread-safety of a SparkListener

2016-04-01 Thread Marcelo Vanzin
On Fri, Apr 1, 2016 at 9:23 AM, Truong Duc Kien  wrote:
> I need to gather some metrics using a SparkListener. Does the callback
> methods need to thread-safe or they are always call from the same thread ?

The callbacks are all fired on the same thread. Just be careful not to
block that thread for too long or Spark will start to drop events.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Thread-safety of a SparkListener

2016-04-01 Thread Truong Duc Kien
I need to process the events related to Task, Stage and Executor.

Regards,
Kien Truong

On Fri, Apr 1, 2016 at 5:34 PM, Ted Yu  wrote:

> In general, you should implement thread-safety in your code.
>
> Which set of events are you interested in ?
>
> Cheers
>
> On Fri, Apr 1, 2016 at 9:23 AM, Truong Duc Kien 
> wrote:
>
>> Hi,
>>
>> I need to gather some metrics using a SparkListener. Does the callback
>> methods need to thread-safe or they are always call from the same thread ?
>>
>> Thanks,
>> Kien Truong
>>
>
>


Re: Thread-safety of a SparkListener

2016-04-01 Thread Ted Yu
In general, you should implement thread-safety in your code.

Which set of events are you interested in ?

Cheers

On Fri, Apr 1, 2016 at 9:23 AM, Truong Duc Kien 
wrote:

> Hi,
>
> I need to gather some metrics using a SparkListener. Does the callback
> methods need to thread-safe or they are always call from the same thread ?
>
> Thanks,
> Kien Truong
>


Thread-safety of a SparkListener

2016-04-01 Thread Truong Duc Kien
Hi,

I need to gather some metrics using a SparkListener. Does the callback
methods need to thread-safe or they are always call from the same thread ?

Thanks,
Kien Truong


Re: Spark process creating and writing to a Hive ORC table

2016-04-01 Thread Mich Talebzadeh
yes this is feasible.

You can use databricks jar file to loas csv files from staging directory.
This is pretty standard

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("hdfs://xx:9000/data/stg/")

You can then create an ORC table from Spark


// Specify Hive DB namw
sql("use accounts")
//
// Drop and create table ll_18740868 .Prefix the tablename with Hive
database name
//
sql("DROP TABLE IF EXISTS accounts.ll_18740868")
var sqltext : String = ""
sqltext = """
CREATE TABLE *accounts.ll_18740868* (
TransactionDateDATE
,TransactionType   String
,SortCode  String
)
COMMENT 'from csv file from excel sheet'
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB" )
"""
sql(sqltext)

/
// Put data in Hive table say from a spark temporary table called say "tmp"
here
//
sqltext = """
INSERT INTO TABLE *accounts.ll_18740868*
SELECT

TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))
AS TransactionDate
, TransactionType
, SortCode
 FROM tmp
"""
sql(sqltext)

That is it. The above will create and populate an ORC table for you in Hive
database

With regard to the use of Parquet as an optimum file for Spark, well I am
not sure. It depends on the use case. I personally prefer an ORC file.
Having said that Parquet seems to be common in Spark. Personally I would
rather store a file as a Hive table for better manageability.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 31 March 2016 at 22:00, Ashok Kumar  wrote:

> Hello,
>
> How feasible is to use Spark to extract csv files and creates and writes
> the content to an ORC table in a Hive database.
>
> Is Parquet file the best (optimum) format to write to HDFS from Spark app.
>
> Thanks
>


Where to set properties for the retainedJobs/Stages?

2016-04-01 Thread Max Schmidt
Can somebody tell me the interaction between the properties:

spark.ui.retainedJobs
spark.ui.retainedStages
spark.history.retainedApplications

I know from the bugtracker, that the last one describes the number of
applications the history-server holds in memory.

Can I set the properties in the spark-env.sh? And where?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Execution error during ALS execution in spark

2016-04-01 Thread pankajrawat
Thanks for suggestion, but our application is still crashing 

*Description: * flatMap at MatrixFactorizationModel.scala:278

*Failure Reason: * Job aborted due to stage failure: Task 1 in stage 6.0
failed 4 times, most recent failure: Lost task 1.3 in stage 6.0 (TID 116,
dev.local): ExecutorLostFailure (executor 11 lost)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Execution-error-during-ALS-execution-in-spark-tp26644p26659.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SPARK-13900 - Join with simple OR conditions take too long

2016-04-01 Thread ashokkumar rajendran
I agree with Hemant's comment. But it does not give good results for simple
usecases like 2 OR conditions. Ultimately we need good results from Spark
for end users. shall we consider this as a request to support SQL hints
then? Is there any plan to support SQL hint in Spark in upcoming release?

Regards
Ashok

On Fri, Apr 1, 2016 at 5:04 PM, Robin East  wrote:

> Yes and even today CBO (e.g. in Oracle) will still require hints in some
> cases so I think it is more like:
>
> RBO -> RBO + Hints -> CBO + Hints. Most relational databases meet
> significant numbers of corner cases where CBO plans simply don’t do what
> you would want. I don’t know enough about Spark SQL to comment on whether
> the same problems would afflict Spark.
>
>
>
>
> On 31 Mar 2016, at 15:54, Yong Zhang  wrote:
>
> I agree that there won't be a generic solution for these kind of cases.
>
> Without the CBO from Spark or Hadoop ecosystem in short future, maybe
> Spark DataFrame/SQL should support more hints from the end user, as in
> these cases, end users will be smart enough to tell the engine what is the
> correct way to do.
>
> Weren't the relational DBs doing exactly same path? RBO -> RBO + Hints ->
> CBO?
>
> Yong
>
> --
> Date: Thu, 31 Mar 2016 16:07:14 +0530
> Subject: Re: SPARK-13900 - Join with simple OR conditions take too long
> From: hemant9...@gmail.com
> To: ashokkumar.rajend...@gmail.com
> CC: user@spark.apache.org
>
> Hi Ashok,
>
> That's interesting.
>
> As I understand, on table A and B, a nested loop join (that will produce m
> X n rows) is performed and than each row is evaluated to see if any of the
> condition is met. You are asking that Spark should instead do a
> BroadcastHashJoin on the equality conditions in parallel and then union the
> results like you are doing in a different query.
>
> If we leave aside parallelism for a moment, theoretically, time taken for
> nested loop join would vary little when the number of conditions are
> increased while the time taken for the solution that you are suggesting
> would increase linearly with number of conditions. So, when number of
> conditions are too many, nested loop join would be faster than the solution
> that you suggest. Now the question is, how should Spark decide when to do
> what?
>
>
> Hemant Bhanawat 
> www.snappydata.io
>
> On Thu, Mar 31, 2016 at 2:28 PM, ashokkumar rajendran <
> ashokkumar.rajend...@gmail.com> wrote:
>
> Hi,
>
> I have filed ticket SPARK-13900. There was an initial reply from a
> developer but did not get any reply on this. How can we do multiple hash
> joins together for OR conditions based joins? Could someone please guide on
> how can we fix this?
>
> Regards
> Ashok
>
>
>


Re: SPARK-13900 - Join with simple OR conditions take too long

2016-04-01 Thread Robin East
Yes and even today CBO (e.g. in Oracle) will still require hints in some cases 
so I think it is more like:

RBO -> RBO + Hints -> CBO + Hints. Most relational databases meet significant 
numbers of corner cases where CBO plans simply don’t do what you would want. I 
don’t know enough about Spark SQL to comment on whether the same problems would 
afflict Spark.




> On 31 Mar 2016, at 15:54, Yong Zhang  wrote:
> 
> I agree that there won't be a generic solution for these kind of cases.
> 
> Without the CBO from Spark or Hadoop ecosystem in short future, maybe Spark 
> DataFrame/SQL should support more hints from the end user, as in these cases, 
> end users will be smart enough to tell the engine what is the correct way to 
> do.
> 
> Weren't the relational DBs doing exactly same path? RBO -> RBO + Hints -> CBO?
> 
> Yong
> 
> Date: Thu, 31 Mar 2016 16:07:14 +0530
> Subject: Re: SPARK-13900 - Join with simple OR conditions take too long
> From: hemant9...@gmail.com 
> To: ashokkumar.rajend...@gmail.com 
> CC: user@spark.apache.org 
> 
> Hi Ashok,
> 
> That's interesting. 
> 
> As I understand, on table A and B, a nested loop join (that will produce m X 
> n rows) is performed and than each row is evaluated to see if any of the 
> condition is met. You are asking that Spark should instead do a 
> BroadcastHashJoin on the equality conditions in parallel and then union the 
> results like you are doing in a different query. 
> 
> If we leave aside parallelism for a moment, theoretically, time taken for 
> nested loop join would vary little when the number of conditions are 
> increased while the time taken for the solution that you are suggesting would 
> increase linearly with number of conditions. So, when number of conditions 
> are too many, nested loop join would be faster than the solution that you 
> suggest. Now the question is, how should Spark decide when to do what? 
> 
> 
> Hemant Bhanawat 
> www.snappydata.io  
> 
> On Thu, Mar 31, 2016 at 2:28 PM, ashokkumar rajendran 
> > 
> wrote:
> Hi,
> 
> I have filed ticket SPARK-13900. There was an initial reply from a developer 
> but did not get any reply on this. How can we do multiple hash joins together 
> for OR conditions based joins? Could someone please guide on how can we fix 
> this? 
> 
> Regards
> Ashok



[Spark-1.5.2]Spark Memory Issue while Saving to HDFS and Pheonix both

2016-04-01 Thread Divya Gehlot
Forgot to mention
I am using all DataFrame API instead of sqls to the operations

-- Forwarded message --
From: Divya Gehlot 
Date: 1 April 2016 at 18:35
Subject: [Spark-1.5.2]Spark Memory Issue while Saving to HDFS and Pheonix
both
To: "user @spark" 


[image: Mic Drop]
Hi,
I have Hadoop Hortonworks  3 NODE  Cluster on EC2 with
*Hadoop *version 2.7.x
*Spark *version - 1.5.2
*Phoenix *version - 4.4
*Hbase *version 1.1.x

*Cluster Statistics *
Date Node 1
OS: redhat7 (x86_64)Cores (CPU): 2 (2)Disk: 20.69GB/99.99GB (20.69% used)
Memory: 7.39GB
Date Node 2
Cores (CPU): 2 (2)Disk: 20.73GB/99.99GB (20.73% used)Memory: 7.39GBLoad Avg:
 0.00Heartbeat: a moment agoCurrent Version: 2.3.4.0-3485*NameNode*Rack:
 /default-rack OS: redhat7 (x86_64)Cores (CPU): 4 (4)Disk: 32.4GB/99.99GB
(32.4% used)Memory: 15.26GBLoad Avg: 0.78Heartbeat: a moment agoCurrent
Version: 2.3.4.0-3485

*Spark Queue Statistics *

> Queue State: RUNNING
> Used Capacity: 0.0%
> Configured Capacity: 100.0%
> Configured Max Capacity: 100.0%
> Absolute Used Capacity: 0.0%
> Absolute Configured Capacity: 100.0%
> Absolute Configured Max Capacity: 100.0%
> Used Resources: 
> Num Schedulable Applications: 0
> Num Non-Schedulable Applications: 0
> Num Containers: 0
> Max Applications: 1
> Max Applications Per User: 1
> Max Application Master Resources: 
> Used Application Master Resources: 
> Max Application Master Resources Per User: 
> Configured Minimum User Limit Percent: 100%
> Configured User Limit Factor: 1.0
> Accessible Node Labels: *
> Ordering Policy: FifoOrderingPolicy
> Preemption: disabled



I have spark scala script
which is doing many operations like reading from
DB(Phoenix),Join-Inner,LeftOuter join),unionAll and finally groupBy and
saving the result set to Phoenix/HDFS
Have created almost 20+ Dataframes for mentioned above operations.

stackTrace :

> 16/04/01 10:11:49 WARN TaskSetManager: Lost task 3.0 in stage 132.4 (TID
> 18401, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal):
> java.lang.OutOfMemoryError: PermGen space
> at sun.misc.Unsafe.defineClass(Native Method)
> at sun.reflect.ClassDefiner.defineClass(ClassDefiner.java:63)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
> at java.security.AccessController.doPrivileged(Native Method)
> at
> sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
> at
> sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
> at
> sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
> at
> java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
> at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
> at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)




For Phoenix : I am getting similar to below error in my stack trace

>
> $provider.DefaultSource does not allow user-specified schemas


The whole job is taking almost 3-4 minutes and for saving itself its taking
3-4 minutes whether it is Phoenix /HDFS

Could somebody help me resolving the above mentioned issue.

Would really appreciate the help.


Thanks,

Divya


[Spark-1.5.2]Spark Memory Issue while Saving to HDFS and Pheonix both

2016-04-01 Thread Divya Gehlot
[image: Mic Drop]
Hi,
I have Hadoop Hortonworks  3 NODE  Cluster on EC2 with
*Hadoop *version 2.7.x
*Spark *version - 1.5.2
*Phoenix *version - 4.4
*Hbase *version 1.1.x

*Cluster Statistics *
Date Node 1
OS: redhat7 (x86_64)Cores (CPU): 2 (2)Disk: 20.69GB/99.99GB (20.69% used)
Memory: 7.39GB
Date Node 2
Cores (CPU): 2 (2)Disk: 20.73GB/99.99GB (20.73% used)Memory: 7.39GBLoad Avg:
 0.00Heartbeat: a moment agoCurrent Version: 2.3.4.0-3485*NameNode*Rack:
 /default-rack OS: redhat7 (x86_64)Cores (CPU): 4 (4)Disk: 32.4GB/99.99GB
(32.4% used)Memory: 15.26GBLoad Avg: 0.78Heartbeat: a moment agoCurrent
Version: 2.3.4.0-3485

*Spark Queue Statistics *

> Queue State: RUNNING
> Used Capacity: 0.0%
> Configured Capacity: 100.0%
> Configured Max Capacity: 100.0%
> Absolute Used Capacity: 0.0%
> Absolute Configured Capacity: 100.0%
> Absolute Configured Max Capacity: 100.0%
> Used Resources: 
> Num Schedulable Applications: 0
> Num Non-Schedulable Applications: 0
> Num Containers: 0
> Max Applications: 1
> Max Applications Per User: 1
> Max Application Master Resources: 
> Used Application Master Resources: 
> Max Application Master Resources Per User: 
> Configured Minimum User Limit Percent: 100%
> Configured User Limit Factor: 1.0
> Accessible Node Labels: *
> Ordering Policy: FifoOrderingPolicy
> Preemption: disabled



I have spark scala script
which is doing many operations like reading from
DB(Phoenix),Join-Inner,LeftOuter join),unionAll and finally groupBy and
saving the result set to Phoenix/HDFS
Have created almost 20+ Dataframes for mentioned above operations.

stackTrace :

> 16/04/01 10:11:49 WARN TaskSetManager: Lost task 3.0 in stage 132.4 (TID
> 18401, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal):
> java.lang.OutOfMemoryError: PermGen space
> at sun.misc.Unsafe.defineClass(Native Method)
> at sun.reflect.ClassDefiner.defineClass(ClassDefiner.java:63)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
> at java.security.AccessController.doPrivileged(Native Method)
> at
> sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
> at
> sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
> at
> sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
> at
> java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
> at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
> at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)




For Phoenix : I am getting similar to below error in my stack trace

>
> $provider.DefaultSource does not allow user-specified schemas


The whole job is taking almost 3-4 minutes and for saving itself its taking
3-4 minutes whether it is Phoenix /HDFS

Could somebody help me resolving the above mentioned issue.

Would really appreciate the help.


Thanks,

Divya


Join FetchFailedException

2016-04-01 Thread nihed mbarek
Hi,
I have a big dataframe 100giga that I need to join with 3 others dataframes.

For the first join, it's ok
For the second, it's ok
But for the third, just after the big shuffle, before the execution of the
stage, I have an exception

org.apache.spark.shuffle.FetchFailedException:
java.io.FileNotFoundException:
/DEVD/data/fs1/hadoop/yarn/log/usercache//appcache/application_1459412930417_0309/blockmgr-ba9d4936-1de0-4205-9913-a579b520ab1f/1a/shuffle_5_65_0.index
(No such file or directory)


Any idea ??


Thank you



-- 

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com




Re: SPARK-13900 - Join with simple OR conditions take too long

2016-04-01 Thread Hemant Bhanawat
As Mich has already noticed, Spark defaults to NL join if there are more
than one condition. Oracle is probably doing cost-based optimizations in
this scenario. You can call it a bug but in my opinion it is an area where
Spark is still evolving.

>> Hemant has mentioned the nested loop time will be very little.
I had mentioned that NL time will *vary *little with more number of
conditions.  What I meant was that instead of 3 conditions if you would
have 15 conditions, the NL loop would still take 13-15 mins while the hash
join would take more than that.

Hemant

Hemant Bhanawat 
www.snappydata.io

On Fri, Apr 1, 2016 at 3:08 PM, ashokkumar rajendran <
ashokkumar.rajend...@gmail.com> wrote:

> Hi Mich,
>
> Thanks for the input.
>
> Yes, it seems to be a bug. Is it possible to fix this in next release?
>
> Regards
> Ashok
>
> On Fri, Apr 1, 2016 at 2:06 PM, Mich Talebzadeh  > wrote:
>
>> hm.
>>
>> Sounds like it ends up in Nested Loop Join (NLJ) as opposed to Hash Join
>> (HJ) when OR  is used for more than one predicate comparison.
>>
>> In below I have a table dummy created as ORC with 1 billion rows. Just
>> created another one called dummy1 with 60K rows
>>
>> A simple join results in Hash Join good!
>>
>> scala> sql("select d.id, d1.id from dummy d, dummy2 d1 where
>> d.random_string = d1.random_string").explain(true)
>>
>> == Physical Plan ==
>> Project [id#212,id#219]
>>
>> *+- BroadcastHashJoin [random_string#216], [random_string#223],
>> BuildRight*   :- ConvertToUnsafe
>>:  +- HiveTableScan [id#212,random_string#216], MetastoreRelation
>> test, dummy, Some(d)
>>+- ConvertToUnsafe
>>   +- HiveTableScan [id#219,random_string#223], MetastoreRelation
>> test, dummy2, Some(d1)
>>
>> When the join is done using OR on other predicates I see it starts doing
>> NLJ
>>
>> scala> sql("select d.id, d1.id from dummy d, dummy2 d1 where
>> d.random_string = d1.random_string OR d.small_vc =
>> d1.small_vc").explain(true)
>>
>> == Physical Plan ==
>> Project [id#241,id#248]
>> +- B*roadcastNestedLoopJoin *BuildRight, Inner, Some(((random_string#245
>> = random_string#252) || (small_vc#246 = small_vc#253)))
>>:- HiveTableScan [small_vc#246,id#241,random_string#245],
>> MetastoreRelation test, dummy, Some(d)
>>+- HiveTableScan [id#248,random_string#252,small_vc#253],
>> MetastoreRelation test, dummy2, Some(d1)
>>
>> in contrast the same identical tables in Oracle use Hash Join with OR
>> which is expected
>>
>> scratch...@mydb.mich.LOCAL> select d.id, d1.id from dummy d, dummy2 d1
>> where d.random_string = d1.random_string OR d.small_vc = d1.small_vc;
>>
>> Execution Plan
>> --
>> Plan hash value: 4163534687
>>
>> --
>> | Id  | Operation   | Name   | Rows  | Bytes |TempSpc| Cost
>> (%CPU)| Time |
>>
>> --
>> |   0 | SELECT STATEMENT|| 63207 |  8332K|   |  1280K
>> (1)| 04:16:05 |
>> |   1 |  CONCATENATION  ||   |   |   |
>> |  |
>> |*  2 |  * HASH JOIN *|| 60183 |  7934K|  4632K|   640K
>> (1)| 02:08:03 |
>> |   3 |TABLE ACCESS FULL| DUMMY2 | 6 |  3925K|   |   157
>> (1)| 00:00:02 |
>> |   4 |TABLE ACCESS FULL| DUMMY  |   100M|  6484M|   |   261K
>> (1)| 00:52:13 |
>> |*  5 |   *HASH JOIN *||  3024 |   398K|  4632K|   640K
>> (1)| 02:08:03 |
>> |   6 |TABLE ACCESS FULL| DUMMY2 | 6 |  3925K|   |   157
>> (1)| 00:00:02 |
>> |   7 |TABLE ACCESS FULL| DUMMY  |   100M|  6484M|   |   261K
>> (1)| 00:52:13 |
>>
>> --
>>
>> So this looks like a bug!
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 1 April 2016 at 04:53, ashokkumar rajendran <
>> ashokkumar.rajend...@gmail.com> wrote:
>>
>>> Thanks for the reply everyone.
>>>
>>> Let me provide more detail on the dataset as well.
>>> 1. The big table (A) contains more than 3 billion records in parquet
>>> format, which is few TBs.
>>> 2. The second table (B) is only of 60K rows which is less than 10MB.
>>> 3. The column on which I perform JOIN is mostly on the String datatype
>>> columns.
>>> 4. I used 20 machines that were of 16 cores each and 120GB RAMs for
>>> testing this.
>>>
>>> The pseudo OR query is as below.
>>>
>>> sql(Select field1, field2 from A, B where A.dimension1 = B.dimension1 OR 
>>> A.dimension2 = B.dimension2 OR A.dimension3 = B.dimension3 OR A.dimension4 
>>> = 

Re: SPARK-13900 - Join with simple OR conditions take too long

2016-04-01 Thread ashokkumar rajendran
Hi Mich,

Thanks for the input.

Yes, it seems to be a bug. Is it possible to fix this in next release?

Regards
Ashok

On Fri, Apr 1, 2016 at 2:06 PM, Mich Talebzadeh 
wrote:

> hm.
>
> Sounds like it ends up in Nested Loop Join (NLJ) as opposed to Hash Join
> (HJ) when OR  is used for more than one predicate comparison.
>
> In below I have a table dummy created as ORC with 1 billion rows. Just
> created another one called dummy1 with 60K rows
>
> A simple join results in Hash Join good!
>
> scala> sql("select d.id, d1.id from dummy d, dummy2 d1 where
> d.random_string = d1.random_string").explain(true)
>
> == Physical Plan ==
> Project [id#212,id#219]
>
> *+- BroadcastHashJoin [random_string#216], [random_string#223], BuildRight*
> :- ConvertToUnsafe
>:  +- HiveTableScan [id#212,random_string#216], MetastoreRelation test,
> dummy, Some(d)
>+- ConvertToUnsafe
>   +- HiveTableScan [id#219,random_string#223], MetastoreRelation test,
> dummy2, Some(d1)
>
> When the join is done using OR on other predicates I see it starts doing
> NLJ
>
> scala> sql("select d.id, d1.id from dummy d, dummy2 d1 where
> d.random_string = d1.random_string OR d.small_vc =
> d1.small_vc").explain(true)
>
> == Physical Plan ==
> Project [id#241,id#248]
> +- B*roadcastNestedLoopJoin *BuildRight, Inner, Some(((random_string#245
> = random_string#252) || (small_vc#246 = small_vc#253)))
>:- HiveTableScan [small_vc#246,id#241,random_string#245],
> MetastoreRelation test, dummy, Some(d)
>+- HiveTableScan [id#248,random_string#252,small_vc#253],
> MetastoreRelation test, dummy2, Some(d1)
>
> in contrast the same identical tables in Oracle use Hash Join with OR
> which is expected
>
> scratch...@mydb.mich.LOCAL> select d.id, d1.id from dummy d, dummy2 d1
> where d.random_string = d1.random_string OR d.small_vc = d1.small_vc;
>
> Execution Plan
> --
> Plan hash value: 4163534687
>
> --
> | Id  | Operation   | Name   | Rows  | Bytes |TempSpc| Cost
> (%CPU)| Time |
>
> --
> |   0 | SELECT STATEMENT|| 63207 |  8332K|   |  1280K
> (1)| 04:16:05 |
> |   1 |  CONCATENATION  ||   |   |   |
> |  |
> |*  2 |  * HASH JOIN *|| 60183 |  7934K|  4632K|   640K
> (1)| 02:08:03 |
> |   3 |TABLE ACCESS FULL| DUMMY2 | 6 |  3925K|   |   157
> (1)| 00:00:02 |
> |   4 |TABLE ACCESS FULL| DUMMY  |   100M|  6484M|   |   261K
> (1)| 00:52:13 |
> |*  5 |   *HASH JOIN *||  3024 |   398K|  4632K|   640K
> (1)| 02:08:03 |
> |   6 |TABLE ACCESS FULL| DUMMY2 | 6 |  3925K|   |   157
> (1)| 00:00:02 |
> |   7 |TABLE ACCESS FULL| DUMMY  |   100M|  6484M|   |   261K
> (1)| 00:52:13 |
>
> --
>
> So this looks like a bug!
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 1 April 2016 at 04:53, ashokkumar rajendran <
> ashokkumar.rajend...@gmail.com> wrote:
>
>> Thanks for the reply everyone.
>>
>> Let me provide more detail on the dataset as well.
>> 1. The big table (A) contains more than 3 billion records in parquet
>> format, which is few TBs.
>> 2. The second table (B) is only of 60K rows which is less than 10MB.
>> 3. The column on which I perform JOIN is mostly on the String datatype
>> columns.
>> 4. I used 20 machines that were of 16 cores each and 120GB RAMs for
>> testing this.
>>
>> The pseudo OR query is as below.
>>
>> sql(Select field1, field2 from A, B where A.dimension1 = B.dimension1 OR 
>> A.dimension2 = B.dimension2 OR A.dimension3 = B.dimension3 OR A.dimension4 = 
>> B.dimension4).explain(true)
>>
>>
>> Pseudo union query is as below.
>>
>> sql{Select field1, field2 from A, B where A.dimension1 = B.dimension1
>> UNION ALL
>> Select field1, field2 from A, B where A.dimension2 = B.dimension2
>> UNION ALL
>> Select field1, field2 from A, B where A.dimension3 = B.dimension3
>> UNION ALL
>> Select field1, field2 from A, B where A.dimension4 = 
>> B.dimension4}.explain(true)
>>
>> You can look at the explain plan in the ticket
>> https://issues.apache.org/jira/browse/SPARK-13900
>>
>> Hemant has mentioned the nested loop time will be very little. But the
>> time taken by inner query for this kind of join is too long. Union of 3
>> HashJoins take only 3 minutes (each dimension hashjoin takes 1 minute),
>> nested loop join takes nearly 13 mins. I agree that the performance of
>> HashedJoin on OR condition will be linear but that will be very optimized
>> comparing to 

Relation between number of partitions and cores.

2016-04-01 Thread vaibhavrtk
As per Spark programming guide, it says "we should have 2-4 partitions for
each CPU in your cluster.". In this case how does 1 CPU core process 2-4
partitions at the same time?

Does it do context switching between tasks or run them in parallel? If it
does context switching how is it efficient compared to 1:1 partition vs
Core?

PS: If we are using Kafka direct API  in which kafka partitions=  Rdd
partitions. Does that mean we should give 40 kafka partitions for 10 CPU
Cores?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-number-of-partitions-and-cores-tp26658.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spatial Spark Library on 1.6

2016-04-01 Thread Jorge Machado
Hi Guys, 

does someone knows a good library for Geo spatial operations ? 

Magellan, Spatial Spark are broken on do not work properly on 1.6 

Regards

Jorge Machado
www.jmachado.me







Re: SPARK-13900 - Join with simple OR conditions take too long

2016-04-01 Thread Mich Talebzadeh
hm.

Sounds like it ends up in Nested Loop Join (NLJ) as opposed to Hash Join
(HJ) when OR  is used for more than one predicate comparison.

In below I have a table dummy created as ORC with 1 billion rows. Just
created another one called dummy1 with 60K rows

A simple join results in Hash Join good!

scala> sql("select d.id, d1.id from dummy d, dummy2 d1 where
d.random_string = d1.random_string").explain(true)

== Physical Plan ==
Project [id#212,id#219]

*+- BroadcastHashJoin [random_string#216], [random_string#223], BuildRight*
:- ConvertToUnsafe
   :  +- HiveTableScan [id#212,random_string#216], MetastoreRelation test,
dummy, Some(d)
   +- ConvertToUnsafe
  +- HiveTableScan [id#219,random_string#223], MetastoreRelation test,
dummy2, Some(d1)

When the join is done using OR on other predicates I see it starts doing NLJ

scala> sql("select d.id, d1.id from dummy d, dummy2 d1 where
d.random_string = d1.random_string OR d.small_vc =
d1.small_vc").explain(true)

== Physical Plan ==
Project [id#241,id#248]
+- B*roadcastNestedLoopJoin *BuildRight, Inner, Some(((random_string#245 =
random_string#252) || (small_vc#246 = small_vc#253)))
   :- HiveTableScan [small_vc#246,id#241,random_string#245],
MetastoreRelation test, dummy, Some(d)
   +- HiveTableScan [id#248,random_string#252,small_vc#253],
MetastoreRelation test, dummy2, Some(d1)

in contrast the same identical tables in Oracle use Hash Join with OR which
is expected

scratch...@mydb.mich.LOCAL> select d.id, d1.id from dummy d, dummy2 d1
where d.random_string = d1.random_string OR d.small_vc = d1.small_vc;

Execution Plan
--
Plan hash value: 4163534687
--
| Id  | Operation   | Name   | Rows  | Bytes |TempSpc| Cost (%CPU)|
Time |
--
|   0 | SELECT STATEMENT|| 63207 |  8332K|   |  1280K  (1)|
04:16:05 |
|   1 |  CONCATENATION  ||   |   |   |
|  |
|*  2 |  * HASH JOIN *|| 60183 |  7934K|  4632K|   640K
(1)| 02:08:03 |
|   3 |TABLE ACCESS FULL| DUMMY2 | 6 |  3925K|   |   157   (1)|
00:00:02 |
|   4 |TABLE ACCESS FULL| DUMMY  |   100M|  6484M|   |   261K  (1)|
00:52:13 |
|*  5 |   *HASH JOIN *||  3024 |   398K|  4632K|   640K
(1)| 02:08:03 |
|   6 |TABLE ACCESS FULL| DUMMY2 | 6 |  3925K|   |   157   (1)|
00:00:02 |
|   7 |TABLE ACCESS FULL| DUMMY  |   100M|  6484M|   |   261K  (1)|
00:52:13 |
--

So this looks like a bug!



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 04:53, ashokkumar rajendran <
ashokkumar.rajend...@gmail.com> wrote:

> Thanks for the reply everyone.
>
> Let me provide more detail on the dataset as well.
> 1. The big table (A) contains more than 3 billion records in parquet
> format, which is few TBs.
> 2. The second table (B) is only of 60K rows which is less than 10MB.
> 3. The column on which I perform JOIN is mostly on the String datatype
> columns.
> 4. I used 20 machines that were of 16 cores each and 120GB RAMs for
> testing this.
>
> The pseudo OR query is as below.
>
> sql(Select field1, field2 from A, B where A.dimension1 = B.dimension1 OR 
> A.dimension2 = B.dimension2 OR A.dimension3 = B.dimension3 OR A.dimension4 = 
> B.dimension4).explain(true)
>
>
> Pseudo union query is as below.
>
> sql{Select field1, field2 from A, B where A.dimension1 = B.dimension1
> UNION ALL
> Select field1, field2 from A, B where A.dimension2 = B.dimension2
> UNION ALL
> Select field1, field2 from A, B where A.dimension3 = B.dimension3
> UNION ALL
> Select field1, field2 from A, B where A.dimension4 = 
> B.dimension4}.explain(true)
>
> You can look at the explain plan in the ticket
> https://issues.apache.org/jira/browse/SPARK-13900
>
> Hemant has mentioned the nested loop time will be very little. But the
> time taken by inner query for this kind of join is too long. Union of 3
> HashJoins take only 3 minutes (each dimension hashjoin takes 1 minute),
> nested loop join takes nearly 13 mins. I agree that the performance of
> HashedJoin on OR condition will be linear but that will be very optimized
> comparing to the nested loop join.
>
> As Yong pointed out, if we can provide hints in SQL engine, it will be
> awesome for these kind of cases.
>
> Any idea on how we can optimize this will be helpful. Please let me know
> if any other detail is needed to provide input.
>
> Regards
> Ashok
>
> On Thu, Mar 31, 2016 at 8:32 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> a hash join come into play