Spark SQL JDBC Connectivity
We are planning to use the latest Spark SQL on RDDs. If a third party application wants to connect to Spark via JDBC, does Spark SQL have support? (We want to avoid going though Shark/Hive JDBC layer as we need good performance). BTW, we also want to do the same for Spark Streaming - With Spark SQL work on DStreams (since the underlying structure is RDD anyway) and can we expose the streaming DStream RDD through JDBC via Spark SQL for Realtime analytics. Any pointers on this will greatly help. Regards, Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-JDBC-Connectivity-tp6511.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Use mvn run Spark program occur problem
That was it, thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Use-mvn-run-Spark-program-occur-problem-tp1751p6512.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Driver OOM while using reduceByKey
Hi, I used 1g memory for the driver java process and got OOM error on driver side before reduceByKey. After analyzed the heap dump, the biggest object is org.apache.spark.MapStatus, which occupied over 900MB memory. Here's my question: 1. Is there any optimization switches that I can tune to avoid this? I have used the compression on output with spark.io.compression.codec. 2. Why the workers send all the data back to driver to run reduceByKey? With the current implementation, if I use reduceByKey on TBs of data, that will be a disaster for driver. Maybe I'm wrong about the assumption of the spark implementation. And here's my code snippet: ``` val cntNew = spark.accumulator(0) val cntOld = spark.accumulator(0) val cntErr = spark.accumulator(0) val sequenceFileUrl = args(0) val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl) val stat = seq.map(pair = convertData( pair._2, cntNew, cntOld, cntErr )).reduceByKey(_ + _) stat.saveAsSequenceFile(args(1) ``` Thanks. -- haitao.yao@China
Re: A Standalone App in Scala: Standalone mode issues
I finally got it working. Main points: - I had to add hadoop-client dependency to avoid a strange EOFException. - I had to set SPARK_MASTER_IP in conf/start-master.sh to hostname -f instead of hostname, since akka seems not to work properly with host names / ip, it requires fully qualified domain names. - I also set SPARK_MASTER_IP in conf/spark-env.sh to hostname -f so that other workers can reach the master. - Be sure that conf/slaves also contains fully qualified domain names. - It seems that both master and workers need to have access to the driver client and since I was within a VPN I had lot of troubles with this. It took some time but I finally realized it. Making these changes, everything just worked like a charm! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/A-Standalone-App-in-Scala-Standalone-mode-issues-tp6493p6514.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How can I dispose an Accumulator?
Hi, How can I dispose an Accumulator? It has no method like 'unpersist()' which Broadcast provides. Thanks.
Re: Python, Spark and HBase
Hi Tommer, I'm working on updating and improving the PR, and will work on getting an HBase example working with it. Will feed back as soon as I have had the chance to work on this a bit more. N On Thu, May 29, 2014 at 3:27 AM, twizansk twiza...@gmail.com wrote: The code which causes the error is: The code which causes the error is: sc = SparkContext(local, My App) rdd = sc.newAPIHadoopFile( name, 'org.apache.hadoop.hbase.mapreduce.TableInputFormat', 'org.apache.hadoop.hbase.io.ImmutableBytesWritable', 'org.apache.hadoop.hbase.client.Result', conf={hbase.zookeeper.quorum: my-host, hbase.rootdir: hdfs://my-host:8020/hbase, hbase.mapreduce.inputtable: data}) The full stack trace is: Py4JError Traceback (most recent call last) ipython-input-8-3b9a4ea2f659 in module() 7 conf={hbase.zookeeper.quorum: my-host, 8 hbase.rootdir: hdfs://my-host:8020/hbase, 9 hbase.mapreduce.inputtable: data}) 10 11 /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.pyc in newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper, value_wrapper, conf) 281 for k, v in conf.iteritems(): 282 jconf[k] = v -- 283 jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class, 284 key_wrapper, value_wrapper, jconf) 285 return RDD(jrdd, self, PickleSerializer()) /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __getattr__(self, name) 657 else: 658 raise Py4JError('{0} does not exist in the JVM'. -- 659 format(self._fqn + name)) 660 661 def __call__(self, *args): Py4JError: org.apache.spark.api.python.PythonRDDnewAPIHadoopFile does not exist in the JVM -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142p6507.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Selecting first ten values in a RDD/partition
I have a DSTREAM which consists of RDD partitioned every 2 sec. I have sorted each RDD and want to retain only top 10 values and discard further value. How can I retain only top 10 values ? I am trying to get top 10 hashtags. Instead of sorting the entire of 5-minute-counts (thereby, incurring the cost of a data shuffle), I am trying to get the top 10 hashtags in each partition. I am struck at how to retain top 10 hashtags in each partition. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Selecting-first-ten-values-in-a-RDD-partition-tp6517.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Is uberjar a recommended way of running Spark/Scala applications?
I'm using Spark 1.0 and sbt assembly plugin to create uberjar of my application. However, when I run assembly command, I get a number of errors like this: java.lang.RuntimeException: deduplicate: different file contents found in the following: /home/username/.ivy2/cache/com.esotericsoftware.kryo/kryo/bundles/kryo-2.21.jar:com/esotericsoftware/minlog/Log$Logger.class /home/username/.ivy2/cache/com.esotericsoftware.minlog/minlog/jars/minlog-1.2.jar:com/esotericsoftware/minlog/Log$Logger.class ... As far as I can see, Spark Core depends on both - Minlog and Kryo, and the latter includes Minlog classes itself. Classes are binary different, so assembly can't combine them. And there's a number of such conflicts - I fixed some of them manually via mergeStrategy, but list of exceptions becomes larger and larger. I can continues, but it just does't look like the right way. My questions are: 1. Is an uberjar a recommended way of running Spark applications? 2. If so, should I include Spark itself into this large jar? 3. If not, what is a recommended way to do both - development and deployment (assuming ordinary sbt project). Thanks, Andrei
Re: problem about broadcast variable in iteration
hi, Andrew Ash, thanks for your reply. In fact, I have already used unpersist(), but it doesn't take effect. One reason that I select 1.0.0 version is just for it providing unpersist() interface. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-broadcast-variable-in-iteration-tp5479p6519.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Is uberjar a recommended way of running Spark/Scala applications?
Hi Andrei, I think the preferred way to deploy Spark jobs is by using the sbt package task instead of using the sbt assembly plugin. In any case, as you comment, the mergeStrategy in combination with some dependency exlusions should fix your problems. Have a look at this gist https://gist.github.com/JordiAranda/bdbad58d128c14277a05 for further details (I just followed some recommendations commented in the sbt assembly plugin documentation). Up to now I haven't found a proper way to combine my development/deployment phases, although I must say my experience in Spark is pretty poor (it really depends in your deployment requirements as well). In this case, I think someone else could give you some further insights. Best, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-uberjar-a-recommended-way-of-running-Spark-Scala-applications-tp6518p6520.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
ClassCastExceptions when using Spark shell
Hi, I have trouble running some custom code on Spark 0.9.1 in standalone mode on a cluster. I built a fat jar (excluding Spark) that I'm adding to the classpath with ADD_JARS=... When I start the Spark shell, I can instantiate classes, but when I run Spark code, I get strange ClassCastExceptions like this: 14/05/29 14:48:10 INFO TaskSetManager: Loss was due to java.lang.ClassCastException: io.ssc.sampling.matrix.DenseBlock cannot be cast to io.ssc.sampling.matrix.DenseBlock [duplicate 1] What am I doing wrong? Thx, Sebastian
Re: Selecting first ten values in a RDD/partition
Can you clarify what you're trying to achieve here ? If you want to take only top 10 of each RDD, why don't sort followed by take(10) of every RDD ? Or, you want to take top 10 of five minutes ? Cheers, On Thu, May 29, 2014 at 2:04 PM, nilmish nilmish@gmail.com wrote: I have a DSTREAM which consists of RDD partitioned every 2 sec. I have sorted each RDD and want to retain only top 10 values and discard further value. How can I retain only top 10 values ? I am trying to get top 10 hashtags. Instead of sorting the entire of 5-minute-counts (thereby, incurring the cost of a data shuffle), I am trying to get the top 10 hashtags in each partition. I am struck at how to retain top 10 hashtags in each partition. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Selecting-first-ten-values-in-a-RDD-partition-tp6517.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Comprehensive Port Configuration reference?
Howdy Andrew, This is a standalone cluster. And, yes, if my understanding of Spark terminology is correct, you are correct about the port ownerships. Jacob Jacob D. Eisinger IBM Emerging Technologies jeis...@us.ibm.com - (512) 286-6075 From: Andrew Ash and...@andrewash.com To: user@spark.apache.org Date: 05/28/2014 05:18 PM Subject:Re: Comprehensive Port Configuration reference? Hmm, those do look like 4 listening ports to me. PID 3404 is an executor and PID 4762 is a worker? This is a standalone cluster? On Wed, May 28, 2014 at 8:22 AM, Jacob Eisinger jeis...@us.ibm.com wrote: Howdy Andrew, Here is what I ran before an application context was created (other services have been deleted): # netstat -l -t tcp -p --numeric-ports Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp6 0 0 10.90.17.100: :::* LISTEN 4762/java tcp6 0 0 :::8081 :::* LISTEN 4762/java And, then while the application context is up: # netstat -l -t tcp -p --numeric-ports Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp6 0 0 10.90.17.100: :::* LISTEN 4762/java tcp6 0 0 :::57286 :::* LISTEN 3404/java tcp6 0 0 10.90.17.100:38118 :::* LISTEN 3404/java tcp6 0 0 10.90.17.100:35530 :::* LISTEN 3404/java tcp6 0 0 :::60235 :::* LISTEN 3404/java tcp6 0 0 :::8081 :::* LISTEN 4762/java My understanding is that this says four ports are open. Is 57286 and 60235 not being used? Jacob Jacob D. Eisinger IBM Emerging Technologies jeis...@us.ibm.com - (512) 286-6075 Inactive hide details for Andrew Ash ---05/25/2014 06:25:18 PM---Hi Jacob, The config option spark.history.ui.port is new for 1Andrew Ash ---05/25/2014 06:25:18 PM---Hi Jacob, The config option spark.history.ui.port is new for 1.0 The problem that From: Andrew Ash and...@andrewash.com To: user@spark.apache.org Date: 05/25/2014 06:25 PM Subject: Re: Comprehensive Port Configuration reference? Hi Jacob, The config option spark.history.ui.port is new for 1.0 The problem that History server solves is that in non-Standalone cluster deployment modes (Mesos and YARN) there is no long-lived Spark Master that can store logs and statistics about an application after it finishes. History server is the UI that renders logged data from applications after they complete. Read more here: https://issues.apache.org/jira/browse/SPARK-1276 and https://github.com/apache/spark/pull/204 As far as the two vs four dynamic ports, are those all listening ports? I did observe 4 ports in use, but only two of them were listening. The other two were the random ports used for responses on outbound connections, the source port of the (srcIP, srcPort, dstIP, dstPort) tuple that uniquely identifies a TCP socket. http://unix.stackexchange.com/questions/75011/how-does-the-server-find-out-what-client-port-to-send-to Thanks for taking a look through! I also realized that I had a couple mistakes with the 0.9 to 1.0 transition so appropriately documented those now as well in the updated PR. Cheers! Andrew On Fri, May 23, 2014 at 2:43 PM, Jacob Eisinger jeis...@us.ibm.com wrote: Howdy Andrew, I noticed you have a configuration item that we were not aware of: spark.history.ui.port . Is that new for 1.0? Also, we noticed that the Workers and the Drivers were opening up four dynamic ports per application context. It looks like you were seeing two. Everything else looks like it aligns! Jacob Jacob D. Eisinger IBM Emerging Technologies jeis...@us.ibm.com - (512) 286-6075 Inactive hide details for Andrew Ash ---05/23/2014 10:30:58 AM---Hi everyone, I've also been interested in better understandingAndrew Ash ---05/23/2014 10:30:58 AM---Hi everyone, I've also been interested in better understanding what ports are used where From: Andrew Ash and...@andrewash.com To: user@spark.apache.org Date: 05/23/2014 10:30 AM Subject: Re: Comprehensive Port Configuration reference? Hi everyone, I've also been interested in better understanding what ports are used where and the direction the network connections go. I've observed a running cluster and read through code, and came up with the below documentation addition.
Re: Spark SQL JDBC Connectivity
On Wed, May 28, 2014 at 11:39 PM, Venkat Subramanian vsubr...@gmail.comwrote: We are planning to use the latest Spark SQL on RDDs. If a third party application wants to connect to Spark via JDBC, does Spark SQL have support? (We want to avoid going though Shark/Hive JDBC layer as we need good performance). We don't have a full release yet, but there is a branch on the Shark github repository that has a version of SharkServer2 that uses Spark SQL. We also plan to port the Shark CLI, but this is not yet finished. You can find this branch along with documentation here: https://github.com/amplab/shark/tree/sparkSql Note that this version has not yet received much testing (outside of the integration tests that are run on Spark SQL). That said, I would love for people to test it out and report any problems or missing features. Any help here would be greatly appreciated! BTW, we also want to do the same for Spark Streaming - With Spark SQL work on DStreams (since the underlying structure is RDD anyway) and can we expose the streaming DStream RDD through JDBC via Spark SQL for Realtime analytics. We have talked about doing this, but this is not currently on the near term road map.
Re: ClassCastExceptions when using Spark shell
Hi Sebastian, That exception generally means you have the class loaded by two different class loaders, and some code is trying to mix instances created by the two different loaded classes. Do you happen to have that class both in the spark jars and in your app's uber-jar? That might explain the problem, although I'm not terribly familiar with Spark's class loader hierarchy. On Thu, May 29, 2014 at 5:51 AM, Sebastian Schelter s...@apache.org wrote: Hi, I have trouble running some custom code on Spark 0.9.1 in standalone mode on a cluster. I built a fat jar (excluding Spark) that I'm adding to the classpath with ADD_JARS=... When I start the Spark shell, I can instantiate classes, but when I run Spark code, I get strange ClassCastExceptions like this: 14/05/29 14:48:10 INFO TaskSetManager: Loss was due to java.lang.ClassCastException: io.ssc.sampling.matrix.DenseBlock cannot be cast to io.ssc.sampling.matrix.DenseBlock [duplicate 1] What am I doing wrong? Thx, Sebastian -- Marcelo
Spark hook to create external process
I have a requirement where for every Spark executor threadpool thread, I need to launch an associated external process. My job will consist of some processing in the Spark executor thread and some processing by its associated external process with the 2 communicating via some IPC mechanism. Is there a hook in Spark where I can put in my code to create / destroy these external processes corresponding to the creation / destruction of executor thread pool threads. Thanks Anand -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-hook-to-create-external-process-tp6526.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark hook to create external process
Hi Anand, This is probably already handled by the RDD.pipe() operation. It will spawn a process and let you feed data to it through its stdin and read data through stdout. Matei On May 29, 2014, at 9:39 AM, ansriniv ansri...@gmail.com wrote: I have a requirement where for every Spark executor threadpool thread, I need to launch an associated external process. My job will consist of some processing in the Spark executor thread and some processing by its associated external process with the 2 communicating via some IPC mechanism. Is there a hook in Spark where I can put in my code to create / destroy these external processes corresponding to the creation / destruction of executor thread pool threads. Thanks Anand -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-hook-to-create-external-process-tp6526.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Is uberjar a recommended way of running Spark/Scala applications?
Thanks, Jordi, your gist looks pretty much like what I have in my project currently (with few exceptions that I'm going to borrow). I like the idea of using sbt package, since it doesn't require third party plugins and, most important, doesn't create a mess of classes and resources. But in this case I'll have to handle jar list manually via Spark context. Is there a way to automate this process? E.g. when I was a Clojure guy, I could run lein deps (lein is a build tool similar to sbt) to download all dependencies and then just enumerate them from my app. Maybe you have heard of something like that for Spark/SBT? Thanks, Andrei On Thu, May 29, 2014 at 3:48 PM, jaranda jordi.ara...@bsc.es wrote: Hi Andrei, I think the preferred way to deploy Spark jobs is by using the sbt package task instead of using the sbt assembly plugin. In any case, as you comment, the mergeStrategy in combination with some dependency exlusions should fix your problems. Have a look at this gist https://gist.github.com/JordiAranda/bdbad58d128c14277a05 for further details (I just followed some recommendations commented in the sbt assembly plugin documentation). Up to now I haven't found a proper way to combine my development/deployment phases, although I must say my experience in Spark is pretty poor (it really depends in your deployment requirements as well). In this case, I think someone else could give you some further insights. Best, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-uberjar-a-recommended-way-of-running-Spark-Scala-applications-tp6518p6520.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Is uberjar a recommended way of running Spark/Scala applications?
The MergeStrategy combined with sbt assembly did work for me. This is not painless: some trial and error and the assembly may take multiple minutes. You will likely want to filter out some additional classes from the generated jar file. Here is an SOF answer to explain that and with IMHO the best answer snippet included here (in this case the OP understandably did not want to not include javax.servlet.Servlet) http://stackoverflow.com/questions/7819066/sbt-exclude-class-from-jar mappings in (Compile,packageBin) ~= { (ms: Seq[(File, String)]) = ms filter { case (file, toPath) = toPath != javax/servlet/Servlet.class } } There is a setting to not include the project files in the assembly but I do not recall it at this moment. 2014-05-29 10:13 GMT-07:00 Andrei faithlessfri...@gmail.com: Thanks, Jordi, your gist looks pretty much like what I have in my project currently (with few exceptions that I'm going to borrow). I like the idea of using sbt package, since it doesn't require third party plugins and, most important, doesn't create a mess of classes and resources. But in this case I'll have to handle jar list manually via Spark context. Is there a way to automate this process? E.g. when I was a Clojure guy, I could run lein deps (lein is a build tool similar to sbt) to download all dependencies and then just enumerate them from my app. Maybe you have heard of something like that for Spark/SBT? Thanks, Andrei On Thu, May 29, 2014 at 3:48 PM, jaranda jordi.ara...@bsc.es wrote: Hi Andrei, I think the preferred way to deploy Spark jobs is by using the sbt package task instead of using the sbt assembly plugin. In any case, as you comment, the mergeStrategy in combination with some dependency exlusions should fix your problems. Have a look at this gist https://gist.github.com/JordiAranda/bdbad58d128c14277a05 for further details (I just followed some recommendations commented in the sbt assembly plugin documentation). Up to now I haven't found a proper way to combine my development/deployment phases, although I must say my experience in Spark is pretty poor (it really depends in your deployment requirements as well). In this case, I think someone else could give you some further insights. Best, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-uberjar-a-recommended-way-of-running-Spark-Scala-applications-tp6518p6520.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Driver OOM while using reduceByKey
That hash map is just a list of where each task ran, it’s not the actual data. How many map and reduce tasks do you have? Maybe you need to give the driver a bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _, 100) to use only 100 tasks). Matei On May 29, 2014, at 2:03 AM, haitao .yao yao.e...@gmail.com wrote: Hi, I used 1g memory for the driver java process and got OOM error on driver side before reduceByKey. After analyzed the heap dump, the biggest object is org.apache.spark.MapStatus, which occupied over 900MB memory. Here's my question: 1. Is there any optimization switches that I can tune to avoid this? I have used the compression on output with spark.io.compression.codec. 2. Why the workers send all the data back to driver to run reduceByKey? With the current implementation, if I use reduceByKey on TBs of data, that will be a disaster for driver. Maybe I'm wrong about the assumption of the spark implementation. And here's my code snippet: ``` val cntNew = spark.accumulator(0) val cntOld = spark.accumulator(0) val cntErr = spark.accumulator(0) val sequenceFileUrl = args(0) val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl) val stat = seq.map(pair = convertData( pair._2, cntNew, cntOld, cntErr )).reduceByKey(_ + _) stat.saveAsSequenceFile(args(1) ``` Thanks. -- haitao.yao@China
Re: Selecting first ten values in a RDD/partition
Try looking at the .mapPartitions( ) method implemented for RDD[T] objects. It will give you direct access to an iterator containing the member objects of each partition for doing the kind of within-partition hashtag counts you're describing. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Selecting-first-ten-values-in-a-RDD-partition-tp6517p6534.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Why Scala?
I recently discovered Hacker News and started reading through older posts about Scala https://hn.algolia.com/?q=scala#!/story/forever/0/scala. It looks like the language is fairly controversial on there, and it got me thinking. Scala appears to be the preferred language to work with in Spark, and Spark itself is written in Scala, right? I know that often times a successful project evolves gradually out of something small, and that the choice of programming language may not always have been made consciously at the outset. But pretending that it was, why is Scala the preferred language of Spark? Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-Scala-tp6536.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Why Scala?
Quite a few people ask this question and the answer is pretty simple. When we started Spark, we had two goals — we wanted to work with the Hadoop ecosystem, which is JVM-based, and we wanted a concise programming interface similar to Microsoft’s DryadLINQ (the first language-integrated big data framework I know of, that begat things like FlumeJava and Crunch). On the JVM, the only language that would offer that kind of API was Scala, due to its ability to capture functions and ship them across the network. Scala’s static typing also made it much easier to control performance compared to, say, Jython or Groovy. In terms of usage, however, we see substantial usage of our other languages (Java and Python), and we’re continuing to invest in both. In a user survey we did last fall, about 25% of users used Java and 30% used Python, and I imagine these numbers are growing. With lambda expressions now added to Java 8 (http://databricks.com/blog/2014/04/14/Spark-with-Java-8.html), I think we’ll see a lot more Java. And at Databricks I’ve seen a lot of interest in Python, which is very exciting to us in terms of ease of use. Matei On May 29, 2014, at 1:57 PM, Benjamin Black b...@b3k.us wrote: HN is a cesspool safely ignored. On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I recently discovered Hacker News and started reading through older posts about Scala. It looks like the language is fairly controversial on there, and it got me thinking. Scala appears to be the preferred language to work with in Spark, and Spark itself is written in Scala, right? I know that often times a successful project evolves gradually out of something small, and that the choice of programming language may not always have been made consciously at the outset. But pretending that it was, why is Scala the preferred language of Spark? Nick View this message in context: Why Scala? Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Shuffle file consolidation
Thanks, I missed that. One thing that's still unclear to me, even looking at that, is - does this parameter have to be set when starting up the cluster, on each of the workers, or can it be set by an individual client job? On Fri, May 23, 2014 at 10:13 AM, Han JU ju.han.fe...@gmail.com wrote: Hi Nathan, There's some explanation in the spark configuration section: ``` If set to true, consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to true when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (8) cores due to filesystem limitations. ``` 2014-05-23 16:00 GMT+02:00 Nathan Kronenfeld nkronenf...@oculusinfo.com: In trying to sort some largish datasets, we came across the spark.shuffle.consolidateFiles property, and I found in the source code that it is set, by default, to false, with a note to default it to true when the feature is stable. Does anyone know what is unstable about this? If we set it true, what problems should we anticipate? Thanks, -Nathan Kronenfeld -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- *JU Han* Data Engineer @ Botify.com +33 061960 -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Shuffle file consolidation
It can be set in an individual application. Consolidation had some issues on ext3 as mentioned there, though we might enable it by default in the future because other optimizations now made it perform on par with the non-consolidation version. It also had some bugs in 0.9.0 so I’d suggest at least 0.9.1. Matei On May 29, 2014, at 2:21 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Thanks, I missed that. One thing that's still unclear to me, even looking at that, is - does this parameter have to be set when starting up the cluster, on each of the workers, or can it be set by an individual client job? On Fri, May 23, 2014 at 10:13 AM, Han JU ju.han.fe...@gmail.com wrote: Hi Nathan, There's some explanation in the spark configuration section: ``` If set to true, consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to true when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (8) cores due to filesystem limitations. ``` 2014-05-23 16:00 GMT+02:00 Nathan Kronenfeld nkronenf...@oculusinfo.com: In trying to sort some largish datasets, we came across the spark.shuffle.consolidateFiles property, and I found in the source code that it is set, by default, to false, with a note to default it to true when the feature is stable. Does anyone know what is unstable about this? If we set it true, what problems should we anticipate? Thanks, -Nathan Kronenfeld -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- JU Han Data Engineer @ Botify.com +33 061960 -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Why Scala?
There were few known concerns about Scala, and some still are, but having been doing Scala professionally over two years now, i learned to master and appreciate the advanatages. Major concern IMO is Scala in a less-than-scrupulous corporate environment. First, Scala requires significantly more discipline in commenting and style to still stay painlessly readable, than java. People with less than stellar code hygiene can easily turn a project into an unmaintainable mess. Second, from corporate management prospective, it is (still?) much harder to staff with Scala coders as opposed to Java ones. All these things are a headache for corporate bosses, but for public and academic projects with thorough peer review and increased desire for contributors to look clean in public it works out quite well, and strong sides really shine. Spark specifically builds around FP patterns -- such as monads and functors -- which were absent in java prior to 8 (i am not sure that they are as well worked out in java 8 collections even now, as opposed to Scala collections). So java 8 simply comes a little late to the show in that department. Also FP is not the only thing that is used by Spark. Spark also uses stuff like implicits, akka/agent framework for IPC. Let's not forget that FP is albeit important but only one out of many stories in Scala in the grand scale of things. On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.comwrote: I recently discovered Hacker News and started reading through older posts about Scala https://hn.algolia.com/?q=scala#!/story/forever/0/scala. It looks like the language is fairly controversial on there, and it got me thinking. Scala appears to be the preferred language to work with in Spark, and Spark itself is written in Scala, right? I know that often times a successful project evolves gradually out of something small, and that the choice of programming language may not always have been made consciously at the outset. But pretending that it was, why is Scala the preferred language of Spark? Nick -- View this message in context: Why Scala?http://apache-spark-user-list.1001560.n3.nabble.com/Why-Scala-tp6536.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: Spark SQL JDBC Connectivity and more
Thanks Michael. OK will try SharkServer2.. But I have some basic questions on a related area: 1) If I have a standalone spark application that has already built a RDD, how can SharkServer2 or for that matter Shark access 'that' RDD and do queries on it. All the examples I have seen for Shark, the RDD (tables) are created within Shark's spark context and processed. I have stylized the real problem we have which is, we have a standalone spark application that is processing DStreams and producing output Dstreams. I want to expose that near real-time Dstream data to a 3 rd party app via JDBC and allow SharkServer2 CLI to operate and query on the Dstreams real-time all from memory. Currently we are writing output stream to Cassandra and exposing it to 3 rd party app through it via JDBC, but want to avoid that extra disk write which increases latency. 2) I have two applications, one used for processing and computing output RDD from an input and another for post processing the resultant RDD into multiple persistent stores + doing other things with it. These are split in to separate processes intentionally. How do we share the output RDD from first application to second application without writing to disk (thinking of serializing the RDD and streaming through Kafka, but then we loose time and all the fault tolerance that RDD brings in)? Is Tachyon the only other way? Are there other models/design patterns for applications that share RDDs, as this may be a very common use case? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-JDBC-Connectivity-tp6511p6543.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
access hdfs file name in map()
Hello, A quick question about using spark to parse text-format CSV files stored on hdfs. I have something very simple: sc.textFile(hdfs://test/path/*).map(line = line.split(,)).map(p = (XXX, p[0], p[2])) Here, I want to replace XXX with a string, which is the current csv filename for the line. This is needed since some information may be encoded in the file name, like date. In hive, I am able to define an external table and use INPUT__FILE__NAME as a column in queries. I wonder if spark has something similar. Thanks! -Simon
Re: Why Scala?
Nicholas, Good question. Couple of thoughts from my practical experience: - Coming from R, Scala feels more natural than other languages. The functional succinctness of Scala is more suited for Data Science than other languages. In short, Scala-Spark makes sense, for Data Science, ML, Data Exploration et al - Having said that occasionally practicality does trump the choice of a language - last time I really wanted to use Scala but ended up in writing in Python ! Hope to get a better result this time - Language evolution is more of a long term granularity - we do underestimate the velocity impact. Have seen evolutions through languages starting from Cobol, CCP/M Basic,Turbo Pascal, ... I think Scala will find it's equilibrium sooner than we think ... Cheers k/ On Thu, May 29, 2014 at 5:54 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Thank you for the specific points about the advantages Scala provides over other languages. Looking at several code samples, the reduction of boilerplate code over Java is one of the biggest plusses, to me. On Thu, May 29, 2014 at 8:10 PM, Marek Kolodziej mkolod@gmail.com wrote: I would advise others to form their opinions based on experiencing it for themselves, rather than reading what random people say on Hacker News. :) Just a nitpick here: What I said was It looks like the language is fairly controversial on [Hacker News.] That was just an observation of what I saw on HN, not a statement of my opinion. I know very little about Scala (or Java, for that matter) and definitely don't have a well-formed opinion on the matter. Nick
getPreferredLocations
I am building my own custom RDD class. 1) Is there a guarantee that a partition will only be processed on a node which is in the getPreferredLocations set of nodes returned by the RDD ? 2) I am implementing this custom RDD in Java and plan to extend JavaRDD. However, I dont see a getPreferredLocations method in http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.api.java.JavaRDD Will I be able to implement my own custom RDD in Java and be able to override the getPreferredLocations method ? Thanks Anand -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getPreferredLocations-tp6554.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Driver OOM while using reduceByKey
Thanks. it worked. 2014-05-30 1:53 GMT+08:00 Matei Zaharia matei.zaha...@gmail.com: That hash map is just a list of where each task ran, it’s not the actual data. How many map and reduce tasks do you have? Maybe you need to give the driver a bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _, 100) to use only 100 tasks). Matei On May 29, 2014, at 2:03 AM, haitao .yao yao.e...@gmail.com wrote: Hi, I used 1g memory for the driver java process and got OOM error on driver side before reduceByKey. After analyzed the heap dump, the biggest object is org.apache.spark.MapStatus, which occupied over 900MB memory. Here's my question: 1. Is there any optimization switches that I can tune to avoid this? I have used the compression on output with spark.io.compression.codec. 2. Why the workers send all the data back to driver to run reduceByKey? With the current implementation, if I use reduceByKey on TBs of data, that will be a disaster for driver. Maybe I'm wrong about the assumption of the spark implementation. And here's my code snippet: ``` val cntNew = spark.accumulator(0) val cntOld = spark.accumulator(0) val cntErr = spark.accumulator(0) val sequenceFileUrl = args(0) val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl) val stat = seq.map(pair = convertData( pair._2, cntNew, cntOld, cntErr )).reduceByKey(_ + _) stat.saveAsSequenceFile(args(1) ``` Thanks. -- haitao.yao@China -- haitao.yao@Beijing
Re: access hdfs file name in map()
Currently there is not a way to do this using textFile(). However, you could pretty straightforwardly define your own subclass of HadoopRDD [1] in order to get access to this information (likely using mapPartitionsWithIndex to look up the InputSplit for a particular partition). Note that sc.textFile() is just a convenience function to construct a new HadoopRDD [2]. [1] HadoopRDD: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L93 [2] sc.textFile(): https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L456 On Thu, May 29, 2014 at 7:49 PM, Xu (Simon) Chen xche...@gmail.com wrote: Hello, A quick question about using spark to parse text-format CSV files stored on hdfs. I have something very simple: sc.textFile(hdfs://test/path/*).map(line = line.split(,)).map(p = (XXX, p[0], p[2])) Here, I want to replace XXX with a string, which is the current csv filename for the line. This is needed since some information may be encoded in the file name, like date. In hive, I am able to define an external table and use INPUT__FILE__NAME as a column in queries. I wonder if spark has something similar. Thanks! -Simon