Re: Connection pooling in spark jobs
Out of curiosity I wanted to see what JBoss supported in terms of clustering and database connection pooling since its implementation should suffice for your use case. I found: *Note:* JBoss does not recommend using this feature on a production environment. It requires accessing a connection pool remotely and this is an anti-pattern as connections are not serializable. Besides, transaction propagation is not supported and it could lead to connection leaks if the remote clients are unreliable (i.e crashes, network failure). If you do need to access a datasource remotely, JBoss recommends accessing it via a remote session bean facade.[1] You probably aren't worried about transactions; I gather from your use case you are just pulling this data in a read only fashion. That being said JBoss appears to have something. The other thing to look for is whether or not a solution exists in Hadoop; I can't find anything for JDBC connection pools over a cluster (just pools local to a mapper which is similar to what Cody suggested earlier for Spark and partitions). If you were talking about a high volume web application then I'd believe the extra effort for connection pooling [over the cluster] would be worth it. Unless you're planning on executing several hundred parallel jobs, does the small amount of overhead outweigh the time necessary to develop a solution? (I'm guessing a solution doesn't exist because the pattern where it would be an issue just isn't a common use case for Spark. I went down this path - connection pooling - myself originally and found a single connection per executor was fine for my needs. Local connection pools for the partition as Cody said previously would also work for my use case.) A local connection pool that was shared amongst all executors on a node isn't a solution since different jobs execute under different JVMs even when on the same worker node.[2] 1. https://developer.jboss.org/wiki/ConfigDataSources 2. http://spark.apache.org/docs/latest/cluster-overview.html On Fri, Apr 3, 2015 at 1:39 AM Sateesh Kavuri sateesh.kav...@gmail.com wrote: Each executor runs for about 5 secs until which time the db connection can potentially be open. Each executor will have 1 connection open. Connection pooling surely has its advantages of performance and not hitting the dbserver for every open/close. The database in question is not just used by the spark jobs, but is shared by other systems and so the spark jobs have to better at managing the resources. I am not really looking for a db connections counter (will let the db handle that part), but rather have a pool of connections on spark end so that the connections can be reused across jobs On Fri, Apr 3, 2015 at 10:21 AM, Charles Feduke charles.fed...@gmail.com wrote: How long does each executor keep the connection open for? How many connections does each executor open? Are you certain that connection pooling is a performant and suitable solution? Are you running out of resources on the database server and cannot tolerate each executor having a single connection? If you need a solution that limits the number of open connections [resource starvation on the DB server] I think you'd have to fake it with a centralized counter of active connections, and logic within each executor that blocks when the counter is at a given threshold. If the counter is not at threshold, then an active connection can be created (after incrementing the shared counter). You could use something like ZooKeeper to store the counter value. This would have the overall effect of decreasing performance if your required number of connections outstrips the database's resources. On Fri, Apr 3, 2015 at 12:22 AM Sateesh Kavuri sateesh.kav...@gmail.com wrote: But this basically means that the pool is confined to the job (of a single app) in question, but is not sharable across multiple apps? The setup we have is a job server (the spark-jobserver) that creates jobs. Currently, we have each job opening and closing a connection to the database. What we would like to achieve is for each of the jobs to obtain a connection from a db pool Any directions on how this can be achieved? -- Sateesh On Thu, Apr 2, 2015 at 7:00 PM, Cody Koeninger c...@koeninger.org wrote: Connection pools aren't serializable, so you generally need to set them up inside of a closure. Doing that for every item is wasteful, so you typically want to use mapPartitions or foreachPartition rdd.mapPartition { part = setupPool part.map { ... See Design Patterns for using foreachRDD in http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams On Thu, Apr 2, 2015 at 7:52 AM, Sateesh Kavuri sateesh.kav...@gmail.com wrote: Right, I am aware on how to use connection pooling with oracle, but the specific question is how to use it in the context of spark job execution On 2 Apr 2015 17:41, Ted Yu yuzhih...@gmail.com wrote
Re: Spark Streaming Worker runs out of inodes
You could also try setting your `nofile` value in /etc/security/limits.conf for `soft` to some ridiculously high value if you haven't done so already. On Fri, Apr 3, 2015 at 2:09 AM Akhil Das ak...@sigmoidanalytics.com wrote: Did you try these? - Disable shuffle : spark.shuffle.spill=false - Enable log rotation: sparkConf.set(spark.executor.logs.rolling.strategy, size) .set(spark.executor.logs.rolling.size.maxBytes, 1024) .set(spark.executor.logs.rolling.maxRetainedFiles, 3) Thanks Best Regards On Fri, Apr 3, 2015 at 9:09 AM, a mesar amesa...@gmail.com wrote: Yes, with spark.cleaner.ttl set there is no cleanup. We pass --properties-file spark-dev.conf to spark-submit where spark-dev.conf contains: spark.master spark://10.250.241.66:7077 spark.logConf true spark.cleaner.ttl 1800 spark.executor.memory 10709m spark.cores.max 4 spark.shuffle.consolidateFiles true On Thu, Apr 2, 2015 at 7:12 PM, Tathagata Das t...@databricks.com wrote: Are you saying that even with the spark.cleaner.ttl set your files are not getting cleaned up? TD On Thu, Apr 2, 2015 at 8:23 AM, andrem amesa...@gmail.com wrote: Apparently Spark Streaming 1.3.0 is not cleaning up its internal files and the worker nodes eventually run out of inodes. We see tons of old shuffle_*.data and *.index files that are never deleted. How do we get Spark to remove these files? We have a simple standalone app with one RabbitMQ receiver and a two node cluster (2 x r3large AWS instances). Batch interval is 10 minutes after which we process data and write results to DB. No windowing or state mgmt is used. I've poured over the documentation and tried setting the following properties but they have not helped. As a work around we're using a cron script that periodically cleans up old files but this has a bad smell to it. SPARK_WORKER_OPTS in spark-env.sh on every worker node spark.worker.cleanup.enabled true spark.worker.cleanup.interval spark.worker.cleanup.appDataTtl Also tried on the driver side: spark.cleaner.ttl spark.shuffle.consolidateFiles true -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Spark-Streaming-Worker-runs-out-of- inodes-tp22355.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Which OS for Spark cluster nodes?
As Akhil says Ubuntu is a good choice if you're starting from near scratch. Cloudera CDH virtual machine images[1] include Hadoop, HDFS, Spark, and other big data tools so you can get a cluster running with very little effort. Keep in mind Cloudera is a for-profit corporation so they are also selling a product. Personally I prefer the EC2 scripts[2] that ship with the downloadable Spark distribution. It provisions a cluster for you on AWS and you can easily terminate the cluster when you don't need it. Ganglia (monitoring), HDFS (ephemeral and EBS backed), Tachyon (caching), and Spark are all installed automatically. For learning, using a cluster of 4 medium machines is fairly inexpensive. (I use the EC2 scripts for both an integration and production environment.) 1. http://www.cloudera.com/content/cloudera/en/products-and-services/cdh.html 2. https://spark.apache.org/docs/latest/ec2-scripts.html On Fri, Apr 3, 2015 at 7:38 AM Akhil Das ak...@sigmoidanalytics.com wrote: There isn't any specific Linux distro, but i would prefer Ubuntu for a beginner as its very easy to apt-get install stuffs on it. Thanks Best Regards On Fri, Apr 3, 2015 at 4:58 PM, Horsmann, Tobias tobias.horsm...@uni-due.de wrote: Hi, Are there any recommendations for operating systems that one should use for setting up Spark/Hadoop nodes in general? I am not familiar with the differences between the various linux distributions or how well they are (not) suited for cluster set-ups, so I wondered if there is some preferred choices? Regards,
Re: Connection pooling in spark jobs
How long does each executor keep the connection open for? How many connections does each executor open? Are you certain that connection pooling is a performant and suitable solution? Are you running out of resources on the database server and cannot tolerate each executor having a single connection? If you need a solution that limits the number of open connections [resource starvation on the DB server] I think you'd have to fake it with a centralized counter of active connections, and logic within each executor that blocks when the counter is at a given threshold. If the counter is not at threshold, then an active connection can be created (after incrementing the shared counter). You could use something like ZooKeeper to store the counter value. This would have the overall effect of decreasing performance if your required number of connections outstrips the database's resources. On Fri, Apr 3, 2015 at 12:22 AM Sateesh Kavuri sateesh.kav...@gmail.com wrote: But this basically means that the pool is confined to the job (of a single app) in question, but is not sharable across multiple apps? The setup we have is a job server (the spark-jobserver) that creates jobs. Currently, we have each job opening and closing a connection to the database. What we would like to achieve is for each of the jobs to obtain a connection from a db pool Any directions on how this can be achieved? -- Sateesh On Thu, Apr 2, 2015 at 7:00 PM, Cody Koeninger c...@koeninger.org wrote: Connection pools aren't serializable, so you generally need to set them up inside of a closure. Doing that for every item is wasteful, so you typically want to use mapPartitions or foreachPartition rdd.mapPartition { part = setupPool part.map { ... See Design Patterns for using foreachRDD in http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams On Thu, Apr 2, 2015 at 7:52 AM, Sateesh Kavuri sateesh.kav...@gmail.com wrote: Right, I am aware on how to use connection pooling with oracle, but the specific question is how to use it in the context of spark job execution On 2 Apr 2015 17:41, Ted Yu yuzhih...@gmail.com wrote: http://docs.oracle.com/cd/B10500_01/java.920/a96654/connpoca.htm The question doesn't seem to be Spark specific, btw On Apr 2, 2015, at 4:45 AM, Sateesh Kavuri sateesh.kav...@gmail.com wrote: Hi, We have a case that we will have to run concurrent jobs (for the same algorithm) on different data sets. And these jobs can run in parallel and each one of them would be fetching the data from the database. We would like to optimize the database connections by making use of connection pooling. Any suggestions / best known ways on how to achieve this. The database in question is Oracle Thanks, Sateesh
Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Assuming you are on Linux, what is your /etc/security/limits.conf set for nofile/soft (number of open file handles)? On Fri, Mar 20, 2015 at 3:29 PM Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826 (Too many open files) And then I switch to: conf.set(spark.shuffle.consolidateFiles, true) .set(spark.shuffle.manager, SORT) Then I get the error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly know the first issue is because Spark shuffle creates too many local temp files (and I don’t know the solution, because looks like my solution also cause other issues), but I am not sure what means is the second error. Anyone knows the solution for both cases? Regards, Shuai
Re: Writing Spark Streaming Programs
Scala is the language used to write Spark so there's never a situation in which features introduced in a newer version of Spark cannot be taken advantage of if you write your code in Scala. (This is mostly true of Java, but it may be a little more legwork if a Java-friendly adapter isn't available alongside new features.) Scala is also OO; its a functional hybrid OO language. Although much of my organization's codebase is written in Java and we've recently transitioned to Java 8 I still write all of my Spark code using Scala. (I also squeeze in Scala where I can in other parts of the organization.) Additionally I use both Python and R for local data analysis, though I haven't used Python with Spark in production. On Thu, Mar 19, 2015 at 10:51 AM James King jakwebin...@gmail.com wrote: Hello All, I'm using Spark for streaming but I'm unclear one which implementation language to use Java, Scala or Python. I don't know anything about Python, familiar with Scala and have been doing Java for a long time. I think the above shouldn't influence my decision on which language to use because I believe the tool should, fit the problem. In terms of performance Java and Scala are comparable. However Java is OO and Scala is FP, no idea what Python is. If using Scala and not applying a consistent style of programming Scala code can become unreadable, but I do like the fact it seems to be possible to do so much work with so much less code, that's a strong selling point for me. Also it could be that the type of programming done in Spark is best implemented in Scala as FP language, not sure though. The question I would like your good help with is are there any other considerations I need to think about when deciding this? are there any recommendations you can make in regards to this? Regards jk
Re: Spark History server default conf values
What I found from a quick search of the Spark source code (from my local snapshot on January 25, 2015): // Interval between each check for event log updates private val UPDATE_INTERVAL_MS = conf.getInt(spark.history.fs.updateInterval, conf.getInt(spark.history.updateInterval, 10)) * 1000 private val retainedApplications = conf.getInt(spark.history.retainedApplications, 50) On Tue, Mar 10, 2015 at 12:37 AM Srini Karri skarri@gmail.com wrote: Hi All, What are the default values for the following conf properities if we don't set in the conf file? # spark.history.fs.updateInterval 10 # spark.history.retainedApplications 500 Regards, Srini.
Re: Spark on EC2
This should help you understand the cost of running a Spark cluster for a short period of time: http://www.ec2instances.info/ If you run an instance for even 1 second of a single hour you are charged for that complete hour. So before you shut down your miniature cluster make sure you really are done with what you want to do, as firing up the cluster again will be like using an extra hour's worth of time. The purpose of EC2's free tier is to get you to purchase into AWS services. At the free level its not terribly useful except for the most simplest of web applications (which you could host on Heroku - also uses AWS - for free) or simple long running but largely dormant shell processes. On Tue Feb 24 2015 at 10:16:56 AM Deep Pradhan pradhandeep1...@gmail.com wrote: Thank You Sean. I was just trying to experiment with the performance of Spark Applications with various worker instances (I hope you remember that we discussed about the worker instances). I thought it would be a good one to try in EC2. So, it doesn't work out, does it? Thank You On Tue, Feb 24, 2015 at 8:40 PM, Sean Owen so...@cloudera.com wrote: The free tier includes 750 hours of t2.micro instance time per month. http://aws.amazon.com/free/ That's basically a month of hours, so it's all free if you run one instance only at a time. If you run 4, you'll be able to run your cluster of 4 for about a week free. A t2.micro has 1GB of memory, which is small but something you could possible get work done with. However it provides only burst CPU. You can only use about 10% of 1 vCPU continuously due to capping. Imagine this as about 1/10th of 1 core on your laptop. It would be incredibly slow. This is not to mention the network and I/O bottleneck you're likely to run into as you don't get much provisioning with these free instances. So, no you really can't use this for anything that is at all CPU intensive. It's for, say, running a low-traffic web service. On Tue, Feb 24, 2015 at 2:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
Emre, As you are keeping the properties file external to the JAR you need to make sure to submit the properties file as an additional --files (or whatever the necessary CLI switch is) so all the executors get a copy of the file along with the JAR. If you know you are going to just put the properties file on HDFS then why don't you define a custom system setting like properties.url and pass it along: (this is for Spark shell, the only CLI string I have available at the moment:) spark-shell --jars $JAR_NAME \ --conf 'properties.url=hdfs://config/stuff.properties' \ --conf 'spark.executor.extraJavaOptions=-Dproperties.url=hdfs://config/stuff.properties' ... then load the properties file during initialization by examining the properties.url system setting. I'd still strongly recommend Typesafe Config as it makes this a lot less painful, and I know for certain you can place your *.conf at a URL (using the -Dconfig.url=) though it probably won't work with an HDFS URL. On Tue Feb 17 2015 at 9:53:08 AM Gerard Maas gerard.m...@gmail.com wrote: +1 for TypeSafe config Our practice is to include all spark properties under a 'spark' entry in the config file alongside job-specific configuration: A config file would look like: spark { master = cleaner.ttl = 123456 ... } job { context { src = foo action = barAction } prop1 = val1 } Then, to create our Spark context, we transparently pass the spark section to a SparkConf instance. This idiom will instantiate the context with the spark specific configuration: sparkConfig.setAll(configToStringSeq(config.getConfig(spark).atPath(spark))) And we can make use of the config object everywhere else. We use the override model of the typesafe config: reasonable defaults go in the reference.conf (within the jar). Environment-specific overrides go in the application.conf (alongside the job jar) and hacks are passed with -Dprop=value :-) -kr, Gerard. On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've decided to try spark-submit ... --conf spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties But when I try to retrieve the value of propertiesFile via System.err.println(propertiesFile : + System.getProperty(propertiesFile)); I get NULL: propertiesFile : null Interestingly, when I run spark-submit with --verbose, I see that it prints: spark.driver.extraJavaOptions - -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties I couldn't understand why I couldn't get to the value of propertiesFile by using standard System.getProperty method. (I can use new SparkConf().get(spark.driver.extraJavaOptions) and manually parse it, and retrieve the value, but I'd like to know why I cannot retrieve that value using System.getProperty method). Any ideas? If I can achieve what I've described above properly, I plan to pass a properties file that resides on HDFS, so that it will be available to my driver program wherever that program runs. -- Emre On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com wrote: I haven't actually tried mixing non-Spark settings into the Spark properties. Instead I package my properties into the jar and use the Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala specific) to get at my properties: Properties file: src/main/resources/integration.conf (below $ENV might be set to either integration or prod[3]) ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \ --conf 'config.resource=$ENV.conf' \ --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf' Since the properties file is packaged up with the JAR I don't have to worry about sending the file separately to all of the slave nodes. Typesafe Config is written in Java so it will work if you're not using Scala. (The Typesafe Config also has the advantage of being extremely easy to integrate with code that is using Java Properties today.) If you instead want to send the file separately from the JAR and you use the Typesafe Config library, you can specify config.file instead of .resource; though I'd point you to [3] below if you want to make your development life easier. 1. https://github.com/typesafehub/config 2. https://github.com/ceedubs/ficus 3. http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/ On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I
Re: Spark newbie desires feedback on first program
I cannot comment about the correctness of Python code. I will assume your caper_kv is keyed on something that uniquely identifies all the rows that make up the person's record so your group by key makes sense, as does the map. (I will also assume all of the rows that comprise a single person's record will always fit in memory. If not you will need another approach.) You should be able to get away with removing the localhost:9000 from your HDFS URL, i.e., hdfs:///sma/processJSON/people and let your HDFS configuration for Spark supply the missing pieces. On Mon Feb 16 2015 at 3:38:31 PM Eric Bell e...@ericjbell.com wrote: I'm a spark newbie working on his first attempt to do write an ETL program. I could use some feedback to make sure I'm on the right path. I've written a basic proof of concept that runs without errors and seems to work, although I might be missing some issues when this is actually run on more than a single node. I am working with data about people (actually healthcare patients). I have an RDD that contains multiple rows per person. My overall goal is to create a single Person object for each person in my data. In this example, I am serializing to JSON, mostly because this is what I know how to do at the moment. Other than general feedback, is my use of the groupByKey() and mapValues() methods appropriate? Thanks! import json class Person: def __init__(self): self.mydata={} self.cpts = [] self.mydata['cpt']=self.cpts def addRowData(self, dataRow): # Get the CPT codes cpt = dataRow.CPT_1 if cpt: self.cpts.append(cpt) def serializeToJSON(self): return json.dumps(self.mydata) def makeAPerson(rows): person = Person() for row in rows: person.addRowData(row) return person.serializeToJSON() peopleRDD = caper_kv.groupByKey().mapValues(lambda personDataRows: makeAPerson(personDataRows)) peopleRDD.saveAsTextFile(hdfs://localhost:9000/sma/processJSON/people) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark newbie desires feedback on first program
My first problem was somewhat similar to yours. You won't find a whole lot of JDBC to Spark examples since I think a lot of the adoption for Spark is from teams already experienced with Hadoop and already have an established big data solution (so their data is already extracted from whatever sources, e.g., log files, Hive, other M/R jobs). JDBC support is somewhat... lacking. Our application uses a 12 node PostgreSQL distributed RDBMS that is sharded at the application tier. I had to write my own JDBC RDD to support this logical schema. However because you are coming from a single MySQL DB you should be able to get away with using the JdbcRDD[1]... but I cannot find a reference to it for the Python API so someone familiar with using Python and Spark will have to chime in on that. You need to consider _how_ the data gets from MySQL to the workers. It might work to pull all of the data to a single node and then parallelize that data across the cluster but its not going to be as efficient as range querying from each worker in the cluster to the database. If you're working with TBs of data then you will see very big benefits by distributing the data across workers from the get go; if you don't it will take however long it takes to copy all the data to a single worker and distribute as your startup code for each execution. (By range querying what I mean is basically what the JdbcRDD does - it forces you to include a conditional statement like id ? AND id = ? in your SQL which it formats at each worker so each worker only gets a piece of the pie). The JdbcRDD makes assumptions about numeric keys for range querying. The next thing to consider is if you're going against your production database, will massive reads cause degradation for production users? I am using read replicas to mitigate this for our production installation, as copying TBs of data out of PostgreSQL would have some negative effect on our users. Running your jobs during low traffic is obviously an option here, as is restoring a read-only version from backup and explicitly querying that instance (in which case parallelizing user IDs and querying MySQL directly might get you near to the JdbcRDD behavior). And of course if the MySQL instance is already your analytics solution then query on. 1. https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/rdd/JdbcRDD.html On Mon Feb 16 2015 at 4:42:30 PM Eric Bell e...@ericjbell.com wrote: Thanks Charles. I just realized a few minutes ago that I neglected to show the step where I generated the key on the person ID. Thanks for the pointer on the HDFS URL. Next step is to process data from multiple RDDS. My data originates from 7 tables in a MySQL database. I used sqoop to create avro files from these tables, and in turn created RDDs using SparkSQL from the avro files. Since the groupByKey only operates on a single RDD, I'm not quite sure yet how I'm going to process 7 tables as a transformation to get all the data I need into my objects. I'm vascillating on whether I should be doing it this way, or if it would be a lot simpler to query MySQL to get all the Person IDs, parallelize them, and have my Person class make queries directly to the MySQL database. Since in theory I only have to do this once, I'm not sure there's much to be gained in moving the data from MySQL to Spark first. I have yet to find any non-trivial examples of ETL logic on the web ... it seems like it's mostly word count map-reduce replacements. On 02/16/2015 01:32 PM, Charles Feduke wrote: I cannot comment about the correctness of Python code. I will assume your caper_kv is keyed on something that uniquely identifies all the rows that make up the person's record so your group by key makes sense, as does the map. (I will also assume all of the rows that comprise a single person's record will always fit in memory. If not you will need another approach.) You should be able to get away with removing the localhost:9000 from your HDFS URL, i.e., hdfs:///sma/processJSON/people and let your HDFS configuration for Spark supply the missing pieces.
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
I haven't actually tried mixing non-Spark settings into the Spark properties. Instead I package my properties into the jar and use the Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala specific) to get at my properties: Properties file: src/main/resources/integration.conf (below $ENV might be set to either integration or prod[3]) ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \ --conf 'config.resource=$ENV.conf' \ --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf' Since the properties file is packaged up with the JAR I don't have to worry about sending the file separately to all of the slave nodes. Typesafe Config is written in Java so it will work if you're not using Scala. (The Typesafe Config also has the advantage of being extremely easy to integrate with code that is using Java Properties today.) If you instead want to send the file separately from the JAR and you use the Typesafe Config library, you can specify config.file instead of .resource; though I'd point you to [3] below if you want to make your development life easier. 1. https://github.com/typesafehub/config 2. https://github.com/ceedubs/ficus 3. http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/ On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I thought I could read the value of my non-Spark property, namely, job.output.dir by using: SparkConf sparkConf = new SparkConf(); final String validatedJSONoutputDir = sparkConf.get(job.output.dir); But it gives me an exception: Exception in thread main java.util.NoSuchElementException: job.output.dir Is it not possible to mix Spark and non-Spark properties in a single .properties file, then pass it via --properties-file and then get the values of those non-Spark properties via SparkConf? Or is there another object / method to retrieve the values for those non-Spark properties? -- Emre Sevinç
Re: SPARK_LOCAL_DIRS Issue
A central location, such as NFS? If they are temporary for the purpose of further job processing you'll want to keep them local to the node in the cluster, i.e., in /tmp. If they are centralized you won't be able to take advantage of data locality and the central file store will become a bottleneck for further processing. If /tmp isn't an option because you want to be able to monitor the file outputs as they occur you can also use HDFS (assuming your Spark nodes are also HDFS members they will benefit from data locality). It looks like the problem you are seeing is that a lock cannot be acquired on the output file in the central file system. On Wed Feb 11 2015 at 11:55:55 AM TJ Klein tjkl...@gmail.com wrote: Hi, Using Spark 1.2 I ran into issued setting SPARK_LOCAL_DIRS to a different path then local directory. On our cluster we have a folder for temporary files (in a central file system), which is called /scratch. When setting SPARK_LOCAL_DIRS=/scratch/node name I get: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, XXX): java.io.IOException: Function not implemented at sun.nio.ch.FileDispatcherImpl.lock0(Native Method) at sun.nio.ch.FileDispatcherImpl.lock(FileDispatcherImpl.java:91) at sun.nio.ch.FileChannelImpl.lock(FileChannelImpl.java:1022) at java.nio.channels.FileChannel.lock(FileChannel.java:1052) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:379) Using SPARK_LOCAL_DIRS=/tmp, however, works perfectly. Any idea? Best, Tassilo -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/SPARK-LOCAL-DIRS-Issue-tp21602.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK_LOCAL_DIRS Issue
Take a look at this: http://wiki.lustre.org/index.php/Running_Hadoop_with_Lustre Particularly: http://wiki.lustre.org/images/1/1b/Hadoop_wp_v0.4.2.pdf (linked from that article) to get a better idea of what your options are. If its possible to avoid writing to [any] disk I'd recommend that route, since that's the performance advantage Spark has over vanilla Hadoop. On Wed Feb 11 2015 at 2:10:36 PM Tassilo Klein tjkl...@gmail.com wrote: Thanks for the info. The file system in use is a Lustre file system. Best, Tassilo On Wed, Feb 11, 2015 at 12:15 PM, Charles Feduke charles.fed...@gmail.com wrote: A central location, such as NFS? If they are temporary for the purpose of further job processing you'll want to keep them local to the node in the cluster, i.e., in /tmp. If they are centralized you won't be able to take advantage of data locality and the central file store will become a bottleneck for further processing. If /tmp isn't an option because you want to be able to monitor the file outputs as they occur you can also use HDFS (assuming your Spark nodes are also HDFS members they will benefit from data locality). It looks like the problem you are seeing is that a lock cannot be acquired on the output file in the central file system. On Wed Feb 11 2015 at 11:55:55 AM TJ Klein tjkl...@gmail.com wrote: Hi, Using Spark 1.2 I ran into issued setting SPARK_LOCAL_DIRS to a different path then local directory. On our cluster we have a folder for temporary files (in a central file system), which is called /scratch. When setting SPARK_LOCAL_DIRS=/scratch/node name I get: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, XXX): java.io.IOException: Function not implemented at sun.nio.ch.FileDispatcherImpl.lock0(Native Method) at sun.nio.ch.FileDispatcherImpl.lock(FileDispatcherImpl.java: 91) at sun.nio.ch.FileChannelImpl.lock(FileChannelImpl.java:1022) at java.nio.channels.FileChannel.lock(FileChannel.java:1052) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:379) Using SPARK_LOCAL_DIRS=/tmp, however, works perfectly. Any idea? Best, Tassilo -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/SPARK-LOCAL-DIRS-Issue-tp21602.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parsing CSV files in Spark
I've been doing a bunch of work with CSVs in Spark, mostly saving them as a merged CSV (instead of the various part-n files). You might find the following links useful: - This article is about combining the part files and outputting a header as the first line in the merged results: http://java.dzone.com/articles/spark-write-csv-file-header - This was my take on the previous author's original article, but it doesn't yet handle the header row: http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/ spark-csv helps with reading CSV data and mapping a schema for Spark SQL, but as of now doesn't save CSV data. On Fri Feb 06 2015 at 9:49:06 AM Sean Owen so...@cloudera.com wrote: You can do this manually without much trouble: get your files on a distributed store like HDFS, read them with textFile, filter out headers, parse with a CSV library like Commons CSV, select columns, format and store the result. That's tens of lines of code. However you probably want to start by looking at https://github.com/databricks/spark-csv which may make it even easier than that and give you a richer query syntax. On Fri, Feb 6, 2015 at 8:37 AM, Spico Florin spicoflo...@gmail.com wrote: Hi! I'm new to Spark. I have a case study that where the data is store in CSV files. These files have headers with morte than 1000 columns. I would like to know what are the best practice to parsing them and in special the following points: 1. Getting and parsing all the files from a folder 2. What CSV parser do you use? 3. I would like to select just some columns whose names matches a pattern and then pass the selected columns values (plus the column names) to the processing and save the output to a CSV (preserving the selected columns). If you have any experience with some points above, it will be really helpful (for me and for the others that will encounter the same cases) if you can share your thoughts. Thanks. Regards, Florin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming from kafka real time + batch processing in java
Good questions, some of which I'd like to know the answer to. Is it okay to update a NoSQL DB with aggregated counts per batch interval or is it generally stored in hdfs? This depends on how you are going to use the aggregate data. 1. Is there a lot of data? If so, and you are going to use the data as inputs to another job, it might benefit from being distributed across the cluster on HDFS (for data locality). 2. Usually when speaking about aggregates there is be substantially less data, in which case storing that data in another datastore is okay. If you're talking about a few thousand rows, and having them in something like Mongo or Postgres makes your life easier (reporting software, for example) - even if you use them as inputs to another job - its okay to just store the results in another data store. If the data will grow unbounded over time this might not be a good solution (in which case refer to #1). On Fri Feb 06 2015 at 6:16:39 AM Mohit Durgapal durgapalmo...@gmail.com wrote: I want to write a spark streaming consumer for kafka in java. I want to process the data in real-time as well as store the data in hdfs in year/month/day/hour/ format. I am not sure how to achieve this. Should I write separate kafka consumers, one for writing data to HDFS and one for spark streaming? Also I would like to ask what do people generally do with the result of spark streams after aggregating over it? Is it okay to update a NoSQL DB with aggregated counts per batch interval or is it generally stored in hdfs? Is it possible to store the mini batch data from spark streaming to HDFS in a way that the data is aggregated hourly and put into HDFS in its hour folder. I would not want a lot of small files equal to the mini batches of spark per hour, that would be inefficient for running hadoop jobs later. Is anyone working on the same problem? Any help and comments would be great. Regards Mohit
Re: How do I set spark.local.dirs?
Did you restart the slaves so they would read the settings? You don't need to start/stop the EC2 cluster, just the slaves. From the master node: $SPARK_HOME/sbin/stop-slaves.sh $SPARK_HOME/sbin/start-slaves.sh ($SPARK_HOME is probably /root/spark) On Fri Feb 06 2015 at 10:31:18 AM Joe Wass jw...@crossref.org wrote: I'm running on EC2 and I want to set the directory to use on the slaves (mounted EBS volumes). I have set: spark.local.dir /vol3/my-spark-dir in /root/spark/conf/spark-defaults.conf and replicated to all nodes. I have verified that in the console the value in the config corresponds. I have checked that these values are present in nodes. But it's still creating temp files in the wrong (default) place: /mnt2/spark How do I get my slaves to pick up this value? How can I verify that they have? Thanks! Joe
Re: spark on ec2
I don't see anything that says you must explicitly restart them to load the new settings, but usually there is some sort of signal trapped [or brute force full restart] to get a configuration reload for most daemons. I'd take a guess and use the $SPARK_HOME/sbin/{stop,start}-slaves.sh scripts on your master node and see. ( http://spark.apache.org/docs/1.2.0/spark-standalone.html#cluster-launch-scripts ) I just tested this out on my integration EC2 cluster and got odd results for stopping the workers (no workers found) but the start script... seemed to work. My integration cluster was running and functioning after executing both scripts, but I also didn't make any changes to spark-env either. On Thu Feb 05 2015 at 9:49:49 PM Kane Kim kane.ist...@gmail.com wrote: Hi, I'm trying to change setting as described here: http://spark.apache.org/docs/1.2.0/ec2-scripts.html export SPARK_WORKER_CORES=6 Then I ran ~/spark-ec2/copy-dir /root/spark/conf to distribute to slaves, but without any effect. Do I have to restart workers? How to do that with spark-ec2? Thanks.
Re: How to design a long live spark application
If you want to design something like Spark shell have a look at: http://zeppelin-project.org/ Its open source and may already do what you need. If not, its source code will be helpful in answering the questions about how to integrate with long running jobs that you have. On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote: You can check out https://github.com/spark-jobserver/spark-jobserver - this allows several users to upload their jars and run jobs with a REST interface. However, if all users are using the same functionality, you can write a simple spray server which will act as the driver and hosts the spark context+RDDs, launched in client mode. On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I want to develop a server side application: User submit request à Server run spark application and return (this might take a few seconds). So I want to host the server to keep the long-live context, I don’t know whether this is reasonable or not. Basically I try to have a global JavaSparkContext instance and keep it there, and initialize some RDD. Then my java application will use it to submit the job. So now I have some questions: 1, if I don’t close it, will there any timeout I need to configure on the spark server? 2, In theory I want to design something similar to Spark shell (which also host a default sc there), just it is not shell based. Any suggestion? I think my request is very common for application development, here must someone has done it before? Regards, Shawn
Re: Writing RDD to a csv file
In case anyone needs to merge all of their part-n files (small result set only) into a single *.csv file or needs to generically flatten case classes, tuples, etc., into comma separated values: http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/ On Tue Feb 03 2015 at 8:23:59 AM kundan kumar iitr.kun...@gmail.com wrote: Thanks Gerard !! This is working. On Tue, Feb 3, 2015 at 6:44 PM, Gerard Maas gerard.m...@gmail.com wrote: this is more of a scala question, so probably next time you'd like to address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala val optArrStr:Option[Array[String]] = ??? optArrStr.map(arr = arr.mkString(,)).getOrElse() // empty string or whatever default value you have for this. kr, Gerard. On Tue, Feb 3, 2015 at 2:09 PM, kundan kumar iitr.kun...@gmail.com wrote: I have a RDD which is of type org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))] I want to write it as a csv file. Please suggest how this can be done. myrdd.map(line = (line._1 + , + line._2._1.mkString(,) + , + line._2._2.mkString(','))).saveAsTextFile(hdfs://...) Doing mkString on line._2._1 works but does not work for the Option type. Please suggest how this can be done. Thanks Kundan
Re: groupByKey is not working
You'll still need to: import org.apache.spark.SparkContext._ Importing org.apache.spark._ does _not_ recurse into sub-objects or sub-packages, it only brings in whatever is at the level of the package or object imported. SparkContext._ has some implicits, one of them for adding groupByKey to an RDD[_] IIRC. On Fri Jan 30 2015 at 3:48:22 PM Stephen Boesch java...@gmail.com wrote: Amit - IJ will not find it until you add the import as Sean mentioned. It includes implicits that intellij will not know about otherwise. 2015-01-30 12:44 GMT-08:00 Amit Behera amit.bd...@gmail.com: I am sorry Sean. I am developing code in intelliJ Idea. so with the above dependencies I am not able to find *groupByKey* when I am searching by ctrl+space On Sat, Jan 31, 2015 at 2:04 AM, Sean Owen so...@cloudera.com wrote: When you post a question anywhere, and say it's not working, you *really* need to say what that means. On Fri, Jan 30, 2015 at 8:20 PM, Amit Behera amit.bd...@gmail.com wrote: hi all, my sbt file is like this: name := Spark version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.1.0 libraryDependencies += net.sf.opencsv % opencsv % 2.3 code: object SparkJob { def pLines(lines:Iterator[String])={ val parser=new CSVParser() lines.map(l={val vs=parser.parseLine(l) (vs(0),vs(1).toInt)}) } def main(args: Array[String]) { val conf = new SparkConf().setAppName(Spark Job).setMaster(local) val sc = new SparkContext(conf) val data = sc.textFile(/home/amit/testData.csv).cache() val result = data.mapPartitions(pLines).groupByKey //val list = result.filter(x= {(x._1).contains(24050881)}) } } Here groupByKey is not working . But same thing is working from spark-shell. Please help me Thanks Amit
Re: Serialized task result size exceeded
Are you using the default Java object serialization, or have you tried Kryo yet? If you haven't tried Kryo please do and let me know how much it impacts the serialization size. (I know its more efficient, I'm curious to know how much more efficient, and I'm being lazy - I don't have ~6K 500MB files on hand.) You can saveAsObjectFile on maybe a take(1) from an RDD and examine the serialized output to see if maybe a much larger graph than you expect is being output. On Fri Jan 30 2015 at 3:47:31 PM ankits ankitso...@gmail.com wrote: This is on spark 1.2 I am loading ~6k parquet files, roughly 500 MB each into a schemaRDD, and calling count() on it. After loading about 2705 tasks (there is one per file), the job crashes with this error: Total size of serialized results of 2705 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) This indicates that the results of each task are about 2705/1024 = 2.6MB each. Is that normal? I don't know exactly what the result of each task would be, but 2.6 MB for each seems too high. Can anyone offer an explanation as to what the normal size should be if this is too high, or ways to reduce this? Thanks. -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Serialized-task-result-size-exceeded-tp21449.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupByKey is not working
Define not working. Not compiling? If so you need: import org.apache.spark.SparkContext._ On Fri Jan 30 2015 at 3:21:45 PM Amit Behera amit.bd...@gmail.com wrote: hi all, my sbt file is like this: name := Spark version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.1.0 libraryDependencies += net.sf.opencsv % opencsv % 2.3 *code:* object SparkJob { def pLines(lines:Iterator[String])={ val parser=new CSVParser() lines.map(l={val vs=parser.parseLine(l) (vs(0),vs(1).toInt)}) } def main(args: Array[String]) { val conf = new SparkConf().setAppName(Spark Job).setMaster(local) val sc = new SparkContext(conf) val data = sc.textFile(/home/amit/testData.csv).cache() val result = data.mapPartitions(pLines).groupByKey //val list = result.filter(x= {(x._1).contains(24050881)}) } } Here groupByKey is not working . But same thing is working from *spark-shell.* Please help me Thanks Amit
Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con
I deal with problems like this so often across Java applications with large dependency trees. Add the shell function at the following link to your shell on the machine where your Spark Streaming is installed: https://gist.github.com/cfeduke/fe63b12ab07f87e76b38 Then run in the directory where your JAR files are: find-java-class SchemeRegistryFactory (I know you said HttpClient but the error seems to be an overload or method of SchemeRegistryFactory is missing from the class that is loaded by the class loader. The class loader loads the first class it finds that match the package/class name coordinates.) You'll then be able to zero in on the JAR that is bringing in an older version of that class. Once you've done that you can exclude that JAR's older dependency from in in your pom. If you find out that the newer version is incompatible you'll have to perform some magic with the Maven shade plugin. On Wed Jan 28 2015 at 8:00:22 AM Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using *Spark 1.1.0* and *Solr 4.10.3*. I'm getting an exception when using *HttpSolrServer* from within Spark Streaming: 15/01/28 13:42:52 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) Normally, when I use my utility class that uses SolrJ to connect to a Solr server and run it by itself (running it stand-alone without Spark), everything works as expected. But when I invoke that utility class inside a Spark Streaming application, I get the exception above as soon as it is trying to establish a connection to the Solr server. My preliminary Internet search led me to believe that some Spark or Hadoop components bring an older version of *httpclient*, so I've tried to exclude them in my pom.xml. But I still get the same exception. Any ideas why? Or how can I fix it? When I analyze my pom.xml dependencies, I get: $ mvn dependency:tree -Ddetail=true | grep http [INFO] | | \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:provided [INFO] | +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile [INFO] | +- org.apache.httpcomponents:httpcore:jar:4.3:compile [INFO] | +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile The whole dependency tree is: $ mvn dependency:tree -Ddetail=true [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building bigcontent 1.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ bigcontent --- [INFO] bigcontent:bigcontent:jar:1.0-SNAPSHOT [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.1.1:provided [INFO] | +- org.apache.spark:spark-core_2.10:jar:1.1.1:provided [INFO] | | +- org.apache.curator:curator-recipes:jar:2.4.0:provided [INFO] | | | \- org.apache.curator:curator-framework:jar:2.4.0:provided [INFO] | | | \- org.apache.curator:curator-client:jar:2.4.0:provided [INFO] | | +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:provided [INFO] | | | +- org.eclipse.jetty.orbit:javax.transaction:jar:1.1.1.v201105210645:provided [INFO] | | | +- org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:provided [INFO] | | | | +- org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:provided [INFO] | | | | \- org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:provided [INFO] | | | \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:provided [INFO] | | | \- org.eclipse.jetty.orbit:javax.mail.glassfish:jar:1.4.1.v201005082020:provided [INFO] | | |\- org.eclipse.jetty.orbit:javax.activation:jar:1.1.0.v201105071233:provided [INFO] | | +- org.eclipse.jetty:jetty-security:jar:8.1.14.v20131031:provided [INFO] | | +- org.eclipse.jetty:jetty-util:jar:8.1.14.v20131031:provided [INFO] | | +- org.apache.commons:commons-lang3:jar:3.3.2:provided [INFO] | | +- org.slf4j:jul-to-slf4j:jar:1.7.5:provided [INFO] | | +-
Re: spark 1.2 ec2 launch script hang
Yeah, I agree ~ should work. And it could have been [read: probably was] the fact that one of the EC2 hosts was in my known_hosts (don't know, never saw an error message, but the behavior is no error message for that state), which I had fixed later with Pete's patch. But the second execution when things worked with an absolute path could have worked because the random hosts that came up on EC2 were never in my known_hosts. On Wed Jan 28 2015 at 3:45:36 PM Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, I can’t see why using ~ would be problematic, especially if you confirm that echo ~/path/to/pem expands to the correct path to your identity file. If you have a simple reproduction of the problem, please send it over. I’d love to look into this. When I pass paths with ~ to spark-ec2 on my system, it works fine. I’m using bash, but zsh handles tilde expansion the same as bash. Nick On Wed Jan 28 2015 at 3:30:08 PM Charles Feduke charles.fed...@gmail.com wrote: It was only hanging when I specified the path with ~ I never tried relative. Hanging on the waiting for ssh to be ready on all hosts. I let it sit for about 10 minutes then I found the StackOverflow answer that suggested specifying an absolute path, cancelled, and re-run with --resume and the absolute path and all slaves were up in a couple minutes. (I've stood up 4 integration clusters and 2 production clusters on EC2 since with no problems.) On Wed Jan 28 2015 at 12:05:43 PM Nicholas Chammas nicholas.cham...@gmail.com wrote: Ey-chih, That makes more sense. This is a known issue that will be fixed as part of SPARK-5242 https://issues.apache.org/jira/browse/SPARK-5242. Charles, Thanks for the info. In your case, when does spark-ec2 hang? Only when the specified path to the identity file doesn't exist? Or also when you specify the path as a relative path or with ~? Nick On Wed Jan 28 2015 at 9:29:34 AM ey-chih chow eyc...@hotmail.com wrote: We found the problem and already fixed it. Basically, spark-ec2 requires ec2 instances to have external ip addresses. You need to specify this in the ASW console. -- From: nicholas.cham...@gmail.com Date: Tue, 27 Jan 2015 17:19:21 + Subject: Re: spark 1.2 ec2 launch script hang To: charles.fed...@gmail.com; pzybr...@gmail.com; eyc...@hotmail.com CC: user@spark.apache.org For those who found that absolute vs. relative path for the pem file mattered, what OS and shell are you using? What version of Spark are you using? ~/ vs. absolute path shouldn’t matter. Your shell will expand the ~/ to the absolute path before sending it to spark-ec2. (i.e. tilde expansion.) Absolute vs. relative path (e.g. ../../path/to/pem) also shouldn’t matter, since we fixed that for Spark 1.2.0 https://issues.apache.org/jira/browse/SPARK-4137. Maybe there’s some case that we missed? Nick On Tue Jan 27 2015 at 10:10:29 AM Charles Feduke charles.fed...@gmail.com wrote: Absolute path means no ~ and also verify that you have the path to the file correct. For some reason the Python code does not validate that the file exists and will hang (this is the same reason why ~ hangs). On Mon, Jan 26, 2015 at 10:08 PM Pete Zybrick pzybr...@gmail.com wrote: Try using an absolute path to the pem file On Jan 26, 2015, at 8:57 PM, ey-chih chow eyc...@hotmail.com wrote: Hi, I used the spark-ec2 script of spark 1.2 to launch a cluster. I have modified the script according to https://github.com/grzegorz-dubicki/spark/commit/5dd8458d2ab 9753aae939b3bb33be953e2c13a70 But the script was still hung at the following message: Waiting for cluster to enter 'ssh-ready' state. Any additional thing I should do to make it succeed? Thanks. Ey-Chih Chow -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/spark-1-2-ec2-launch-script-hang-tp21381.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.2 ec2 launch script hang
It was only hanging when I specified the path with ~ I never tried relative. Hanging on the waiting for ssh to be ready on all hosts. I let it sit for about 10 minutes then I found the StackOverflow answer that suggested specifying an absolute path, cancelled, and re-run with --resume and the absolute path and all slaves were up in a couple minutes. (I've stood up 4 integration clusters and 2 production clusters on EC2 since with no problems.) On Wed Jan 28 2015 at 12:05:43 PM Nicholas Chammas nicholas.cham...@gmail.com wrote: Ey-chih, That makes more sense. This is a known issue that will be fixed as part of SPARK-5242 https://issues.apache.org/jira/browse/SPARK-5242. Charles, Thanks for the info. In your case, when does spark-ec2 hang? Only when the specified path to the identity file doesn't exist? Or also when you specify the path as a relative path or with ~? Nick On Wed Jan 28 2015 at 9:29:34 AM ey-chih chow eyc...@hotmail.com wrote: We found the problem and already fixed it. Basically, spark-ec2 requires ec2 instances to have external ip addresses. You need to specify this in the ASW console. -- From: nicholas.cham...@gmail.com Date: Tue, 27 Jan 2015 17:19:21 + Subject: Re: spark 1.2 ec2 launch script hang To: charles.fed...@gmail.com; pzybr...@gmail.com; eyc...@hotmail.com CC: user@spark.apache.org For those who found that absolute vs. relative path for the pem file mattered, what OS and shell are you using? What version of Spark are you using? ~/ vs. absolute path shouldn’t matter. Your shell will expand the ~/ to the absolute path before sending it to spark-ec2. (i.e. tilde expansion.) Absolute vs. relative path (e.g. ../../path/to/pem) also shouldn’t matter, since we fixed that for Spark 1.2.0 https://issues.apache.org/jira/browse/SPARK-4137. Maybe there’s some case that we missed? Nick On Tue Jan 27 2015 at 10:10:29 AM Charles Feduke charles.fed...@gmail.com wrote: Absolute path means no ~ and also verify that you have the path to the file correct. For some reason the Python code does not validate that the file exists and will hang (this is the same reason why ~ hangs). On Mon, Jan 26, 2015 at 10:08 PM Pete Zybrick pzybr...@gmail.com wrote: Try using an absolute path to the pem file On Jan 26, 2015, at 8:57 PM, ey-chih chow eyc...@hotmail.com wrote: Hi, I used the spark-ec2 script of spark 1.2 to launch a cluster. I have modified the script according to https://github.com/grzegorz-dubicki/spark/commit/5dd8458d2ab 9753aae939b3bb33be953e2c13a70 But the script was still hung at the following message: Waiting for cluster to enter 'ssh-ready' state. Any additional thing I should do to make it succeed? Thanks. Ey-Chih Chow -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/spark-1-2-ec2-launch-script-hang-tp21381.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con
It looks like you're shading in the Apache HTTP commons library and its a different version than what is expected. (Maybe 4.6.x based on the Javadoc.) I see you are attempting to exclude commons-httpclient by using: exclusion groupIdcommons-httpclient/groupId artifactId*/artifactId /exclusion in your pom. However, what I think you really want is: exclusion groupIdorg.apache.httpcomponents/groupId artifactIdhttpclient/artifactId /exclusion The last time the groupId was commons-httpclient was Aug 2007 as version 3.1 (search.maven.com). I hope none of your dependencies rely on that particular version. SchemeRegistryFactory was introduced in version 4.3.1 of httpcomponents so even if by chance one of them did rely on commons-httpclient there wouldn't be a class conflict. On Wed Jan 28 2015 at 9:19:20 AM Emre Sevinc emre.sev...@gmail.com wrote: This is what I get: ./bigcontent-1.0-SNAPSHOT.jar:org/apache/http/impl/conn/Sch emeRegistryFactory.class (probably because I'm using a self-contained JAR). In other words, I'm still stuck. -- Emre On Wed, Jan 28, 2015 at 2:47 PM, Charles Feduke charles.fed...@gmail.com wrote: I deal with problems like this so often across Java applications with large dependency trees. Add the shell function at the following link to your shell on the machine where your Spark Streaming is installed: https://gist.github.com/cfeduke/fe63b12ab07f87e76b38 Then run in the directory where your JAR files are: find-java-class SchemeRegistryFactory (I know you said HttpClient but the error seems to be an overload or method of SchemeRegistryFactory is missing from the class that is loaded by the class loader. The class loader loads the first class it finds that match the package/class name coordinates.) You'll then be able to zero in on the JAR that is bringing in an older version of that class. Once you've done that you can exclude that JAR's older dependency from in in your pom. If you find out that the newer version is incompatible you'll have to perform some magic with the Maven shade plugin. On Wed Jan 28 2015 at 8:00:22 AM Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using *Spark 1.1.0* and *Solr 4.10.3*. I'm getting an exception when using *HttpSolrServer* from within Spark Streaming: 15/01/28 13:42:52 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) Normally, when I use my utility class that uses SolrJ to connect to a Solr server and run it by itself (running it stand-alone without Spark), everything works as expected. But when I invoke that utility class inside a Spark Streaming application, I get the exception above as soon as it is trying to establish a connection to the Solr server. My preliminary Internet search led me to believe that some Spark or Hadoop components bring an older version of *httpclient*, so I've tried to exclude them in my pom.xml. But I still get the same exception. Any ideas why? Or how can I fix it? When I analyze my pom.xml dependencies, I get: $ mvn dependency:tree -Ddetail=true | grep http [INFO] | | \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:provided [INFO] | +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile [INFO] | +- org.apache.httpcomponents:httpcore:jar:4.3:compile [INFO] | +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile The whole dependency tree is: $ mvn dependency:tree -Ddetail=true [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building bigcontent 1.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ bigcontent --- [INFO] bigcontent:bigcontent:jar:1.0-SNAPSHOT [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.1.1:provided [INFO] | +- org.apache.spark:spark-core_2.10:jar:1.1.1:provided [INFO] | | +- org.apache.curator:curator-recipes:jar
Re: Spark and S3 server side encryption
I have been trying to work around a similar problem with my Typesafe config *.conf files seemingly not appearing on the executors. (Though now that I think about it its not because the files are absent in the JAR, but because the -Dconf.resource environment variable I pass to the master obviously doesn't get relayed to the workers.) What happens if you do something like this: nohup ./bin/spark-submit --verbose —jars lib/app.jar \ --master spark://master-amazonaws.com:7077 \ --class com.elsevier.spark.SparkSync \ --conf spark.executor.extraJavaOptions=-Ds3service.server-side-encryption=AES256 lib/app.jar out.log (I bet this will fix my problem too.) On Wed Jan 28 2015 at 10:17:09 AM Kohler, Curt E (ELS-STL) c.koh...@elsevier.com wrote: So, following up on your suggestion, I'm still having some problems getting the configuration changes recognized when my job run. I’ve added jets3t.properties to the root of my application jar file that I submit to Spark (via spark-submit). I’ve verified that my jets3t.properties is at the root of my application jar by executing jar tf app.jar. I submit my job to the cluster with the following command. nohup ./bin/spark-submit --verbose —jars lib/app.jar --master spark://master-amazonaws.com:7077 --class com.elsevier.spark.SparkSync lib/app.jar out.log In my mainline of app.jar, I also added the following code: log.info(System.getProperty(java.class.path)); InputStream in = SparkSync.class.getClassLoader().getResourceAsStream(jets3t.properties); log.info(getStringFromInputStream(in)); And I can see that the jets3t.properties I provided is found because it outputs: s3service.server-side-encryption=AES256 It’s almost as if the hadoop/jets3t piece has already been initialized and is ignoring my jets3t.properties. I can get this all working inside of Eclipse by including the folder containing my jets3t.properties. But, I can’t get things working when trying to submit this to a spark stand-alone cluster. Any insights would be appreciated. -- *From:* Thomas Demoor thomas.dem...@amplidata.com *Sent:* Tuesday, January 27, 2015 4:41 AM *To:* Kohler, Curt E (ELS-STL) *Cc:* user@spark.apache.org *Subject:* Re: Spark and S3 server side encryption Spark uses the Hadoop filesystems. I assume you are trying to use s3n:// which, under the hood, uses the 3rd party jets3t library. It is configured through the jets3t.properties file (google hadoop s3n jets3t) which you should put on Spark's classpath. The setting you are looking for is s3service.server-side-encryption The last version of hadoop (2.6) introduces a new and improved s3a:// filesystem which has the official sdk from Amazon under the hood. On Mon, Jan 26, 2015 at 10:01 PM, curtkohler c.koh...@elsevier.com wrote: We are trying to create a Spark job that writes out a file to S3 that leverage S3's server side encryption for sensitive data. Typically this is accomplished by setting the appropriate header on the put request, but it isn't clear whether this capability is exposed in the Spark/Hadoop APIs. Does anyone have any suggestions? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-S3-server-side-encryption-tp21377.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con
Yeah it sounds like your original exclusion of commons-httpclient from hadoop-* was correct, but its still coming in from somewhere. Can you try something like this?: dependency artifactIdcommons-http/artifactId groupIdhttpclient/groupId scopeprovided/scope /dependency ref: http://stackoverflow.com/questions/4716310/is-there-a-way-to-exclude-a-maven-dependency-globally (I don't know if a provided dependency will work without a specific version number so I'm just making a guess here.) On Wed Jan 28 2015 at 11:24:02 AM Emre Sevinc emre.sev...@gmail.com wrote: When I examine the dependencies again, I see that SolrJ library is using v. 4.3.1 of org.apache.httpcomponents:httpclient [INFO] +- org.apache.solr:solr-solrj:jar:4.10.3:compile [INFO] | +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile == [INFO] | +- org.apache.httpcomponents:httpcore:jar:4.3:compile [INFO] | +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile [INFO] | +- org.codehaus.woodstox:wstx-asl:jar:3.2.7:compile [INFO] | \- org.noggit:noggit:jar:0.5:compile But hadoop-common 2.4.0 is using v. 3.1.1 of commons-httpclient:commons-httpclient : +- org.apache.hadoop:hadoop-common:jar:2.4.0:provided [INFO] | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | +- org.apache.commons:commons-math3:jar:3.1.1:provided [INFO] | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | +- commons-httpclient:commons-httpclient:jar:3.1:provided === [INFO] | +- commons-codec:commons-codec:jar:1.4:compile So my reasoning was: I have to exclude v. 3.1.1 of commons-httpclient:commons-httpclient and force it to use httpclient v. 4.3.1 that SolrJ declares as a dependency. But apparently somehow it does not work, I mean I have also tried your latest suggestion (changed the 'exclusion' to org.apache.httpcomponents and httpclient), still getting the same exception: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) Maybe it is about Hadoop 2.4.0, but I think this is what is included in the binary download of Spark. I've also tried it with Spark 1.2.0 binary (pre-built for Hadoop 2.4 and later). Or maybe I'm totally wrong, and the problem / fix is something completely different? -- Emre On Wed, Jan 28, 2015 at 4:58 PM, Charles Feduke charles.fed...@gmail.com wrote: It looks like you're shading in the Apache HTTP commons library and its a different version than what is expected. (Maybe 4.6.x based on the Javadoc.) I see you are attempting to exclude commons-httpclient by using: exclusion groupIdcommons-httpclient/groupId artifactId*/artifactId /exclusion in your pom. However, what I think you really want is: exclusion groupIdorg.apache.httpcomponents/groupId artifactIdhttpclient/artifactId /exclusion The last time the groupId was commons-httpclient was Aug 2007 as version 3.1 (search.maven.com). I hope none of your dependencies rely on that particular version. SchemeRegistryFactory was introduced in version 4.3.1 of httpcomponents so even if by chance one of them did rely on commons-httpclient there wouldn't be a class conflict. On Wed Jan 28 2015 at 9:19:20 AM Emre Sevinc emre.sev...@gmail.com wrote: This is what I get: ./bigcontent-1.0-SNAPSHOT.jar:org/apache/http/impl/conn/Sch emeRegistryFactory.class (probably because I'm using a self-contained JAR). In other words, I'm still stuck. -- Emre On Wed, Jan 28, 2015 at 2:47 PM, Charles Feduke charles.fed...@gmail.com wrote: I deal with problems like this so often across Java applications with large dependency trees. Add the shell function at the following link to your shell on the machine where your Spark Streaming is installed: https://gist.github.com/cfeduke/fe63b12ab07f87e76b38 Then run in the directory where your JAR files are: find-java-class SchemeRegistryFactory (I know you said HttpClient but the error seems to be an overload or method of SchemeRegistryFactory is missing from the class that is loaded by the class loader. The class loader
Re: spark 1.2 ec2 launch script hang
Absolute path means no ~ and also verify that you have the path to the file correct. For some reason the Python code does not validate that the file exists and will hang (this is the same reason why ~ hangs). On Mon, Jan 26, 2015 at 10:08 PM Pete Zybrick pzybr...@gmail.com wrote: Try using an absolute path to the pem file On Jan 26, 2015, at 8:57 PM, ey-chih chow eyc...@hotmail.com wrote: Hi, I used the spark-ec2 script of spark 1.2 to launch a cluster. I have modified the script according to https://github.com/grzegorz-dubicki/spark/commit/ 5dd8458d2ab9753aae939b3bb33be953e2c13a70 But the script was still hung at the following message: Waiting for cluster to enter 'ssh-ready' state. Any additional thing I should do to make it succeed? Thanks. Ey-Chih Chow -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/spark-1-2-ec2-launch-script-hang-tp21381.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HW imbalance
You should look at using Mesos. This should abstract away the individual hosts into a pool of resources and make the different physical specifications manageable. I haven't tried configuring Spark Standalone mode to have different specs on different machines but based on spark-env.sh.template: # - SPARK_WORKER_CORES, to set the number of cores to use on this machine # - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g) # - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. -Dx=y) it looks like you should be able to mix. (Its not clear to me whether SPARK_WORKER_MEMORY is uniform across the cluster or for the machine where the config file resides.) On Mon Jan 26 2015 at 8:07:51 AM Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, is it possible to mix hosts with (significantly) different specs within a cluster (without wasting the extra resources)? for example having 10 nodes with 36GB RAM/10CPUs now trying to add 3 hosts with 128GB/10CPUs - is there a way to utilize the extra memory by spark executors (as my understanding is all spark executors must have same memory). thanks, Antony.
Re: No AMI for Spark 1.2 using ec2 scripts
I definitely have Spark 1.2 running within EC2 using the spark-ec2 scripts. I downloaded Spark 1.2 with prebuilt for Hadoop 2.4 and later. What parameters are you using when you execute spark-ec2? I am launching in the us-west-1 region (ami-7a320f3f) which may explain things. On Mon Jan 26 2015 at 2:40:01 AM hajons haj...@gmail.com wrote: Hi, When I try to launch a standalone cluster on EC2 using the scripts in the ec2 directory for Spark 1.2, I get the following error: Could not resolve AMI at: https://raw.github.com/mesos/spark-ec2/v4/ami-list/us-east-1/pvm It seems there is not yet any AMI available on EC2. Any ideas when there will be one? This works without problems for version 1.1. Starting up a cluster using these scripts is so simple and straightforward, so I am really missing it on 1.2. /Håkan -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/No-AMI-for-Spark-1-2-using-ec2-scripts-tp21362.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)
I've got my solution working: https://gist.github.com/cfeduke/3bca88ed793ddf20ea6d I couldn't actually perform the steps I outlined in the previous message in this thread because I would ultimately be trying to serialize a SparkContext to the workers to use during the generation of 1..*n* JdbcRDDs. So I took a look at the source for JdbcRDD and it was trivial to adjust to my needs. This got me thinking about your problem; the JdbcRDD that ships with Spark will shard the query across the cluster by a Long ID value (requiring you to put ? placeholders in your query for use as part of a range boundary) so if you've got such a key - or any series field that happens to be a Long - then you'd just need to use the PostgreSQL JDBC driver and get your JDBC URL: http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html If you have something other than Long for your primary key/series data type then you can do the same thing I did and modify a copy of JdbcRDD, though your changes would be even fewer than my own. (Though I can't see anything much different than a Long or date/time working for this since it has to partition the full range into appropriate sub-ranges.) Because of the sub-range bucketing and cluster distribution you shouldn't run into OOM errors, assuming you provision sufficient worker nodes in the cluster. On Sun Jan 25 2015 at 9:39:56 AM Charles Feduke charles.fed...@gmail.com wrote: I'm facing a similar problem except my data is already pre-sharded in PostgreSQL. I'm going to attempt to solve it like this: - Submit the shard names (database names) across the Spark cluster as a text file and partition it so workers get 0 or more - hopefully 1 - shard name. In this case you could partition ranges - if your primary key is a datetime, then a start/end datetime pair; or if its a long then a start/end long pair. (You may need to run a separate job to get your overall start/end pair and then calculate how many partitions you need from there.) - Write the job so that the worker loads data from its shard(s) and unions the RDDs together. In the case of pairs the concept is the same. Basically look at how the JdbcRDD constructor requires a start, end, and query (disregard numPartitions in this case since we're manually partitioning in the step above). Your query will be its initial filter conditions plus a between condition for the primary key and its pair. - Operate on the union RDDs with other transformations or filters. If everything works as planned then the data should be spread out across the cluster and no one node will be responsible for loading TiBs of data and then distributing it to its peers. That should help with your OOM problem. Of course this does not guarantee that the data is balanced across nodes. With a large amount of data it should balance well enough to get the job done though. (You may need to run several refinements against the complete dataset to figure out the appropriate start/end pair values to get an RDD that is partitioned and balanced across the workers. This is a task best performed using aggregate query logic or stored procedures. With my shard problem I don't have this option available.) Unless someone has a better idea, in which case I'd love to hear it. On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hi Nicholas, thanks for your reply. I checked spark-redshift - it's just for the unload data files stored on hadoop, not for online result sets from DB. Do you know of any example of a custom RDD which fetches the data on the fly (not reading from HDFS)? Thanks. Denis -- *From:* Nicholas Chammas nicholas.cham...@gmail.com *To:* Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org user@spark.apache.org *Sent:* Sunday, 25 January 2015, 3:06 *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS Redshift) I believe databricks provides an rdd interface to redshift. Did you check spark-packages.org? On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hello, we've got some analytics data in AWS Redshift. The data is being constantly updated. I'd like to be able to write a query against Redshift which would return a subset of data, and then run a Spark job (Pyspark) to do some analysis. I could not find an RDD which would let me do it OOB (Python), so I tried writing my own. For example, tried combination of a generator (via yield) with parallelize. It appears though that parallelize reads all the data first into memory as I get either OOM or Python swaps as soon as I increase the number of rows beyond trivial limits. I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems that it also reads all the data into memory. So my question is - how to correctly feed Spark with huge datasets which don't initially reside in HDFS/S3 (ideally
Re: where storagelevel DISK_ONLY persists RDD to
I think you want to instead use `.saveAsSequenceFile` to save an RDD to someplace like HDFS or NFS it you are attempting to interoperate with another system, such as Hadoop. `.persist` is for keeping the contents of an RDD around so future uses of that particular RDD don't need to recalculate its composite parts. On Sun Jan 25 2015 at 3:36:31 AM Larry Liu larryli...@gmail.com wrote: I would like to persist RDD TO HDFS or NFS mount. How to change the location?
Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)
I'm facing a similar problem except my data is already pre-sharded in PostgreSQL. I'm going to attempt to solve it like this: - Submit the shard names (database names) across the Spark cluster as a text file and partition it so workers get 0 or more - hopefully 1 - shard name. In this case you could partition ranges - if your primary key is a datetime, then a start/end datetime pair; or if its a long then a start/end long pair. (You may need to run a separate job to get your overall start/end pair and then calculate how many partitions you need from there.) - Write the job so that the worker loads data from its shard(s) and unions the RDDs together. In the case of pairs the concept is the same. Basically look at how the JdbcRDD constructor requires a start, end, and query (disregard numPartitions in this case since we're manually partitioning in the step above). Your query will be its initial filter conditions plus a between condition for the primary key and its pair. - Operate on the union RDDs with other transformations or filters. If everything works as planned then the data should be spread out across the cluster and no one node will be responsible for loading TiBs of data and then distributing it to its peers. That should help with your OOM problem. Of course this does not guarantee that the data is balanced across nodes. With a large amount of data it should balance well enough to get the job done though. (You may need to run several refinements against the complete dataset to figure out the appropriate start/end pair values to get an RDD that is partitioned and balanced across the workers. This is a task best performed using aggregate query logic or stored procedures. With my shard problem I don't have this option available.) Unless someone has a better idea, in which case I'd love to hear it. On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hi Nicholas, thanks for your reply. I checked spark-redshift - it's just for the unload data files stored on hadoop, not for online result sets from DB. Do you know of any example of a custom RDD which fetches the data on the fly (not reading from HDFS)? Thanks. Denis -- *From:* Nicholas Chammas nicholas.cham...@gmail.com *To:* Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org user@spark.apache.org *Sent:* Sunday, 25 January 2015, 3:06 *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS Redshift) I believe databricks provides an rdd interface to redshift. Did you check spark-packages.org? On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid wrote: Hello, we've got some analytics data in AWS Redshift. The data is being constantly updated. I'd like to be able to write a query against Redshift which would return a subset of data, and then run a Spark job (Pyspark) to do some analysis. I could not find an RDD which would let me do it OOB (Python), so I tried writing my own. For example, tried combination of a generator (via yield) with parallelize. It appears though that parallelize reads all the data first into memory as I get either OOM or Python swaps as soon as I increase the number of rows beyond trivial limits. I've also looked at Java RDDs (there is an example of MySQL RDD) but it seems that it also reads all the data into memory. So my question is - how to correctly feed Spark with huge datasets which don't initially reside in HDFS/S3 (ideally for Pyspark, but would appreciate any tips)? Thanks. Denis
JDBC sharded solution
I'm trying to figure out the best approach to getting sharded data from PostgreSQL into Spark. Our production PGSQL cluster has 12 shards with TiB of data on each shard. (I won't be accessing all of the data on a shard at once, but I don't think its feasible to use Sqoop to copy tables who's data will be out of date rather quickly.) We are using RDS replication in AWS so read-heavy queries against the sharded data is okay. I see that the JdbcRDD is really designed to take data in chunks from a single datasource, where partitioning will spread the chunks across the cluster. This is neat for a single JDBC datasource but inconvenient when the data is already sharded. My current plan is to create a small text file with the shard names of our cluster and partition it across the Spark cluster. From there I will use custom code to process a SQL statement in the context of a JdbcRDD, generating 1..*n* Connections (and 1..*n *JdbcRDDs) with a partition size of 1 so each worker will handle 1..*n* shards [ideally 1]; those RDDs will then be unioned together [when more than 1 RDD] to get a shard's worth of data satisfying the SQL query into the worker for further processing. It seems like there should already be an established solution for this pattern though so I want to see if I am going about this entirely the wrong way and should instead be using something else. (If it matters we also have a second datacenter Cassandra cluster for executing analytic queries against that I could use if necessary. Originally I was going to ETL the PGSQL data into this cluster but that poses its own set of challenges.)