Spark SQL JDBC Connectivity

2014-05-29 Thread Venkat Subramanian
We are planning to use the latest Spark SQL on RDDs. If a third party
application wants to connect to Spark via JDBC, does Spark SQL have support?
(We want to avoid going though Shark/Hive JDBC layer as we need good
performance).

BTW, we also want to do the same for Spark Streaming - With Spark SQL work
on DStreams (since the underlying structure is RDD anyway) and can we expose
the streaming DStream RDD through JDBC via Spark SQL for Realtime analytics.

Any pointers on this will greatly help.

Regards,

Venkat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-JDBC-Connectivity-tp6511.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Use mvn run Spark program occur problem

2014-05-29 Thread jaranda
That was it, thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Use-mvn-run-Spark-program-occur-problem-tp1751p6512.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Driver OOM while using reduceByKey

2014-05-29 Thread haitao .yao
Hi,

 I used 1g memory for the driver java process and got OOM error on
driver side before reduceByKey. After analyzed the heap dump, the biggest
object is org.apache.spark.MapStatus, which occupied over 900MB memory.

Here's my question:


1. Is there any optimization switches that I can tune to avoid this? I have
used the compression on output with spark.io.compression.codec.

2. Why the workers send all the data back to driver to run reduceByKey?
With the current implementation, if I use reduceByKey on TBs of data, that
will be a disaster for driver. Maybe I'm wrong about the assumption of the
spark implementation.


And here's my code snippet:


```

val cntNew = spark.accumulator(0)

val cntOld = spark.accumulator(0)

val cntErr = spark.accumulator(0)


val sequenceFileUrl = args(0)

val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl)

val stat = seq.map(pair = convertData(

  pair._2, cntNew, cntOld, cntErr

)).reduceByKey(_ + _)

stat.saveAsSequenceFile(args(1)

```


Thanks.


-- 

haitao.yao@China


Re: A Standalone App in Scala: Standalone mode issues

2014-05-29 Thread jaranda
I finally got it working. Main points:

- I had to add hadoop-client dependency to avoid a strange EOFException.
- I had to set SPARK_MASTER_IP in conf/start-master.sh to hostname -f
instead of hostname, since akka seems not to work properly with host names /
ip, it requires fully qualified domain names.
- I also set SPARK_MASTER_IP in conf/spark-env.sh to hostname -f so that
other workers can reach the master.
- Be sure that conf/slaves also contains fully qualified domain names.
- It seems that both master and workers need to have access to the driver
client and since I was within a VPN I had lot of troubles with this. It took
some time but I finally realized it.

Making these changes, everything just worked like a charm!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/A-Standalone-App-in-Scala-Standalone-mode-issues-tp6493p6514.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


How can I dispose an Accumulator?

2014-05-29 Thread innowireless TaeYun Kim
Hi,

 

How can I dispose an Accumulator?

It has no method like 'unpersist()' which Broadcast provides.

 

Thanks.

 



Re: Python, Spark and HBase

2014-05-29 Thread Nick Pentreath
Hi Tommer,

I'm working on updating and improving the PR, and will work on getting an
HBase example working with it. Will feed back as soon as I have had the
chance to work on this a bit more.

N


On Thu, May 29, 2014 at 3:27 AM, twizansk twiza...@gmail.com wrote:

 The code which causes the error is:

 The code which causes the error is:

 sc = SparkContext(local, My App)
 rdd = sc.newAPIHadoopFile(
 name,
 'org.apache.hadoop.hbase.mapreduce.TableInputFormat',
 'org.apache.hadoop.hbase.io.ImmutableBytesWritable',
 'org.apache.hadoop.hbase.client.Result',
 conf={hbase.zookeeper.quorum: my-host,
   hbase.rootdir: hdfs://my-host:8020/hbase,
   hbase.mapreduce.inputtable: data})

 The full stack trace is:



 Py4JError Traceback (most recent call last)
 ipython-input-8-3b9a4ea2f659 in module()
   7 conf={hbase.zookeeper.quorum: my-host,
   8   hbase.rootdir: hdfs://my-host:8020/hbase,
  9   hbase.mapreduce.inputtable: data})
  10
  11

 /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.pyc in
 newAPIHadoopFile(self, name, inputformat_class, key_class, value_class,
 key_wrapper, value_wrapper, conf)
 281 for k, v in conf.iteritems():
 282 jconf[k] = v
 -- 283 jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc,
 name,
 inputformat_class, key_class, value_class,
 284 key_wrapper,
 value_wrapper, jconf)
 285 return RDD(jrdd, self, PickleSerializer())


 /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py
 in __getattr__(self, name)
 657 else:
 658 raise Py4JError('{0} does not exist in the JVM'.
 -- 659 format(self._fqn + name))
 660
 661 def __call__(self, *args):

 Py4JError: org.apache.spark.api.python.PythonRDDnewAPIHadoopFile does not
 exist in the JVM



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142p6507.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Selecting first ten values in a RDD/partition

2014-05-29 Thread nilmish
I have a DSTREAM which consists of RDD partitioned every 2 sec. I have sorted
each RDD and want to retain only top 10 values and discard further value.
How can I retain only top 10 values ?

I am trying to get top 10 hashtags.  Instead of sorting the entire of
5-minute-counts (thereby, incurring the cost of a data shuffle), I am trying
to get the top 10 hashtags in each partition. I am struck at how to retain
top 10 hashtags in each partition.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Selecting-first-ten-values-in-a-RDD-partition-tp6517.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Is uberjar a recommended way of running Spark/Scala applications?

2014-05-29 Thread Andrei
I'm using Spark 1.0 and sbt assembly plugin to create uberjar of my
application. However, when I run assembly command, I get a number of errors
like this:

java.lang.RuntimeException: deduplicate: different file contents found in
the following:
/home/username/.ivy2/cache/com.esotericsoftware.kryo/kryo/bundles/kryo-2.21.jar:com/esotericsoftware/minlog/Log$Logger.class
/home/username/.ivy2/cache/com.esotericsoftware.minlog/minlog/jars/minlog-1.2.jar:com/esotericsoftware/minlog/Log$Logger.class
...

As far as I can see, Spark Core depends on both - Minlog and Kryo, and the
latter includes Minlog classes itself. Classes are binary different, so
assembly can't combine them. And there's a number of such conflicts - I
fixed some of them manually via mergeStrategy, but list of exceptions
becomes larger and larger. I can continues, but it just does't look like
the right way.

My questions are:

1. Is an uberjar a recommended way of running Spark applications?
2. If so, should I include Spark itself into this large jar?
3. If not, what is a recommended way to do both - development and
deployment (assuming ordinary sbt project).

Thanks,
Andrei


Re: problem about broadcast variable in iteration

2014-05-29 Thread randylu
hi, Andrew Ash, thanks for your reply.
In fact, I have already used unpersist(), but it doesn't take effect.
One reason that I select 1.0.0 version is just for it providing unpersist()
interface.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-broadcast-variable-in-iteration-tp5479p6519.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Is uberjar a recommended way of running Spark/Scala applications?

2014-05-29 Thread jaranda
Hi Andrei,

I think the preferred way to deploy Spark jobs is by using the sbt package
task instead of using the sbt assembly plugin. In any case, as you comment,
the mergeStrategy in combination with some dependency exlusions should fix
your problems. Have a look at  this gist
https://gist.github.com/JordiAranda/bdbad58d128c14277a05   for further
details (I just followed some recommendations commented in the sbt assembly
plugin documentation).

Up to now I haven't found a proper way to combine my development/deployment
phases, although I must say my experience in Spark is pretty poor (it really
depends in your deployment requirements as well). In this case, I think
someone else could give you some further insights.

Best,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-uberjar-a-recommended-way-of-running-Spark-Scala-applications-tp6518p6520.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


ClassCastExceptions when using Spark shell

2014-05-29 Thread Sebastian Schelter

Hi,

I have trouble running some custom code on Spark 0.9.1 in standalone 
mode on a cluster. I built a fat jar (excluding Spark) that I'm adding 
to the classpath with ADD_JARS=... When I start the Spark shell, I can 
instantiate classes, but when I run Spark code, I get strange 
ClassCastExceptions like this:


14/05/29 14:48:10 INFO TaskSetManager: Loss was due to 
java.lang.ClassCastException: io.ssc.sampling.matrix.DenseBlock cannot 
be cast to io.ssc.sampling.matrix.DenseBlock [duplicate 1]


What am I doing wrong?

Thx,
Sebastian


Re: Selecting first ten values in a RDD/partition

2014-05-29 Thread Anwar Rizal
Can you clarify what you're trying to achieve here ?

If you want to take only top 10 of each RDD, why don't sort followed by
take(10) of every RDD ?

Or, you want to take top 10 of five minutes ?

Cheers,



On Thu, May 29, 2014 at 2:04 PM, nilmish nilmish@gmail.com wrote:

 I have a DSTREAM which consists of RDD partitioned every 2 sec. I have
 sorted
 each RDD and want to retain only top 10 values and discard further value.
 How can I retain only top 10 values ?

 I am trying to get top 10 hashtags.  Instead of sorting the entire of
 5-minute-counts (thereby, incurring the cost of a data shuffle), I am
 trying
 to get the top 10 hashtags in each partition. I am struck at how to retain
 top 10 hashtags in each partition.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Selecting-first-ten-values-in-a-RDD-partition-tp6517.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Comprehensive Port Configuration reference?

2014-05-29 Thread Jacob Eisinger

Howdy Andrew,

This is a standalone cluster.  And, yes, if my understanding of Spark
terminology is correct, you are correct about the port ownerships.

Jacob

Jacob D. Eisinger
IBM Emerging Technologies
jeis...@us.ibm.com - (512) 286-6075



From:   Andrew Ash and...@andrewash.com
To: user@spark.apache.org
Date:   05/28/2014 05:18 PM
Subject:Re: Comprehensive Port Configuration reference?



Hmm, those do look like 4 listening ports to me.  PID 3404 is an executor
and PID 4762 is a worker?  This is a standalone cluster?


On Wed, May 28, 2014 at 8:22 AM, Jacob Eisinger jeis...@us.ibm.com wrote:
  Howdy Andrew,

  Here is what I ran before an application context was created (other
  services have been deleted):


# netstat -l -t tcp -p  --numeric-ports

Active Internet connections (only servers)

Proto Recv-Q Send-Q Local Address           Foreign Address
State       PID/Program name
tcp6       0      0 10.90.17.100:       :::*
LISTEN      4762/java
tcp6       0      0 :::8081                 :::*
LISTEN      4762/java

  And, then while the application context is up:
# netstat -l -t tcp -p  --numeric-ports

Active Internet connections (only servers)

Proto Recv-Q Send-Q Local Address           Foreign Address
State       PID/Program name
tcp6       0      0 10.90.17.100:       :::*
LISTEN      4762/java
tcp6       0      0 :::57286                :::*
LISTEN      3404/java
tcp6       0      0 10.90.17.100:38118      :::*
LISTEN      3404/java
tcp6       0      0 10.90.17.100:35530      :::*
LISTEN      3404/java
tcp6       0      0 :::60235                :::*
LISTEN      3404/java
tcp6       0      0 :::8081                 :::*
LISTEN      4762/java

  My understanding is that this says four ports are open.  Is 57286 and
  60235 not being used?


  Jacob

  Jacob D. Eisinger
  IBM Emerging Technologies
  jeis...@us.ibm.com - (512) 286-6075

  Inactive hide details for Andrew Ash ---05/25/2014 06:25:18 PM---Hi
  Jacob, The config option spark.history.ui.port is new for 1Andrew Ash
  ---05/25/2014 06:25:18 PM---Hi Jacob, The config option
  spark.history.ui.port is new for 1.0  The problem that


  From: Andrew Ash and...@andrewash.com
  To: user@spark.apache.org
  Date: 05/25/2014 06:25 PM

  Subject: Re: Comprehensive Port Configuration reference?



  Hi Jacob,

  The config option spark.history.ui.port is new for 1.0  The problem that
  History server solves is that in non-Standalone cluster deployment modes
  (Mesos and YARN) there is no long-lived Spark Master that can store logs
  and statistics about an application after it finishes.  History server is
  the UI that renders logged data from applications after they complete.

  Read more here: https://issues.apache.org/jira/browse/SPARK-1276 and
  https://github.com/apache/spark/pull/204

  As far as the two vs four dynamic ports, are those all listening ports?
  I did observe 4 ports in use, but only two of them were listening.  The
  other two were the random ports used for responses on outbound
  connections, the source port of the (srcIP, srcPort, dstIP, dstPort)
  tuple that uniquely identifies a TCP socket.

  
http://unix.stackexchange.com/questions/75011/how-does-the-server-find-out-what-client-port-to-send-to


  Thanks for taking a look through!

  I also realized that I had a couple mistakes with the 0.9 to 1.0
  transition so appropriately documented those now as well in the updated
  PR.

  Cheers!
  Andrew



  On Fri, May 23, 2014 at 2:43 PM, Jacob Eisinger jeis...@us.ibm.com
  wrote:
Howdy Andrew,

I noticed you have a configuration item that we were not aware of:
spark.history.ui.port .  Is that new for 1.0?

Also, we noticed that the Workers and the Drivers were opening up
four dynamic ports per application context.  It looks like you were
seeing two.

Everything else looks like it aligns!
Jacob




Jacob D. Eisinger
IBM Emerging Technologies
jeis...@us.ibm.com - (512) 286-6075

Inactive hide details for Andrew Ash ---05/23/2014 10:30:58 AM---Hi
everyone, I've also been interested in better understandingAndrew
Ash ---05/23/2014 10:30:58 AM---Hi everyone, I've also been
interested in better understanding what ports are used where

From: Andrew Ash and...@andrewash.com
To: user@spark.apache.org
Date: 05/23/2014 10:30 AM
Subject: Re: Comprehensive Port Configuration reference?





Hi everyone,

I've also been interested in better understanding what ports are
used where and the direction the network connections go.  I've
observed a running cluster and read through code, and came up with
the below documentation addition.


Re: Spark SQL JDBC Connectivity

2014-05-29 Thread Michael Armbrust
On Wed, May 28, 2014 at 11:39 PM, Venkat Subramanian vsubr...@gmail.comwrote:

 We are planning to use the latest Spark SQL on RDDs. If a third party
 application wants to connect to Spark via JDBC, does Spark SQL have
 support?
 (We want to avoid going though Shark/Hive JDBC layer as we need good
 performance).


 We don't have a full release yet, but there is a branch on the Shark
github repository that has a version of SharkServer2 that uses Spark SQL.
 We also plan to port the Shark CLI, but this is not yet finished.  You can
find this branch along with documentation here:
https://github.com/amplab/shark/tree/sparkSql

Note that this version has not yet received much testing (outside of the
integration tests that are run on Spark SQL).  That said, I would love for
people to test it out and report any problems or missing features.  Any
help here would be greatly appreciated!


 BTW, we also want to do the same for Spark Streaming - With Spark SQL work
 on DStreams (since the underlying structure is RDD anyway) and can we
 expose
 the streaming DStream RDD through JDBC via Spark SQL for Realtime
 analytics.


 We have talked about doing this, but this is not currently on the near
term road map.


Re: ClassCastExceptions when using Spark shell

2014-05-29 Thread Marcelo Vanzin
Hi Sebastian,

That exception generally means you have the class loaded by two
different class loaders, and some code is trying to mix instances
created by the two different loaded classes.

Do you happen to have that class both in the spark jars and in your
app's uber-jar? That might explain the problem, although I'm not
terribly familiar with Spark's class loader hierarchy.


On Thu, May 29, 2014 at 5:51 AM, Sebastian Schelter s...@apache.org wrote:
 Hi,

 I have trouble running some custom code on Spark 0.9.1 in standalone mode on
 a cluster. I built a fat jar (excluding Spark) that I'm adding to the
 classpath with ADD_JARS=... When I start the Spark shell, I can instantiate
 classes, but when I run Spark code, I get strange ClassCastExceptions like
 this:

 14/05/29 14:48:10 INFO TaskSetManager: Loss was due to
 java.lang.ClassCastException: io.ssc.sampling.matrix.DenseBlock cannot be
 cast to io.ssc.sampling.matrix.DenseBlock [duplicate 1]

 What am I doing wrong?

 Thx,
 Sebastian



-- 
Marcelo


Spark hook to create external process

2014-05-29 Thread ansriniv
I have a requirement where for every Spark executor threadpool thread, I need
to launch an associated external process.

My job will consist of some processing in the Spark executor thread and some
processing by its associated external process with the 2 communicating via
some IPC mechanism.

Is there a hook in Spark where I can put in my code to create / destroy
these external processes corresponding to the creation / destruction of
executor thread pool threads.

Thanks
Anand



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-hook-to-create-external-process-tp6526.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark hook to create external process

2014-05-29 Thread Matei Zaharia
Hi Anand,

This is probably already handled by the RDD.pipe() operation. It will spawn a 
process and let you feed data to it through its stdin and read data through 
stdout.

Matei

On May 29, 2014, at 9:39 AM, ansriniv ansri...@gmail.com wrote:

 I have a requirement where for every Spark executor threadpool thread, I need
 to launch an associated external process.
 
 My job will consist of some processing in the Spark executor thread and some
 processing by its associated external process with the 2 communicating via
 some IPC mechanism.
 
 Is there a hook in Spark where I can put in my code to create / destroy
 these external processes corresponding to the creation / destruction of
 executor thread pool threads.
 
 Thanks
 Anand
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-hook-to-create-external-process-tp6526.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Is uberjar a recommended way of running Spark/Scala applications?

2014-05-29 Thread Andrei
Thanks, Jordi, your gist looks pretty much like what I have in my project
currently (with few exceptions that I'm going to borrow).

I like the idea of using sbt package, since it doesn't require third
party plugins and, most important, doesn't create a mess of classes and
resources. But in this case I'll have to handle jar list manually via Spark
context. Is there a way to automate this process? E.g. when I was a Clojure
guy, I could run lein deps (lein is a build tool similar to sbt) to
download all dependencies and then just enumerate them from my app. Maybe
you have heard of something like that for Spark/SBT?

Thanks,
Andrei


On Thu, May 29, 2014 at 3:48 PM, jaranda jordi.ara...@bsc.es wrote:

 Hi Andrei,

 I think the preferred way to deploy Spark jobs is by using the sbt package
 task instead of using the sbt assembly plugin. In any case, as you comment,
 the mergeStrategy in combination with some dependency exlusions should fix
 your problems. Have a look at  this gist
 https://gist.github.com/JordiAranda/bdbad58d128c14277a05   for further
 details (I just followed some recommendations commented in the sbt assembly
 plugin documentation).

 Up to now I haven't found a proper way to combine my development/deployment
 phases, although I must say my experience in Spark is pretty poor (it
 really
 depends in your deployment requirements as well). In this case, I think
 someone else could give you some further insights.

 Best,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-uberjar-a-recommended-way-of-running-Spark-Scala-applications-tp6518p6520.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Is uberjar a recommended way of running Spark/Scala applications?

2014-05-29 Thread Stephen Boesch
The MergeStrategy combined with sbt assembly did work for me.  This is not
painless: some trial and error and the assembly may take multiple minutes.

You will likely want to filter out some additional classes from the
generated jar file.  Here is an SOF answer to explain that and with IMHO
the best answer snippet included here (in this case the OP understandably
did not want to not include javax.servlet.Servlet)

http://stackoverflow.com/questions/7819066/sbt-exclude-class-from-jar


mappings in (Compile,packageBin) ~= { (ms: Seq[(File, String)]) = ms
filter { case (file, toPath) = toPath != javax/servlet/Servlet.class } }

There is a setting to not include the project files in the assembly but I
do not recall it at this moment.



2014-05-29 10:13 GMT-07:00 Andrei faithlessfri...@gmail.com:

 Thanks, Jordi, your gist looks pretty much like what I have in my project
 currently (with few exceptions that I'm going to borrow).

 I like the idea of using sbt package, since it doesn't require third
 party plugins and, most important, doesn't create a mess of classes and
 resources. But in this case I'll have to handle jar list manually via Spark
 context. Is there a way to automate this process? E.g. when I was a Clojure
 guy, I could run lein deps (lein is a build tool similar to sbt) to
 download all dependencies and then just enumerate them from my app. Maybe
 you have heard of something like that for Spark/SBT?

 Thanks,
 Andrei


 On Thu, May 29, 2014 at 3:48 PM, jaranda jordi.ara...@bsc.es wrote:

 Hi Andrei,

 I think the preferred way to deploy Spark jobs is by using the sbt package
 task instead of using the sbt assembly plugin. In any case, as you
 comment,
 the mergeStrategy in combination with some dependency exlusions should fix
 your problems. Have a look at  this gist
 https://gist.github.com/JordiAranda/bdbad58d128c14277a05   for further
 details (I just followed some recommendations commented in the sbt
 assembly
 plugin documentation).

 Up to now I haven't found a proper way to combine my
 development/deployment
 phases, although I must say my experience in Spark is pretty poor (it
 really
 depends in your deployment requirements as well). In this case, I think
 someone else could give you some further insights.

 Best,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-uberjar-a-recommended-way-of-running-Spark-Scala-applications-tp6518p6520.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: Driver OOM while using reduceByKey

2014-05-29 Thread Matei Zaharia
That hash map is just a list of where each task ran, it’s not the actual data. 
How many map and reduce tasks do you have? Maybe you need to give the driver a 
bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _, 100) to use 
only 100 tasks).

Matei

On May 29, 2014, at 2:03 AM, haitao .yao yao.e...@gmail.com wrote:

 Hi,
 
  I used 1g memory for the driver java process and got OOM error on driver 
 side before reduceByKey. After analyzed the heap dump, the biggest object is 
 org.apache.spark.MapStatus, which occupied over 900MB memory. 
 
 Here's my question: 
 
 
 1. Is there any optimization switches that I can tune to avoid this? I have 
 used the compression on output with spark.io.compression.codec.  
 
 2. Why the workers send all the data back to driver to run reduceByKey? With 
 the current implementation, if I use reduceByKey on TBs of data, that will be 
 a disaster for driver. Maybe I'm wrong about the assumption of the spark 
 implementation.
 
 
 And here's my code snippet:
 
 
 ```
 
 val cntNew = spark.accumulator(0)
 
 val cntOld = spark.accumulator(0)
 
 val cntErr = spark.accumulator(0)
 
 
 val sequenceFileUrl = args(0)
 
 val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl)
 
 val stat = seq.map(pair = convertData(
 
   pair._2, cntNew, cntOld, cntErr
 
 )).reduceByKey(_ + _)
 
 stat.saveAsSequenceFile(args(1)
 
 ```
 
 
 Thanks.
 
 
 -- 
 
 haitao.yao@China



Re: Selecting first ten values in a RDD/partition

2014-05-29 Thread Brian Gawalt
Try looking at the .mapPartitions( ) method implemented for RDD[T] objects.
It will give you direct access to an iterator containing the member objects
of each partition for doing the kind of within-partition hashtag counts
you're describing.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Selecting-first-ten-values-in-a-RDD-partition-tp6517p6534.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Why Scala?

2014-05-29 Thread Nick Chammas
I recently discovered Hacker News and started reading through older posts
about Scala https://hn.algolia.com/?q=scala#!/story/forever/0/scala. It
looks like the language is fairly controversial on there, and it got me
thinking.

Scala appears to be the preferred language to work with in Spark, and Spark
itself is written in Scala, right?

I know that often times a successful project evolves gradually out of
something small, and that the choice of programming language may not always
have been made consciously at the outset.

But pretending that it was, why is Scala the preferred language of Spark?

Nick




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Scala-tp6536.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Why Scala?

2014-05-29 Thread Matei Zaharia
Quite a few people ask this question and the answer is pretty simple. When we 
started Spark, we had two goals — we wanted to work with the Hadoop ecosystem, 
which is JVM-based, and we wanted a concise programming interface similar to 
Microsoft’s DryadLINQ (the first language-integrated big data framework I know 
of, that begat things like FlumeJava and Crunch). On the JVM, the only language 
that would offer that kind of API was Scala, due to its ability to capture 
functions and ship them across the network. Scala’s static typing also made it 
much easier to control performance compared to, say, Jython or Groovy.

In terms of usage, however, we see substantial usage of our other languages 
(Java and Python), and we’re continuing to invest in both. In a user survey we 
did last fall, about 25% of users used Java and 30% used Python, and I imagine 
these numbers are growing. With lambda expressions now added to Java 8 
(http://databricks.com/blog/2014/04/14/Spark-with-Java-8.html), I think we’ll 
see a lot more Java. And at Databricks I’ve seen a lot of interest in Python, 
which is very exciting to us in terms of ease of use.

Matei

On May 29, 2014, at 1:57 PM, Benjamin Black b...@b3k.us wrote:

 HN is a cesspool safely ignored.
 
 
 On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.com 
 wrote:
 I recently discovered Hacker News and started reading through older posts 
 about Scala. It looks like the language is fairly controversial on there, and 
 it got me thinking.
 
 Scala appears to be the preferred language to work with in Spark, and Spark 
 itself is written in Scala, right?
 
 I know that often times a successful project evolves gradually out of 
 something small, and that the choice of programming language may not always 
 have been made consciously at the outset.
 
 But pretending that it was, why is Scala the preferred language of Spark?
 
 Nick
 
 
 View this message in context: Why Scala?
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 



Re: Shuffle file consolidation

2014-05-29 Thread Nathan Kronenfeld
Thanks, I missed that.

One thing that's still unclear to me, even looking at that, is - does this
parameter have to be set when starting up the cluster, on each of the
workers, or can it be set by an individual client job?


On Fri, May 23, 2014 at 10:13 AM, Han JU ju.han.fe...@gmail.com wrote:

 Hi Nathan,

 There's some explanation in the spark configuration section:

 ```
 If set to true, consolidates intermediate files created during a
 shuffle. Creating fewer files can improve filesystem performance for
 shuffles with large numbers of reduce tasks. It is recommended to set this
 to true when using ext4 or xfs filesystems. On ext3, this option might
 degrade performance on machines with many (8) cores due to filesystem
 limitations.
 ```


 2014-05-23 16:00 GMT+02:00 Nathan Kronenfeld nkronenf...@oculusinfo.com:

 In trying to sort some largish datasets, we came across the
 spark.shuffle.consolidateFiles property, and I found in the source code
 that it is set, by default, to false, with a note to default it to true
 when the feature is stable.

 Does anyone know what is unstable about this? If we set it true, what
 problems should we anticipate?

 Thanks,
 -Nathan Kronenfeld


 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com




 --
 *JU Han*

 Data Engineer @ Botify.com

 +33 061960




-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Shuffle file consolidation

2014-05-29 Thread Matei Zaharia
It can be set in an individual application.

Consolidation had some issues on ext3 as mentioned there, though we might 
enable it by default in the future because other optimizations now made it 
perform on par with the non-consolidation version. It also had some bugs in 
0.9.0 so I’d suggest at least 0.9.1.

Matei

On May 29, 2014, at 2:21 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com 
wrote:

 Thanks, I missed that.
 
 One thing that's still unclear to me, even looking at that, is - does this 
 parameter have to be set when starting up the cluster, on each of the 
 workers, or can it be set by an individual client job?
 
 
 On Fri, May 23, 2014 at 10:13 AM, Han JU ju.han.fe...@gmail.com wrote:
 Hi Nathan,
 
 There's some explanation in the spark configuration section:
 
 ```
 If set to true, consolidates intermediate files created during a shuffle. 
 Creating fewer files can improve filesystem performance for shuffles with 
 large numbers of reduce tasks. It is recommended to set this to true when 
 using ext4 or xfs filesystems. On ext3, this option might degrade performance 
 on machines with many (8) cores due to filesystem limitations.
 ```
 
 
 2014-05-23 16:00 GMT+02:00 Nathan Kronenfeld nkronenf...@oculusinfo.com:
 
 In trying to sort some largish datasets, we came across the 
 spark.shuffle.consolidateFiles property, and I found in the source code that 
 it is set, by default, to false, with a note to default it to true when the 
 feature is stable.
 
 Does anyone know what is unstable about this? If we set it true, what 
 problems should we anticipate?
 
 Thanks,
 -Nathan Kronenfeld
 
 
 -- 
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com
 
 
 
 -- 
 JU Han
 
 Data Engineer @ Botify.com
 
 +33 061960
 
 
 
 -- 
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com



Re: Why Scala?

2014-05-29 Thread Dmitriy Lyubimov
There were few known concerns about Scala, and some still are, but having
been doing Scala professionally over two years now, i learned to master and
appreciate the advanatages.

Major concern IMO is Scala in a less-than-scrupulous corporate environment.

First, Scala requires significantly more discipline in commenting and style
to still stay painlessly readable, than java. People with less than stellar
code hygiene can easily turn a project into an unmaintainable mess.

Second, from corporate management prospective, it is (still?) much harder
to staff with Scala coders as opposed to Java ones.

All these things are a headache for corporate bosses, but for public and
academic projects with thorough peer review and increased desire for
contributors to look clean in public it works out quite well, and strong
sides really shine.

Spark specifically builds around FP patterns -- such as monads and functors
-- which were absent in java prior to  8 (i am not sure that they are as
well worked out in java 8 collections even now, as opposed to Scala
collections). So java 8 simply comes a little late to the show in that
department.

Also FP is not the only thing that is used by Spark. Spark also uses stuff
like implicits, akka/agent framework for IPC. Let's not forget that FP is
albeit important but only one out of many  stories in Scala in the grand
scale of things.


On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.comwrote:

 I recently discovered Hacker News and started reading through older posts
 about Scala https://hn.algolia.com/?q=scala#!/story/forever/0/scala. It
 looks like the language is fairly controversial on there, and it got me
 thinking.

 Scala appears to be the preferred language to work with in Spark, and
 Spark itself is written in Scala, right?

 I know that often times a successful project evolves gradually out of
 something small, and that the choice of programming language may not always
 have been made consciously at the outset.

 But pretending that it was, why is Scala the preferred language of Spark?

 Nick


 --
 View this message in context: Why 
 Scala?http://apache-spark-user-list.1001560.n3.nabble.com/Why-Scala-tp6536.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.



Re: Spark SQL JDBC Connectivity and more

2014-05-29 Thread Venkat Subramanian
Thanks Michael.
OK will try SharkServer2..

But I have some basic questions on a related area:

1) If I have a standalone spark application that has already built a RDD,
how can SharkServer2 or for that matter Shark access 'that' RDD and do
queries on it. All the examples I have seen for Shark, the RDD (tables) are
created within Shark's spark context and processed. 

 I have stylized the real problem we have which is, we have a standalone
spark application that is processing DStreams and producing output Dstreams.
I want to expose that near real-time Dstream data to a 3 rd party app via
JDBC and allow SharkServer2 CLI to operate and query on the Dstreams
real-time all from memory. Currently we are writing output stream to
Cassandra and exposing it to 3 rd party app through it via JDBC, but want to
avoid that extra disk write which increases latency.

2) I have two applications, one used for processing and computing output RDD
from an input and another for post processing the resultant RDD into
multiple persistent stores + doing other things with it.  These are split in
to separate processes intentionally. How do we share the output RDD from
first application to second application without writing to disk (thinking of
serializing the RDD and streaming through Kafka, but then we loose time and
all the fault tolerance that RDD brings in)? Is Tachyon the only other way?
Are there other models/design patterns for applications that share RDDs, as
this may be a very common use case?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-JDBC-Connectivity-tp6511p6543.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


access hdfs file name in map()

2014-05-29 Thread Xu (Simon) Chen
Hello,

A quick question about using spark to parse text-format CSV files stored on
hdfs.

I have something very simple:
sc.textFile(hdfs://test/path/*).map(line = line.split(,)).map(p =
(XXX, p[0], p[2]))

Here, I want to replace XXX with a string, which is the current csv
filename for the line. This is needed since some information may be encoded
in the file name, like date.

In hive, I am able to define an external table and use INPUT__FILE__NAME as
a column in queries. I wonder if spark has something similar.

Thanks!
-Simon


Re: Why Scala?

2014-05-29 Thread Krishna Sankar
Nicholas,
   Good question. Couple of thoughts from my practical experience:

   - Coming from R, Scala feels more natural than other languages. The
   functional  succinctness of Scala is more suited for Data Science than
   other languages. In short, Scala-Spark makes sense, for Data Science, ML,
   Data Exploration et al
   - Having said that occasionally practicality does trump the choice of a
   language - last time I really wanted to use Scala but ended up in writing
   in Python ! Hope to get a better result this time
   - Language evolution is more of a long term granularity -  we do
   underestimate the velocity  impact. Have seen evolutions through languages
   starting from Cobol, CCP/M Basic,Turbo Pascal, ... I think Scala will find
   it's equilibrium sooner than we think ...

Cheers
k/


On Thu, May 29, 2014 at 5:54 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Thank you for the specific points about the advantages Scala provides over
 other languages. Looking at several code samples, the reduction of
 boilerplate code over Java is one of the biggest plusses, to me.

 On Thu, May 29, 2014 at 8:10 PM, Marek Kolodziej mkolod@gmail.com
 wrote:

 I would advise others to form their opinions based on experiencing it for
 themselves, rather than reading what random people say on Hacker News. :)


 Just a nitpick here: What I said was It looks like the language is fairly
 controversial on [Hacker News.] That was just an observation of what I saw
 on HN, not a statement of my opinion. I know very little about Scala (or
 Java, for that matter) and definitely don't have a well-formed opinion on
 the matter.

 Nick



getPreferredLocations

2014-05-29 Thread ansriniv
I am building my own custom RDD class.

1) Is there a guarantee that a partition will only be processed on a node
which is in the getPreferredLocations set of nodes returned by the RDD ? 

2) I am implementing this custom RDD in Java and plan to extend JavaRDD.
However, I dont see a getPreferredLocations method in
http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.api.java.JavaRDD

Will I be able to implement my own custom RDD in Java and be able to
override the getPreferredLocations method ?

Thanks
Anand



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/getPreferredLocations-tp6554.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Driver OOM while using reduceByKey

2014-05-29 Thread haitao .yao
Thanks. it worked.


2014-05-30 1:53 GMT+08:00 Matei Zaharia matei.zaha...@gmail.com:

 That hash map is just a list of where each task ran, it’s not the actual
 data. How many map and reduce tasks do you have? Maybe you need to give the
 driver a bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _,
 100) to use only 100 tasks).

 Matei

 On May 29, 2014, at 2:03 AM, haitao .yao yao.e...@gmail.com wrote:

  Hi,
 
   I used 1g memory for the driver java process and got OOM error on
 driver side before reduceByKey. After analyzed the heap dump, the biggest
 object is org.apache.spark.MapStatus, which occupied over 900MB memory.
 
  Here's my question:
 
 
  1. Is there any optimization switches that I can tune to avoid this? I
 have used the compression on output with spark.io.compression.codec.
 
  2. Why the workers send all the data back to driver to run reduceByKey?
 With the current implementation, if I use reduceByKey on TBs of data, that
 will be a disaster for driver. Maybe I'm wrong about the assumption of the
 spark implementation.
 
 
  And here's my code snippet:
 
 
  ```
 
  val cntNew = spark.accumulator(0)
 
  val cntOld = spark.accumulator(0)
 
  val cntErr = spark.accumulator(0)
 
 
  val sequenceFileUrl = args(0)
 
  val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl)
 
  val stat = seq.map(pair = convertData(
 
pair._2, cntNew, cntOld, cntErr
 
  )).reduceByKey(_ + _)
 
  stat.saveAsSequenceFile(args(1)
 
  ```
 
 
  Thanks.
 
 
  --
 
  haitao.yao@China




-- 
haitao.yao@Beijing


Re: access hdfs file name in map()

2014-05-29 Thread Aaron Davidson
Currently there is not a way to do this using textFile(). However, you
could pretty straightforwardly define your own subclass of HadoopRDD [1] in
order to get access to this information (likely using
mapPartitionsWithIndex to look up the InputSplit for a particular
partition).

Note that sc.textFile() is just a convenience function to construct a new
HadoopRDD [2].

[1] HadoopRDD:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L93
[2] sc.textFile():
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L456


On Thu, May 29, 2014 at 7:49 PM, Xu (Simon) Chen xche...@gmail.com wrote:

 Hello,

 A quick question about using spark to parse text-format CSV files stored
 on hdfs.

 I have something very simple:
 sc.textFile(hdfs://test/path/*).map(line = line.split(,)).map(p =
 (XXX, p[0], p[2]))

 Here, I want to replace XXX with a string, which is the current csv
 filename for the line. This is needed since some information may be encoded
 in the file name, like date.

 In hive, I am able to define an external table and use INPUT__FILE__NAME
 as a column in queries. I wonder if spark has something similar.

 Thanks!
 -Simon