[Community] Python support added to Spark Job Server
Hi folks, Just a friendly message that we have added Python support to the REST Spark Job Server project. If you are a Python user looking for a RESTful way to manage your Spark jobs, please come have a look at our project! https://github.com/spark-jobserver/spark-jobserver -Evan - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Can we use spark inside a web service?
Andres, A couple points: 1) If you look at my post, you can see that you could use Spark for low-latency - many sub-second queries could be executed in under a second, with the right technology. It really depends on "real time" definition, but I believe low latency is definitely possible. 2) Akka-http over SparkContext - this is essentially what Spark Job Server does. (it uses Spray, whic is the predecessor to akka-http we will upgrade once Spark 2.0 is incorporated) 3) Someone else can probably talk about Ignite, but it is based on a distributed object cache. So you define your objects in Java, POJOs, annotate which ones you want indexed, upload your jars, then you can execute queries. It's a different use case than typical OLAP. There is some Spark integration, but then you would have the same bottlenecks going through Spark. On Fri, Mar 11, 2016 at 5:02 AM, Andrés Ivaldi <iaiva...@gmail.com> wrote: > nice discussion , I've a question about Web Service with Spark. > > What Could be the problem using Akka-http as web service (Like play does ) , > with one SparkContext created , and the queries over -http akka using only > the instance of that SparkContext , > > Also about Analytics , we are working on real- time Analytics and as Hemant > said Spark is not a solution for low latency queries. What about using > Ingite for that? > > > On Fri, Mar 11, 2016 at 6:52 AM, Hemant Bhanawat <hemant9...@gmail.com> > wrote: >> >> Spark-jobserver is an elegant product that builds concurrency on top of >> Spark. But, the current design of DAGScheduler prevents Spark to become a >> truly concurrent solution for low latency queries. DagScheduler will turn >> out to be a bottleneck for low latency queries. Sparrow project was an >> effort to make Spark more suitable for such scenarios but it never made it >> to the Spark codebase. If Spark has to become a highly concurrent solution, >> scheduling has to be distributed. >> >> Hemant Bhanawat >> www.snappydata.io >> >> On Fri, Mar 11, 2016 at 7:02 AM, Chris Fregly <ch...@fregly.com> wrote: >>> >>> great discussion, indeed. >>> >>> Mark Hamstra and i spoke offline just now. >>> >>> Below is a quick recap of our discussion on how they've achieved >>> acceptable performance from Spark on the user request/response path (@mark- >>> feel free to correct/comment). >>> >>> 1) there is a big difference in request/response latency between >>> submitting a full Spark Application (heavy weight) versus having a >>> long-running Spark Application (like Spark Job Server) that submits >>> lighter-weight Jobs using a shared SparkContext. mark is obviously using >>> the latter - a long-running Spark App. >>> >>> 2) there are some enhancements to Spark that are required to achieve >>> acceptable user request/response times. some links that Mark provided are >>> as follows: >>> >>> https://issues.apache.org/jira/browse/SPARK-11838 >>> https://github.com/apache/spark/pull/11036 >>> https://github.com/apache/spark/pull/11403 >>> https://issues.apache.org/jira/browse/SPARK-13523 >>> https://issues.apache.org/jira/browse/SPARK-13756 >>> >>> Essentially, a deeper level of caching at the shuffle file layer to >>> reduce compute and memory between queries. >>> >>> Note that Mark is running a slightly-modified version of stock Spark. >>> (He's mentioned this in prior posts, as well.) >>> >>> And I have to say that I'm, personally, seeing more and more >>> slightly-modified versions of Spark being deployed to production to >>> workaround outstanding PR's and Jiras. >>> >>> this may not be what people want to hear, but it's a trend that i'm >>> seeing lately as more and more customize Spark to their specific use cases. >>> >>> Anyway, thanks for the good discussion, everyone! This is why we have >>> these lists, right! :) >>> >>> >>> On Thu, Mar 10, 2016 at 7:51 PM, Evan Chan <velvia.git...@gmail.com> >>> wrote: >>>> >>>> One of the premises here is that if you can restrict your workload to >>>> fewer cores - which is easier with FiloDB and careful data modeling - >>>> you can make this work for much higher concurrency and lower latency >>>> than most typical Spark use cases. >>>> >>>> The reason why it typically does not work in production is that most >>>> people are using HDFS and files. These data sources are designed for >>>> running que
Re: Can we use spark inside a web service?
At least for simple queries, the DAGScheduler does not appear to be the bottleneck - since we are able to schedule 700 queries, and all the scheduling is probably done from the main application thread. However, I did have high hopes for Sparrow. What was the reason they decided not to include that? On Fri, Mar 11, 2016 at 1:52 AM, Hemant Bhanawat <hemant9...@gmail.com> wrote: > Spark-jobserver is an elegant product that builds concurrency on top of > Spark. But, the current design of DAGScheduler prevents Spark to become a > truly concurrent solution for low latency queries. DagScheduler will turn > out to be a bottleneck for low latency queries. Sparrow project was an > effort to make Spark more suitable for such scenarios but it never made it > to the Spark codebase. If Spark has to become a highly concurrent solution, > scheduling has to be distributed. > > Hemant Bhanawat > www.snappydata.io > > On Fri, Mar 11, 2016 at 7:02 AM, Chris Fregly <ch...@fregly.com> wrote: >> >> great discussion, indeed. >> >> Mark Hamstra and i spoke offline just now. >> >> Below is a quick recap of our discussion on how they've achieved >> acceptable performance from Spark on the user request/response path (@mark- >> feel free to correct/comment). >> >> 1) there is a big difference in request/response latency between >> submitting a full Spark Application (heavy weight) versus having a >> long-running Spark Application (like Spark Job Server) that submits >> lighter-weight Jobs using a shared SparkContext. mark is obviously using >> the latter - a long-running Spark App. >> >> 2) there are some enhancements to Spark that are required to achieve >> acceptable user request/response times. some links that Mark provided are >> as follows: >> >> https://issues.apache.org/jira/browse/SPARK-11838 >> https://github.com/apache/spark/pull/11036 >> https://github.com/apache/spark/pull/11403 >> https://issues.apache.org/jira/browse/SPARK-13523 >> https://issues.apache.org/jira/browse/SPARK-13756 >> >> Essentially, a deeper level of caching at the shuffle file layer to reduce >> compute and memory between queries. >> >> Note that Mark is running a slightly-modified version of stock Spark. >> (He's mentioned this in prior posts, as well.) >> >> And I have to say that I'm, personally, seeing more and more >> slightly-modified versions of Spark being deployed to production to >> workaround outstanding PR's and Jiras. >> >> this may not be what people want to hear, but it's a trend that i'm seeing >> lately as more and more customize Spark to their specific use cases. >> >> Anyway, thanks for the good discussion, everyone! This is why we have >> these lists, right! :) >> >> >> On Thu, Mar 10, 2016 at 7:51 PM, Evan Chan <velvia.git...@gmail.com> >> wrote: >>> >>> One of the premises here is that if you can restrict your workload to >>> fewer cores - which is easier with FiloDB and careful data modeling - >>> you can make this work for much higher concurrency and lower latency >>> than most typical Spark use cases. >>> >>> The reason why it typically does not work in production is that most >>> people are using HDFS and files. These data sources are designed for >>> running queries and workloads on all your cores across many workers, >>> and not for filtering your workload down to only one or two cores. >>> >>> There is actually nothing inherent in Spark that prevents people from >>> using it as an app server. However, the insistence on using it with >>> HDFS is what kills concurrency. This is why FiloDB is important. >>> >>> I agree there are more optimized stacks for running app servers, but >>> the choices that you mentioned: ES is targeted at text search; Cass >>> and HBase by themselves are not fast enough for analytical queries >>> that the OP wants; and MySQL is great but not scalable. Probably >>> something like VectorWise, HANA, Vertica would work well, but those >>> are mostly not free solutions. Druid could work too if the use case >>> is right. >>> >>> Anyways, great discussion! >>> >>> On Thu, Mar 10, 2016 at 2:46 PM, Chris Fregly <ch...@fregly.com> wrote: >>> > you are correct, mark. i misspoke. apologies for the confusion. >>> > >>> > so the problem is even worse given that a typical job requires multiple >>> > tasks/cores. >>> > >>> > i have yet to see this particular architecture w
Re: Can we use spark inside a web service?
One of the premises here is that if you can restrict your workload to fewer cores - which is easier with FiloDB and careful data modeling - you can make this work for much higher concurrency and lower latency than most typical Spark use cases. The reason why it typically does not work in production is that most people are using HDFS and files. These data sources are designed for running queries and workloads on all your cores across many workers, and not for filtering your workload down to only one or two cores. There is actually nothing inherent in Spark that prevents people from using it as an app server. However, the insistence on using it with HDFS is what kills concurrency. This is why FiloDB is important. I agree there are more optimized stacks for running app servers, but the choices that you mentioned: ES is targeted at text search; Cass and HBase by themselves are not fast enough for analytical queries that the OP wants; and MySQL is great but not scalable. Probably something like VectorWise, HANA, Vertica would work well, but those are mostly not free solutions. Druid could work too if the use case is right. Anyways, great discussion! On Thu, Mar 10, 2016 at 2:46 PM, Chris Freglywrote: > you are correct, mark. i misspoke. apologies for the confusion. > > so the problem is even worse given that a typical job requires multiple > tasks/cores. > > i have yet to see this particular architecture work in production. i would > love for someone to prove otherwise. > > On Thu, Mar 10, 2016 at 5:44 PM, Mark Hamstra > wrote: >>> >>> For example, if you're looking to scale out to 1000 concurrent requests, >>> this is 1000 concurrent Spark jobs. This would require a cluster with 1000 >>> cores. >> >> >> This doesn't make sense. A Spark Job is a driver/DAGScheduler concept >> without any 1:1 correspondence between Worker cores and Jobs. Cores are >> used to run Tasks, not Jobs. So, yes, a 1000 core cluster can run at most >> 1000 simultaneous Tasks, but that doesn't really tell you anything about how >> many Jobs are or can be concurrently tracked by the DAGScheduler, which will >> be apportioning the Tasks from those concurrent Jobs across the available >> Executor cores. >> >> On Thu, Mar 10, 2016 at 2:00 PM, Chris Fregly wrote: >>> >>> Good stuff, Evan. Looks like this is utilizing the in-memory >>> capabilities of FiloDB which is pretty cool. looking forward to the webcast >>> as I don't know much about FiloDB. >>> >>> My personal thoughts here are to removed Spark from the user >>> request/response hot path. >>> >>> I can't tell you how many times i've had to unroll that architecture at >>> clients - and replace with a real database like Cassandra, ElasticSearch, >>> HBase, MySql. >>> >>> Unfortunately, Spark - and Spark Streaming, especially - lead you to >>> believe that Spark could be used as an application server. This is not a >>> good use case for Spark. >>> >>> Remember that every job that is launched by Spark requires 1 CPU core, >>> some memory, and an available Executor JVM to provide the CPU and memory. >>> >>> Yes, you can horizontally scale this because of the distributed nature of >>> Spark, however it is not an efficient scaling strategy. >>> >>> For example, if you're looking to scale out to 1000 concurrent requests, >>> this is 1000 concurrent Spark jobs. This would require a cluster with 1000 >>> cores. this is just not cost effective. >>> >>> Use Spark for what it's good for - ad-hoc, interactive, and iterative >>> (machine learning, graph) analytics. Use an application server for what >>> it's good - managing a large amount of concurrent requests. And use a >>> database for what it's good for - storing/retrieving data. >>> >>> And any serious production deployment will need failover, throttling, >>> back pressure, auto-scaling, and service discovery. >>> >>> While Spark supports these to varying levels of production-readiness, >>> Spark is a batch-oriented system and not meant to be put on the user >>> request/response hot path. >>> >>> For the failover, throttling, back pressure, autoscaling that i mentioned >>> above, it's worth checking out the suite of Netflix OSS - particularly >>> Hystrix, Eureka, Zuul, Karyon, etc: http://netflix.github.io/ >>> >>> Here's my github project that incorporates a lot of these: >>> https://github.com/cfregly/fluxcapacitor >>> >>> Here's a netflix Skunkworks github project that packages these up in >>> Docker images: https://github.com/Netflix-Skunkworks/zerotodocker >>> >>> >>> On Thu, Mar 10, 2016 at 1:40 PM, velvia.github >>> wrote: Hi, I just wrote a blog post which might be really useful to you -- I have just benchmarked being able to achieve 700 queries per second in Spark. So, yes, web speed SQL queries are definitely possible. Read my new blog post:
Achieving 700 Spark SQL Queries Per Second
Hey folks, I just saw a recent thread on here (but can't find it anymore) on using Spark as a web-speed query engine. I want to let you guys know that this is definitely possible! Most folks don't realize how low-latency Spark can actually be. Please check out my blog post below on achieving 700 queries per second in Spark: http://velvia.github.io/Spark-Concurrent-Fast-Queries/ Would love your feedback. thanks, Evan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL - can we add new column(s) to parquet files
I would expect an SQL query on c would fail because c would not be known in the schema of the older Parquet file. What I'd be very interested in is how to add a new column as an incremental new parquet file, and be able to somehow join the existing and new file, in an efficient way. IE, somehow guarantee that for every row in the old parquet file, that the corresponding rows in the new file would be stored in the same node, so that joins are local. On Fri, Nov 21, 2014 at 10:03 AM, Sadhan Sood sadhan.s...@gmail.com wrote: We create the table definition by reading the parquet file for schema and store it in hive metastore. But if someone adds a new column to the schema, and if we rescan the schema from the new parquet files and update the table definition, would it still work if we run queries on the table ? So, old table has - Int a, Int b new table - Int a, Int b, String c but older parquet files don't have String c, so on querying the table would it return me null for column c from older files and data from newer files or fail?
Re: Multitenancy in Spark - within/across spark context
Ashwin, I would say the strategies in general are: 1) Have each user submit separate Spark app (each its own Spark Context), with its own resource settings, and share data through HDFS or something like Tachyon for speed. 2) Share a single spark context amongst multiple users, using fair scheduler. This is sort of like having a Hadoop resource pool.It has some obvious HA/SPOF issues, namely that if the context dies then every user using it is also dead. Also, sharing RDDs in cached memory has the same resiliency problems, namely that if any executor dies then Spark must recompute / rebuild the RDD (it tries to only rebuild the missing part, but sometimes it must rebuild everything). Job server can help with 1 or 2, 2 in particular. If you have any questions about job server, feel free to ask at the spark-jobserver google group. I am the maintainer. -Evan On Thu, Oct 23, 2014 at 1:06 PM, Marcelo Vanzin van...@cloudera.com wrote: You may want to take a look at https://issues.apache.org/jira/browse/SPARK-3174. On Thu, Oct 23, 2014 at 2:56 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Upvote for the multitanency requirement. I'm also building a data analytic platform and there'll be multiple users running queries and computations simultaneously. One of the paint point is control of resource size. Users don't really know how much nodes they need, they always use as much as possible... The result is lots of wasted resource in our Yarn cluster. A way to 1) allow multiple spark context to share the same resource or 2) add dynamic resource management for Yarn mode is very much wanted. Jianshi On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin van...@cloudera.com wrote: On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar ashwinshanka...@gmail.com wrote: That's not something you might want to do usually. In general, a SparkContext maps to a user application My question was basically this. In this page in the official doc, under Scheduling within an application section, it talks about multiuser and fair sharing within an app. How does multiuser within an application work(how users connect to an app,run their stuff) ? When would I want to use this ? I see. The way I read that page is that Spark supports all those scheduling options; but Spark doesn't give you the means to actually be able to submit jobs from different users to a running SparkContext hosted on a different process. For that, you'll need something like the job server that I referenced before, or write your own framework for supporting that. Personally, I'd use the information on that page when dealing with concurrent jobs in the same SparkContext, but still restricted to the same user. I'd avoid trying to create any application where a single SparkContext is trying to be shared by multiple users in any way. As far as I understand, this will cause executors to be killed, which means that Spark will start retrying tasks to rebuild the data that was held by those executors when needed. I basically wanted to find out if there were any gotchas related to preemption on Spark. Things like say half of an application's executors got preempted say while doing reduceByKey, will the application progress with the remaining resources/fair share ? Jobs should still make progress as long as at least one executor is available. The gotcha would be the one I mentioned, where Spark will fail your job after x executors failed, which might be a common occurrence when preemption is enabled. That being said, it's a configurable option, so you can set x to a very large value and your job should keep on chugging along. The options you'd want to take a look at are: spark.task.maxFailures and spark.yarn.max.executor.failures -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Marcelo - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Use Case of mutable RDD - any ideas around will help.
Sweet, that's probably it. Too bad it didn't seem to make 1.1? On Wed, Sep 17, 2014 at 5:32 PM, Michael Armbrust mich...@databricks.com wrote: The unknown slowdown might be addressed by https://github.com/apache/spark/commit/f858f466862541c3faad76a1fa2391f1c17ec9dd On Sun, Sep 14, 2014 at 10:40 PM, Evan Chan velvia.git...@gmail.com wrote: SPARK-1671 looks really promising. Note that even right now, you don't need to un-cache the existing table. You can do something like this: newAdditionRdd.registerTempTable(table2) sqlContext.cacheTable(table2) val unionedRdd = sqlContext.table(table1).unionAll(sqlContext.table(table2)) When you use table, it will return you the cached representation, so that the union executes much faster. However, there is some unknown slowdown, it's not quite as fast as what you would expect. On Fri, Sep 12, 2014 at 2:09 PM, Cheng Lian lian.cs@gmail.com wrote: Ah, I see. So basically what you need is something like cache write through support which exists in Shark but not implemented in Spark SQL yet. In Shark, when inserting data into a table that has already been cached, the newly inserted data will be automatically cached and “union”-ed with the existing table content. SPARK-1671 was created to track this feature. We’ll work on that. Currently, as a workaround, instead of doing union at the RDD level, you may try cache the new table, union it with the old table and then query the union-ed table. The drawbacks is higher code complexity and you end up with lots of temporary tables. But the performance should be reasonable. On Fri, Sep 12, 2014 at 1:19 PM, Archit Thakur archit279tha...@gmail.com wrote: LittleCode snippet: line1: cacheTable(existingRDDTableName) line2: //some operations which will materialize existingRDD dataset. line3: existingRDD.union(newRDD).registerAsTable(new_existingRDDTableName) line4: cacheTable(new_existingRDDTableName) line5: //some operation that will materialize new _existingRDD. now, what we expect is in line4 rather than caching both existingRDDTableName and new_existingRDDTableName, it should cache only new_existingRDDTableName. but we cannot explicitly uncache existingRDDTableName because we want the union to use the cached existingRDDTableName. since being lazy new_existingRDDTableName could be materialized later and by then we cant lose existingRDDTableName from cache. What if keep the same name of the new table so, cacheTable(existingRDDTableName) existingRDD.union(newRDD).registerAsTable(existingRDDTableName) cacheTable(existingRDDTableName) //might not be needed again. Will our both cases be satisfied, that it uses existingRDDTableName from cache for union and dont duplicate the data in the cache but somehow, append to the older cacheTable. Thanks and Regards, Archit Thakur. Sr Software Developer, Guavus, Inc. On Sat, Sep 13, 2014 at 12:01 AM, pankaj arora pankajarora.n...@gmail.com wrote: I think i should elaborate usecase little more. So we have UI dashboard whose response time is quite fast as all the data is cached. Users query data based on time range and also there is always new data coming into the system at predefined frequency lets say 1 hour. As you said i can uncache tables it will basically drop all data from memory. I cannot afford losing my cache even for short interval. As all queries from UI will get slow till the time cache loads again. UI response time needs to be predictable and shoudl be fast enough so that user does not get irritated. Also i cannot keep two copies of data(till newrdd materialize) into memory as it will surpass total available memory in system. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Use-Case-of-mutable-RDD-any-ideas-around-will-help-tp14095p14112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Example of Geoprocessing with Spark
Hi Abel, Pretty interesting. May I ask how big is your point CSV dataset? It seems you are relying on searching through the FeatureCollection of polygons for which one intersects your point. This is going to be extremely slow. I highly recommend using a SpatialIndex, such as the many that exist in the JTS library itself, to speed things up. Also, note that the geoscript library is not really maintained anymore. I forked it with the intention of maintaining it some more, but I've decided this is not really a good direction. On Thu, Sep 18, 2014 at 7:02 PM, Abel Coronado Iruegas acoronadoirue...@gmail.com wrote: Now i have a better version, but now the problem is that the saveAsTextFile do not finish the Job, in the hdfs repository only exist a partial temporary file, someone can tell me what is wrong: Thanks !! object SimpleApp { def main(args: Array[String]){ val conf = new SparkConf().setAppName(Csv Clipper) val sc = new SparkContext(conf) val csvPath = hdfs://m01/user/acoronado/mov/movilidad_64mb.csv val csv = sc.textFile(csvPath) csv.cache() val clipPoints = csv.map({line: String = val Array(usuario, lat, lon, date) = line.split(,).map(_.trim) val punto = Point(lon.toDouble,lat.toDouble) val internal = geoDataExternal.get.find(f = f.geometry intersects punto) val (cve_est, cve_mun) = internal match { case Some(f:org.geoscript.feature.Feature) = { val index = f.getAttribute(1).toString() val existe = geoDataMun.get(index).find(f = f.geometry intersects punto) existe match { case Some(f) = (f.getAttribute(1).toString, f.getAttribute(2).toString) case None = (0, 0) } } case None = (0, 0) } val time = try {(new SimpleDateFormat(-MM-dd'T'HH:mm:ss.SSSZ)).parse(date.replaceAll(Z$, +)).getTime().toString()} catch {case e: Exception = 0} line+,+time+,+cve_est+,+cve_mun }) clipPoints.saveAsTextFile(hdfs://m01/user/acoronado/mov/resultados_movilidad_60.csv) println(Spark Clip Exito!!!) } object geoDataMun { private val shp = Shapefile(/geoData/MunicipiosLatLon.shp) val features = shp.getFeatures.toIterable val result = scala.io.Source.fromFile(/geoData/indice_espacial.csv) .getLines() .toList map { line: String = val campos = line.split(,).map(_.trim) val cve_edo = campos(0) val cve_mun = campos(1) val index = campos(2) scala.collection.immutable.List(index.toInt , (cve_edo,cve_mun)) } val mapaIx = result.groupBy(x=x(0)).mapValues(cves = cves.map(x = x(1))) def get(index:String) = { features.filter(f = mapaIx(index.toInt).contains((f.getAttribute(1).toString,f.getAttribute(2).toString))) } } object geoDataExternal{ private val shp = Shapefile(/geoData/IndiceRecortado.shp) val features = shp.getFeatures def get: FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature] = features } } the log of the driver is: 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] - [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942 ] 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] - [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [
Re: Better way to process large image data set ?
What Sean said. You should also definitely turn on Kryo serialization. The default Java serialization is really really slow if you're gonna move around lots of data.Also make sure you use a cluster with high network bandwidth on. On Thu, Sep 18, 2014 at 3:06 AM, Sean Owen so...@cloudera.com wrote: Base 64 is an inefficient encoding for binary data by about 2.6x. You could use byte[] directly. But you would still be storing and potentially shuffling lots of data in your RDDs. If the files exist separately on HDFS perhaps you can just send around the file location and load it directly using HDFS APIs in the function that needs it. On Sep 18, 2014 9:51 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to process a large image data set and need some way to optimize my implementation since it's very slow from now. In my current implementation I store my images in an object file with the following fields case class Image(groupId: String, imageId: String, buffer: String) Images belong to groups and have an id, the buffer is the image file (jpg, png) encode in base 64 string. Before running an image processing algorithm on the image buffer, I have a lot of jobs that filter, group, join images in my data set based on groupId or imageId and theses steps are relatively slow. I suspect that spark moves around my image buffer even if it's not necessary for these specific jobs and then there's a lot of communication times waste. Is there a better way to optimize my implementation ? Regards, Jaonary - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Use Case of mutable RDD - any ideas around will help.
SPARK-1671 looks really promising. Note that even right now, you don't need to un-cache the existing table. You can do something like this: newAdditionRdd.registerTempTable(table2) sqlContext.cacheTable(table2) val unionedRdd = sqlContext.table(table1).unionAll(sqlContext.table(table2)) When you use table, it will return you the cached representation, so that the union executes much faster. However, there is some unknown slowdown, it's not quite as fast as what you would expect. On Fri, Sep 12, 2014 at 2:09 PM, Cheng Lian lian.cs@gmail.com wrote: Ah, I see. So basically what you need is something like cache write through support which exists in Shark but not implemented in Spark SQL yet. In Shark, when inserting data into a table that has already been cached, the newly inserted data will be automatically cached and “union”-ed with the existing table content. SPARK-1671 was created to track this feature. We’ll work on that. Currently, as a workaround, instead of doing union at the RDD level, you may try cache the new table, union it with the old table and then query the union-ed table. The drawbacks is higher code complexity and you end up with lots of temporary tables. But the performance should be reasonable. On Fri, Sep 12, 2014 at 1:19 PM, Archit Thakur archit279tha...@gmail.com wrote: LittleCode snippet: line1: cacheTable(existingRDDTableName) line2: //some operations which will materialize existingRDD dataset. line3: existingRDD.union(newRDD).registerAsTable(new_existingRDDTableName) line4: cacheTable(new_existingRDDTableName) line5: //some operation that will materialize new _existingRDD. now, what we expect is in line4 rather than caching both existingRDDTableName and new_existingRDDTableName, it should cache only new_existingRDDTableName. but we cannot explicitly uncache existingRDDTableName because we want the union to use the cached existingRDDTableName. since being lazy new_existingRDDTableName could be materialized later and by then we cant lose existingRDDTableName from cache. What if keep the same name of the new table so, cacheTable(existingRDDTableName) existingRDD.union(newRDD).registerAsTable(existingRDDTableName) cacheTable(existingRDDTableName) //might not be needed again. Will our both cases be satisfied, that it uses existingRDDTableName from cache for union and dont duplicate the data in the cache but somehow, append to the older cacheTable. Thanks and Regards, Archit Thakur. Sr Software Developer, Guavus, Inc. On Sat, Sep 13, 2014 at 12:01 AM, pankaj arora pankajarora.n...@gmail.com wrote: I think i should elaborate usecase little more. So we have UI dashboard whose response time is quite fast as all the data is cached. Users query data based on time range and also there is always new data coming into the system at predefined frequency lets say 1 hour. As you said i can uncache tables it will basically drop all data from memory. I cannot afford losing my cache even for short interval. As all queries from UI will get slow till the time cache loads again. UI response time needs to be predictable and shoudl be fast enough so that user does not get irritated. Also i cannot keep two copies of data(till newrdd materialize) into memory as it will surpass total available memory in system. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Use-Case-of-mutable-RDD-any-ideas-around-will-help-tp14095p14112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Finding previous and next element in a sorted RDD
There's no way to avoid a shuffle due to the first and last elements of each partition needing to be computed with the others, but I wonder if there is a way to do a minimal shuffle. On Thu, Aug 21, 2014 at 6:13 PM, cjwang c...@cjwang.us wrote: One way is to do zipWithIndex on the RDD. Then use the index as a key. Add or subtract 1 for previous or next element. Then use cogroup or join to bind them together. val idx = input.zipWithIndex val previous = idx.map(x = (x._2+1, x._1)) val current = idx.map(x = (x._2, x._1)) val next = idx.map(x = (x._2-1, x._1)) val joined = current leftOuterJoin previous leftOuterJoin next Code looks clean to me, but I feel uneasy about the performance of join. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-tp12621p12623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Merging two Spark SQL tables?
Is it possible to merge two cached Spark SQL tables into a single table so it can queried with one SQL statement? ie, can you do schemaRdd1.union(schemaRdd2), then register the new schemaRdd and run a query over it? Ideally, both schemaRdd1 and schemaRdd2 would be cached, so the union should run cached too. thanks, Evan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Merging two Spark SQL tables?
SO I tried the above (why doesn't union or ++ have the same behavior btw?) and it works, but is slow because the original Rdds are not cached and files must be read from disk. I also discovered you can recover the InMemoryCached versions of the Rdds using sqlContext.table(table1). Thus you can do sqlContext.table(table1).unionAll(sqlContext.table(table2)), but this is like 10x slower than running the query on table1 which is cached using sqlContext.cacheTable(). (at least on Spark 1.0.2, haven't tried on 1.1.0 snapshot yet) On Thu, Aug 21, 2014 at 12:17 AM, Michael Armbrust mich...@databricks.com wrote: I believe this should work if you run srdd1.unionAll(srdd2). Both RDDs must have the same schema. On Wed, Aug 20, 2014 at 11:30 PM, Evan Chan velvia.git...@gmail.com wrote: Is it possible to merge two cached Spark SQL tables into a single table so it can queried with one SQL statement? ie, can you do schemaRdd1.union(schemaRdd2), then register the new schemaRdd and run a query over it? Ideally, both schemaRdd1 and schemaRdd2 would be cached, so the union should run cached too. thanks, Evan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Writeup on Spark SQL with GDELT
I just put up a repo with a write-up on how to import the GDELT public dataset into Spark SQL and play around. Has a lot of notes on different import methods and observations about Spark SQL. Feel free to have a look and comment. http://www.github.com/velvia/spark-sql-gdelt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Tachyon] Error reading from Parquet files in HDFS
Spark 1.0.2, Tachyon 0.4.1, Hadoop 1.0 (standard EC2 config) scala val gdeltT = sqlContext.parquetFile(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/) 14/08/21 19:07:14 INFO : initialize(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005, Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, hdfs-site.xml). Connecting to Tachyon: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : Trying to connect master @ /172.31.42.40:19998 14/08/21 19:07:14 INFO : User registered at the master ip-172-31-42-40.us-west-2.compute.internal/172.31.42.40:19998 got UserId 14 14/08/21 19:07:14 INFO : Trying to get local worker host : ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : No local worker on ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : Connecting remote worker @ ip-172-31-47-74/172.31.47.74:29998 14/08/21 19:07:14 INFO : tachyon://172.31.42.40:19998 tachyon://172.31.42.40:19998 hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005 TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : tachyon.client.TachyonFS@4b05b3ff hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 /gdelt-parquet/1979-2005 tachyon.PrefixList@636c50d3 14/08/21 19:07:14 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value. 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_SUCCESS 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/part-r-1.parquet 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005/_metadata TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : open(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata, 65536) 14/08/21 19:07:14 ERROR : The machine does not have any local worker. 14/08/21 19:07:14 ERROR : Reading from HDFS directly 14/08/21 19:07:14 ERROR : Reading from HDFS directly java.io.IOException: can not read class parquet.format.FileMetaData: null at parquet.format.Util.read(Util.java:50) at parquet.format.Util.readFileMetaData(Util.java:34) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:310) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:296) I'm not sure why this is saying that, as the Tachyon UI reports all 8 nodes being up? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Tachyon] Error reading from Parquet files in HDFS
The underFS is HDFS btw. On Thu, Aug 21, 2014 at 12:22 PM, Evan Chan velvia.git...@gmail.com wrote: Spark 1.0.2, Tachyon 0.4.1, Hadoop 1.0 (standard EC2 config) scala val gdeltT = sqlContext.parquetFile(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/) 14/08/21 19:07:14 INFO : initialize(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005, Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, hdfs-site.xml). Connecting to Tachyon: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : Trying to connect master @ /172.31.42.40:19998 14/08/21 19:07:14 INFO : User registered at the master ip-172-31-42-40.us-west-2.compute.internal/172.31.42.40:19998 got UserId 14 14/08/21 19:07:14 INFO : Trying to get local worker host : ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : No local worker on ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : Connecting remote worker @ ip-172-31-47-74/172.31.47.74:29998 14/08/21 19:07:14 INFO : tachyon://172.31.42.40:19998 tachyon://172.31.42.40:19998 hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005 TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : tachyon.client.TachyonFS@4b05b3ff hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 /gdelt-parquet/1979-2005 tachyon.PrefixList@636c50d3 14/08/21 19:07:14 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value. 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_SUCCESS 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/part-r-1.parquet 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005/_metadata TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : open(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata, 65536) 14/08/21 19:07:14 ERROR : The machine does not have any local worker. 14/08/21 19:07:14 ERROR : Reading from HDFS directly 14/08/21 19:07:14 ERROR : Reading from HDFS directly java.io.IOException: can not read class parquet.format.FileMetaData: null at parquet.format.Util.read(Util.java:50) at parquet.format.Util.readFileMetaData(Util.java:34) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:310) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:296) I'm not sure why this is saying that, as the Tachyon UI reports all 8 nodes being up? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Tachyon] Error reading from Parquet files in HDFS
And it worked earlier with non-parquet directory. On Thu, Aug 21, 2014 at 12:22 PM, Evan Chan velvia.git...@gmail.com wrote: The underFS is HDFS btw. On Thu, Aug 21, 2014 at 12:22 PM, Evan Chan velvia.git...@gmail.com wrote: Spark 1.0.2, Tachyon 0.4.1, Hadoop 1.0 (standard EC2 config) scala val gdeltT = sqlContext.parquetFile(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/) 14/08/21 19:07:14 INFO : initialize(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005, Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, hdfs-site.xml). Connecting to Tachyon: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : Trying to connect master @ /172.31.42.40:19998 14/08/21 19:07:14 INFO : User registered at the master ip-172-31-42-40.us-west-2.compute.internal/172.31.42.40:19998 got UserId 14 14/08/21 19:07:14 INFO : Trying to get local worker host : ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : No local worker on ip-172-31-42-40.us-west-2.compute.internal 14/08/21 19:07:14 INFO : Connecting remote worker @ ip-172-31-47-74/172.31.47.74:29998 14/08/21 19:07:14 INFO : tachyon://172.31.42.40:19998 tachyon://172.31.42.40:19998 hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005 TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005 14/08/21 19:07:14 INFO : tachyon.client.TachyonFS@4b05b3ff hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000 /gdelt-parquet/1979-2005 tachyon.PrefixList@636c50d3 14/08/21 19:07:14 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value. 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_SUCCESS 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : Get: /gdelt-parquet/1979-2005/part-r-1.parquet 14/08/21 19:07:14 INFO : getFileStatus(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata): HDFS Path: hdfs://ec2-54-213-113-173.us-west-2.compute.amazonaws.com:9000/gdelt-parquet/1979-2005/_metadata TPath: tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata 14/08/21 19:07:14 INFO : open(tachyon://172.31.42.40:19998/gdelt-parquet/1979-2005/_metadata, 65536) 14/08/21 19:07:14 ERROR : The machine does not have any local worker. 14/08/21 19:07:14 ERROR : Reading from HDFS directly 14/08/21 19:07:14 ERROR : Reading from HDFS directly java.io.IOException: can not read class parquet.format.FileMetaData: null at parquet.format.Util.read(Util.java:50) at parquet.format.Util.readFileMetaData(Util.java:34) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:310) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:296) I'm not sure why this is saying that, as the Tachyon UI reports all 8 nodes being up? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark-JobServer moving to a new location
Dear community, Wow, I remember when we first open sourced the job server, at the first Spark Summit in December. Since then, more and more of you have started using it and contributing to it. It is awesome to see! If you are not familiar with the spark job server, it is a REST API for managing your Spark jobs and job history and status. In order to make sure the project can continue to move forward independently, new features developed and contributions merged, we are moving the project to a new github organization. The new location is: https://github.com/spark-jobserver/spark-jobserver The git commit history is still there, but unfortunately the pull requests don't migrate over. I'll be contacting each of you with open PRs to move them over to the new location. Happy Hacking! Evan (@velvia) Kelvin (@kelvinchu) Daniel (@dan-null) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: type issue: found RDD[T] expected RDD[A]
That might not be enough. Reflection is used to determine what the fields are, thus your class might actually need to have members corresponding to the fields in the table. I heard that a more generic method of inputting stuff is coming. On Tue, Aug 19, 2014 at 6:43 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Tue, Aug 19, 2014 at 7:01 PM, Patrick McGloin mcgloin.patr...@gmail.com wrote: I think the type of the data contained in your RDD needs to be a known case class and not abstract for createSchemaRDD. This makes sense when you think it needs to know about the fields in the object to create the schema. Exactly this. The actual message pointing to that is: inferred type arguments [T] do not conform to method createSchemaRDD's type parameter bounds [A : Product] All case classes are automatically subclasses of Product, but otherwise you will have to extend Product and add the required methods yourself. Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org