Data from PostgreSQL to Spark
Hi All I have a use case where where I am consuming the Events from RabbitMQ using spark streaming.This event has some fields on which I want to query the PostgreSQL and bring the data and then do the join between event data and PostgreSQl data and put the aggregated data into HDFS, so that I run run analytics query over this data using SparkSQL. my question is PostgreSQL data in production data so i don't want to hit so many times. at any given 1 seconds time I may have 3000 events,that means I need to fire 3000 parallel query to my PostGreSQl and this data keeps on growing, so my database will go down. I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Any anyone suggest me best approach/ method to handle this case? Regards Jeetendra
Re: Comparison between Standalone mode and YARN mode
YARN and Mesos are better for production clusters of non-trivial size that have mixed job kinds and multiple users, as they manage resources more intelligently and dynamically. They also support other services you probably need, like HDFS, databases, workflow tools, etc. Standalone is fine, though, if you have a limited number of jobs competing for resources, for example a small cluster dedicated to ingesting or processing a specific kind of data, or for a dev/QA cluster. Standalone mode has much lower overhead, but you have to manage the daemon services yourself, including configuration of Zookeeper if you need master failover. Hence, you don't see it often in production scenarios. The Spark page on cluster deployments has more details: http://spark.apache.org/docs/latest/cluster-overview.html dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Jul 22, 2015 at 6:56 PM, Dogtail Ray spark.ru...@gmail.com wrote: Hi, I am very curious about the differences between Standalone mode and YARN mode. According to http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/, it seems that YARN mode is always better than Standalone mode. Is that the case? Or I should choose different modes according to my specific requirements? Thanks!
Re: Performance issue with Spak's foreachpartition method
Bagavath, Sometimes we need to merge existing records, due to recomputations of the whole data. I don't think we could achieve this with pure insert, or is there a way? On 24 July 2015 at 08:53, Bagavath bagav...@gmail.com wrote: Try using insert instead of merge. Typically we use insert append to do bulk inserts to oracle. On Thu, Jul 23, 2015 at 1:12 AM, diplomatic Guru diplomaticg...@gmail.com wrote: Thanks Robin for your reply. I'm pretty sure that writing to Oracle is taking longer as when writing to HDFS is only taking ~5minutes. The job is writing about ~5 Million of records. I've set the job to call executeBatch() when the batchSize reaches 200,000 of records, so I assume that commit will be invoked at every 200K batch. In this case, it should only call commit 25 times, is this too much? I wouldn't want to increase the batch size any further as it may cause Java heap issue. I do not have much knowledge in Oracle side, so any advice with the configuration will be grateful. Thanks, Raj On 22 July 2015 at 20:20, Robin East robin.e...@xense.co.uk wrote: The first question I would ask is have you determined whether you have a performance issue writing to Oracle? In particular how many commits are you making? If you are issuing a lot of commits that would be a performance problem. Robin On 22 Jul 2015, at 19:11, diplomatic Guru diplomaticg...@gmail.com wrote: Hello all, We are having a major performance issue with the Spark, which is holding us from going live. We have a job that carries out computation on log files and write the results into Oracle DB. The reducer 'reduceByKey' have been set to parallelize by 4 as we don't want to establish too many DB connections. We are then calling the foreachPartition on the RDD pairs that were reduced by the key. Within this foreachPartition method we establish DB connection, then iterate the results, prepare the Oracle statement for batch insertion then we commit the batch and close the connection. All these are working fine. However, when we execute the job to process 12GB of data, it takes forever to complete, especially at the foreachPartition stage. We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 is assigned to spark.storage.memoryFraction. The job is taking about 50 minutes to complete, which is not ideal. I'm not sure how we could enhance the performance. I've provided the main body of the codes, please take a look and advice: From Driver: reduceResultsRDD.foreachPartition(new DB.InsertFunction( dbuser,dbpass,batchsize)); DB class: public class DB { private static final Logger logger = LoggerFactory .getLogger(DB.class); public static class InsertFunction implements VoidFunctionIteratorTuple2String, String { private static final long serialVersionUID = 55766876878L; private String dbuser = ; private String dbpass = ; private int batchsize; public InsertFunction(String dbuser, String dbpass, int batchsize) { super(); this.dbuser = dbuser; this.dbuser = dbuser; this.batchsize=batchsize; } @Override public void call(IteratorTuple2String, String results) { Connection connect = null; PreparedStatement pstmt = null; try { connect = getDBConnection(dbuser, dbpass); int count = 0; if (batchsize = 0) { batchsize = 1; } pstmt1 = connect .prepareStatement(MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT); while (results.hasNext()) { Tuple2String, String kv = results.next(); String [] data = kv._1.concat(, +kv._2).split(,); pstmt.setString(1, data[0].toString()); pstmt.setString(2, data[1].toString()); . pstmt.addBatch(); count++; if (count == batchsize) { logger.info(BulkCount : + count); pstmt.executeBatch(); connect.commit(); count = 0; } pstmt.executeBatch(); connect.commit(); } pstmt.executeBatch(); connect.commit(); } catch (Exception e) { logger.error(InsertFunction error: + e.getMessage()); } finally { if (pstmt != null) { pstmt.close(); } try { connect.close(); } catch (SQLException e) { logger.error(InsertFunction Connection Close error: + e.getMessage()); } } } } }
Spark - Serialization with Kryo
Hello, I´ve got a problem using Spark with Geomesa. I´m not quite sure where the error comes from, but I assume its problem with Spark. A ClassNotFoundException is thrown with following content: Failed to register classes with Kryo. Please have a look at https://github.com/apache/spark/pull/4258 A solution is described there, but I´m not sure how to use this patch. I´m using Spark version 1.3.0 and it´s not possible for me to update my version, because I use GeoMesa. Thanks in advance.
Fwd: Performance questions regarding Spark 1.3 standalone mode
Hi all, I wonder if any one has an explanation for this behavior. Thank you, -Khaled -- Forwarded message -- From: Khaled Ammar khaled.am...@gmail.com Date: Fri, Jul 24, 2015 at 9:35 AM Subject: Performance questions regarding Spark 1.3 standalone mode To: user@spark.apache.org Hi all, I have a standalone spark cluster setup on EC2 machines. I did the setup manually without the ec2 scripts. I have two questions about Spark/GraphX performance: 1) When I run the PageRank example, the storage tab does not show that all RDDs are cached. Only one RDD is 100% cached, but the remaining range from 25% to 97%. Kindly note there is enough memory to cache all RDDs. 2) I noticed that loading the dataset partitions, total of 25 GB, is not always evenly distributed to executors. Occasionally, one or two executor become responsible for loading several partitions, while others are loading only 1 partition. Does any one know the reason behind this behavior? Is it a bug, or it is possible to fix this using configuration parameters. -- Thanks, -Khaled -- Thanks, -Khaled
Re: suggest coding platform
Hi Saif: There is also the Spark Kernel which provides you the auto-complete, logs and syntax highlighting for scala on the notebook (ex. jupyter) https://github.com/ibm-et/spark-kernel There was a recent meetup that talked about it in case you are interested in the technical details: https://www.youtube.com/watch?v=2AX6g0tK-us All the best, Guillermo On Mon, Jul 27, 2015 at 3:25 AM, Akhil Das ak...@sigmoidanalytics.com wrote: How about IntelliJ? It also has a Terminal tab. Thanks Best Regards On Fri, Jul 24, 2015 at 6:06 PM, saif.a.ell...@wellsfargo.com wrote: Hi all, I tried Notebook Incubator Zeppelin, but I am not completely happy with it. What do you people use for coding? Anything with auto-complete, proper warning logs and perhaps some colored syntax. My platform is on linux, so anything with some notebook studio, or perhaps a windows IDE with remote ssh capabilities? Thanks, Saif - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data from PostgreSQL to Spark
You can have Spark reading from PostgreSQL through the data access API. Do you have any concern with that approach since you mention copying that data into HBase. From: Jeetendra Gangele Sent: Monday, July 27, 6:00 AM Subject: Data from PostgreSQL to Spark To: user Hi All I have a use case where where I am consuming the Events from RabbitMQ using spark streaming.This event has some fields on which I want to query the PostgreSQL and bring the data and then do the join between event data and PostgreSQl data and put the aggregated data into HDFS, so that I run run analytics query over this data using SparkSQL. my question is PostgreSQL data in production data so i don't want to hit so many times. at any given 1 seconds time I may have 3000 events,that means I need to fire 3000 parallel query to my PostGreSQl and this data keeps on growing, so my database will go down. I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Any anyone suggest me best approach/ method to handle this case? Regards Jeetendra
Re: spark spark-ec2 credentials using aws_security_token
You refer to `aws_security_token`, but I'm not sure where you're specifying it. Can you elaborate? Is it an environment variable? On Mon, Jul 27, 2015 at 4:21 AM Jan Zikeš jan.zi...@centrum.cz wrote: Hi, I would like to ask if it is currently possible to use spark-ec2 script together with credentials that are consisting not only from: aws_access_key_id and aws_secret_access_key, but it also contains aws_security_token. When I try to run the script I am getting following error message: ERROR:boto:Caught exception reading instance data Traceback (most recent call last): File /Users/zikes/opensource/spark/ec2/lib/boto-2.34.0/boto/utils.py, line 210, in retry_url r = opener.open(req, timeout=timeout) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 404, in open response = self._open(req, data) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 422, in _open '_open', req) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 382, in _call_chain result = func(*args) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 1214, in http_open return self.do_open(httplib.HTTPConnection, req) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 1184, in do_open raise URLError(err) URLError: urlopen error [Errno 64] Host is down ERROR:boto:Unable to read instance data, giving up No handler was ready to authenticate. 1 handlers were checked. ['QuerySignatureV2AuthHandler'] Check your credentials Does anyone has some idea what can be possibly wrong? Is aws_security_token the problem? I know that it seems more like a boto problem, but still I would like to ask if anybody has some experience with this? My launch command is: ./spark-ec2 -k my_key -i my_key.pem --additional-tags mytag:tag1,mytag2:tag2 --instance-profile-name profile1 -s 1 launch test Thank you in advance for any help. Best regards, Jan Note: I have also asked at http://stackoverflow.com/questions/31583513/spark-spark-ec2-credentials-using-aws-security-token?noredirect=1#comment51151822_31583513 without any success. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-spark-ec2-credentials-using-aws-security-token-tp24007.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Getting java.net.BindException when attempting to start Spark master on EC2 node with public IP
Hello, I am trying to start a Spark master for a standalone cluster on an EC2 node. The CLI command I'm using looks like this: Note that I'm specifying the --host argument; I want my Spark master to be listening on a specific IP address. The host that I'm specifying (i.e. 54.xx.xx.xx) is the public IP for my EC2 node; I've confirmed that nothing else is listening on port 7077 and that my EC2 security group has all ports open. I've also double-checked that the public IP is correct. When I use --host 54.xx.xx.xx, I get the following error message: This does not occur if I leave out the --host argument and it doesn't occur if I use --host 10.0.xx.xx, where 10.0.xx.xx is my private EC2 IP address. Why would Spark fail to bind to a public EC2 address? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-java-net-BindException-when-attempting-to-start-Spark-master-on-EC2-node-with-public-IP-tp24011.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [ Potential bug ] Spark terminal logs say that job has succeeded even though job has failed in Yarn cluster mode
Any updates on this bug ? Why Spark log results Job final status does not match ? (one saying that job has failed, another stating that job has succeeded) Thanks. On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, While running Spark Word count python example with intentional mistake in *Yarn cluster mode*, Spark terminal states final status as SUCCEEDED, but log files state correct results indicating that the job failed. Why terminal log output application log output contradict each other ? If i run same job on *local mode* then terminal logs and application logs match, where both state that job has failed to expected error in python script. More details: Scenario While running Spark Word count python example on *Yarn cluster mode*, if I make intentional error in wordcount.py by changing this line (I'm using Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0 versions - which i tested): lines = sc.textFile(sys.argv[1], 1) into this line: lines = sc.textFile(*nonExistentVariable*,1) where nonExistentVariable variable was never created and initialized. then i run that example with this command (I put README.md into HDFS before running this command): *./bin/spark-submit --master yarn-cluster wordcount.py /README.md* The job runs and finishes successfully according the log printed in the terminal : *Terminal logs*: ... 15/07/23 16:19:17 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:18 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:19 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:20 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:21 INFO yarn.Client: Application report for application_1437612288327_0013 (state: FINISHED) 15/07/23 16:19:21 INFO yarn.Client: client token: N/A diagnostics: Shutdown hook called before final status was reported. ApplicationMaster host: 10.0.53.59 ApplicationMaster RPC port: 0 queue: default start time: 1437693551439 final status: *SUCCEEDED* tracking URL: http://localhost:8088/proxy/application_1437612288327_0013/history/application_1437612288327_0013/1 user: edadashov 15/07/23 16:19:21 INFO util.Utils: Shutdown hook called 15/07/23 16:19:21 INFO util.Utils: Deleting directory /tmp/spark-eba0a1b5-a216-4afa-9c54-a3cb67b16444 But if look at log files generated for this application in HDFS - it indicates failure of the job with correct reason: *Application log files*: ... \00 stdout\00 179Traceback (most recent call last): File wordcount.py, line 32, in module lines = sc.textFile(nonExistentVariable,1) *NameError: name 'nonExistentVariable' is not defined* Why terminal output - final status: *SUCCEEDED , *is not matching application log results - failure of the job (NameError: name 'nonExistentVariable' is not defined) ? Is this bug ? Is there Jira ticket related to this issue ? (Is someone assigned to this issue ?) If i run this wordcount .py example (with mistake line) in local mode, then terminal log states that the job has failed in terminal logs too. *./bin/spark-submit wordcount.py /README.md* *Terminal logs*: ... 15/07/23 16:31:55 INFO scheduler.EventLoggingListener: Logging events to hdfs:///app-logs/local-1437694314943 Traceback (most recent call last): File /home/edadashov/tools/myspark/spark/wordcount.py, line 32, in module lines = sc.textFile(nonExistentVariable,1) NameError: name 'nonExistentVariable' is not defined 15/07/23 16:31:55 INFO spark.SparkContext: Invoking stop() from shutdown hook Thanks. -- Best regards, Elkhan Dadashov
SparkR
Does SparkR support all the algorithms that R library supports?
Re: PYSPARK_DRIVER_PYTHON=ipython spark/bin/pyspark Does not create SparkContext
Hmm, it should work with you run `PYSPARK_DRIVER_PYTHON=ipython spark/bin/pyspark` PYTHONSTARTUP is a PYTHON environment variable https://docs.python.org/2/using/cmdline.html#envvar-PYTHONSTARTUP On Sun, Jul 26, 2015 at 4:06 PM -0700, Zerony Zhao bw.li...@gmail.com wrote: Hello everyone, I have a newbie question. $SPARK_HOME/bin/pyspark will create SparkContext automatically. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.4.1 /_/ Using Python version 2.7.3 (default, Jun 22 2015 19:33:41) SparkContext available as sc, HiveContext available as sqlContext. But When using ipython as a driver, PYSPARK_DRIVER_PYTHON=ipython spark/bin/pyspark , does not create SparkContext automatically. I have to execute execfile('spark_home/python/pyspark/shell.py') is it by design? I read the bash script bin/pyspark, I noticed the line: export PYTHONSTARTUP=$SPARK_HOME/python/pyspark/shell.py But I searched the whole spark source code, the variable PYTHONSTARTUP is never used, I could not understand when PYTHONSTARTUP is executed. Thank you.
Unexpected performance issues with Spark SQL using Parquet
Hi spark users and developers, I have been trying to understand how Spark SQL works with Parquet for the couple of days. There is a performance problem that is unexpected using the column pruning. Here is a dummy example: The parquet file has the 3 fields: |-- customer_id: string (nullable = true) |-- type: string (nullable = true) |-- mapping: map (nullable = true) ||-- key: string ||-- value: string (nullable = true) Note that mapping is just a field with a lot of key value pairs. I just created a parquet files with 1 billion entries with each entry having 10 key-value pairs in the mapping. After I generate this parquet file, I generate another parquet without the mapping field that is: |-- customer_id: string (nullable = true) |-- type: string (nullable = true) Let call the first parquet file data-with-mapping and the second parquet file data-without-mapping. Then I ran a very simple query over two parquet files: val df = sqlContext.read.parquet(path) df.select(df(type)).count The run on the data-with-mapping takes 34 seconds with the input size of 11.7 MB. The run on the data-without-mapping takes 8 seconds with the input size of 7.6 MB. They all ran on the same cluster with spark 1.4.1. What bothers me the most is the input size because I supposed column pruning will only deserialize columns that are relevant to the query (in this case the field type) but for sure, it reads more data on the data-with-mapping than the data-without-mapping. The speed is 4x faster in the data-without-mapping that means that the more columns a parquet file has the slower it is even only a specific column is needed. Anyone has an explanation on this? I was expecting both of them will finish approximate the same time. Best Regards, Jerry
Re: Spark build/sbt assembly
bq. on one node it works but on the other it gives me the above error. Can you tell us the difference between the environments on the two nodes ? Does the other node use Java 8 ? Cheers On Mon, Jul 27, 2015 at 11:38 AM, Rahul Palamuttam rahulpala...@gmail.com wrote: Hi All, I hope this is the right place to post troubleshooting questions. I've been following the install instructions and I get the following error when running the following from Spark home directory $./build/sbt Using /usr/java/jdk1.8.0_20/ as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. Attempting to fetch sbt Launching sbt from build/sbt-launch-0.13.7.jar Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar However when I run sbt assembly it compiles, with a couple of warnings, but it works none-the less. Is the build/sbt script deprecated? I do notice on one node it works but on the other it gives me the above error. Thanks, Rahul P -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-build-sbt-assembly-tp24012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark build/sbt assembly
So just to clarify, I have 4 nodes, all of which use Java 8. Only one of them is able to successfully execute the build/sbt assembly command. However on the 3 others I get the error. If I run sbt assembly in Spark Home, it works and I'm able to launch the master and worker processes. On Mon, Jul 27, 2015 at 11:48 AM, Rahul Palamuttam rahulpala...@gmail.com wrote: All nodes are using java 8. I've tried to mimic the environments as much as possible among all nodes. On Mon, Jul 27, 2015 at 11:44 AM, Ted Yu yuzhih...@gmail.com wrote: bq. on one node it works but on the other it gives me the above error. Can you tell us the difference between the environments on the two nodes ? Does the other node use Java 8 ? Cheers On Mon, Jul 27, 2015 at 11:38 AM, Rahul Palamuttam rahulpala...@gmail.com wrote: Hi All, I hope this is the right place to post troubleshooting questions. I've been following the install instructions and I get the following error when running the following from Spark home directory $./build/sbt Using /usr/java/jdk1.8.0_20/ as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. Attempting to fetch sbt Launching sbt from build/sbt-launch-0.13.7.jar Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar However when I run sbt assembly it compiles, with a couple of warnings, but it works none-the less. Is the build/sbt script deprecated? I do notice on one node it works but on the other it gives me the above error. Thanks, Rahul P -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-build-sbt-assembly-tp24012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark build/sbt assembly
All nodes are using java 8. I've tried to mimic the environments as much as possible among all nodes. On Mon, Jul 27, 2015 at 11:44 AM, Ted Yu yuzhih...@gmail.com wrote: bq. on one node it works but on the other it gives me the above error. Can you tell us the difference between the environments on the two nodes ? Does the other node use Java 8 ? Cheers On Mon, Jul 27, 2015 at 11:38 AM, Rahul Palamuttam rahulpala...@gmail.com wrote: Hi All, I hope this is the right place to post troubleshooting questions. I've been following the install instructions and I get the following error when running the following from Spark home directory $./build/sbt Using /usr/java/jdk1.8.0_20/ as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. Attempting to fetch sbt Launching sbt from build/sbt-launch-0.13.7.jar Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar However when I run sbt assembly it compiles, with a couple of warnings, but it works none-the less. Is the build/sbt script deprecated? I do notice on one node it works but on the other it gives me the above error. Thanks, Rahul P -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-build-sbt-assembly-tp24012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.NoSuchMethodError for list.toMap.
Hi, Akhil, Yes, in the build.sbt I wrongly set it to the installed scala version of 2.11.6 on the cluster, fixed now. Thanks! Cheers, Dan 2015-07-27 2:29 GMT-05:00 Akhil Das ak...@sigmoidanalytics.com: Whats in your build.sbt? You could be messing with the scala version it seems. Thanks Best Regards On Fri, Jul 24, 2015 at 2:15 AM, Dan Dong dongda...@gmail.com wrote: Hi, When I ran with spark-submit the following simple Spark program of: import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark._ import SparkContext._ object TEST2{ def main(args:Array[String]) { val conf = new SparkConf().setAppName(TEST) val sc=new SparkContext(conf) val list=List((aa,1),(bb,2),(cc,3)) val maps=list.toMap } } I got java.lang.NoSuchMethodError for the line of val maps=list.toMap. But in a spark-shell or simply scala, it has no problem: scala val list=List((aa,1),(bb,2),(cc,3)) list: List[(String, Int)] = List((aa,1), (bb,2), (cc,3)) scala val maps=list.toMap maps: scala.collection.immutable.Map[String,Int] = Map(aa - 1, bb - 2, cc - 3) So to use toMap method, what am I missing in spark-submit? I use sbt package to compile the program and without problem. Thanks! Cheers, Dan
CPU Parallelization not being used (local mode)
Hi all, would like some insight. I am currently computing huge databases, and playing with monitoring and tunning. When monitoring the multiple cores I have, I see that even when RDDs are parallelized, computation on the RDD jump from core to core sporadically ( I guess, depending on where the chunk is), So I see one CORE at 100% usage, and the other ones sitting idle by, after some time when the task is complete, the procesing jumps into another core, and so on. can you share any general insight on this situation? Does this depend on the computation? I have tried serialization and different setups, but I neve see more than 1 Core working at a spark-submission. note: This is no cluster mode, just local processors. Thanks, Saif
Spark build/sbt assembly
Hi All, I hope this is the right place to post troubleshooting questions. I've been following the install instructions and I get the following error when running the following from Spark home directory $./build/sbt Using /usr/java/jdk1.8.0_20/ as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. Attempting to fetch sbt Launching sbt from build/sbt-launch-0.13.7.jar Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar However when I run sbt assembly it compiles, with a couple of warnings, but it works none-the less. Is the build/sbt script deprecated? I do notice on one node it works but on the other it gives me the above error. Thanks, Rahul P -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-build-sbt-assembly-tp24012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data from PostgreSQL to Spark
Thanks for your reply. Parallel i will be hitting around 6000 call to postgreSQl which is not good my database will die. these calls to database will keeps on increasing. Handling millions on request is not an issue with Hbase/NOSQL any other alternative? On 27 July 2015 at 23:18, felixcheun...@hotmail.com wrote: You can have Spark reading from PostgreSQL through the data access API. Do you have any concern with that approach since you mention copying that data into HBase. From: Jeetendra Gangele Sent: Monday, July 27, 6:00 AM Subject: Data from PostgreSQL to Spark To: user Hi All I have a use case where where I am consuming the Events from RabbitMQ using spark streaming.This event has some fields on which I want to query the PostgreSQL and bring the data and then do the join between event data and PostgreSQl data and put the aggregated data into HDFS, so that I run run analytics query over this data using SparkSQL. my question is PostgreSQL data in production data so i don't want to hit so many times. at any given 1 seconds time I may have 3000 events,that means I need to fire 3000 parallel query to my PostGreSQl and this data keeps on growing, so my database will go down. I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Any anyone suggest me best approach/ method to handle this case? Regards Jeetendra
Re: Data from PostgreSQL to Spark
You can call dB connect once per partition. Please have a look at design patterns of for each construct in document. How big is your data in dB? How soon that data changes? You would be better off if data is in spark already On 28 Jul 2015 04:48, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for your reply. Parallel i will be hitting around 6000 call to postgreSQl which is not good my database will die. these calls to database will keeps on increasing. Handling millions on request is not an issue with Hbase/NOSQL any other alternative? On 27 July 2015 at 23:18, felixcheun...@hotmail.com wrote: You can have Spark reading from PostgreSQL through the data access API. Do you have any concern with that approach since you mention copying that data into HBase. From: Jeetendra Gangele Sent: Monday, July 27, 6:00 AM Subject: Data from PostgreSQL to Spark To: user Hi All I have a use case where where I am consuming the Events from RabbitMQ using spark streaming.This event has some fields on which I want to query the PostgreSQL and bring the data and then do the join between event data and PostgreSQl data and put the aggregated data into HDFS, so that I run run analytics query over this data using SparkSQL. my question is PostgreSQL data in production data so i don't want to hit so many times. at any given 1 seconds time I may have 3000 events,that means I need to fire 3000 parallel query to my PostGreSQl and this data keeps on growing, so my database will go down. I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Any anyone suggest me best approach/ method to handle this case? Regards Jeetendra
Re: Data from PostgreSQL to Spark
I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Is velocity an issue in Postgres that your data would become stale as soon as it reaches Big data cluster? If your concern is that Datastore (hbase etc) is not current in Big Data cluster, can the source write to other stores (like Kafka/Hbase etc/Flume) as well when it writes to Postgres? Sent from Windows Mail From: santosh...@gmail.com Sent: Monday, July 27, 2015 5:22 PM To: ayan guha, Jeetendra Gangele Cc: felixcheun...@hotmail.com, user@spark.apache.org Why cant you bulk pre-fetch the data to HDFS (like using Sqoop) instead of hitting Postgres multiple times? Sent from Windows Mail From: ayan guha Sent: Monday, July 27, 2015 4:41 PM To: Jeetendra Gangele Cc: felixcheun...@hotmail.com, user@spark.apache.org You can call dB connect once per partition. Please have a look at design patterns of for each construct in document. How big is your data in dB? How soon that data changes? You would be better off if data is in spark already On 28 Jul 2015 04:48, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for your reply. Parallel i will be hitting around 6000 call to postgreSQl which is not good my database will die. these calls to database will keeps on increasing. Handling millions on request is not an issue with Hbase/NOSQL any other alternative? On 27 July 2015 at 23:18, felixcheun...@hotmail.com wrote: You can have Spark reading from PostgreSQL through the data access API. Do you have any concern with that approach since you mention copying that data into HBase. From: Jeetendra Gangele Sent: Monday, July 27, 6:00 AM Subject: Data from PostgreSQL to Spark To: user Hi All I have a use case where where I am consuming the Events from RabbitMQ using spark streaming.This event has some fields on which I want to query the PostgreSQL and bring the data and then do the join between event data and PostgreSQl data and put the aggregated data into HDFS, so that I run run analytics query over this data using SparkSQL. my question is PostgreSQL data in production data so i don't want to hit so many times. at any given 1 seconds time I may have 3000 events,that means I need to fire 3000 parallel query to my PostGreSQl and this data keeps on growing, so my database will go down. I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Any anyone suggest me best approach/ method to handle this case? Regards Jeetendra
pyspark issue
Hi, I am running pyspark in windows and I am seeing an error while adding pyfiles to the sparkcontext. below is the example, sc = SparkContext(local,Sample,pyFiles=C:/sample/yattag.zip) this fails with no file found error for C The below logic is treating the path as individual files like C, : / etc. https://github.com/apache/spark/blob/master/python/pyspark/context.py#l195 It works if I use Spark Conf, sparkConf.set(spark.submit.pyFiles,***C:/sample/yattag.zip***) sc = SparkContext(local,Sample,conf=sparkConf) Is this an existing issue or I am not including the files in correct way in Spark Context? Thanks. when I run this, I am getting
Re: PYSPARK_DRIVER_PYTHON=ipython spark/bin/pyspark Does not create SparkContext
Thank you so much. I found the issue. My fault, the stock ipython version 0.12.1 is too old, which does not support PYTHONSTARTUP. Upgrading ipython solved the issue. On Mon, Jul 27, 2015 at 12:43 PM, felixcheun...@hotmail.com wrote: Hmm, it should work with you run `PYSPARK_DRIVER_PYTHON=ipython spark/bin/pyspark` PYTHONSTARTUP is a PYTHON environment variable https https://docs.python.org/2/using/cmdline.html#envvar-PYTHONSTARTUP :// https://docs.python.org/2/using/cmdline.html#envvar-PYTHONSTARTUP docs.python.org https://docs.python.org/2/using/cmdline.html#envvar-PYTHONSTARTUP /2/using/ https://docs.python.org/2/using/cmdline.html#envvar-PYTHONSTARTUP cmdline.html#envvar-PYTHONSTARTUP https://docs.python.org/2/using/cmdline.html#envvar-PYTHONSTARTUP On Sun, Jul 26, 2015 at 4:06 PM -0700, Zerony Zhao bw.li...@gmail.com wrote: Hello everyone, I have a newbie question. $SPARK_HOME/bin/pyspark will create SparkContext automatically. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.4.1 /_/ Using Python version 2.7.3 (default, Jun 22 2015 19:33:41) SparkContext available as sc, HiveContext available as sqlContext. But When using ipython as a driver, PYSPARK_DRIVER_PYTHON=ipython spark/bin/pyspark , does not create SparkContext automatically. I have to execute execfile('spark_home/python/pyspark/shell.py') is it by design? I read the bash script bin/pyspark, I noticed the line: export PYTHONSTARTUP=$SPARK_HOME/python/pyspark/shell.py But I searched the whole spark source code, the variable PYTHONSTARTUP is never used, I could not understand when PYTHONSTARTUP is executed. Thank you.
Re: pyspark issue
It expects an iterable, and if you iterate over a string, you get the individual characters. Use a list instead: pyfiles=['/path/to/file'] On Mon, Jul 27, 2015 at 2:40 PM, Naveen Madhire vmadh...@umail.iu.edu wrote: Hi, I am running pyspark in windows and I am seeing an error while adding pyfiles to the sparkcontext. below is the example, sc = SparkContext(local,Sample,pyFiles=C:/sample/yattag.zip) this fails with no file found error for C The below logic is treating the path as individual files like C, : / etc. https://github.com/apache/spark/blob/master/python/pyspark/context.py#l195 It works if I use Spark Conf, sparkConf.set(spark.submit.pyFiles,***C:/sample/yattag.zip***) sc = SparkContext(local,Sample,conf=sparkConf) Is this an existing issue or I am not including the files in correct way in Spark Context? Thanks. when I run this, I am getting -- www.skrasser.com http://www.skrasser.com/?utm_source=sig
Do I really need to build Spark for Hive/Thrift Server support?
I'm a bit confused about the documentation in the area of Hive support. I want to use a remote Hive metastore/hdfs server and the documentation says that we need to build Spark from source due to the large number of dependencies Hive requires. Specifically the documentation says: Hive has a large number of dependencies, it is not included in the default Spark assemblyThis command builds a new assembly jar that includes Hive. So I downloaded the source distribution of Spark 1.4.1 and executed the following build command: ./make-distribution.sh --name spark-1.4.1-hadoop-2.6-hive --tgz -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests Inspecting the size of the resulting spark-assembly-1.4.1-hadoop2.6.0.jar it is only a few bytes different ie. Pre-built jar is 162976273 bytes and my custom built jar is 162976444. I don't see any new hive jar file either? Can someone please help me understand what is going on here? Cheers, Reece -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Do-I-really-need-to-build-Spark-for-Hive-Thrift-Server-support-tp24013.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spree: a live-updating web UI for Spark
Probably relevant to people on this list: on Friday I released a clone of the Spark web UI built using Meteor https://www.meteor.com/ so that everything updates in real-time, saving you from endlessly refreshing the page while jobs are running :) It can also serve as the UI for running as well as completed applications, so you don't have to mess with the separate history-server process if you don't want to. *This blog post* http://www.hammerlab.org/2015/07/25/spree-58-a-live-updating-web-ui-for-spark/ and *the github repo* https://github.com/hammerlab/spree have lots of information on how to use it. It has two sub-components, JsonRelay https://github.com/hammerlab/spark-json-relay and Slim https://github.com/hammerlab/slim; the former sends SparkListenerEvent https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L33s out of Spark via a network socket, while the latter receives those events and writes various stats about them to Mongo (like an external JobProgressListener https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala that persists info to a database). You might find them to offer a better way of storing information about running and completed Spark applications than the event log files that Spark uses, and they can be used with or without the real-time web UI. Give them a try if they sound useful to you, and let me know if you have questions or comments! -Ryan
Controlling output fileSize in SparkSQL
Hi, I am using Spark 1.3 (CDH 5.4.4). What's the recipe for setting a minimum output file size when writing out from SparkSQL? So far, I have tried: --x- import sqlContext.implicits._ sc.hadoopConfiguration.setBoolean(fs.hdfs.impl.disable.cache,true) sc.hadoopConfiguration.setLong(fs.local.block.size,1073741824) sc.hadoopConfiguration.setLong(dfs.blocksize,1073741824) sqlContext.sql(SET spark.sql.shuffle.partitions=2) val df = sqlContext.jsonFile(hdfs://nameservice1/user/joe/samplejson/*) df.saveAsParquetFile(hdfs://nameservice1/user/joe/data/reduceFiles-Parquet) --x- But my output still isn't aggregated into 1+GB files. Thanks, - Siddhartha
RE: SparkR
Simply no. Currently SparkR is the R API of Spark DataFrame, no existing algorithms can benefit from it unless they are re-written to be based on the API. There is on-going development on supporting MLlib and ML Pipelines in SparkR: https://issues.apache.org/jira/browse/SPARK-6805 From: Mohit Anchlia [mailto:mohitanch...@gmail.com] Sent: Tuesday, July 28, 2015 1:08 AM To: user@spark.apache.org Subject: SparkR Does SparkR support all the algorithms that R library supports?
Weird error using absolute path to run pyspark when using ipython driver
Hello everyone, Another newbie question. PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark runs fine, (in $SPARK_HOME) Python 2.7.10 (default, Jul 3 2015, 01:26:20) Type copyright, credits or license for more information. IPython 3.2.1 -- An enhanced Interactive Python. ? - Introduction and overview of IPython's features. %quickref - Quick reference. help - Python's own help system. object? - Details about 'object', use 'object??' for extra details. 15/07/27 17:16:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.4.1 /_/ Using Python version 2.7.10 (default, Jul 3 2015 01:26:20) SparkContext available as sc, HiveContext available as sqlContext. But PYSPARK_DRIVER_PYTHON=ipython ./spark-1.4.1-bin-hadoop2.6/bin/pyspark will throw errors: Traceback (most recent call last): File /usr/local/bin/ipython, line 7, in module from IPython import start_ipython File /usr/local/lib/python2.7/site-packages/IPython/__init__.py, line 45, in module from .config.loader import Config File /usr/local/lib/python2.7/site-packages/IPython/config/__init__.py, line 6, in module from .application import * File /usr/local/lib/python2.7/site-packages/IPython/config/application.py, line 19, in module from IPython.config.configurable import SingletonConfigurable File /usr/local/lib/python2.7/site-packages/IPython/config/configurable.py, line 12, in module from .loader import Config, LazyConfigValue File /usr/local/lib/python2.7/site-packages/IPython/config/loader.py, line 14, in module from ast import literal_eval ImportError: cannot import name literal_eval Note: running `from ast import literal_eval` within ipython is successful. The only different is that I run the command in the SPARK_HOME directory or not. What causes the problem? Or something is wrong with the compiled python and ipython? Thank you very much.
Re: Why the length of each task varies
Hi. Have you ruled out that this may just be I/O time? Word count is a very light-wight task for the CPU but you will be needing to read the initial data from what ever storage device you have your HDFS running on. As you have 3 machines, 22 cores each but perhaps just one or a few HDD / SSD / NAS the 22 cores may be saturating your I/O capacity and thus I/O determines the running time or your task? If it is some form of NAS storage you may be saturating the network capacity. If this is the case, that would explain fluctuations in the observed running times. A given Map-task may have been lucky, and the data was read when the I/O was not busy, or unlucky, many machine cores (map-tasks) starting a new block at about the same time. Also, 22*256MB = 5632 MB: This is the RAM you need to cache a block of data for each map-task running in parallel on the same machine. Depending on how much RAM you have per node, you may want to re-block the data on HDFS for optimal performance. Hope this helps, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-the-length-of-each-task-varies-tp24008p24014.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL Error
Hello all, I am currently having an error with Spark SQL access Elasticsearch using Elasticsearch Spark integration. Below is the series of command I issued along with the stacktrace. I am unclear what the error could mean. I can print the schema correctly but error out if i try and display a few results. Can you guys point me in the right direction? scala sqlContext.read.format(org.elasticsearch.spark.sql).options(esOptions).load(reddit_comment_public-201507-v3/default).registerTempTable(reddit_comment) scala reddit_comment_df.printSchema root |-- data: struct (nullable = true) ||-- archived: boolean (nullable = true) ||-- author: string (nullable = true) ||-- author_flair_css_class: string (nullable = true) ||-- author_flair_text: string (nullable = true) ||-- body: string (nullable = true) ||-- body_html: string (nullable = true) ||-- controversiality: long (nullable = true) ||-- created: long (nullable = true) ||-- created_utc: long (nullable = true) ||-- distinguished: string (nullable = true) ||-- downs: long (nullable = true) ||-- edited: long (nullable = true) ||-- gilded: long (nullable = true) ||-- id: string (nullable = true) ||-- link_author: string (nullable = true) ||-- link_id: string (nullable = true) ||-- link_title: string (nullable = true) ||-- link_url: string (nullable = true) ||-- name: string (nullable = true) ||-- parent_id: string (nullable = true) ||-- replies: string (nullable = true) ||-- saved: boolean (nullable = true) ||-- score: long (nullable = true) ||-- score_hidden: boolean (nullable = true) ||-- subreddit: string (nullable = true) ||-- subreddit_id: string (nullable = true) ||-- ups: long (nullable = true) scala reddit_comment_df.show 15/07/27 20:38:31 INFO ScalaEsRowRDD: Reading from [reddit_comment_public-201507-v3/default] 15/07/27 20:38:31 INFO ScalaEsRowRDD: Discovered mapping {reddit_comment_public-201507-v3=[mappings=[default=[acquire_date=DATE, elasticsearch_date_partition_index=STRING, elasticsearch_language_partition_index=STRING, elasticsearch_type=STRING, source=[data=[archived=BOOLEAN, author=STRING, author_flair_css_class=STRING, author_flair_text=STRING, body=STRING, body_html=STRING, controversiality=LONG, created=LONG, created_utc=LONG, distinguished=STRING, downs=LONG, edited=LONG, gilded=LONG, id=STRING, link_author=STRING, link_id=STRING, link_title=STRING, link_url=STRING, name=STRING, parent_id=STRING, replies=STRING, saved=BOOLEAN, score=LONG, score_hidden=BOOLEAN, subreddit=STRING, subreddit_id=STRING, ups=LONG], kind=STRING], source_geo_location=GEO_POINT, source_id=STRING, source_language=STRING, source_time=DATE]]]} for [reddit_comment_public-201507-v3/default] 15/07/27 20:38:31 INFO SparkContext: Starting job: show at console:26 15/07/27 20:38:31 INFO DAGScheduler: Got job 13 (show at console:26) with 1 output partitions (allowLocal=false) 15/07/27 20:38:31 INFO DAGScheduler: Final stage: ResultStage 16(show at console:26) 15/07/27 20:38:31 INFO DAGScheduler: Parents of final stage: List() 15/07/27 20:38:31 INFO DAGScheduler: Missing parents: List() 15/07/27 20:38:31 INFO DAGScheduler: Submitting ResultStage 16 (MapPartitionsRDD[65] at show at console:26), which has no missing parents 15/07/27 20:38:31 INFO MemoryStore: ensureFreeSpace(7520) called with curMem=71364, maxMem=2778778828 15/07/27 20:38:31 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 7.3 KB, free 2.6 GB) 15/07/27 20:38:31 INFO MemoryStore: ensureFreeSpace(3804) called with curMem=78884, maxMem=2778778828 15/07/27 20:38:31 INFO MemoryStore: Block broadcast_13_piece0 stored as bytes in memory (estimated size 3.7 KB, free 2.6 GB) 15/07/27 20:38:31 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory on 172.25.185.239:58296 (size: 3.7 KB, free: 2.6 GB) 15/07/27 20:38:31 INFO SparkContext: Created broadcast 13 from broadcast at DAGScheduler.scala:874 15/07/27 20:38:31 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 16 (MapPartitionsRDD[65] at show at console:26) 15/07/27 20:38:31 INFO TaskSchedulerImpl: Adding task set 16.0 with 1 tasks 15/07/27 20:38:31 INFO FairSchedulableBuilder: Added task set TaskSet_16 tasks to pool default 15/07/27 20:38:31 INFO TaskSetManager: Starting task 0.0 in stage 16.0 (TID 172, 172.25.185.164, ANY, 5085 bytes) 15/07/27 20:38:31 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory on 172.25.185.164:50275 (size: 3.7 KB, free: 3.6 GB) 15/07/27 20:38:31 WARN TaskSetManager: Lost task 0.0 in stage 16.0 (TID 172, 172.25.185.164): java.lang.ArrayIndexOutOfBoundsException: -1 at scala.collection.mutable.ResizableArray$class.update(ResizableArray.scala:49) at scala.collection.mutable.ArrayBuffer.update(ArrayBuffer.scala:47) at
Json parsing library for Spark Streaming?
Hi, What is the proper Json parsing library to use in Spark Streaming? Currently I am trying to use Gson library in a Java class and calling the Java method from a Scala class as shown below: What are the advantages of using Json4S as against using Gson library in a Java class and calling it from Scala? val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap). map{case (x, y) = ((x.toString, Utils.toJsonObject(y.toString).get(request).getAsJsonObject().get(queryString).toString))} Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Json-parsing-library-for-Spark-Streaming-tp24016.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Json parsing library for Spark Streaming?
json4s is used by https://github.com/hammerlab/spark-json-relay See the other thread on 'Spree' FYI On Mon, Jul 27, 2015 at 6:07 PM, swetha swethakasire...@gmail.com wrote: Hi, What is the proper Json parsing library to use in Spark Streaming? Currently I am trying to use Gson library in a Java class and calling the Java method from a Scala class as shown below: What are the advantages of using Json4S as against using Gson library in a Java class and calling it from Scala? val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap). map{case (x, y) = ((x.toString, Utils.toJsonObject(y.toString).get(request).getAsJsonObject().get(queryString).toString))} Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Json-parsing-library-for-Spark-Streaming-tp24016.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: use S3-Compatible Storage with spark
No with s3a, I have the following error : java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:285) 2015-07-27 11:17 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: So you are able to access your AWS S3 with s3a now? What is the error that you are getting when you try to access the custom storage with fs.s3a.endpoint? Thanks Best Regards On Mon, Jul 27, 2015 at 2:44 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I was able to access Amazon S3, but for some reason, the Endpoint parameter is ignored, and I'm not able to access to storage from my provider... : sc.hadoopConfiguration.set(fs.s3a.endpoint,test) sc.hadoopConfiguration.set(fs.s3a.awsAccessKeyId,) sc.hadoopConfiguration.set(fs.s3a.awsSecretAccessKey,) Any Idea why it doesn't work ? 2015-07-20 18:11 GMT+02:00 Schmirr Wurst schmirrwu...@gmail.com: Thanks, that is what I was looking for... Any Idea where I have to store and reference the corresponding hadoop-aws-2.6.0.jar ?: java.io.IOException: No FileSystem for scheme: s3n 2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Not in the uri, but in the hadoop configuration you can specify it. property namefs.s3a.endpoint/name descriptionAWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region (s3.amazonaws.com) is assumed. /description /property Thanks Best Regards On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I want to use pithos, were do I can specify that endpoint, is it possible in the url ? 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Could you name the Storage service that you are using? Most of them provides a S3 like RestAPI endpoint for you to hit. Thanks Best Regards On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: use S3-Compatible Storage with spark
That error is a jar conflict, you must be having multiple versions of hadoop jar in the classpath. First you make sure you are able to access your AWS S3 with s3a, then you give the endpoint configuration and try to access the custom storage. Thanks Best Regards On Mon, Jul 27, 2015 at 4:02 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: No with s3a, I have the following error : java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:285) 2015-07-27 11:17 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: So you are able to access your AWS S3 with s3a now? What is the error that you are getting when you try to access the custom storage with fs.s3a.endpoint? Thanks Best Regards On Mon, Jul 27, 2015 at 2:44 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I was able to access Amazon S3, but for some reason, the Endpoint parameter is ignored, and I'm not able to access to storage from my provider... : sc.hadoopConfiguration.set(fs.s3a.endpoint,test) sc.hadoopConfiguration.set(fs.s3a.awsAccessKeyId,) sc.hadoopConfiguration.set(fs.s3a.awsSecretAccessKey,) Any Idea why it doesn't work ? 2015-07-20 18:11 GMT+02:00 Schmirr Wurst schmirrwu...@gmail.com: Thanks, that is what I was looking for... Any Idea where I have to store and reference the corresponding hadoop-aws-2.6.0.jar ?: java.io.IOException: No FileSystem for scheme: s3n 2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Not in the uri, but in the hadoop configuration you can specify it. property namefs.s3a.endpoint/name descriptionAWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region (s3.amazonaws.com) is assumed. /description /property Thanks Best Regards On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I want to use pithos, were do I can specify that endpoint, is it possible in the url ? 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Could you name the Storage service that you are using? Most of them provides a S3 like RestAPI endpoint for you to hit. Thanks Best Regards On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GenericRowWithSchema is too heavy
Internally I believe that we only actually create one struct object for each row, so you are really only paying the cost of the pointer in most use cases (as shown below). scala val df = Seq((1,2), (3,4)).toDF(a, b) df: org.apache.spark.sql.DataFrame = [a: int, b: int] scala df.collect() res1: Array[org.apache.spark.sql.Row] = Array([1,2], [3,4]) scala res1(0).schema eq res1(1).schema res3: Boolean = true I'd strongly suggest that you use something like parquet https://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files or avro http://spark-packages.org/package/databricks/spark-avro to store DataFrames as it is likely much more space efficient and faster than generic serialization. Michael On Mon, Jul 27, 2015 at 9:02 PM, Kevin Jung itsjb.j...@samsung.com wrote: Hi all, SparkSQL usually creates DataFrame with GenericRowWithSchema(is that right?). And 'Row' is a super class of GenericRow and GenericRowWithSchema. The only difference is that GenericRowWithSchema has its schema information as StructType. But I think one DataFrame has only one schema then each row should not have to store schema in it. Because StructType is very heavy and most of RDD has many rows. To test this, 1) create DataFrame and call rdd ( RDD[Row] ) = GenericRowWithSchema 2) dataframe.map( row = Row(row.toSeq)) = GenericRow 3) dataframe.map( row = row.toSeq) = underlying sequence of a row 4) saveAsObjectFile or use org.apache.spark.util.SizeEstimator.estimate And my result is, (dataframe with 5columns) GenericRowWithSchema = 13gb GenericRow = 8.2gb Seq = 7gb Best regards Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GenericRowWithSchema-is-too-heavy-tp24018.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Package Release Annoucement: Spark SQL on HBase Astro
Hi Yan, Is it possible to access the hbase table through spark sql jdbc layer ? Thanks. Deb On Jul 22, 2015 9:03 PM, Yan Zhou.sc yan.zhou...@huawei.com wrote: Yes, but not all SQL-standard insert variants . *From:* Debasish Das [mailto:debasish.da...@gmail.com] *Sent:* Wednesday, July 22, 2015 7:36 PM *To:* Bing Xiao (Bing) *Cc:* user; dev; Yan Zhou.sc *Subject:* Re: Package Release Annoucement: Spark SQL on HBase Astro Does it also support insert operations ? On Jul 22, 2015 4:53 PM, Bing Xiao (Bing) bing.x...@huawei.com wrote: We are happy to announce the availability of the Spark SQL on HBase 1.0.0 release. http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase The main features in this package, dubbed “Astro”, include: · Systematic and powerful handling of data pruning and intelligent scan, based on partial evaluation technique · HBase pushdown capabilities like custom filters and coprocessor to support ultra low latency processing · SQL, Data Frame support · More SQL capabilities made possible (Secondary index, bloom filter, Primary Key, Bulk load, Update) · Joins with data from other sources · Python/Java/Scala support · Support latest Spark 1.4.0 release The tests by Huawei team and community contributors covered the areas: bulk load; projection pruning; partition pruning; partial evaluation; code generation; coprocessor; customer filtering; DML; complex filtering on keys and non-keys; Join/union with non-Hbase data; Data Frame; multi-column family test. We will post the test results including performance tests the middle of August. You are very welcomed to try out or deploy the package, and help improve the integration tests with various combinations of the settings, extensive Data Frame tests, complex join/union test and extensive performance tests. Please use the “Issues” “Pull Requests” links at this package homepage, if you want to report bugs, improvement or feature requests. Special thanks to project owner and technical leader Yan Zhou, Huawei global team, community contributors and Databricks. Databricks has been providing great assistance from the design to the release. “Astro”, the Spark SQL on HBase package will be useful for ultra low latency* query and analytics of large scale data sets in vertical enterprises**.* We will continue to work with the community to develop new features and improve code base. Your comments and suggestions are greatly appreciated. Yan Zhou / Bing Xiao Huawei Big Data team
Which directory contains third party libraries for Spark
when using spark-submit: which directory contains third party libraries that will be loaded on each of the slaves? I would like to scp one or more libraries to each of the slaves instead of shipping the contents in the application uber-jar. Note: I did try adding to $SPARK_HOME/lib_managed/jars. But the spark-submit still results in a ClassNotFoundException for classes included in the added library.
Hive Session gets overwritten in ClientWrapper
I'm currently using Spark 1.4 in standalone mode. I've forked the Apache Hive branch from https://github.com/pwendell/hive https://github.com/pwendell/hive and customised in the following way. Added a thread local variable in SessionManager class. And I'm setting the session variable in my Custom Authenticator class. For achieving the above, I've built the necessary jars(hive-common-0.13.1c.jar, hive-exec-0.13.1c.jar, hive-metastore-0.13.1c.jar, hive-serde-0.13.1c.jar, hive-service-0.13.1c.jar) from https://github.com/pwendell/hive and added to Spark's classpath. The above feature works in Spark 1.3.1, but is broken in Spark 1.4. When I looked into it, I found out that the ClientWrapper class is creating a new Session State and using it thereafter. As a result I'm not able to retrieve the info which i had stored earlier in the session. Also I'm not able to retrieve a value from hiveconf which was set earlier. When i looked into the source code for ClientWrapper.scala, i found the following. // Create an internal session state for this ClientWrapper. val state = { val original = Thread.currentThread().getContextClassLoader // Switch to the initClassLoader. Thread.currentThread().setContextClassLoader(initClassLoader) From what i can understand, the above tries to use the existing Hive session, else it creates it's own session. Am I right? If so, is there a bug causing the ClientWrapper not to use the existing session. Or should I implement my requirement in a different way? My requirement is to have a custom session variable and use it throughout the session. My usage is as folows: To set the value /SessionManager.setSessionVar(value);/ To retrieve the value /SessionManager.getSessionVar();/ To set a hiveconf /hiveConf.set(conf, ConfVars.VAR, val);/ to Retrieve /hiveConf.get(ConfVars.VAR); SessionState.get().getConf().getVar(ConfVars.VAR)/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hive-Session-gets-overwritten-in-ClientWrapper-tp24020.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
streaming issue
Hi, I got a error when running spark streaming as below . java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144) at org.apache.spark.scheduler.EventLoggingListener.onUnpersistRDD(EventLoggingListener.scala:175) at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:50) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53) at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60) Caused by: java.io.IOException: All datanodes 10.153.192.159:50010 are bad. Aborting... at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1137) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:933) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:487) 15/07/28 02:01:10 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at scala.Option.foreach(Option.scala:236) I had set the ulimit in /etc/security/limits.conf , but still get the same exception . can please some body help me to resolved this issue ? core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 264192 max locked memory (kbytes, -l) 32 max memory size (kbytes, -m) unlimited open files (-n) 65535 pipe size(512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 10240 cpu time (seconds, -t) unlimited max user processes (-u) 34816 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited Thanks .
Spark on Mesos - Shut down failed while running spark-shell
Hi all, I am running Spark 1.4.1 on mesos 0.23.0 While I am able to start spark-shell on the node with mesos-master running, it works fine. But when I try to start spark-shell on mesos-slave nodes, I'm encounter this error. I greatly appreciate any help. 15/07/27 22:14:44 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/07/27 22:14:44 INFO SparkUI: Started SparkUI at http://10.142.0.140:4040 Warning: MESOS_NATIVE_LIBRARY is deprecated, use MESOS_NATIVE_JAVA_LIBRARY instead. Future releases will not support JNI bindings via MESOS_NATIVE_LIBRARY. Warning: MESOS_NATIVE_LIBRARY is deprecated, use MESOS_NATIVE_JAVA_LIBRARY instead. Future releases will not support JNI bindings via MESOS_NATIVE_LIBRARY. WARNING: Logging before InitGoogleLogging() is written to STDERR W0727 22:14:45.091286 33441 sched.cpp:1326] ** Scheduler driver bound to loopback interface! Cannot communicate with remote master(s). You might want to set 'LIBPROCESS_IP' environment variable to use a routable IP address. ** 2015-07-27 22:14:45,091:33222(0x7fff9e1fc700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5 2015-07-27 22:14:45,091:33222(0x7fff9e1fc700):ZOO_INFO@log_env@716: Client environment:host.name=nid00011 I0727 22:14:45.091995 33441 sched.cpp:157] Version: 0.23.0 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@723: Client environment:os.name=Linux 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@724: Client environment:os.arch=2.6.32-431.el6_1..8785-cray_ari_athena_c_cos 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@725: Client environment:os.version=#1 SMP Wed Jun 24 19:34:50 UTC 2015 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@733: Client environment:user.name=root 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@741: Client environment:user.home=/root 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@log_env@753: Client environment:user.dir=/opt/spark-1.4.1/spark-source 2015-07-27 22:14:45,092:33222(0x7fff9e1fc700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=192.168.0.10:2181 sessionTimeout=1 watcher=0x7fffb561a8e0 sessionId=0 sessionPasswd=nullcontext=0x7ffdd930 flags=0 2015-07-27 22:14:45,092:33222(0x7fff6ebfd700):ZOO_INFO@check_events@1703: initiated connection to server [192.168.0.10:2181] 2015-07-27 22:14:45,096:33222(0x7fff6ebfd700):ZOO_INFO@check_events@1750: session establishment complete on server [192.168.0.10:2181], sessionId=0x14ed296a0fd000a, negotiated timeout=1 I0727 22:14:45.096891 33479 group.cpp:313] Group process (group(1)@ 127.0.0.1:45546) connected to ZooKeeper I0727 22:14:45.096914 33479 group.cpp:787] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0) I0727 22:14:45.096923 33479 group.cpp:385] Trying to create path '/mesos' in ZooKeeper I0727 22:14:45.099181 33471 detector.cpp:138] Detected a new leader: (id='4') I0727 22:14:45.099298 33483 group.cpp:656] Trying to get '/mesos/info_04' in ZooKeeper W0727 22:14:45.100443 33453 detector.cpp:444] Leading master master@127.0.0.1:5050 is using a Protobuf binary format when registering with ZooKeeper (info): this will be deprecated as of Mesos 0.24 (see MESOS-2340) I0727 22:14:45.100544 33453 detector.cpp:481] A new leading master (UPID= master@127.0.0.1:5050) is detected I0727 22:14:45.100739 33478 sched.cpp:254] New master detected at master@127.0.0.1:5050 I0727 22:14:45.101104 33478 sched.cpp:264] No credentials provided. Attempting to register without authentication E0727 22:14:45.101210 33490 socket.hpp:107] Shutdown failed on fd=88: Transport endpoint is not connected [107] E0727 22:14:45.101380 33490 socket.hpp:107] Shutdown failed on fd=89: Transport endpoint is not connected [107] E0727 22:14:46.643348 33490 socket.hpp:107] Shutdown failed on fd=88: Transport endpoint is not connected [107] E0727 22:14:47.111336 33490 socket.hpp:107] Shutdown failed on fd=88: Transport endpoint is not connected [107] 15/07/27 22:14:50 INFO DiskBlockManager: Shutdown hook called 15/07/27 22:14:50 INFO Utils: path = /tmp/spark-3f94442b-7873-463f-91dd-3ee62ed5b263/blockmgr-74a5ed25-025b-4186-b1d8-dc395f287a8f, already present as root for deletion. 15/07/27 22:14:50 INFO Utils: Shutdown hook called 15/07/27 22:14:50 INFO Utils: Deleting directory /tmp/spark-3f94442b-7873-463f-91dd-3ee62ed5b263/httpd-5d2a71e5-1d36-47f7-b122-31f1dd12a0f0 15/07/27 22:14:50 INFO Utils: Deleting directory /tmp/spark-3f94442b-7873-463f-91dd-3ee62ed5b263 15/07/27 22:14:50 INFO Utils: Deleting directory /tmp/spark-bfd6c444-5346-4315-9501-1baed4d500de -- Regards, Haripriya Ayyalasomayajula
NO Cygwin Support in bin/spark-class in Spark 1.4.0
Hi, Spark Users Looks like Spark 1.4.0 cannot work with Cygwin due to the removing of Cygwin support in bin/spark-class The changeset is https://github.com/apache/spark/commit/517975d89d40a77c7186f488547eed11f79c1e97#diff-fdf4d3e600042c63ffa17b692c4372a3 The changeset said Add a library for launching Spark jobs programmatically, but how to use it in Cygwin? I'm wondering any solutions available to make it work in Windows? Thanks Proust
Re: Unexpected performance issues with Spark SQL using Parquet
Hi Jerry, Thanks for the detailed report! I haven't investigate this issue in detail. But for the input size issue, I believe this is due to a limitation of HDFS API. It seems that Hadoop FileSystem adds the size of a whole block to the metrics even if you only touch a fraction of that block. In Parquet, all columns within a single row group are stored in a single HDFS block. This is probably the reason why you observed weird task input size. You may find more information in one of my earlier posts http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3c54c9899e.2030...@gmail.com%3E For the performance issue, I don't have a proper explanation yet. Need further investigation. Cheng On 7/28/15 2:37 AM, Jerry Lam wrote: Hi spark users and developers, I have been trying to understand how Spark SQL works with Parquet for the couple of days. There is a performance problem that is unexpected using the column pruning. Here is a dummy example: The parquet file has the 3 fields: |-- customer_id: string (nullable = true) |-- type: string (nullable = true) |-- mapping: map (nullable = true) ||-- key: string ||-- value: string (nullable = true) Note that mapping is just a field with a lot of key value pairs. I just created a parquet files with 1 billion entries with each entry having 10 key-value pairs in the mapping. After I generate this parquet file, I generate another parquet without the mapping field that is: |-- customer_id: string (nullable = true) |-- type: string (nullable = true) Let call the first parquet file data-with-mapping and the second parquet file data-without-mapping. Then I ran a very simple query over two parquet files: val df = sqlContext.read.parquet(path) df.select(df(type)).count The run on the data-with-mapping takes 34 seconds with the input size of 11.7 MB. The run on the data-without-mapping takes 8 seconds with the input size of 7.6 MB. They all ran on the same cluster with spark 1.4.1. What bothers me the most is the input size because I supposed column pruning will only deserialize columns that are relevant to the query (in this case the field type) but for sure, it reads more data on the data-with-mapping than the data-without-mapping. The speed is 4x faster in the data-without-mapping that means that the more columns a parquet file has the slower it is even only a specific column is needed. Anyone has an explanation on this? I was expecting both of them will finish approximate the same time. Best Regards, Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [ Potential bug ] Spark terminal logs say that job has succeeded even though job has failed in Yarn cluster mode
Elkhan, What does the ResourceManager say about the final status of the job? Spark jobs that run as Yarn applications can fail but still successfully clean up their resources and give them back to the Yarn cluster. Because of this, there's a difference between your code throwing an exception in an executor/driver and the Yarn application failing. Generally you'll see a yarn application fail when there's a memory problem (too much memory being allocated or not enough causing executors to fail multiple times not allowing your job to finish). What I'm seeing from your post is that you had an exception in your application which was caught by the Spark framework which then proceeded to clean up the job and shut itself down- which it did successfully. When you aren't running in the Yarn modes, you aren't seeing any Yarn status that's telling you the Yarn application was successfully shut down, you are just seeing the failure(s) from your drivers/executors. On Mon, Jul 27, 2015 at 2:11 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Any updates on this bug ? Why Spark log results Job final status does not match ? (one saying that job has failed, another stating that job has succeeded) Thanks. On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, While running Spark Word count python example with intentional mistake in *Yarn cluster mode*, Spark terminal states final status as SUCCEEDED, but log files state correct results indicating that the job failed. Why terminal log output application log output contradict each other ? If i run same job on *local mode* then terminal logs and application logs match, where both state that job has failed to expected error in python script. More details: Scenario While running Spark Word count python example on *Yarn cluster mode*, if I make intentional error in wordcount.py by changing this line (I'm using Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0 versions - which i tested): lines = sc.textFile(sys.argv[1], 1) into this line: lines = sc.textFile(*nonExistentVariable*,1) where nonExistentVariable variable was never created and initialized. then i run that example with this command (I put README.md into HDFS before running this command): *./bin/spark-submit --master yarn-cluster wordcount.py /README.md* The job runs and finishes successfully according the log printed in the terminal : *Terminal logs*: ... 15/07/23 16:19:17 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:18 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:19 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:20 INFO yarn.Client: Application report for application_1437612288327_0013 (state: RUNNING) 15/07/23 16:19:21 INFO yarn.Client: Application report for application_1437612288327_0013 (state: FINISHED) 15/07/23 16:19:21 INFO yarn.Client: client token: N/A diagnostics: Shutdown hook called before final status was reported. ApplicationMaster host: 10.0.53.59 ApplicationMaster RPC port: 0 queue: default start time: 1437693551439 final status: *SUCCEEDED* tracking URL: http://localhost:8088/proxy/application_1437612288327_0013/history/application_1437612288327_0013/1 user: edadashov 15/07/23 16:19:21 INFO util.Utils: Shutdown hook called 15/07/23 16:19:21 INFO util.Utils: Deleting directory /tmp/spark-eba0a1b5-a216-4afa-9c54-a3cb67b16444 But if look at log files generated for this application in HDFS - it indicates failure of the job with correct reason: *Application log files*: ... \00 stdout\00 179Traceback (most recent call last): File wordcount.py, line 32, in module lines = sc.textFile(nonExistentVariable,1) *NameError: name 'nonExistentVariable' is not defined* Why terminal output - final status: *SUCCEEDED , *is not matching application log results - failure of the job (NameError: name 'nonExistentVariable' is not defined) ? Is this bug ? Is there Jira ticket related to this issue ? (Is someone assigned to this issue ?) If i run this wordcount .py example (with mistake line) in local mode, then terminal log states that the job has failed in terminal logs too. *./bin/spark-submit wordcount.py /README.md* *Terminal logs*: ... 15/07/23 16:31:55 INFO scheduler.EventLoggingListener: Logging events to hdfs:///app-logs/local-1437694314943 Traceback (most recent call last): File /home/edadashov/tools/myspark/spark/wordcount.py, line 32, in module lines = sc.textFile(nonExistentVariable,1) NameError: name 'nonExistentVariable' is not defined 15/07/23 16:31:55 INFO spark.SparkContext: Invoking stop() from shutdown hook Thanks. -- Best regards, Elkhan Dadashov
GenericRowWithSchema is too heavy
Hi all, SparkSQL usually creates DataFrame with GenericRowWithSchema(is that right?). And 'Row' is a super class of GenericRow and GenericRowWithSchema. The only difference is that GenericRowWithSchema has its schema information as StructType. But I think one DataFrame has only one schema then each row should not have to store schema in it. Because StructType is very heavy and most of RDD has many rows. To test this, 1) create DataFrame and call rdd ( RDD[Row] ) = GenericRowWithSchema 2) dataframe.map( row = Row(row.toSeq)) = GenericRow 3) dataframe.map( row = row.toSeq) = underlying sequence of a row 4) saveAsObjectFile or use org.apache.spark.util.SizeEstimator.estimate And my result is, (dataframe with 5columns) GenericRowWithSchema = 13gb GenericRow = 8.2gb Seq = 7gb Best regards Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GenericRowWithSchema-is-too-heavy-tp24018.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Create StructType column in data frame
Hello, I would like to add a column of StructType to DataFrame. What would be the best way to do it? Not sure if it is possible using withColumn. A possible way is to convert the dataframe into a RDD[Row], add the struct and then convert it back to dataframe. But that seems an overkill. Please note that I don't know the StructType beforehand and I am creating it based on some configuration so using case classes is out of picture. Thanks.
Functions in Spark SQL
Hi, May I know how to use the functions mentioned in http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.functions$ in spark sql? when I use like Select last(column) from tablename I am getting error like 15/07/27 03:00:00 INFO exec.FunctionRegistry: Unable to lookup UDF in metastore: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:NoSuchO bjectException(message:Function default.last does not exist)) java.lang.RuntimeException: Couldn't find function last Thanks, Vinod
spark spark-ec2 credentials using aws_security_token
Hi, I would like to ask if it is currently possible to use spark-ec2 script together with credentials that are consisting not only from: aws_access_key_id and aws_secret_access_key, but it also contains aws_security_token. When I try to run the script I am getting following error message: ERROR:boto:Caught exception reading instance data Traceback (most recent call last): File /Users/zikes/opensource/spark/ec2/lib/boto-2.34.0/boto/utils.py, line 210, in retry_url r = opener.open(req, timeout=timeout) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 404, in open response = self._open(req, data) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 422, in _open '_open', req) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 382, in _call_chain result = func(*args) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 1214, in http_open return self.do_open(httplib.HTTPConnection, req) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 1184, in do_open raise URLError(err) URLError: urlopen error [Errno 64] Host is down ERROR:boto:Unable to read instance data, giving up No handler was ready to authenticate. 1 handlers were checked. ['QuerySignatureV2AuthHandler'] Check your credentials Does anyone has some idea what can be possibly wrong? Is aws_security_token the problem? I know that it seems more like a boto problem, but still I would like to ask if anybody has some experience with this? My launch command is: ./spark-ec2 -k my_key -i my_key.pem --additional-tags mytag:tag1,mytag2:tag2 --instance-profile-name profile1 -s 1 launch test Thank you in advance for any help. Best regards, Jan Note: I have also asked at http://stackoverflow.com/questions/31583513/spark-spark-ec2-credentials-using-aws-security-token?noredirect=1#comment51151822_31583513 without any success. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-spark-ec2-credentials-using-aws-security-token-tp24007.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: suggest coding platform
How about IntelliJ? It also has a Terminal tab. Thanks Best Regards On Fri, Jul 24, 2015 at 6:06 PM, saif.a.ell...@wellsfargo.com wrote: Hi all, I tried Notebook Incubator Zeppelin, but I am not completely happy with it. What do you people use for coding? Anything with auto-complete, proper warning logs and perhaps some colored syntax. My platform is on linux, so anything with some notebook studio, or perhaps a windows IDE with remote ssh capabilities? Thanks, Saif
Re: RDD[Future[T]] = Future[RDD[T]]
do you mean something like this ? val values = rdd.mapPartitions{ i: Iterator[Future[T]] = val future: Future[Iterator[T]] = Future sequence i Await result (future, someTimeout) } Where is the blocking happening in this case? It seems to me that all the workers will be blocked until the future is completed, no ? 2015-07-27 7:24 GMT+02:00 Nick Pentreath nick.pentre...@gmail.com: You could use Iterator.single on the future[iterator]. However if you collect all the partitions I'm not sure if it will work across executor boundaries. Perhaps you may need to await the sequence of futures in each partition and return the resulting iterator. — Sent from Mailbox https://www.dropbox.com/mailbox On Sun, Jul 26, 2015 at 10:43 PM, Ayoub Benali benali.ayoub.i...@gmail.com wrote: It doesn't work because mapPartitions expects a function f:(Iterator[T]) ⇒ Iterator[U] while .sequence wraps the iterator in a Future 2015-07-26 22:25 GMT+02:00 Ignacio Blasco elnopin...@gmail.com: Maybe using mapPartitions and .sequence inside it? El 26/7/2015 10:22 p. m., Ayoub benali.ayoub.i...@gmail.com escribió: Hello, I am trying to convert the result I get after doing some async IO : val rdd: RDD[T] = // some rdd val result: RDD[Future[T]] = rdd.map(httpCall) Is there a way collect all futures once they are completed in a *non blocking* (i.e. without scala.concurrent Await) and lazy way? If the RDD was a standard scala collection then calling scala.concurrent.Future.sequence would have resolved the issue but RDD is not a TraversableOnce (which is required by the method). Is there a way to do this kind of transformation with an RDD[Future[T]] ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-RDD-Future-T-Future-RDD-T-tp24005.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Encryption on RDDs or in-memory on Apache Spark
Have a look at the current security support https://spark.apache.org/docs/latest/security.html, Spark does not have any encryption support for objects in memory out of the box. But if your concern is to protect the data being cached in memory, then you can easily encrypt your objects in memory with SealedObject http://docs.oracle.com/javase/7/docs/api/javax/crypto/SealedObject.html, Here's one simple example http://stackoverflow.com/questions/16950833/is-there-an-easy-way-to-encrypt-a-java-object#answers-header which you can make use of. Thanks Best Regards On Fri, Jul 24, 2015 at 2:12 PM, IASIB1 moreill...@qub.ac.uk wrote: I am currently working on the latest version of Apache Spark (1.4.1), pre-built package for Hadoop 2.6+. Is there any feature in Spark/Hadoop to encrypt RDDs or in-memory (similarly to Altibase's HDB: http://altibase.com/in-memory-database-computing-solutions/security/ http://altibase.com/in-memory-database-computing-solutions/security/ ) when running applications in Spark? Or is there an external library/framework which could be used to encrypt RDDs or in-memory in Spark? Any help would be appreciated. Many thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Encryption-on-RDDs-or-in-memory-on-Apache-Spark-tp23982.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Functions in Spark SQL
Hi, there I test with sqlContext.sql(select funcName(param1,param2,...) from tableName ) just worked fine. Would you like to paste your test code here ? And which version of Spark are u using ? Best, Sun. fightf...@163.com From: vinod kumar Date: 2015-07-27 15:04 To: User Subject: Functions in Spark SQL Hi, May I know how to use the functions mentioned in http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.functions$ in spark sql? when I use like Select last(column) from tablename I am getting error like 15/07/27 03:00:00 INFO exec.FunctionRegistry: Unable to lookup UDF in metastore: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:NoSuchO bjectException(message:Function default.last does not exist)) java.lang.RuntimeException: Couldn't find function last Thanks, Vinod
Re: spark as a lookup engine for dedup
its for 1 day events in range of 1 billions and processing is in streaming application of ~10-15 sec interval so lookup should be fast. RDD need to be updated with new events and old events of current time-24 hours back should be removed at each processing. So is spark RDD not fit for this requirement? On Mon, Jul 27, 2015 at 1:08 PM, Romi Kuntsman r...@totango.com wrote: What the throughput of processing and for how long do you need to remember duplicates? You can take all the events, put them in an RDD, group by the key, and then process each key only once. But if you have a long running application where you want to check that you didn't see the same value before, and check that for every value, you probably need a key-value store, not RDD. On Sun, Jul 26, 2015 at 7:38 PM Shushant Arora shushantaror...@gmail.com wrote: Hi I have a requirement for processing large events but ignoring duplicate at the same time. Events are consumed from kafka and each event has a eventid. It may happen that an event is already processed and came again at some other offset. 1.Can I use Spark RDD to persist processed events and then lookup with this rdd (How to do lookup inside a RDD ?I have a JavaPairRDDeventid,timestamp ) while processing new events and if event is present in persisted rdd ignore it , else process the even. Does rdd.lookup(key) on billion of events will be efficient ? 2. update the rdd (Since RDD is immutable how to update it)? Thanks
Re: Spark - Eclipse IDE - Maven
You can follow this doc https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-IDESetup Thanks Best Regards On Fri, Jul 24, 2015 at 10:56 AM, Siva Reddy ksiv...@gmail.com wrote: Hi All, I am trying to setup the Eclipse (LUNA) with Maven so that I create a maven projects for developing spark programs. I am having some issues and I am not sure what is the issue. Can Anyone share a nice step-step document to configure eclipse with maven for spark development. Thanks Siva -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Eclipse-IDE-Maven-tp23977.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark as a lookup engine for dedup
RDD is immutable, it cannot be changed, you can only create a new one from data or from transformation. It sounds inefficient to create one each 15 sec for the last 24 hours. I think a key-value store will be much more fitted for this purpose. On Mon, Jul 27, 2015 at 11:21 AM Shushant Arora shushantaror...@gmail.com wrote: its for 1 day events in range of 1 billions and processing is in streaming application of ~10-15 sec interval so lookup should be fast. RDD need to be updated with new events and old events of current time-24 hours back should be removed at each processing. So is spark RDD not fit for this requirement? On Mon, Jul 27, 2015 at 1:08 PM, Romi Kuntsman r...@totango.com wrote: What the throughput of processing and for how long do you need to remember duplicates? You can take all the events, put them in an RDD, group by the key, and then process each key only once. But if you have a long running application where you want to check that you didn't see the same value before, and check that for every value, you probably need a key-value store, not RDD. On Sun, Jul 26, 2015 at 7:38 PM Shushant Arora shushantaror...@gmail.com wrote: Hi I have a requirement for processing large events but ignoring duplicate at the same time. Events are consumed from kafka and each event has a eventid. It may happen that an event is already processed and came again at some other offset. 1.Can I use Spark RDD to persist processed events and then lookup with this rdd (How to do lookup inside a RDD ?I have a JavaPairRDDeventid,timestamp ) while processing new events and if event is present in persisted rdd ignore it , else process the even. Does rdd.lookup(key) on billion of events will be efficient ? 2. update the rdd (Since RDD is immutable how to update it)? Thanks
Re: ERROR TaskResultGetter: Exception while getting task result when reading avro files that contain arrays
Its a serialization error with nested schema i guess. You can look at the twitters chill avro serializer library. Here's two discussion on the same: - https://issues.apache.org/jira/browse/SPARK-3447 - http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-fails-with-avro-having-Arrays-and-unions-but-succeeds-with-simple-avro-td14549.html Thanks Best Regards On Thu, Jul 23, 2015 at 9:32 PM, Arbi Akhina arbi.akh...@gmail.com wrote: Hi, I'm trying to read an avro file into a spark RDD, but I'm having an Exception while getting task result. The avro schema file has the following content: { type : record, name : sample_schema, namespace : com.adomik.avro, fields : [ { name : username, type : string, doc : Name of the user account }, { name : events, type : { type : array, items : { name:Event, type:record, fields:[ {name:action, type:string}, {name:value, type:long} ] } }, doc : The content of the user's Events message } ], doc: : A basic schema for storing Events messages } I create the avro file using avro-tools.jar file from the following json file: {username:miguno,events: [{action:signed, value: 1}, {action: loged, value:1}] } {username:blizzard,events: [{action:logout, value: 2}, {action: visited, value:3}] } $ java -jar avro-tools-1.7.7.jar fromjson --schema-file myschema.avsc data.json data.avro I can correctly read the generated avro file with the avro-tools.jar as follows: $ java -jar avro-tools-1.7.7.jar tojson data.avro However I'm having an exception when I try to read this generated avro file into a Spark RDD from spark shell as follows: import org.apache.avro.mapred.AvroInputFormat import org.apache.avro.mapred.AvroWrapper import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Text import org.apache.avro.generic.GenericRecord val input = /home/arbi/avro/data.avro val rdd = sc.hadoopFile( input, classOf[AvroInputFormat[GenericRecord]], classOf[AvroWrapper[GenericRecord]], classOf[NullWritable] ) Then when I call rdd.next, I see the following exception: 15/07/23 14:30:48 ERROR TaskResultGetter: Exception while getting task result com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: values (org.apache.avro.generic.GenericData$Record) datum (org.apache.avro.mapred.AvroWrapper) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
Re: java.lang.NoSuchMethodError for list.toMap.
Whats in your build.sbt? You could be messing with the scala version it seems. Thanks Best Regards On Fri, Jul 24, 2015 at 2:15 AM, Dan Dong dongda...@gmail.com wrote: Hi, When I ran with spark-submit the following simple Spark program of: import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark._ import SparkContext._ object TEST2{ def main(args:Array[String]) { val conf = new SparkConf().setAppName(TEST) val sc=new SparkContext(conf) val list=List((aa,1),(bb,2),(cc,3)) val maps=list.toMap } } I got java.lang.NoSuchMethodError for the line of val maps=list.toMap. But in a spark-shell or simply scala, it has no problem: scala val list=List((aa,1),(bb,2),(cc,3)) list: List[(String, Int)] = List((aa,1), (bb,2), (cc,3)) scala val maps=list.toMap maps: scala.collection.immutable.Map[String,Int] = Map(aa - 1, bb - 2, cc - 3) So to use toMap method, what am I missing in spark-submit? I use sbt package to compile the program and without problem. Thanks! Cheers, Dan
hive.contrib.serde2.RegexSerDe not found
Hi all:I am testing the performance of hive on spark sql.The existing table is created with ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe' WITH SERDEPROPERTIES ( 'input.regex' = '(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)\\|\\^\\|(.*?)','output.format.string' = '%1$s %2$s %3$s %4$s %5$s %16$s %7$s %8$s %9$s %10$s %11$s %12$s %13$s %14$s %15$s %16$s %17$s ')STORED AS TEXTFILElocation '/data/BaseData/wx/xx/xx/xx/xx'; When i use spark sql(spark-shell) to query the existing table, got exception like this:Caused by: MetaException(message:java.lang.ClassNotFoundException Class org.apache.hadoop.hive.contrib.serde2.RegexSerDe not found) at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:382) at org.apache.hadoop.hive.ql.metadata.Partition.getDeserializer(Partition.java:249) I add the jar dependency in the spark-shell command, still do not work.SPARK_SUBMIT_OPTS=-XX:MaxPermSize=256m ./bin/spark-shell --jars /data/dbcenter/cdh5/spark-1.4.0-bin-hadoop2.4/hive-contrib-0.13.1-cdh5.2.0.jar,postgresql-9.2-1004-jdbc41.jar How should i fix the problem?Cheers
Re: spark as a lookup engine for dedup
What the throughput of processing and for how long do you need to remember duplicates? You can take all the events, put them in an RDD, group by the key, and then process each key only once. But if you have a long running application where you want to check that you didn't see the same value before, and check that for every value, you probably need a key-value store, not RDD. On Sun, Jul 26, 2015 at 7:38 PM Shushant Arora shushantaror...@gmail.com wrote: Hi I have a requirement for processing large events but ignoring duplicate at the same time. Events are consumed from kafka and each event has a eventid. It may happen that an event is already processed and came again at some other offset. 1.Can I use Spark RDD to persist processed events and then lookup with this rdd (How to do lookup inside a RDD ?I have a JavaPairRDDeventid,timestamp ) while processing new events and if event is present in persisted rdd ignore it , else process the even. Does rdd.lookup(key) on billion of events will be efficient ? 2. update the rdd (Since RDD is immutable how to update it)? Thanks
Re: spark dataframe gc
This spark.shuffle.sort.bypassMergeThreshold might help, You could also try setting the shuffle manager to hash from sort. You can see more configuration options from here https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior. Thanks Best Regards On Fri, Jul 24, 2015 at 3:33 AM, Mohit Jaggi mohitja...@gmail.com wrote: Hi There, I am testing Spark DataFrame and havn't been able to get my code to finish due to what I suspect are GC issues. My guess is that GC interferes with heartbeating and executors are detected as failed. The data is ~50 numeric columns, ~100million rows in a CSV file. We are doing a groupBy using one of the columns and trying to calculate the average of each of the other columns. The groupBy key has about 250k unique values. It seems that Spark is creating a lot of temp objects (see jmap output below) while calculating the average which I am surprised to see. Why doesn't it use the same temp variable? Am I missing something? Do I need to specify a config flag to enable code generation and not do this? Mohit. [x app-20150723142604-0002]$ jmap -histo 12209 num #instances #bytes class name -- 1: 258615458 8275694656 scala.collection.immutable.$colon$colon 2: 103435856 7447381632 org.apache.spark.sql.catalyst.expressions.Cast 3: 103435856 4964921088 org.apache.spark.sql.catalyst.expressions.Coalesce 4: 1158643 4257400112 [B 5: 51717929 4137434320 org.apache.spark.sql.catalyst.expressions.SumFunction 6: 51717928 3723690816 org.apache.spark.sql.catalyst.expressions.Add 7: 51717929 2896204024 org.apache.spark.sql.catalyst.expressions.CountFunction 8: 51717928 2896203968 org.apache.spark.sql.catalyst.expressions.MutableLiteral 9: 51717928 2482460544 org.apache.spark.sql.catalyst.expressions.Literal 10: 51803728 1243289472 java.lang.Double 11: 51717755 1241226120 org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5 12:975810 850906320 [Lorg.apache.spark.sql.catalyst.expressions.AggregateFunction; 13: 51717754 827484064 org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$org$apache$spark$sql$catalyst$expressions$Cast$$cast$1 14:982451 47157648 java.util.HashMap$Entry 15:981132 34981720 [Ljava.lang.Object; 16: 1049984 25199616 org.apache.spark.sql.types.UTF8String 17:978296 23479104 org.apache.spark.sql.catalyst.expressions.GenericRow 18:117166 15944560 methodKlass 19:117166 14986224 constMethodKlass 20: 1567 12891952 [Ljava.util.HashMap$Entry; 21: 9103 10249728 constantPoolKlass 22: 91039278592 instanceKlassKlass 23: 50725691320 [I 24: 72815335040 constantPoolCacheKlass 25: 464204769600 [C 26:1059843391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
spark spark-ec2 credentials using aws_security_token
Hi, I would like to ask if it is currently possible to use spark-ec2 script together with credentials that are consisting not only from: aws_access_key_id and aws_secret_access_key, but it also contains aws_security_token. When I try to run the script I am getting following error message: ERROR:boto:Caught exception reading instance data Traceback (most recent call last): File /Users/zikes/opensource/spark/ec2/lib/boto-2.34.0/boto/utils.py, line 210, in retry_url r = opener.open(req, timeout=timeout) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 404, in open response = self._open(req, data) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 422, in _open '_open', req) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 382, in _call_chain result = func(*args) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 1214, in http_open return self.do_open(httplib.HTTPConnection, req) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 1184, in do_open raise URLError(err) URLError: urlopen error [Errno 64] Host is down ERROR:boto:Unable to read instance data, giving up No handler was ready to authenticate. 1 handlers were checked. ['QuerySignatureV2AuthHandler'] Check your credentials Does anyone has some idea what can be possibly wrong? Is aws_security_token the problem? I know that it seems more like a boto problem, but still I would like to ask if anybody has some experience with this? My launch command is: ./spark-ec2 -k my_key -i my_key.pem --additional-tags mytag:tag1,mytag2:tag2 --instance-profile-name profile1 -s 1 launch test Thank you in advance for any help. Best regards, Jan Note: I have also asked at http://stackoverflow.com/questions/31583513/spark-spark-ec2-credentials-using-aws-security-token?noredirect=1#comment51151822_31583513 without any success. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD[Future[T]] = Future[RDD[T]]
In this case, each partition will block until the futures in that partition are completed. If you are in the end collecting all the Futures to the driver, what is the reasoning behind using an RDD? You could just use a bunch of Futures directly. If you want to do some processing on the results of the futures, then I'd say you would need to block in each partition until the Futures' results are completed, as I'm not at all sure whether Futures would be composable across stage / task boundaries. On Mon, Jul 27, 2015 at 9:33 AM, Ayoub benali.ayoub.i...@gmail.com wrote: do you mean something like this ? val values = rdd.mapPartitions{ i: Iterator[Future[T]] = val future: Future[Iterator[T]] = Future sequence i Await result (future, someTimeout) } Where is the blocking happening in this case? It seems to me that all the workers will be blocked until the future is completed, no ? 2015-07-27 7:24 GMT+02:00 Nick Pentreath [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=0: You could use Iterator.single on the future[iterator]. However if you collect all the partitions I'm not sure if it will work across executor boundaries. Perhaps you may need to await the sequence of futures in each partition and return the resulting iterator. — Sent from Mailbox https://www.dropbox.com/mailbox On Sun, Jul 26, 2015 at 10:43 PM, Ayoub Benali [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=1 wrote: It doesn't work because mapPartitions expects a function f:(Iterator[T]) ⇒ Iterator[U] while .sequence wraps the iterator in a Future 2015-07-26 22:25 GMT+02:00 Ignacio Blasco [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=2: Maybe using mapPartitions and .sequence inside it? El 26/7/2015 10:22 p. m., Ayoub [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=3 escribió: Hello, I am trying to convert the result I get after doing some async IO : val rdd: RDD[T] = // some rdd val result: RDD[Future[T]] = rdd.map(httpCall) Is there a way collect all futures once they are completed in a *non blocking* (i.e. without scala.concurrent Await) and lazy way? If the RDD was a standard scala collection then calling scala.concurrent.Future.sequence would have resolved the issue but RDD is not a TraversableOnce (which is required by the method). Is there a way to do this kind of transformation with an RDD[Future[T]] ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=4 For additional commands, e-mail: [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=5 -- View this message in context: Re: RDD[Future[T]] = Future[RDD[T]] http://apache-spark-user-list.1001560.n3.nabble.com/Re-RDD-Future-T-Future-RDD-T-tp24005.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Functions in Spark SQL
Hi, Select last(product) from sampleTable Spark Version 1.3 -Vinod On Mon, Jul 27, 2015 at 3:48 AM, fightf...@163.com fightf...@163.com wrote: Hi, there I test with sqlContext.sql(select funcName(param1,param2,...) from tableName ) just worked fine. Would you like to paste your test code here ? And which version of Spark are u using ? Best, Sun. -- fightf...@163.com *From:* vinod kumar vinodsachin...@gmail.com *Date:* 2015-07-27 15:04 *To:* User user@spark.apache.org *Subject:* Functions in Spark SQL Hi, May I know how to use the functions mentioned in http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.functions$ in spark sql? when I use like Select last(column) from tablename I am getting error like 15/07/27 03:00:00 INFO exec.FunctionRegistry: Unable to lookup UDF in metastore: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:NoSuchO bjectException(message:Function default.last does not exist)) java.lang.RuntimeException: Couldn't find function last Thanks, Vinod
Re: ERROR SparkUI: Failed to bind SparkUI java.net.BindException: Address already in use: Service 'SparkUI' failed after 16 retries!
For each of your job, you can pass spark.ui.port to bind to a different port. Thanks Best Regards On Fri, Jul 24, 2015 at 7:49 PM, Joji John jj...@ebates.com wrote: Thanks Ajay. The way we wrote our spark application is that we have a generic python code, multiple instances of which can be called using different parameters. Does spark offer any function to bind it to a available port? I guess the other option is to define a function to find open port and use that. Thanks Joji John -- *From:* Ajay Singal asinga...@gmail.com *Sent:* Friday, July 24, 2015 6:59 AM *To:* Joji John *Cc:* user@spark.apache.org *Subject:* Re: ERROR SparkUI: Failed to bind SparkUI java.net.BindException: Address already in use: Service 'SparkUI' failed after 16 retries! Hi Jodi, I guess, there is no hard limit on number of Spark applications running in parallel. However, you need to ensure that you do not use the same (e.g., default) port numbers for each application. In your specific case, for example, if you try using default SparkUI port 4040 for more than one Spark applications, the first application you start will bind to port 4040. So, this port becomes unavailable (at this moment). Therefore, all subsequent applications you start will get SparkUI BindException. To solve this issue, simply use non-competing port numbers, e.g., 4040, 4041, 4042... Thanks, Ajay On Fri, Jul 24, 2015 at 6:21 AM, Joji John jj...@ebates.com wrote: *HI,* *I am getting this error for some of spark applications. I have multiple spark applications running in parallel. Is there a limit in the number of spark applications that I can run in parallel.* *ERROR SparkUI: Failed to bind SparkUI* *java.net.BindException: Address already in use: Service 'SparkUI' failed after 16 retries!* *Thanks* *Joji john*
Re: use S3-Compatible Storage with spark
So you are able to access your AWS S3 with s3a now? What is the error that you are getting when you try to access the custom storage with fs.s3a.endpoint? Thanks Best Regards On Mon, Jul 27, 2015 at 2:44 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I was able to access Amazon S3, but for some reason, the Endpoint parameter is ignored, and I'm not able to access to storage from my provider... : sc.hadoopConfiguration.set(fs.s3a.endpoint,test) sc.hadoopConfiguration.set(fs.s3a.awsAccessKeyId,) sc.hadoopConfiguration.set(fs.s3a.awsSecretAccessKey,) Any Idea why it doesn't work ? 2015-07-20 18:11 GMT+02:00 Schmirr Wurst schmirrwu...@gmail.com: Thanks, that is what I was looking for... Any Idea where I have to store and reference the corresponding hadoop-aws-2.6.0.jar ?: java.io.IOException: No FileSystem for scheme: s3n 2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Not in the uri, but in the hadoop configuration you can specify it. property namefs.s3a.endpoint/name descriptionAWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region (s3.amazonaws.com) is assumed. /description /property Thanks Best Regards On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I want to use pithos, were do I can specify that endpoint, is it possible in the url ? 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Could you name the Storage service that you are using? Most of them provides a S3 like RestAPI endpoint for you to hit. Thanks Best Regards On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why the length of each task varies
I am implementing wordcount on the spark cluster (1 master, 3 slaves) in standalone mode. I have 546G data, and the dfs.blocksize I set is 256MB. Therefore, the amount of tasks are 2186. My 3 slaves each uses 22 cores and 72 memory to do the processing, so the computing ability of each slave should be same. Since wordcount just has two parts, map and reduce, therefore, I think in each stage, each task takes care of one partition, so the length of each task should be nearly same right? However, from the event timeline I saw in job UI, I found that the length of each task in mapToPair stage varies much and there were many small tasks. I don't know if it is normal or it is my own problem ? Here is the pic of event timeline, http://apache-spark-user-list.1001560.n3.nabble.com/file/n24008/QQ%E6%88%AA%E5%9B%BE20150727172511.png And the amount of the tasks assigned to each slave are also different, http://apache-spark-user-list.1001560.n3.nabble.com/file/n24008/QQ%E6%88%AA%E5%9B%BE20150727172739.png Anybody has any idea with this? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-the-length-of-each-task-varies-tp24008.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: unserialize error in sparkR
Hi, Do you mean you are running the script with https://github.com/amplab-extras/SparkR-pkg and spark 1.2? I am afraid that currently there is no development effort and support on the SparkR-pkg since it has been integrated into Spark since Spark 1.4. Unfortunately, the RDD API and RDD-like API of DataFrame of SparkR is not exposed in Spark 1.4 for some considerations. Although not exposed, some RDD-like API of DataFrame are actually implemented which you can find in the SparkR source code, including lapply/lapplyPartition/flatMap/foreach/foreachPartition. Though not recommended, but if you really want to use them, you can use SparkR::: to access them as a temporary workaround. There is on-going investigation and discussion on whether to expose a subset of RDD API or not, you can refer to https://issues.apache.org/jira/browse/SPARK-7264 if you are interested. -Original Message- From: Jennifer15 [mailto:bsabe...@purdue.edu] Sent: Monday, July 27, 2015 1:47 PM To: user@spark.apache.org Subject: unserialize error in sparkR Hi, I have a newbie question; I get the following error by increasing the number of samples in my sample script samplescript.R http://apache-spark-user-list.1001560.n3.nabble.com/file/n24002/samplescript.R , which is written in Spark1.2 (no error for small sample of error): Error in unserialize(obj) : ReadItem: unknown type 0, perhaps written by later version of R Calls: assetForecast ... convertJListToRList - lapply - lapply - FUN - unserialize Execution halted I tried using Spark1.4 though I could not find lapply or any similar functions for dataframes. I am not sure if this error is because of using spark1.2 though if it is, what is the equivalent of lapply/map to work on dataframes? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/unserialize-error-in-sparkR-tp24002.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: use S3-Compatible Storage with spark
I was able to access Amazon S3, but for some reason, the Endpoint parameter is ignored, and I'm not able to access to storage from my provider... : sc.hadoopConfiguration.set(fs.s3a.endpoint,test) sc.hadoopConfiguration.set(fs.s3a.awsAccessKeyId,) sc.hadoopConfiguration.set(fs.s3a.awsSecretAccessKey,) Any Idea why it doesn't work ? 2015-07-20 18:11 GMT+02:00 Schmirr Wurst schmirrwu...@gmail.com: Thanks, that is what I was looking for... Any Idea where I have to store and reference the corresponding hadoop-aws-2.6.0.jar ?: java.io.IOException: No FileSystem for scheme: s3n 2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Not in the uri, but in the hadoop configuration you can specify it. property namefs.s3a.endpoint/name descriptionAWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region (s3.amazonaws.com) is assumed. /description /property Thanks Best Regards On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I want to use pithos, were do I can specify that endpoint, is it possible in the url ? 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Could you name the Storage service that you are using? Most of them provides a S3 like RestAPI endpoint for you to hit. Thanks Best Regards On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org