Re: spark cluster performance decreases by adding more nodes
Maybe your master or zeppelin server is running out of memory and the more data it receives the more memory swapping it has to dosomething to check. Get Outlook for Android On Wed, May 17, 2017 at 11:14 AM -0400, "Junaid Nasir"wrote: I have a large data set of 1B records and want to run analytics using Apache spark because of the scaling it provides, but I am seeing an anti pattern here. The more nodes I add to spark cluster, completion time increases. Data store is Cassandra, and queries are run by Zeppelin. I have tried many different queries but even a simple query of `dataframe.count()` behaves like this. Here is the zeppelin notebook, temp table has 18M records val df = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "temp", "keyspace" -> "mykeyspace")) .load().cache() df.registerTempTable("table") %sql SELECT first(devid),date,count(1) FROM table group by date,rtu order by date when tested against different no. of spark worker nodes these were the resultsSpark nodesTime4 nodes22 min 58 sec3 nodes15 min 49 sec2 nodes12 min 51 sec1 node17 min 59 sec Increasing the no. of nodes decreases performance. which should not happen as it defeats the purpose of using Spark. If you want me to run any query or further info about the setup please ask.Any cues on why this is happening would be very helpful, been stuck on this for two days now. Thank you for your time. **versions** Zeppelin: 0.7.1Spark: 2.1.0Cassandra: 2.2.9Connector: datastax:spark-cassandra-connector:2.0.1-s_2.11 Spark cluster specs 6 vCPUs, 32 GB memory = 1 node Cassandra + Zeppelin server specs8 vCPUs, 52 GB memory
Re: Where is release 2.1.1?
Thanks. It looks like they posted the release just now because it wasn't showing before. Get Outlook for Android On Fri, May 5, 2017 at 11:04 AM -0400, "Jules Damji"wrote: Go to this link http://spark.apache.org/downloads.html CheersJules Sent from my iPhonePardon the dumb thumb typos :) On May 5, 2017, at 7:40 AM, dar...@ontrenet.com wrote: Hi Website says it is released. Where can it be downloaded? Thanks Get Outlook for Android
Where is release 2.1.1?
Hi Website says it is released. Where can it be downloaded? Thanks Get Outlook for Android
Re: Running Spark on EMR
So what was the answer? Sent from my Verizon, Samsung Galaxy smartphone Original message From: Andrew HolwayDate: 1/15/17 11:37 AM (GMT-05:00) To: Marco Mistroni Cc: Neil Jonkers , User Subject: Re: Running Spark on EMR Darn. I didn't respond to the list. Sorry. On Sun, Jan 15, 2017 at 5:29 PM, Marco Mistroni wrote: thanks Neil. I followed original suggestion from Andrw and everything is working fine nowkr On Sun, Jan 15, 2017 at 4:27 PM, Neil Jonkers wrote: Hello, Can you drop the url: spark://master:7077 The url is used when running Spark in standalone mode. Regards Original message From: Marco Mistroni Date:15/01/2017 16:34 (GMT+02:00) To: User Subject: Running Spark on EMR hi all could anyone assist here?i am trying to run spark 2.0.0 on an EMR cluster,but i am having issues connecting to the master nodeSo, below is a snippet of what i am doing sc = SparkSession.builder.master(sparkHost).appName("DataProcess").getOrCreate() sparkHost is passed as input parameter. that was thought so that i can run the script locallyon my spark local instance as well as submitting scripts on any cluster i want Now i have 1 - setup a cluster on EMR. 2 - connected to masternode3 - launch the command spark-submit myscripts.py spark://master:7077 But that results in an connection refused exceptionThen i have tried to remove the .master call above and launch the script with the following command spark-submit --master spark://master:7077 myscript.py but still i am gettingconnectionREfused exception I am using Spark 2.0.0 , could anyone advise on how shall i build the spark session and how can i submit a pythjon script to the cluster? kr marco -- Otter Networks UG http://otternetworks.de Gotenstraße 17 10829 Berlin
Spark in docker over EC2
Anyone got a good guide for getting spark master to talk to remote workers inside dockers? I followed the tips found by searching but doesn't work still. Spark 1.6.2. I exposed all the ports and tried to set local IP inside container to the host IP but spark complains it can't bind ui ports. Thanks in advance! Sent from my Verizon, Samsung Galaxy smartphone
Re: Dependency Injection and Microservice development with Spark
We've been able to use ipopo dependency injection framework in our pyspark system and deploy .egg pyspark apps that resolve and wire up all the components (like a kernel architecture. Also similar to spring) during an initial bootstrap sequence; then invoke those components across spark. Just replying for info since it's not identical to your request but in the same spirit. Darren Sent from my Verizon, Samsung Galaxy smartphone Original message From: Chetan Khatri <chetan.opensou...@gmail.com> Date: 1/4/17 6:34 AM (GMT-05:00) To: Lars Albertsson <la...@mapflat.com> Cc: user <user@spark.apache.org>, Spark Dev List <d...@spark.apache.org> Subject: Re: Dependency Injection and Microservice development with Spark Lars, Thank you, I want to use DI for configuring all the properties (wiring) for below architectural approach. Oracle -> Kafka Batch (Event Queuing) -> Spark Jobs( Incremental load from HBase -> Hive with Transformation) -> Spark Transformation -> PostgreSQL Thanks. On Thu, Dec 29, 2016 at 3:25 AM, Lars Albertsson <la...@mapflat.com> wrote: Do you really need dependency injection? DI is often used for testing purposes. Data processing jobs are easy to test without DI, however, due to their functional and synchronous nature. Hence, DI is often unnecessary for testing data processing jobs, whether they are batch or streaming jobs. Or do you want to use DI for other reasons? Lars Albertsson Data engineering consultant www.mapflat.com https://twitter.com/lalleal +46 70 7687109 Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com On Fri, Dec 23, 2016 at 11:56 AM, Chetan Khatri <chetan.opensou...@gmail.com> wrote: > Hello Community, > > Current approach I am using for Spark Job Development with Scala + SBT and > Uber Jar with yml properties file to pass configuration parameters. But If i > would like to use Dependency Injection and MicroService Development like > Spring Boot feature in Scala then what would be the standard approach. > > Thanks > > Chetan
Re: Scala Vs Python
I politely disagree. The jvm is one vm. Python has another. It's less about preference and more about where the skills as an industry is going for data analysis and BI etc. No cares about jvm vs. Pvm. They do care about time. So if the time to prototype is 10x faster (in calendar days) but the VM is slower in cpu cycles, the greater benefit decides what's best. The industry trend is clear now. And seemingly spark is moving in its own direction. In my opinion of course. Sent from my Verizon, Samsung Galaxy smartphone Original message From: Sivakumaran S <siva.kuma...@me.com> Date: 9/2/16 4:03 AM (GMT-05:00) To: Mich Talebzadeh <mich.talebza...@gmail.com> Cc: Jakob Odersky <ja...@odersky.com>, ayan guha <guha.a...@gmail.com>, Tal Grynbaum <tal.grynb...@gmail.com>, darren <dar...@ontrenet.com>, kant kodali <kanth...@gmail.com>, AssafMendelson <assaf.mendel...@rsa.com>, user <user@spark.apache.org> Subject: Re: Scala Vs Python Whatever benefits you may accrue from the rapid prototyping and coding in Python, it will be offset against the time taken to convert it to run inside the JVM. This of course depends on the complexity of the DAG. I guess it is a matter of language preference. Regards, Sivakumaran S On 02-Sep-2016, at 8:58 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: From an outsider point of view nobody likes change :) However, it appears to me that Scala is a rising star and if one learns it, it is another iron in the fire so to speak. I believe as we progress in time Spark is going to move away from Python. If you look at 2014 Databricks code examples, they were mostly in Python. Now they are mostly in Scala for a reason. HTH Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On 2 September 2016 at 08:23, Jakob Odersky <ja...@odersky.com> wrote: Forgot to answer your question about feature parity of Python w.r.t. Spark's different components I mostly work with scala so I can't say for sure but I think that all pre-2.0 features (that's basically everything except Structured Streaming) are on par. Structured Streaming is a pretty new feature and Python support is currently not available. The API is not final however and I reckon that Python support will arrive once it gets finalized, probably in the next version.
Re: Scala Vs Python
This topic is a concern for us as well. In the data science world no one uses native scala or java by choice. It's R and Python. And python is growing. Yet in spark, python is 3rd in line for feature support, if at all. This is why we have decoupled from spark in our project. It's really unfortunate spark team have invested so heavily in scale. As for speed it comes from horizontal scaling and throughout. When you can scale outward, individual VM performance is less an issue. Basic HPC principles. Sent from my Verizon, Samsung Galaxy smartphone Original message From: Mich TalebzadehDate: 9/1/16 6:01 PM (GMT-05:00) To: Jakob Odersky Cc: ayan guha , kant kodali , AssafMendelson , user Subject: Re: Scala Vs Python Hi Jacob. My understanding of Dataset is that it is basically an RDD with some optimization gone into it. RDD is meant to deal with unstructured data? Now DataFrame is the tabular format of RDD designed for tabular work, csv, SQL stuff etc. When you mention DataFrame is just an alias for Dataset[Row] does that mean that it converts an RDD to DataSet thus producing a tabular format? Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On 1 September 2016 at 22:49, Jakob Odersky wrote: > However, what really worries me is not having Dataset APIs at all in Python. > I think thats a deal breaker. What is the functionality you are missing? In Spark 2.0 a DataFrame is just an alias for Dataset[Row] ("type DataFrame = Dataset[Row]" in core/.../o/a/s/sql/package.scala). Since python is dynamically typed, you wouldn't really gain anything by using Datasets anyway. On Thu, Sep 1, 2016 at 2:20 PM, ayan guha wrote: Thanks All for your replies. Feature Parity: MLLib, RDD and dataframes features are totally comparable. Streaming is now at par in functionality too, I believe. However, what really worries me is not having Dataset APIs at all in Python. I think thats a deal breaker. Performance: I do get this bit when RDDs are involved, but not when Data frame is the only construct I am operating on. Dataframe supposed to be language-agnostic in terms of performance. So why people think python is slower? is it because of using UDF? Any other reason? Is there any kind of benchmarking/stats around Python UDF vs Scala UDF comparison? like the one out there b/w RDDs. @Kant: I am not comparing ANY applications. I am comparing SPARK applications only. I would be glad to hear your opinion on why pyspark applications will not work, if you have any benchmarks please share if possible. On Fri, Sep 2, 2016 at 12:57 AM, kant kodali wrote: c'mon man this is no Brainer..Dynamic Typed Languages for Large Code Bases or Large Scale Distributed Systems makes absolutely no sense. I can write a 10 page essay on why that wouldn't work so great. you might be wondering why would spark have it then? well probably because its ease of use for ML (that would be my best guess). On Wed, Aug 31, 2016 11:45 PM, AssafMendelson assaf.mendel...@rsa.com wrote: I believe this would greatly depend on your use case and your familiarity with the languages. In general, scala would have a much better performance than python and not all interfaces are available in python. That said, if you are planning to use dataframes without any UDF then the performance hit is practically nonexistent. Even if you need UDF, it is possible to write those in scala and wrap them for python and still get away without the performance hit. Python does not have interfaces for UDAFs. I believe that if you have large structured data and do not generally need UDF/UDAF you can certainly work in python without losing too much. From: ayan guha [mailto:[hidden email]] Sent: Thursday, September 01, 2016 5:03 AM To: user Subject: Scala Vs Python Hi Users Thought to ask (again and again) the question: While I am building any production application, should I use Scala or Python? I have read many if not most articles but all seems pre-Spark 2. Anything changed with Spark 2? Either pro-scala way or pro-python way? I am
RE: AMQP extension for Apache Spark Streaming (messaging/IoT)
This is fantastic news. Sent from my Verizon 4G LTE smartphone Original message From: Paolo PatiernoDate: 7/3/16 4:41 AM (GMT-05:00) To: user@spark.apache.org Subject: AMQP extension for Apache Spark Streaming (messaging/IoT) Hi all, I'm working on an AMQP extension for Apache Spark Streaming, developing a reliable receiver for that. After MQTT support (I see it in the Apache Bahir repository), another messaging/IoT protocol could be very useful for the Apache Spark Streaming ecosystem. Out there a lot of broker (with "store and forward" mechanism) support AMQP as first citizen protocol other than the Apache Qpid Dispatch Router that is based on that for message routing. Currently the source code is in my own GitHub account and it's in a early stage; the first step was just having something working end-to-end. I'm going to add feature like QoS and flow control in AMQP terms very soon. I was inspired by the spark-packages directories structure using Scala (as main language) and SBT (as build tool). https://github.com/ppatierno/dstream-amqp What do you think about that ? Looking forward to hear from you. Thanks, Paolo.
Re: Spark + Kafka processing trouble
Well that could be the problem. A SQL database is essential a big synchronizer. If you have a lot of spark tasks all bottlenecking on a single database socket (is the database clustered or colocated with spark workers?) then you will have blocked threads on the database server. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Malcolm Lockyer <malcolm.lock...@hapara.com> Date: 05/30/2016 10:40 PM (GMT-05:00) To: user@spark.apache.org Subject: Re: Spark + Kafka processing trouble On Tue, May 31, 2016 at 1:56 PM, Darren Govoni <dar...@ontrenet.com> wrote: > So you are calling a SQL query (to a single database) within a spark > operation distributed across your workers? Yes, but currently with very small sets of data (1-10,000) and on a single (dev) machine right now. (sorry didn't reply to the list) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark + Kafka processing trouble
So you are calling a SQL query (to a single database) within a spark operation distributed across your workers? Sent from my Verizon Wireless 4G LTE smartphone Original message From: Malcolm LockyerDate: 05/30/2016 9:45 PM (GMT-05:00) To: user@spark.apache.org Subject: Spark + Kafka processing trouble Hopefully this is not off topic for this list, but I am hoping to reach some people who have used Kafka + Spark before. We are new to Spark and are setting up our first production environment and hitting a speed issue that maybe configuration related - and we have little experience in configuring Spark environments. So we've got a Spark streaming job that seems to take an inordinate amount of time to process. I realize that without specifics, it is difficult to trace - however the most basic primitives in Spark are performing horribly. The lazy nature of Spark is making it difficult for me to understand what is happening - any suggestions are very much appreciated. Environment is MBP 2.2 i7. Spark master is "local[*]". We are using Kafka and PostgreSQL, both local. The job is designed to: a) grab some data from Kafka b) correlate with existing data in PostgreSQL c) output data to Kafka I am isolating timings by calling System.nanoTime() before and after something that forces calculation, for example .count() on a DataFrame. It seems like every operation has a MASSIVE fixed overhead and that is stacking up making each iteration on the RDD extremely slow. Slow operations include pulling a single item from the Kafka queue, running a simple query against PostgresSQL, and running a Spark aggregation on a RDD with a handful of rows. The machine is not maxing out on memory, disk or CPU. The machine seems to be doing nothing for a high percentage of the execution time. We have reproduced this behavior on two other machines. So we're suspecting a configuration issue As a concrete example, we have a DataFrame produced by running a JDBC query by mapping over an RDD from Kafka. Calling count() (I guess forcing execution) on this DataFrame when there is *1* item/row (Note: SQL database is EMPTY at this point so this is not a factor) takes 4.5 seconds, calling count when there are 10,000 items takes 7 seconds. Can anybody offer experience of something like this happening for them? Any suggestions on how to understand what is going wrong? I have tried tuning the number of Kafka partitions - increasing this seems to increase the concurrency and ultimately number of things processed per minute, but to get something half decent, I'm going to need running with 1024 or more partitions. Is 1024 partitions a reasonable number? What do you use in you environments? I've tried different options for batchDuration. The calculation seems to be batchDuration * Kafka partitions for number of items per iteration, but this is always still extremely slow (many per iteration vs. very few doesn't seem to really improve things). Can you suggest a list of the Spark configuration parameters related to speed that you think are key - preferably with the values you use for those parameters? I'd really really appreciate any help or suggestions as I've been working on this speed issue for 3 days without success and my head is starting to hurt. Thanks in advance. Thanks, -- Malcolm Lockyer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Submit python egg?
Hi I have a python egg with a __main__.py in it. I am able to execute the egg by itself fine. Is there a way to just submit the egg to spark and have it run? It seems an external .py script is needed which would be unfortunate if true. Thanks Sent from my Verizon Wireless 4G LTE smartphone
Re: Does pyspark still lag far behind the Scala API in terms of features
Our data is made up of single text documents scraped off the web. We store these in a RDD. A Dataframe or similar structure makes no sense at that point. And the RDD is transient. So my point is. Dataframes should not replace plain old rdd since rdds allow for more flexibility and sql etc is not even usable on our data while in rdd. So all those nice dataframe apis aren't usable until it's structured. Which is the core problem anyway. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Nicholas Chammas <nicholas.cham...@gmail.com> Date: 03/02/2016 5:43 PM (GMT-05:00) To: Darren Govoni <dar...@ontrenet.com>, Jules Damji <dmat...@comcast.net>, Joshua Sorrell <jsor...@gmail.com> Cc: user@spark.apache.org Subject: Re: Does pyspark still lag far behind the Scala API in terms of features Plenty of people get their data in Parquet, Avro, or ORC files; or from a database; or do their initial loading of un- or semi-structured data using one of the various data source libraries which help with type-/schema-inference. All of these paths help you get to a DataFrame very quickly. Nick On Wed, Mar 2, 2016 at 5:22 PM Darren Govoni <dar...@ontrenet.com> wrote: Dataframes are essentially structured tables with schemas. So where does the non typed data sit before it becomes structured if not in a traditional RDD? For us almost all the processing comes before there is structure to it. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Nicholas Chammas <nicholas.cham...@gmail.com> Date: 03/02/2016 5:13 PM (GMT-05:00) To: Jules Damji <dmat...@comcast.net>, Joshua Sorrell <jsor...@gmail.com> Cc: user@spark.apache.org Subject: Re: Does pyspark still lag far behind the Scala API in terms of features > However, I believe, investing (or having some members of your group) learn >and invest in Scala is worthwhile for few reasons. One, you will get the >performance gain, especially now with Tungsten (not sure how it relates to >Python, but some other knowledgeable people on the list, please chime in). The more your workload uses DataFrames, the less of a difference there will be between the languages (Scala, Java, Python, or R) in terms of performance. One of the main benefits of Catalyst (which DFs enable) is that it automatically optimizes DataFrame operations, letting you focus on _what_ you want while Spark will take care of figuring out _how_. Tungsten takes things further by tightly managing memory using the type information made available to it via DataFrames. This benefit comes into play regardless of the language used. So in short, DataFrames are the "new RDD"--i.e. the new base structure you should be using in your Spark programs wherever possible. And with DataFrames, what language you use matters much less in terms of performance. Nick On Tue, Mar 1, 2016 at 12:07 PM Jules Damji <dmat...@comcast.net> wrote: Hello Joshua, comments are inline... On Mar 1, 2016, at 5:03 AM, Joshua Sorrell <jsor...@gmail.com> wrote: I haven't used Spark in the last year and a half. I am about to start a project with a new team, and we need to decide whether to use pyspark or Scala. Indeed, good questions, and they do come up lot in trainings that I have attended, where this inevitable question is raised.I believe, it depends on your level of comfort zone or adventure into newer things. True, for the most part that Apache Spark committers have been committed to keep the APIs at parity across all the language offerings, even though in some cases, in particular Python, they have lagged by a minor release. To the the extent that they’re committed to level-parity is a good sign. It might to be the case with some experimental APIs, where they lag behind, but for the most part, they have been admirably consistent. With Python there’s a minor performance hit, since there’s an extra level of indirection in the architecture and an additional Python PID that the executors launch to execute your pickled Python lambdas. Other than that it boils down to your comfort zone. I recommend looking at Sameer’s slides on (Advanced Spark for DevOps Training) where he walks through the pySpark and Python architecture. We are NOT a java shop. So some of the build tools/procedures will require some learning overhead if we go the Scala route. What I want to know is: is the Scala version of Spark still far enough ahead of pyspark to be well worth any initial training overhead? If you are a very advanced Python shop and if you’ve in-house libraries that you have written in Python that don’t exist in Scala or some ML libs that don’t exist in the Scala version and will require fair amount of porting and gap is too large, then perhaps it makes sense to stay put with Python. However, I believe, investing (or having some members of your
Re: Does pyspark still lag far behind the Scala API in terms of features
Dataframes are essentially structured tables with schemas. So where does the non typed data sit before it becomes structured if not in a traditional RDD? For us almost all the processing comes before there is structure to it. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Nicholas ChammasDate: 03/02/2016 5:13 PM (GMT-05:00) To: Jules Damji , Joshua Sorrell Cc: user@spark.apache.org Subject: Re: Does pyspark still lag far behind the Scala API in terms of features > However, I believe, investing (or having some members of your group) learn >and invest in Scala is worthwhile for few reasons. One, you will get the >performance gain, especially now with Tungsten (not sure how it relates to >Python, but some other knowledgeable people on the list, please chime in). The more your workload uses DataFrames, the less of a difference there will be between the languages (Scala, Java, Python, or R) in terms of performance. One of the main benefits of Catalyst (which DFs enable) is that it automatically optimizes DataFrame operations, letting you focus on _what_ you want while Spark will take care of figuring out _how_. Tungsten takes things further by tightly managing memory using the type information made available to it via DataFrames. This benefit comes into play regardless of the language used. So in short, DataFrames are the "new RDD"--i.e. the new base structure you should be using in your Spark programs wherever possible. And with DataFrames, what language you use matters much less in terms of performance. Nick On Tue, Mar 1, 2016 at 12:07 PM Jules Damji wrote: Hello Joshua, comments are inline... On Mar 1, 2016, at 5:03 AM, Joshua Sorrell wrote: I haven't used Spark in the last year and a half. I am about to start a project with a new team, and we need to decide whether to use pyspark or Scala. Indeed, good questions, and they do come up lot in trainings that I have attended, where this inevitable question is raised.I believe, it depends on your level of comfort zone or adventure into newer things. True, for the most part that Apache Spark committers have been committed to keep the APIs at parity across all the language offerings, even though in some cases, in particular Python, they have lagged by a minor release. To the the extent that they’re committed to level-parity is a good sign. It might to be the case with some experimental APIs, where they lag behind, but for the most part, they have been admirably consistent. With Python there’s a minor performance hit, since there’s an extra level of indirection in the architecture and an additional Python PID that the executors launch to execute your pickled Python lambdas. Other than that it boils down to your comfort zone. I recommend looking at Sameer’s slides on (Advanced Spark for DevOps Training) where he walks through the pySpark and Python architecture. We are NOT a java shop. So some of the build tools/procedures will require some learning overhead if we go the Scala route. What I want to know is: is the Scala version of Spark still far enough ahead of pyspark to be well worth any initial training overhead? If you are a very advanced Python shop and if you’ve in-house libraries that you have written in Python that don’t exist in Scala or some ML libs that don’t exist in the Scala version and will require fair amount of porting and gap is too large, then perhaps it makes sense to stay put with Python. However, I believe, investing (or having some members of your group) learn and invest in Scala is worthwhile for few reasons. One, you will get the performance gain, especially now with Tungsten (not sure how it relates to Python, but some other knowledgeable people on the list, please chime in). Two, since Spark is written in Scala, it gives you an enormous advantage to read sources (which are well documented and highly readable) should you have to consult or learn nuances of certain API method or action not covered comprehensively in the docs. And finally, there’s a long term benefit in learning Scala for reasons other than Spark. For example, writing other scalable and distributed applications. Particularly, we will be using Spark Streaming. I know a couple of years ago that practically forced the decision to use Scala. Is this still the case? You’ll notice that certain APIs call are not available, at least for now, in Python. http://spark.apache.org/docs/latest/streaming-programming-guide.html CheersJules -- The Best Ideas Are Simple Jules S. Damji e-mail:dmat...@comcast.net e-mail:jules.da...@gmail.com
RE: How could I do this algorithm in Spark?
This might be hard to do. One generalization of this problem is https://en.m.wikipedia.org/wiki/Longest_path_problem Given a node (e.g. A), find longest path. All interior relations are transitive and can be inferred. But finding a distributed spark way of doing it in P time would be interesting. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Guillermo OrtizDate: 02/24/2016 5:26 PM (GMT-05:00) To: user Subject: How could I do this algorithm in Spark? I want to do some algorithm in Spark.. I know how to do it in a single machine where all data are together, but I don't know a good way to do it in Spark. If someone has an idea..I have some data like thisa , bx , yb , cy , yc , d I want something like:a , db , dc , dx , yy , y I need to know that a->b->c->d, so a->d, b->d and c->d.I don't want the code, just an idea how I could deal with it. Any idea?
RE: Unusually large deserialisation time
I meant to write 'last task in stage'. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Darren Govoni <dar...@ontrenet.com> Date: 02/16/2016 6:55 AM (GMT-05:00) To: Abhishek Modi <abshkm...@gmail.com>, user@spark.apache.org Subject: RE: Unusually large deserialisation time I think this is part of the bigger issue of serious deadlock conditions occurring in spark many of us have posted on. Would the task in question be the past task of a stage by chance? Sent from my Verizon Wireless 4G LTE smartphone Original message From: Abhishek Modi <abshkm...@gmail.com> Date: 02/16/2016 4:12 AM (GMT-05:00) To: user@spark.apache.org Subject: Unusually large deserialisation time I'm doing a mapPartitions on a rdd cached in memory followed by a reduce. Here is my code snippet // myRdd is an rdd consisting of Tuple2[Int,Long] myRdd.mapPartitions(rangify).reduce( (x,y) => (x._1+y._1,x._2 ++ y._2)) //The rangify function def rangify(l: Iterator[ Tuple2[Int,Long] ]) : Iterator[ Tuple2[Long, List [ ArrayBuffer[ Tuple2[Long,Long] ] ] ] ]= { var sum=0L val mylist=ArrayBuffer[ Tuple2[Long,Long] ]() if(l.isEmpty) return List( (0L,List [ ArrayBuffer[ Tuple2[Long,Long] ] ] ())).toIterator var prev= -1000L var begin= -1000L for (x <- l){ sum+=x._1 if(prev<0){ prev=x._2 begin=x._2 } else if(x._2==prev+1) prev=x._2 else { list+=((begin,prev)) prev=x._2 begin=x._2 } } mylist+= ((begin,prev)) List((sum, List(mylist) ) ).toIterator } The rdd is cached in memory. I'm using 20 executors with 1 core for each executor. The cached rdd has 60 blocks. The problem is for every 2-3 runs of the job, there is a task which has an abnormally large deserialisation time. Screenshot attached Thank you,Abhishek
RE: Unusually large deserialisation time
I think this is part of the bigger issue of serious deadlock conditions occurring in spark many of us have posted on. Would the task in question be the past task of a stage by chance? Sent from my Verizon Wireless 4G LTE smartphone Original message From: Abhishek ModiDate: 02/16/2016 4:12 AM (GMT-05:00) To: user@spark.apache.org Subject: Unusually large deserialisation time I'm doing a mapPartitions on a rdd cached in memory followed by a reduce. Here is my code snippet // myRdd is an rdd consisting of Tuple2[Int,Long] myRdd.mapPartitions(rangify).reduce( (x,y) => (x._1+y._1,x._2 ++ y._2)) //The rangify function def rangify(l: Iterator[ Tuple2[Int,Long] ]) : Iterator[ Tuple2[Long, List [ ArrayBuffer[ Tuple2[Long,Long] ] ] ] ]= { var sum=0L val mylist=ArrayBuffer[ Tuple2[Long,Long] ]() if(l.isEmpty) return List( (0L,List [ ArrayBuffer[ Tuple2[Long,Long] ] ] ())).toIterator var prev= -1000L var begin= -1000L for (x <- l){ sum+=x._1 if(prev<0){ prev=x._2 begin=x._2 } else if(x._2==prev+1) prev=x._2 else { list+=((begin,prev)) prev=x._2 begin=x._2 } } mylist+= ((begin,prev)) List((sum, List(mylist) ) ).toIterator } The rdd is cached in memory. I'm using 20 executors with 1 core for each executor. The cached rdd has 60 blocks. The problem is for every 2-3 runs of the job, there is a task which has an abnormally large deserialisation time. Screenshot attached Thank you,Abhishek
Re: Launching EC2 instances with Spark compiled for Scala 2.11
Why not deploy it. Then build a custom distribution with Scala 2.11 and just overlay it. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Nuno SantosDate: 01/25/2016 7:38 AM (GMT-05:00) To: user@spark.apache.org Subject: Re: Launching EC2 instances with Spark compiled for Scala 2.11 Hello, Any updates on this question? I'm also very interested in a solution, as I'm trying to use Spark on EC2 but need Scala 2.11 support. The scripts in the ec2 directory of the Spark distribution install use Scala 2.10 by default and I can't see any obvious option to change to Scala 2.11. Regards, Nuno -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Launching-EC2-instances-with-Spark-compiled-for-Scala-2-11-tp24979p26059.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 10hrs of Scheduler Delay
Yeah. I have screenshots and stack traces. I will post them to the ticket. Nothing informative. I should also mention I'm using pyspark but I think the deadlock is inside the Java scheduler code. Sent from my Verizon Wireless 4G LTE smartphone Original message From: "Sanders, Isaac B" <sande...@rose-hulman.edu> Date: 01/25/2016 8:59 AM (GMT-05:00) To: Ted Yu <yuzhih...@gmail.com> Cc: Darren Govoni <dar...@ontrenet.com>, Renu Yadav <yren...@gmail.com>, Muthu Jayakumar <bablo...@gmail.com>, user@spark.apache.org Subject: Re: 10hrs of Scheduler Delay Is the thread dump the stack trace you are talking about? If so, I will see if I can capture the few different stages I have seen it in. Thanks for the help, I was able to do it for 0.1% of my data. I will create the JIRA. Thanks, Isaac On Jan 25, 2016, at 8:51 AM, Ted Yu <yuzhih...@gmail.com> wrote: Opening a JIRA is fine. See if you can capture stack trace during the hung stage and attach to JIRA so that we have more clue. Thanks On Jan 25, 2016, at 4:25 AM, Darren Govoni <dar...@ontrenet.com> wrote: Probably we should open a ticket for this. There's definitely a deadlock situation occurring in spark under certain conditions. The only clue I have is it always happens on the last stage. And it does seem sensitive to scale. If my job has 300mb of data I'll see the deadlock. But if I only run 10mb of it it will succeed. This suggest a serious fundamental scaling problem. Workers have plenty of resources. Sent from my Verizon Wireless 4G LTE smartphone Original message From: "Sanders, Isaac B" <sande...@rose-hulman.edu> Date: 01/24/2016 2:54 PM (GMT-05:00) To: Renu Yadav <yren...@gmail.com> Cc: Darren Govoni <dar...@ontrenet.com>, Muthu Jayakumar <bablo...@gmail.com>, Ted Yu <yuzhih...@gmail.com>, user@spark.apache.org Subject: Re: 10hrs of Scheduler Delay I am not getting anywhere with any of the suggestions so far. :( Trying some more outlets, I will share any solution I find. - Isaac On Jan 23, 2016, at 1:48 AM, Renu Yadav <yren...@gmail.com> wrote: If you turn on spark.speculation on then that might help. it worked for me On Sat, Jan 23, 2016 at 3:21 AM, Darren Govoni <dar...@ontrenet.com> wrote: Thanks for the tip. I will try it. But this is the kind of thing spark is supposed to figure out and handle. Or at least not get stuck forever. Sent from my Verizon Wireless 4G LTE smartphone Original message ---- From: Muthu Jayakumar <bablo...@gmail.com> Date: 01/22/2016 3:50 PM (GMT-05:00) To: Darren Govoni <dar...@ontrenet.com>, "Sanders, Isaac B" <sande...@rose-hulman.edu>, Ted Yu <yuzhih...@gmail.com> Cc: user@spark.apache.org Subject: Re: 10hrs of Scheduler Delay Does increasing the number of partition helps? You could try out something 3 times what you currently have. Another trick i used was to partition the problem into multiple dataframes and run them sequentially and persistent the result and then run a union on the results. Hope this helps. On Fri, Jan 22, 2016, 3:48 AM Darren Govoni <dar...@ontrenet.com> wrote: Me too. I had to shrink my dataset to get it to work. For us at least Spark seems to have scaling issues. Sent from my Verizon Wireless 4G LTE smartphone Original message From: "Sanders, Isaac B" <sande...@rose-hulman.edu> Date: 01/21/2016 11:18 PM (GMT-05:00) To: Ted Yu <yuzhih...@gmail.com> Cc: user@spark.apache.org Subject: Re: 10hrs of Scheduler Delay I have run the driver on a smaller dataset (k=2, n=5000) and it worked quickly and didn’t hang like this. This dataset is closer to k=10, n=4.4m, but I am using more resources on this one. - Isaac On Jan 21, 2016, at 11:06 PM, Ted Yu <yuzhih...@gmail.com> wrote: You may have seen the following on github page: Latest commit 50fdf0e on Feb 22, 2015 That was 11 months ago. Can you search for similar algorithm which runs on Spark and is newer ? If nothing found, consider running the tests coming from the project to determine whether the delay is intrinsic. Cheers On Thu, Jan 21, 2016 at 7:46 PM, Sanders, Isaac B <sande...@rose-hulman.edu> wrote: That thread seems to be moving, it oscillates between a few different traces… Maybe it is working. It seems odd that it would take that long. This is 3rd party code, and after looking at some of it, I think it might not be as Spark-y as it could be. I linked it below. I don’t know a lot about spark, so it might be fine, but I have my suspicions. https://github.com/alitouka/spark_dbscan/blob/master/src/src/main/scala/org/alitouka/spark/dbscan/exploratoryAnalysis/Distance
Re: 10hrs of Scheduler Delay
Probably we should open a ticket for this.There's definitely a deadlock situation occurring in spark under certain conditions. The only clue I have is it always happens on the last stage. And it does seem sensitive to scale. If my job has 300mb of data I'll see the deadlock. But if I only run 10mb of it it will succeed. This suggest a serious fundamental scaling problem. Workers have plenty of resources. Sent from my Verizon Wireless 4G LTE smartphone Original message From: "Sanders, Isaac B" <sande...@rose-hulman.edu> Date: 01/24/2016 2:54 PM (GMT-05:00) To: Renu Yadav <yren...@gmail.com> Cc: Darren Govoni <dar...@ontrenet.com>, Muthu Jayakumar <bablo...@gmail.com>, Ted Yu <yuzhih...@gmail.com>, user@spark.apache.org Subject: Re: 10hrs of Scheduler Delay I am not getting anywhere with any of the suggestions so far. :( Trying some more outlets, I will share any solution I find. - Isaac On Jan 23, 2016, at 1:48 AM, Renu Yadav <yren...@gmail.com> wrote: If you turn on spark.speculation on then that might help. it worked for me On Sat, Jan 23, 2016 at 3:21 AM, Darren Govoni <dar...@ontrenet.com> wrote: Thanks for the tip. I will try it. But this is the kind of thing spark is supposed to figure out and handle. Or at least not get stuck forever. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Muthu Jayakumar <bablo...@gmail.com> Date: 01/22/2016 3:50 PM (GMT-05:00) To: Darren Govoni <dar...@ontrenet.com>, "Sanders, Isaac B" <sande...@rose-hulman.edu>, Ted Yu <yuzhih...@gmail.com> Cc: user@spark.apache.org Subject: Re: 10hrs of Scheduler Delay Does increasing the number of partition helps? You could try out something 3 times what you currently have. Another trick i used was to partition the problem into multiple dataframes and run them sequentially and persistent the result and then run a union on the results. Hope this helps. On Fri, Jan 22, 2016, 3:48 AM Darren Govoni <dar...@ontrenet.com> wrote: Me too. I had to shrink my dataset to get it to work. For us at least Spark seems to have scaling issues. Sent from my Verizon Wireless 4G LTE smartphone Original message From: "Sanders, Isaac B" <sande...@rose-hulman.edu> Date: 01/21/2016 11:18 PM (GMT-05:00) To: Ted Yu <yuzhih...@gmail.com> Cc: user@spark.apache.org Subject: Re: 10hrs of Scheduler Delay I have run the driver on a smaller dataset (k=2, n=5000) and it worked quickly and didn’t hang like this. This dataset is closer to k=10, n=4.4m, but I am using more resources on this one. - Isaac On Jan 21, 2016, at 11:06 PM, Ted Yu <yuzhih...@gmail.com> wrote: You may have seen the following on github page: Latest commit 50fdf0e on Feb 22, 2015 That was 11 months ago. Can you search for similar algorithm which runs on Spark and is newer ? If nothing found, consider running the tests coming from the project to determine whether the delay is intrinsic. Cheers On Thu, Jan 21, 2016 at 7:46 PM, Sanders, Isaac B <sande...@rose-hulman.edu> wrote: That thread seems to be moving, it oscillates between a few different traces… Maybe it is working. It seems odd that it would take that long. This is 3rd party code, and after looking at some of it, I think it might not be as Spark-y as it could be. I linked it below. I don’t know a lot about spark, so it might be fine, but I have my suspicions. https://github.com/alitouka/spark_dbscan/blob/master/src/src/main/scala/org/alitouka/spark/dbscan/exploratoryAnalysis/DistanceToNearestNeighborDriver.scala - Isaac On Jan 21, 2016, at 10:08 PM, Ted Yu <yuzhih...@gmail.com> wrote: You may have noticed the following - did this indicate prolonged computation in your code ?
Re: 10hrs of Scheduler Delay
Thanks for the tip. I will try it. But this is the kind of thing spark is supposed to figure out and handle. Or at least not get stuck forever. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Muthu Jayakumar <bablo...@gmail.com> Date: 01/22/2016 3:50 PM (GMT-05:00) To: Darren Govoni <dar...@ontrenet.com>, "Sanders, Isaac B" <sande...@rose-hulman.edu>, Ted Yu <yuzhih...@gmail.com> Cc: user@spark.apache.org Subject: Re: 10hrs of Scheduler Delay Does increasing the number of partition helps? You could try out something 3 times what you currently have. Another trick i used was to partition the problem into multiple dataframes and run them sequentially and persistent the result and then run a union on the results. Hope this helps. On Fri, Jan 22, 2016, 3:48 AM Darren Govoni <dar...@ontrenet.com> wrote: Me too. I had to shrink my dataset to get it to work. For us at least Spark seems to have scaling issues. Sent from my Verizon Wireless 4G LTE smartphone Original message From: "Sanders, Isaac B" <sande...@rose-hulman.edu> Date: 01/21/2016 11:18 PM (GMT-05:00) To: Ted Yu <yuzhih...@gmail.com> Cc: user@spark.apache.org Subject: Re: 10hrs of Scheduler Delay I have run the driver on a smaller dataset (k=2, n=5000) and it worked quickly and didn’t hang like this. This dataset is closer to k=10, n=4.4m, but I am using more resources on this one. - Isaac On Jan 21, 2016, at 11:06 PM, Ted Yu <yuzhih...@gmail.com> wrote: You may have seen the following on github page: Latest commit 50fdf0e on Feb 22, 2015 That was 11 months ago. Can you search for similar algorithm which runs on Spark and is newer ? If nothing found, consider running the tests coming from the project to determine whether the delay is intrinsic. Cheers On Thu, Jan 21, 2016 at 7:46 PM, Sanders, Isaac B <sande...@rose-hulman.edu> wrote: That thread seems to be moving, it oscillates between a few different traces… Maybe it is working. It seems odd that it would take that long. This is 3rd party code, and after looking at some of it, I think it might not be as Spark-y as it could be. I linked it below. I don’t know a lot about spark, so it might be fine, but I have my suspicions. https://github.com/alitouka/spark_dbscan/blob/master/src/src/main/scala/org/alitouka/spark/dbscan/exploratoryAnalysis/DistanceToNearestNeighborDriver.scala - Isaac On Jan 21, 2016, at 10:08 PM, Ted Yu <yuzhih...@gmail.com> wrote: You may have noticed the following - did this indicate prolonged computation in your code ? org.apache.commons.math3.util.MathArrays.distance(MathArrays.java:205) org.apache.commons.math3.ml.distance.EuclideanDistance.compute(EuclideanDistance.java:34) org.alitouka.spark.dbscan.spatial.DistanceCalculation$class.calculateDistance(DistanceCalculation.scala:15) org.alitouka.spark.dbscan.exploratoryAnalysis.DistanceToNearestNeighborDriver$.calculateDistance(DistanceToNearestNeighborDriver.scala:16) On Thu, Jan 21, 2016 at 5:13 PM, Sanders, Isaac B <sande...@rose-hulman.edu> wrote: Hadoop is: HDP 2.3.2.0-2950 Here is a gist (pastebin) of my versions en masse and a stacktrace: https://gist.github.com/isaacsanders/2e59131758469097651b Thanks On Jan 21, 2016, at 7:44 PM, Ted Yu <yuzhih...@gmail.com> wrote: Looks like you were running on YARN. What hadoop version are you using ? Can you capture a few stack traces of the AppMaster during the delay and pastebin them ? Thanks On Thu, Jan 21, 2016 at 8:08 AM, Sanders, Isaac B <sande...@rose-hulman.edu> wrote: The Spark Version is 1.4.1 The logs are full of standard fair, nothing like an exception or even interesting [INFO] lines. Here is the script I am using: https://gist.github.com/isaacsanders/660f480810fbc07d4df2 Thanks Isaac On Jan 21, 2016, at 11:03 AM, Ted Yu <yuzhih...@gmail.com> wrote: Can you provide a bit more information ? command line for submitting Spark job version of Spark anything interesting from driver / executor logs ? Thanks On Thu, Jan 21, 2016 at 7:35 AM, Sanders, Isaac B <sande...@rose-hulman.edu> wrote: Hey all, I am a CS student in the United States working on my senior thesis. My thesis uses Spark, and I am encountering some trouble. I am using https://github.com/alitouka/spark_dbscan, and to determine parameters, I am using the utility class they supply, org.alitouka.spark.dbscan.exploratoryAnalysis.DistanceToNearestNeighborDriver. I am on a 10 node cluster with one machine with 8 cores and 32G of memory and nine machines with 6 cores and 16G of memory. I have 442M of data, which seems like it would be a joke, but the job stalls at the last stage. It was stuck in Scheduler Delay for 10 hours overnight, and I have tried
Re: 10hrs of Scheduler Delay
Me too. I had to shrink my dataset to get it to work. For us at least Spark seems to have scaling issues. Sent from my Verizon Wireless 4G LTE smartphone Original message From: "Sanders, Isaac B"Date: 01/21/2016 11:18 PM (GMT-05:00) To: Ted Yu Cc: user@spark.apache.org Subject: Re: 10hrs of Scheduler Delay I have run the driver on a smaller dataset (k=2, n=5000) and it worked quickly and didn’t hang like this. This dataset is closer to k=10, n=4.4m, but I am using more resources on this one. - Isaac On Jan 21, 2016, at 11:06 PM, Ted Yu wrote: You may have seen the following on github page: Latest commit 50fdf0e on Feb 22, 2015 That was 11 months ago. Can you search for similar algorithm which runs on Spark and is newer ? If nothing found, consider running the tests coming from the project to determine whether the delay is intrinsic. Cheers On Thu, Jan 21, 2016 at 7:46 PM, Sanders, Isaac B wrote: That thread seems to be moving, it oscillates between a few different traces… Maybe it is working. It seems odd that it would take that long. This is 3rd party code, and after looking at some of it, I think it might not be as Spark-y as it could be. I linked it below. I don’t know a lot about spark, so it might be fine, but I have my suspicions. https://github.com/alitouka/spark_dbscan/blob/master/src/src/main/scala/org/alitouka/spark/dbscan/exploratoryAnalysis/DistanceToNearestNeighborDriver.scala - Isaac On Jan 21, 2016, at 10:08 PM, Ted Yu wrote: You may have noticed the following - did this indicate prolonged computation in your code ? org.apache.commons.math3.util.MathArrays.distance(MathArrays.java:205) org.apache.commons.math3.ml.distance.EuclideanDistance.compute(EuclideanDistance.java:34) org.alitouka.spark.dbscan.spatial.DistanceCalculation$class.calculateDistance(DistanceCalculation.scala:15) org.alitouka.spark.dbscan.exploratoryAnalysis.DistanceToNearestNeighborDriver$.calculateDistance(DistanceToNearestNeighborDriver.scala:16) On Thu, Jan 21, 2016 at 5:13 PM, Sanders, Isaac B wrote: Hadoop is: HDP 2.3.2.0-2950 Here is a gist (pastebin) of my versions en masse and a stacktrace: https://gist.github.com/isaacsanders/2e59131758469097651b Thanks On Jan 21, 2016, at 7:44 PM, Ted Yu wrote: Looks like you were running on YARN. What hadoop version are you using ? Can you capture a few stack traces of the AppMaster during the delay and pastebin them ? Thanks On Thu, Jan 21, 2016 at 8:08 AM, Sanders, Isaac B wrote: The Spark Version is 1.4.1 The logs are full of standard fair, nothing like an exception or even interesting [INFO] lines. Here is the script I am using: https://gist.github.com/isaacsanders/660f480810fbc07d4df2 Thanks Isaac On Jan 21, 2016, at 11:03 AM, Ted Yu wrote: Can you provide a bit more information ? command line for submitting Spark job version of Spark anything interesting from driver / executor logs ? Thanks On Thu, Jan 21, 2016 at 7:35 AM, Sanders, Isaac B wrote: Hey all, I am a CS student in the United States working on my senior thesis. My thesis uses Spark, and I am encountering some trouble. I am using https://github.com/alitouka/spark_dbscan, and to determine parameters, I am using the utility class they supply, org.alitouka.spark.dbscan.exploratoryAnalysis.DistanceToNearestNeighborDriver. I am on a 10 node cluster with one machine with 8 cores and 32G of memory and nine machines with 6 cores and 16G of memory. I have 442M of data, which seems like it would be a joke, but the job stalls at the last stage. It was stuck in Scheduler Delay for 10 hours overnight, and I have tried a number of things for the last couple days, but nothing seems to be helping. I have tried: - Increasing heap sizes and numbers of cores - More/less executors with different amounts of resources. - Kyro Serialization - FAIR Scheduling It doesn’t seem like it should require this much. Any ideas? - Isaac
Re: 10hrs of Scheduler Delay
I've experienced this same problem. Always the last stage hangs. Indeterminant. No errors in logs. I run spark 1.5.2. Can't find an explanation. But it's definitely a showstopper. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Ted YuDate: 01/21/2016 7:44 PM (GMT-05:00) To: "Sanders, Isaac B" Cc: user@spark.apache.org Subject: Re: 10hrs of Scheduler Delay Looks like you were running on YARN. What hadoop version are you using ? Can you capture a few stack traces of the AppMaster during the delay and pastebin them ? Thanks On Thu, Jan 21, 2016 at 8:08 AM, Sanders, Isaac B wrote: The Spark Version is 1.4.1 The logs are full of standard fair, nothing like an exception or even interesting [INFO] lines. Here is the script I am using: https://gist.github.com/isaacsanders/660f480810fbc07d4df2 Thanks Isaac On Jan 21, 2016, at 11:03 AM, Ted Yu wrote: Can you provide a bit more information ? command line for submitting Spark job version of Spark anything interesting from driver / executor logs ? Thanks On Thu, Jan 21, 2016 at 7:35 AM, Sanders, Isaac B wrote: Hey all, I am a CS student in the United States working on my senior thesis. My thesis uses Spark, and I am encountering some trouble. I am using https://github.com/alitouka/spark_dbscan, and to determine parameters, I am using the utility class they supply, org.alitouka.spark.dbscan.exploratoryAnalysis.DistanceToNearestNeighborDriver. I am on a 10 node cluster with one machine with 8 cores and 32G of memory and nine machines with 6 cores and 16G of memory. I have 442M of data, which seems like it would be a joke, but the job stalls at the last stage. It was stuck in Scheduler Delay for 10 hours overnight, and I have tried a number of things for the last couple days, but nothing seems to be helping. I have tried: - Increasing heap sizes and numbers of cores - More/less executors with different amounts of resources. - Kyro Serialization - FAIR Scheduling It doesn’t seem like it should require this much. Any ideas? - Isaac
Re: Docker/Mesos with Spark
I also would be interested in some best practice for making this work. Where will the writeup be posted? On mesosphere website? Sent from my Verizon Wireless 4G LTE smartphone Original message From: Sathish Kumaran VairaveluDate: 01/19/2016 7:00 PM (GMT-05:00) To: Tim Chen Cc: John Omernik , user Subject: Re: Docker/Mesos with Spark Thank you! Looking forward for it.. On Tue, Jan 19, 2016 at 4:03 PM Tim Chen wrote: Hi Sathish, Sorry about that, I think that's a good idea and I'll write up a section in the Spark documentation page to explain how it can work. We (Mesosphere) have been doing this for our DCOS spark for our past releases and has been working well so far. Thanks! Tim On Tue, Jan 19, 2016 at 12:28 PM, Sathish Kumaran Vairavelu wrote: Hi Tim Do you have any materials/blog for running Spark in a container in Mesos cluster environment? I have googled it but couldn't find info on it. Spark documentation says it is possible, but no details provided.. Please help Thanks Sathish On Mon, Sep 21, 2015 at 11:54 AM Tim Chen wrote: Hi John, There is no other blog post yet, I'm thinking to do a series of posts but so far haven't get time to do that yet. Running Spark in docker containers makes distributing spark versions easy, it's simple to upgrade and automatically caches on the slaves so the same image just runs right away. Most of the docker perf is usually related to network and filesystem overheads, but I think with recent changes in Spark to make Mesos sandbox the default temp dir filesystem won't be a big concern as it's mostly writing to the mounted in Mesos sandbox. Also Mesos uses host network by default so network is affected much. Most of the cluster mode limitation is that you need to make the spark job files available somewhere that all the slaves can access remotely (http, s3, hdfs, etc) or available on all slaves locally by path. I'll try to make more doc efforts once I get my existing patches and testing infra work done. Let me know if you have more questions, Tim On Sat, Sep 19, 2015 at 5:42 AM, John Omernik wrote: I was searching in the 1.5.0 docs on the Docker on Mesos capabilities and just found you CAN run it this way. Are there any user posts, blog posts, etc on why and how you'd do this? Basically, at first I was questioning why you'd run spark in a docker container, i.e., if you run with tar balled executor, what are you really gaining? And in this setup, are you losing out on performance somehow? (I am guessing smarter people than I have figured that out). Then I came along a situation where I wanted to use a python library with spark, and it had to be installed on every node, and I realized one big advantage of dockerized spark would be that spark apps that needed other libraries could be contained and built well. OK, that's huge, let's do that. For my next question there are lot of "questions" have on how this actually works. Does Clustermode/client mode apply here? If so, how? Is there a good walk through on getting this setup? Limitations? Gotchas? Should I just dive in an start working with it? Has anyone done any stories/rough documentation? This seems like a really helpful feature to scaling out spark, and letting developers truly build what they need without tons of admin overhead, so I really want to explore. Thanks! John
Re: rdd.foreach return value
What's the rationale behind that? It certainly limits the kind of flow logic we can do in one statement. Sent from my Verizon Wireless 4G LTE smartphone Original message From: David RussellDate: 01/18/2016 10:44 PM (GMT-05:00) To: charles li Cc: user@spark.apache.org Subject: Re: rdd.foreach return value The foreach operation on RDD has a void (Unit) return type. See attached. So there is no return value to the driver. David "All that is gold does not glitter, Not all those who wander are lost." Original Message Subject: rdd.foreach return value Local Time: January 18 2016 10:34 pm UTC Time: January 19 2016 3:34 am From: charles.up...@gmail.com To: user@spark.apache.org code snippet the 'print' actually print info on the worker node, but I feel confused where the 'return' value goes to. for I get nothing on the driver node. -- -- a spark lover, a quant, a developer and a good man. http://github.com/litaotao
Task hang problem
Hi, I've had this nagging problem where a task will hang and the entire job hangs. Using pyspark. Spark 1.5.1 The job output looks like this, and hangs after the last task: .. 15/12/29 17:00:38 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.65.143.174:34385 (size: 5.8 KB, free: 2.1 GB) 15/12/29 17:00:39 INFO TaskSetManager: Finished task 15.0 in stage 0.0 (TID 15) in 11668 ms on 10.65.143.174 (29/32) 15/12/29 17:00:39 INFO TaskSetManager: Finished task 23.0 in stage 0.0 (TID 23) in 11684 ms on 10.65.143.174 (30/32) 15/12/29 17:00:39 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 11717 ms on 10.65.143.174 (31/32) {nothing here for a while, ~6mins} Here is the executor status, from UI. 31 31 0 RUNNING PROCESS_LOCAL 2 / 10.65.143.174 2015/12/29 17:00:28 6.8 min 0 ms 0 ms 60 ms 0 ms 0 ms 0.0 B Here is executor 2 from 10.65.143.174. Never see task 31 get to the executor.any ideas? . 15/12/29 17:00:38 INFO TorrentBroadcast: Started reading broadcast variable 0 15/12/29 17:00:38 INFO MemoryStore: ensureFreeSpace(5979) called with curMem=0, maxMem=2223023063 15/12/29 17:00:38 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.8 KB, free 2.1 GB) 15/12/29 17:00:38 INFO TorrentBroadcast: Reading broadcast variable 0 took 208 ms 15/12/29 17:00:38 INFO MemoryStore: ensureFreeSpace(8544) called with curMem=5979, maxMem=2223023063 15/12/29 17:00:38 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 8.3 KB, free 2.1 GB) 15/12/29 17:00:39 INFO PythonRunner: Times: total = 913, boot = 747, init = 166, finish = 0 15/12/29 17:00:39 INFO Executor: Finished task 15.0 in stage 0.0 (TID 15). 967 bytes result sent to driver 15/12/29 17:00:39 INFO PythonRunner: Times: total = 955, boot = 735, init = 220, finish = 0 15/12/29 17:00:39 INFO Executor: Finished task 23.0 in stage 0.0 (TID 23). 967 bytes result sent to driver 15/12/29 17:00:39 INFO PythonRunner: Times: total = 970, boot = 812, init = 158, finish = 0 15/12/29 17:00:39 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 967 bytes result sent to driver root@ip-10-65-143-174 2]$ Sent from my Verizon Wireless 4G LTE smartphone
Re: Task hang problem
here's executor trace. Thread 58: Executor task launch worker-3 (RUNNABLE) java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.read(SocketInputStream.java:152) java.net.SocketInputStream.read(SocketInputStream.java:122) java.io.BufferedInputStream.fill(BufferedInputStream.java:235) java.io.BufferedInputStream.read(BufferedInputStream.java:254) java.io.DataInputStream.readInt(DataInputStream.java:387) org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:139) org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207) org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) org.apache.spark.rdd.RDD.iterator(RDD.scala:264) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) org.apache.spark.scheduler.Task.run(Task.scala:88) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Thread 41: BLOCK_MANAGER cleanup timer (WAITING) java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:503) java.util.TimerThread.mainLoop(Timer.java:526) java.util.TimerThread.run(Timer.java:505) Thread 42: BROADCAST_VARS cleanup timer (WAITING) java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:503) java.util.TimerThread.mainLoop(Timer.java:526) java.util.TimerThread.run(Timer.java:505) Thread 54: driver-heartbeater (TIMED_WAITING) sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090) java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807) java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Thread 3: Finalizer (WAITING) java.lang.Object.wait(Native Method) java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135) java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151) java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209) Thread 25: ForkJoinPool-3-worker-15 (WAITING) sun.misc.Unsafe.park(Native Method) scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thread 35: Hashed wheel timer #2 (TIMED_WAITING) java.lang.Thread.sleep(Native Method) org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:483) org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:392) org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) java.lang.Thread.run(Thread.java:745) Thread 68: Idle Worker Monitor for /usr/bin/python2.7 (TIMED_WAITING) java.lang.Thread.sleep(Native Method) org.apache.spark.api.python.PythonWorkerFactory$MonitorThread.run(PythonWorkerFactory.scala:229) Thread 1: main (WAITING) sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236) akka.actor.ActorSystemImpl$TerminationCallbacks.ready(ActorSystem.scala:819)
Re: DataFrame Vs RDDs ... Which one to use When ?
I'll throw a thought in here. Dataframes are nice if your data is uniform and clean with consistent schema. However in many big data problems this is seldom the case. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Chris FreglyDate: 12/28/2015 5:22 PM (GMT-05:00) To: Richard Eggert Cc: Daniel Siegmann , Divya Gehlot , "user @spark" Subject: Re: DataFrame Vs RDDs ... Which one to use When ? here's a good article that sums it up, in my opinion: https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/ basically, building apps with RDDs is like building with apps with primitive JVM bytecode. haha. @richard: remember that even if you're currently writing RDDs in Java/Scala, you're not gaining the code gen/rewrite performance benefits of the Catalyst optimizer. i agree with @daniel who suggested that you start with DataFrames and revert to RDDs only when DataFrames don't give you what you need. the only time i use RDDs directly these days is when i'm dealing with a Spark library that has not yet moved to DataFrames - ie. GraphX - and it's kind of annoying switching back and forth. almost everything you need should be in the DataFrame API. Datasets are similar to RDDs, but give you strong compile-time typing, tabular structure, and Catalyst optimizations. hopefully Datasets is the last API we see from Spark SQL... i'm getting tired of re-writing slides and book chapters! :) On Mon, Dec 28, 2015 at 4:55 PM, Richard Eggert wrote: One advantage of RDD's over DataFrames is that RDD's allow you to use your own data types, whereas DataFrames are backed by RDD's of Record objects, which are pretty flexible but don't give you much in the way of compile-time type checking. If you have an RDD of case class elements or JSON, then Spark SQL can automatically figure out how to convert it into an RDD of Record objects (and therefore a DataFrame), but there's no way to automatically go the other way (from DataFrame/Record back to custom types). In general, you can ultimately do more with RDDs than DataFrames, but DataFrames give you a lot of niceties (automatic query optimization, table joins, SQL-like syntax, etc.) for free, and can avoid some of the runtime overhead associated with writing RDD code in a non-JVM language (such as Python or R), since the query optimizer is effectively creating the required JVM code under the hood. There's little to no performance benefit if you're already writing Java or Scala code, however (and RDD-based code may actually perform better in some cases, if you're willing to carefully tune your code). On Mon, Dec 28, 2015 at 3:05 PM, Daniel Siegmann wrote: DataFrames are a higher level API for working with tabular data - RDDs are used underneath. You can use either and easily convert between them in your code as necessary. DataFrames provide a nice abstraction for many cases, so it may be easier to code against them. Though if you're used to thinking in terms of collections rather than tables, you may find RDDs more natural. Data frames can also be faster, since Spark will do some optimizations under the hood - if you are using PySpark, this will avoid the overhead. Data frames may also perform better if you're reading structured data, such as a Hive table or Parquet files. I recommend you prefer data frames, switching over to RDDs as necessary (when you need to perform an operation not supported by data frames / Spark SQL). HOWEVER (and this is a big one), Spark 1.6 will have yet another API - datasets. The release of Spark 1.6 is currently being finalized and I would expect it in the next few days. You will probably want to use the new API once it's available. On Sun, Dec 27, 2015 at 9:18 PM, Divya Gehlot wrote: Hi, I am new bee to spark and a bit confused about RDDs and DataFames in Spark. Can somebody explain me with the use cases which one to use when ? Would really appreciate the clarification . Thanks, Divya -- Rich -- Chris FreglyPrincipal Data Solutions EngineerIBM Spark Technology Center, San Francisco, CAhttp://spark.tc | http://advancedspark.com
Re: Scala VS Java VS Python
I use python too. I'm actually surprises it's not the primary language since it is by far more used in data science than java snd Scala combined. If I had a second choice of script language for general apps I'd want groovy over scala. Sent from my Verizon Wireless 4G LTE smartphone Original message From: Daniel LopesDate: 12/16/2015 4:16 PM (GMT-05:00) To: Daniel Valdivia Cc: user Subject: Re: Scala VS Java VS Python For me Scala is better like Spark is written in Scala, and I like python cuz I always used python for data science. :) On Wed, Dec 16, 2015 at 5:54 PM, Daniel Valdivia wrote: Hello, This is more of a "survey" question for the community, you can reply to me directly so we don't flood the mailing list. I'm having a hard time learning Spark using Python since the API seems to be slightly incomplete, so I'm looking at my options to start doing all my apps in either Scala or Java, being a Java Developer, java 1.8 looks like the logical way, however I'd like to ask here what's the most common (Scala Or Java) since I'm observing mixed results in the social documentation, however Scala seems to be the predominant language for spark examples. Thank for the advice - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Daniel Lopes, B.EngData Scientist - BankFacilCREA/SP 5069410560Mob +55 (18) 99764-2733Ph +55 (11) 3522-8009http://about.me/dannyeuu Av. Nova Independência, 956, São Paulo, SPBairro Brooklin PaulistaCEP 04570-001https://www.bankfacil.com.br
Re: Pyspark submitted app just hangs
The pyspark app stdout/err log shows this oddity. Traceback (most recent call last): File "/root/spark/notebooks/ingest/XXX.py", line 86, in print pdfRDD.collect()[:5] File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 773, in collect File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 536, in __call__ File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 364, in send_command File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 473, in send_command File "/usr/lib64/python2.7/socket.py", line 430, in readline data = recv(1) KeyboardInterrupt On 12/02/2015 08:57 PM, Jim Lohse wrote: Is this the stderr output from a woker? Are any files being written? Can you run in debug and see how far it's getting? This to me doesn't give me a direction to look without the actual logs from $SPARK_HOME or the stderr from the worker UI. Just imho maybe someone know what this means but it seems like it could be caused by a lot of things. On 12/2/2015 6:48 PM, Darren Govoni wrote: Hi all, Wondering if someone can provide some insight why this pyspark app is just hanging. Here is output. ... 15/12/03 01:47:05 INFO TaskSetManager: Starting task 21.0 in stage 0.0 (TID 21, 10.65.143.174, PROCESS_LOCAL, 1794787 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 22.0 in stage 0.0 (TID 22, 10.97.144.52, PROCESS_LOCAL, 1801814 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 23.0 in stage 0.0 (TID 23, 10.65.67.146, PROCESS_LOCAL, 1823921 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 24.0 in stage 0.0 (TID 24, 10.144.176.22, PROCESS_LOCAL, 1820713 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 25.0 in stage 0.0 (TID 25, 10.65.143.174, PROCESS_LOCAL, 1850492 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 26.0 in stage 0.0 (TID 26, 10.97.144.52, PROCESS_LOCAL, 1845557 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 27.0 in stage 0.0 (TID 27, 10.65.67.146, PROCESS_LOCAL, 1876187 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 28.0 in stage 0.0 (TID 28, 10.144.176.22, PROCESS_LOCAL, 2054748 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 29.0 in stage 0.0 (TID 29, 10.65.143.174, PROCESS_LOCAL, 1967659 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 30.0 in stage 0.0 (TID 30, 10.97.144.52, PROCESS_LOCAL, 1977909 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 31.0 in stage 0.0 (TID 31, 10.65.67.146, PROCESS_LOCAL, 2084044 bytes) 15/12/03 01:47:06 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.65.143.174:39356 (size: 5.2 KB, free: 4.1 GB) 15/12/03 01:47:06 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.144.176.22:40904 (size: 5.2 KB, free: 4.1 GB) 15/12/03 01:47:06 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.97.144.52:35646 (size: 5.2 KB, free: 4.1 GB) 15/12/03 01:47:06 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.65.67.146:44110 (size: 5.2 KB, free: 4.1 GB) ... In the spark console, it says 0/32 tasks and just sits there. No movement. Thanks in advance, D - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Pyspark submitted app just hangs
Hi all, Wondering if someone can provide some insight why this pyspark app is just hanging. Here is output. ... 15/12/03 01:47:05 INFO TaskSetManager: Starting task 21.0 in stage 0.0 (TID 21, 10.65.143.174, PROCESS_LOCAL, 1794787 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 22.0 in stage 0.0 (TID 22, 10.97.144.52, PROCESS_LOCAL, 1801814 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 23.0 in stage 0.0 (TID 23, 10.65.67.146, PROCESS_LOCAL, 1823921 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 24.0 in stage 0.0 (TID 24, 10.144.176.22, PROCESS_LOCAL, 1820713 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 25.0 in stage 0.0 (TID 25, 10.65.143.174, PROCESS_LOCAL, 1850492 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 26.0 in stage 0.0 (TID 26, 10.97.144.52, PROCESS_LOCAL, 1845557 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 27.0 in stage 0.0 (TID 27, 10.65.67.146, PROCESS_LOCAL, 1876187 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 28.0 in stage 0.0 (TID 28, 10.144.176.22, PROCESS_LOCAL, 2054748 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 29.0 in stage 0.0 (TID 29, 10.65.143.174, PROCESS_LOCAL, 1967659 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 30.0 in stage 0.0 (TID 30, 10.97.144.52, PROCESS_LOCAL, 1977909 bytes) 15/12/03 01:47:05 INFO TaskSetManager: Starting task 31.0 in stage 0.0 (TID 31, 10.65.67.146, PROCESS_LOCAL, 2084044 bytes) 15/12/03 01:47:06 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.65.143.174:39356 (size: 5.2 KB, free: 4.1 GB) 15/12/03 01:47:06 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.144.176.22:40904 (size: 5.2 KB, free: 4.1 GB) 15/12/03 01:47:06 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.97.144.52:35646 (size: 5.2 KB, free: 4.1 GB) 15/12/03 01:47:06 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.65.67.146:44110 (size: 5.2 KB, free: 4.1 GB) ... In the spark console, it says 0/32 tasks and just sits there. No movement. Thanks in advance, D - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: thought experiment: use spark ML to real time prediction
I agree 100%. Making the model requires large data and many cpus. Using it does not. This is a very useful side effect of ML models. If mlib can't use models outside spark that's a real shame. Sent from my Verizon Wireless 4G LTE smartphone Original message From: "Kothuvatiparambil, Viju"Date: 11/12/2015 3:09 PM (GMT-05:00) To: DB Tsai , Sean Owen Cc: Felix Cheung , Nirmal Fernando , Andy Davidson , Adrian Tanase , "user @spark" , Xiangrui Meng , hol...@pigscanfly.ca Subject: RE: thought experiment: use spark ML to real time prediction I am glad to see DB’s comments, make me feel I am not the only one facing these issues. If we are able to use MLLib to load the model in web applications (outside the spark cluster), that would have solved the issue. I understand Spark is manly for processing big data in a distributed mode. But, there is no purpose in training a model using MLLib, if we are not able to use it in applications where needs to access the model. Thanks Viju From: DB Tsai [mailto:dbt...@dbtsai.com] Sent: Thursday, November 12, 2015 11:04 AM To: Sean Owen Cc: Felix Cheung; Nirmal Fernando; Andy Davidson; Adrian Tanase; user @spark; Xiangrui Meng; hol...@pigscanfly.ca Subject: Re: thought experiment: use spark ML to real time prediction I think the use-case can be quick different from PMML. By having a Spark platform independent ML jar, this can empower users to do the following, 1) PMML doesn't contain all the models we have in mllib. Also, for a ML pipeline trained by Spark, most of time, PMML is not expressive enough to do all the transformation we have in Spark ML. As a result, if we are able to serialize the entire Spark ML pipeline after training, and then load them back in app without any Spark platform for production scorning, this will be very useful for production deployment of Spark ML models. The only issue will be if the transformer involves with shuffle, we need to figure out a way to handle it. When I chatted with Xiangrui about this, he suggested that we may tag if a transformer is shuffle ready. Currently, at Netflix, we are not able to use ML pipeline because of those issues, and we have to write our own scorers in our production which is quite a duplicated work. 2) If users can use Spark's linear algebra like vector or matrix code in their application, this will be very useful. This can help to share code in Spark training pipeline and production deployment. Also, lots of good stuff at Spark's mllib doesn't depend on Spark platform, and people can use them in their application without pulling lots of dependencies. In fact, in my project, I have to copy & paste code from mllib into my project to use those goodies in apps. 3) Currently, mllib depends on graphx which means in graphx, there is no way to use mllib's vector or matrix. And
Python Kafka support?
Hi, I read on this page http://spark.apache.org/docs/latest/streaming-kafka-integration.html about python support for "receiverless" kafka integration (Approach 2) but it says its incomplete as of version 1.4. Has this been updated in version 1.5.1? Darren - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
can distinct transform applied on DStream?
val aDstream = ... val distinctStream = aDstream.transform(_.distinct()) but the elements in distinctStream are not distinct. Did I use it wrong?
Re: [spark-streaming] can shuffle write to disk be disabled?
Thanks, Shao On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com wrote: Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. I have increased spark.shuffle.memoryFraction to 0.8 which I can see from SparKUI's environment variables But spill always happens even from start when latency is less than slide window(I changed it to 10 seconds), the shuflle data disk written is really a snow ball effect, it slows down eventually. I noticed that the files spilled to disk are all very small in size but huge in numbers: total 344K drwxr-xr-x 2 root root 4.0K Mar 18 16:55 . drwxr-xr-x 66 root root 4.0K Mar 18 16:39 .. -rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data -rw-r--r-- 1 root root 75K Mar 18 16:54 shuffle_48_419_0.data -rw-r--r-- 1 root root 36K Mar 18 16:54 shuffle_48_518_0.data -rw-r--r-- 1 root root 69K Mar 18 16:55 shuffle_49_319_0.data -rw-r--r-- 1 root root 330 Mar 18 16:55 shuffle_49_418_0.data -rw-r--r-- 1 root root 65K Mar 18 16:55 shuffle_49_517_0.data MemStore says: 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_1338_2 in memory. 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in memory! (computed 512.0 B so far) 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB. Not enough space even for 512 byte?? The executors still has plenty free memory: 0slave1:40778 0 0.0 B / 529.9 MB 0.0 B 16 0 15047 15063 2.17 h 0.0 B 402.3 MB 768.0 B 1 slave2:50452 0 0.0 B / 529.9 MB 0.0 B 16 0 14447 14463 2.17 h 0.0 B 388.8 MB 1248.0 B 1 lvs02:47325116 27.6 MB / 529.9 MB 0.0 B 8 0 58169 58177 3.16 h 893.5 MB 624.0 B 1189.9 MB driver lvs02:47041 0 0.0 B / 529.9 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network. CPU load is quite low , about 1~3 from top, and network usage is far from saturated. I don't even do any usefull complex calculations in this small Simple App yet.
Re: [spark-streaming] can shuffle write to disk be disabled?
I've already done that: From SparkUI Environment Spark properties has: spark.shuffle.spillfalse On Wed, Mar 18, 2015 at 6:34 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I think you can disable it with spark.shuffle.spill=false Thanks Best Regards On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo darren@gmail.com wrote: Thanks, Shao On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com wrote: Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. I have increased spark.shuffle.memoryFraction to 0.8 which I can see from SparKUI's environment variables But spill always happens even from start when latency is less than slide window(I changed it to 10 seconds), the shuflle data disk written is really a snow ball effect, it slows down eventually. I noticed that the files spilled to disk are all very small in size but huge in numbers: total 344K drwxr-xr-x 2 root root 4.0K Mar 18 16:55 . drwxr-xr-x 66 root root 4.0K Mar 18 16:39 .. -rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data -rw-r--r-- 1 root root 75K Mar 18 16:54 shuffle_48_419_0.data -rw-r--r-- 1 root root 36K Mar 18 16:54 shuffle_48_518_0.data -rw-r--r-- 1 root root 69K Mar 18 16:55 shuffle_49_319_0.data -rw-r--r-- 1 root root 330 Mar 18 16:55 shuffle_49_418_0.data -rw-r--r-- 1 root root 65K Mar 18 16:55 shuffle_49_517_0.data MemStore says: 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_1338_2 in memory. 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in memory! (computed 512.0 B so far) 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB. Not enough space even for 512 byte?? The executors still has plenty free memory: 0slave1:40778 0 0.0 B / 529.9 MB 0.0 B 16 0 15047 15063 2.17 h 0.0 B 402.3 MB 768.0 B 1 slave2:50452 0 0.0 B / 529.9 MB 0.0 B 16 0 14447 14463 2.17 h 0.0 B 388.8 MB 1248.0 B 1 lvs02:47325116 27.6 MB / 529.9 MB 0.0 B 8 0 58169 58177 3.16 h 893.5 MB 624.0 B 1189.9 MB driver lvs02:47041 0 0.0 B / 529.9 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network. CPU load is quite low , about 1~3 from top, and network usage is far from saturated. I don't even do any usefull complex calculations in this small Simple App yet.
Re: [spark-streaming] can shuffle write to disk be disabled?
On Wed, Mar 18, 2015 at 8:31 PM, Shao, Saisai saisai.s...@intel.com wrote: From the log you pasted I think this (-rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final shuffle result. why the shuffle result is written to disk? As I said, did you think shuffle is the bottleneck which makes your job running slowly? I am quite new to spark, So I am just doing wild guesses. which information should I provide further that can help to find the real bottleneck? Maybe you should identify the cause at first. Besides from the log it looks your memory is not enough the cache the data, maybe you should increase the memory size of the executor. running two executors, the memory ussage is quite low: executor 0 8.6 MB / 4.1 GB executor 1 23.9 MB / 4.1 GB driver 0.0B / 529.9 MB submitted with args : --executor-memory 8G --num-executors 2 --driver-memory 1G
Re: [spark-streaming] can shuffle write to disk be disabled?
: Finished task 179.0 in stage 392.0 (TID 100613) in 292 ms on lvs02 (181/291) 15/03/18 15:16:41 INFO TaskSetManager: Starting task 188.0 in stage 392.0 (TID 100622, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:41 INFO TaskSetManager: Finished task 182.0 in stage 392.0 (TID 100616) in 213 ms on lvs02 (182/291) 15/03/18 15:16:41 INFO BlockManagerInfo: Added input-0-1426663001400 in memory on lvs02:38954 (size: 24.4 KB, free: 1068.1 MB) 15/03/18 15:16:41 INFO TaskSetManager: Starting task 189.0 in stage 392.0 (TID 100623, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:41 INFO TaskSetManager: Finished task 181.0 in stage 392.0 (TID 100615) in 286 ms on lvs02 (183/291) 15/03/18 15:16:41 INFO TaskSetManager: Starting task 190.0 in stage 392.0 (TID 100624, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:41 INFO TaskSetManager: Finished task 183.0 in stage 392.0 (TID 100617) in 261 ms on lvs02 (184/291) Any hints? Thanks! On Wed, Mar 18, 2015 at 2:19 PM, Shao, Saisai saisai.s...@intel.com wrote: Would you please check your driver log or streaming web UI to see each job's latency, including processing latency and total latency. Seems from your code, sliding window is just 3 seconds, so you will process each 60 second's data in 3 seconds, if processing latency is larger than the sliding window, so maybe you computation power cannot reach to the qps you wanted. I think you need to identify the bottleneck at first, and then trying to tune your code, balance the data, add more computation resources. Thanks Jerry *From:* Darren Hoo [mailto:darren@gmail.com] *Sent:* Wednesday, March 18, 2015 1:39 PM *To:* user@spark.apache.org *Subject:* [spark-streaming] can shuffle write to disk be disabled? I use spark-streaming reading messages from a Kafka, the producer creates messages about 1500 per second def hash(x: String): Int = { MurmurHash3.stringHash(x) } val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2) val clickstream = stream.map(log = { //parse log ... (hash(log.url), HashSet(hash(log.userid))) }).window(Seconds(60), Seconds(3)) val upv = clickstream.transform( rdd = rdd.reduceByKey(_ ++ _ ).map{ case(url, visits) = { val uv = visits.size (uv, url) }}) upv.foreach(rdd = println(new Date() + \n---\n + rdd.top(20).mkString(\n) + \n)) it is quite quick upon startup, but after running for a few minutes, it goes slower and slower and the latency can be minutes. I found a lot of shuffle writes at /tmp/spark- in several gigabytes. with 1500 qps of message and window size of 60 seconds, I think it should be done within memory without writing to disk at all I've set executor-memory to 8G, So there is plenty of memory. $SPARK_HOME/bin/spark-submit \ --class SimpleApp \ --master spark://localhost:7077 \ --driver-memory 16G \ --executor-memory 8G \ target/scala-2.10/simple-assembly-1.0.jar I also tries these settings, but it still spill to disk. spark.master spark://localhost:7077 #spark.driver.memory 4g #spark.shuffle.file.buffer.kb 4096 #spark.shuffle.memoryFraction 0.8 #spark.storage.unrollFraction 0.8 #spark.storage.unrollMemoryThreshold 1073741824 spark.io.compression.codec lz4 spark.shuffle.spill false spark.serializer org.apache.spark.serializer.KryoSerializer where am I wrong?