[SPARK-25299] A Discussion About Shuffle Metadata Tracking
Hi everyone, A working group in the community have been having ongoing discussions regarding how we can allow for flexible storage solutions for shuffle data that is compatible with containerized systems, more resilient to node failures, and can support disaggregated storage architectures. One of the core challenges we have been trying to overcome is navigating the space of shuffle metadata tracking, and reasoning about how we approach recomputing lost shuffle blocks in the case when the shuffle file storage system is not resilient. I have written a design document on the subject, and a proposed set of APIs to fix it. These should be considered as part of the APIs for SPARK-25299<https://issues.apache.org/jira/browse/SPARK-25299>. Once we have reached some common conclusion on the proper APIs to build, I can modify the original SPARK-25299 SPIP to reflect the choices we’ve made. But I wanted to write more extensively on this topic separately to encourage focused discussion on this subset of the problem space. You can find the design document here: https://docs.google.com/document/d/1Aj6IyMsbS2sdIfHxLvIbHUNjHIWHTabfknIPoxOrTjk/edit?usp=sharing If you would like to catch up on the discussions we have had so far, I give some background to the subject matter and have linked to other relevant design documents and discussion threads in this document. Feedback is definitely appreciated – I acknowledge that this is a fairly complex space with lots of potential viable options, so I’m looking forward to engaging with dialogue moving forward. Thanks! -Matt Cheah
Re: Thoughts on Spark 3 release, or a preview release
I don’t know if it will be feasible to merge all of SPARK-25299 into Spark 3. There are a number of APIs that will be submitted for review, and I wouldn’t want to block the release on negotiating these changes, as the decisions we make for each API can be pretty involved. Our original plan was to mark every API included in SPARK-25299 as private until the entirety was merged, sometime between the release of Spark 3 and Spark 3.1. Once the entire API is merged into the codebase, we’d promote all of them to Experimental status and ship them in Spark 3.1. So, I’m -1 on blocking the Spark 3 preview release specifically on SPARK-25299. -Matt Cheah From: Xiao Li Date: Tuesday, September 17, 2019 at 12:00 AM To: Erik Erlandson Cc: Sean Owen , dev Subject: Re: Thoughts on Spark 3 release, or a preview release https://issues.apache.org/jira/browse/SPARK-28264 [issues.apache.org] SPARK-28264 Revisiting Python / pandas UDF sounds critical for 3.0 preview Xiao On Mon, Sep 16, 2019 at 12:22 PM Erik Erlandson wrote: I'm in favor of adding SPARK-25299 [issues.apache.org] - Use remote storage for persisting shuffle data https://issues.apache.org/jira/browse/SPARK-25299 [issues.apache.org] If that is far enough along to get onto the roadmap. On Wed, Sep 11, 2019 at 11:37 AM Sean Owen wrote: I'm curious what current feelings are about ramping down towards a Spark 3 release. It feels close to ready. There is no fixed date, though in the past we had informally tossed around "back end of 2019". For reference, Spark 1 was May 2014, Spark 2 was July 2016. I'd expect Spark 2 to last longer, so to speak, but feels like Spark 3 is coming due. What are the few major items that must get done for Spark 3, in your opinion? Below are all of the open JIRAs for 3.0 (which everyone should feel free to update with things that aren't really needed for Spark 3; I already triaged some). For me, it's: - DSv2? - Finishing touches on the Hive, JDK 11 update What about considering a preview release earlier, as happened for Spark 2, to get feedback much earlier than the RC cycle? Could that even happen ... about now? I'm also wondering what a realistic estimate of Spark 3 release is. My guess is quite early 2020, from here. SPARK-29014 DataSourceV2: Clean up current, default, and session catalog uses SPARK-28900 Test Pyspark, SparkR on JDK 11 with run-tests SPARK-28883 Fix a flaky test: ThriftServerQueryTestSuite SPARK-28717 Update SQL ALTER TABLE RENAME to use TableCatalog API SPARK-28588 Build a SQL reference doc SPARK-28629 Capture the missing rules in HiveSessionStateBuilder SPARK-28684 Hive module support JDK 11 SPARK-28548 explain() shows wrong result for persisted DataFrames after some operations SPARK-28372 Document Spark WEB UI SPARK-28476 Support ALTER DATABASE SET LOCATION SPARK-28264 Revisiting Python / pandas UDF SPARK-28301 fix the behavior of table name resolution with multi-catalog SPARK-28155 do not leak SaveMode to file source v2 SPARK-28103 Cannot infer filters from union table with empty local relation table properly SPARK-28024 Incorrect numeric values when out of range SPARK-27936 Support local dependency uploading from --py-files SPARK-27884 Deprecate Python 2 support in Spark 3.0 SPARK-27763 Port test cases from PostgreSQL to Spark SQL SPARK-27780 Shuffle server & client should be versioned to enable smoother upgrade SPARK-27714 Support Join Reorder based on Genetic Algorithm when the # of joined tables > 12 SPARK-27471 Reorganize public v2 catalog API SPARK-27520 Introduce a global config system to replace hadoopConfiguration SPARK-24625 put all the backward compatible behavior change configs under spark.sql.legacy.* SPARK-24640 size(null) returns null SPARK-24702 Unable to cast to calendar interval in spark sql. SPARK-24838 Support uncorrelated IN/EXISTS subqueries for more operators SPARK-24941 Add RDDBarrier.coalesce() function SPARK-25017 Add test suite for ContextBarrierState SPARK-25083 remove the type erasure hack in data source scan SPARK-25383 Image data source supports sample pushdown SPARK-27272 Enable blacklisting of node/executor on fetch failures by default SPARK-27296 User Defined Aggregating Functions (UDAFs) have a major efficiency problem SPARK-25128 multiple simultaneous job submissions against k8s backend cause driver pods to hang SPARK-26731 remove EOLed spark jobs from jenkins SPARK-26664 Make DecimalType's minimum adjusted scale configurable SPARK-21559 Remove Mesos fine-grained mode SPARK-24942 Improve cluster resource management with jobs containing barrier stage SPARK-25914 Separate projection from grouping and aggregate in logical Aggregate SPARK-26022 PySpark Comparison with Pandas SPARK-20964 Make some keywords reserved along with the ANSI/SQL standard SPARK-26221 Improve Spark SQL instrumentation and metrics SPARK-26425 Add more constraint checks in file streaming source to avoid checkpoint corruption SPA
Re: Thoughts on Spark 3 release, or a preview release
+1 as both a contributor and a user. From: John Zhuge Date: Thursday, September 12, 2019 at 4:15 PM To: Jungtaek Lim Cc: Jean Georges Perrin , Hyukjin Kwon , Dongjoon Hyun , dev Subject: Re: Thoughts on Spark 3 release, or a preview release +1 Like the idea as a user and a DSv2 contributor. On Thu, Sep 12, 2019 at 4:10 PM Jungtaek Lim wrote: +1 (as a contributor) from me to have preview release on Spark 3 as it would help to test the feature. When to cut preview release is questionable, as major works are ideally to be done before that - if we are intended to introduce new features before official release, that should work regardless of this, but if we are intended to have opportunity to test earlier, ideally it should. As a one of contributors in structured streaming area, I'd like to add some items for Spark 3.0, both "must be done" and "better to have". For "better to have", I pick some items for new features which committers reviewed couple of rounds and dropped off without soft-reject (No valid reason to stop). For Spark 2.4 users, only added feature for structured streaming is Kafka delegation token. (given we assume revising Kafka consumer pool as improvement) I hope we provide some gifts for structured streaming users in Spark 3.0 envelope. > must be done * SPARK-26154 Stream-stream joins - left outer join gives inconsistent output It's a correctness issue with multiple users reported, being reported at Nov. 2018. There's a way to reproduce it consistently, and we have a patch submitted at Jan. 2019 to fix it. > better to have * SPARK-23539 Add support for Kafka headers in Structured Streaming * SPARK-26848 Introduce new option to Kafka source - specify timestamp to start and end offset * SPARK-20568 Delete files after processing in structured streaming There're some more new features/improvements items in SS, but given we're talking about ramping-down, above list might be realistic one. On Thu, Sep 12, 2019 at 9:53 AM Jean Georges Perrin wrote: As a user/non committer, +1 I love the idea of an early 3.0.0 so we can test current dev against it, I know the final 3.x will probably need another round of testing when it gets out, but less for sure... I know I could checkout and compile, but having a “packaged” preversion is great if it does not take too much time to the team... jg On Sep 11, 2019, at 20:40, Hyukjin Kwon wrote: +1 from me too but I would like to know what other people think too. 2019년 9월 12일 (목) 오전 9:07, Dongjoon Hyun 님이 작성: Thank you, Sean. I'm also +1 for the following three. 1. Start to ramp down (by the official branch-3.0 cut) 2. Apache Spark 3.0.0-preview in 2019 3. Apache Spark 3.0.0 in early 2020 For JDK11 clean-up, it will meet the timeline and `3.0.0-preview` helps it a lot. After this discussion, can we have some timeline for `Spark 3.0 Release Window` in our versioning-policy page? - https://spark.apache.org/versioning-policy.html [spark.apache.org] Bests, Dongjoon. On Wed, Sep 11, 2019 at 11:54 AM Michael Heuer wrote: I would love to see Spark + Hadoop + Parquet + Avro compatibility problems resolved, e.g. https://issues.apache.org/jira/browse/SPARK-25588 [issues.apache.org] https://issues.apache.org/jira/browse/SPARK-27781 [issues.apache.org] Note that Avro is now at 1.9.1, binary-incompatible with 1.8.x. As far as I know, Parquet has not cut a release based on this new version. Then out of curiosity, are the new Spark Graph APIs targeting 3.0? https://github.com/apache/spark/pull/24851 [github.com] https://github.com/apache/spark/pull/24297 [github.com] michael On Sep 11, 2019, at 1:37 PM, Sean Owen wrote: I'm curious what current feelings are about ramping down towards a Spark 3 release. It feels close to ready. There is no fixed date, though in the past we had informally tossed around "back end of 2019". For reference, Spark 1 was May 2014, Spark 2 was July 2016. I'd expect Spark 2 to last longer, so to speak, but feels like Spark 3 is coming due. What are the few major items that must get done for Spark 3, in your opinion? Below are all of the open JIRAs for 3.0 (which everyone should feel free to update with things that aren't really needed for Spark 3; I already triaged some). For me, it's: - DSv2? - Finishing touches on the Hive, JDK 11 update What about considering a preview release earlier, as happened for Spark 2, to get feedback much earlier than the RC cycle? Could that even happen ... about now? I'm also wondering what a realistic estimate of Spark 3 release is. My guess is quite early 2020, from here. SPARK-29014 DataSourceV2: Clean up current, default, and session catalog uses SPARK-28900 Test Pyspark, SparkR on JDK 11 with run-tests SPARK-28883 Fix a flaky test: ThriftServerQueryTestSuite SPARK-28717 Update SQL ALTER TABLE RENAME to use TableCatalog API SPARK-28588 Build a SQL
Re: DataSourceV2 : Transactional Write support
There might be some help from the staging table catalog as well. -Matt Cheah From: Wenchen Fan Date: Monday, August 5, 2019 at 7:40 PM To: Shiv Prashant Sood Cc: Ryan Blue , Jungtaek Lim , Spark Dev List Subject: Re: DataSourceV2 : Transactional Write support I agree with the temp table approach. One idea is: maybe we only need one temp table, and each task writes to this temp table. At the end we read the data from the temp table and write it to the target table. AFAIK JDBC can handle concurrent table writing very well, and it's better than creating thousands of temp tables for one write job(assume the input RDD has thousands of partitions). On Tue, Aug 6, 2019 at 7:57 AM Shiv Prashant Sood wrote: Thanks all for the clarification. Regards, Shiv On Sat, Aug 3, 2019 at 12:49 PM Ryan Blue wrote: > What you could try instead is intermediate output: inserting into temporal > table in executors, and move inserted records to the final table in driver > (must be atomic) I think that this is the approach that other systems (maybe sqoop?) have taken. Insert into independent temporary tables, which can be done quickly. Then for the final commit operation, union and insert into the final table. In a lot of cases, JDBC databases can do that quickly as well because the data is already on disk and just needs to added to the final table. On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim wrote: I asked similar question for end-to-end exactly-once with Kafka, and you're correct distributed transaction is not supported. Introducing distributed transaction like "two-phase commit" requires huge change on Spark codebase and the feedback was not positive. What you could try instead is intermediate output: inserting into temporal table in executors, and move inserted records to the final table in driver (must be atomic). Thanks, Jungtaek Lim (HeartSaVioR) On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood wrote: All, I understood that DataSourceV2 supports Transactional write and wanted to implement that in JDBC DataSource V2 connector ( PR#25211 [github.com] ). Don't see how this is feasible for JDBC based connector. The FW suggest that EXECUTOR send a commit message to DRIVER, and actual commit should only be done by DRIVER after receiving all commit confirmations. This will not work for JDBC as commits have to happen on the JDBC Connection which is maintained by the EXECUTORS and JDBCConnection is not serializable that it can be sent to the DRIVER. Am i right in thinking that this cannot be supported for JDBC? My goal is to either fully write or roll back the dataframe write operation. Thanks in advance for your help. Regards, Shiv -- Name : Jungtaek Lim Blog : http://medium.com/@heartsavior [medium.com] Twitter : http://twitter.com/heartsavior [twitter.com] LinkedIn : http://www.linkedin.com/in/heartsavior [linkedin.com] -- Ryan Blue Software Engineer Netflix smime.p7s Description: S/MIME cryptographic signature
Re: DataSourceV2 : Transactional Write support
Can we check that the latest staging APIs work for the JDBC use case in a single transactional write? See https://github.com/apache/spark/pull/24798/files#diff-c9d2f9c9d20452939b7c28ebdae0503dR53 But also acknowledge that transactions from a more traditional RDBMS sense tend to have pretty specific semantics we don’t support in the V2 API. For example, one cannot commit multiple write operations in a single transaction right now. That would require changes to the DDL and a pretty substantial change to the design of Spark-SQL more broadly. -Matt Cheah From: Shiv Prashant Sood Date: Friday, August 2, 2019 at 12:56 PM To: Spark Dev List Subject: DataSourceV2 : Transactional Write support All, I understood that DataSourceV2 supports Transactional write and wanted to implement that in JDBC DataSource V2 connector ( PR#25211 [github.com] ). Don't see how this is feasible for JDBC based connector. The FW suggest that EXECUTOR send a commit message to DRIVER, and actual commit should only be done by DRIVER after receiving all commit confirmations. This will not work for JDBC as commits have to happen on the JDBC Connection which is maintained by the EXECUTORS and JDBCConnection is not serializable that it can be sent to the DRIVER. Am i right in thinking that this cannot be supported for JDBC? My goal is to either fully write or roll back the dataframe write operation. Thanks in advance for your help. Regards, Shiv smime.p7s Description: S/MIME cryptographic signature
Re: [Discuss] Follow ANSI SQL on table insertion
I agree that having both modes and let the user choose the one he/she wants is the best option (I don't see big arguments on this honestly). Once we have this, I don't see big differences on what is the default. What - I think - we still have to work on, is to go ahead with the "strict mode" work and provide a more convenient way for users to switch among the 2 options. I mean: currently we have one flag for throwing exception on overflow for operations on decimals, one for doing the same for operations on other data types and probably going ahead we will have more. I think in the end we will need to collect them all under an "umbrella" flag which lets the user simply switch between strict and non-strict mode. I also think that we will need to document this very well and give it particular attention in our docs, maybe with a dedicated section, in order to provide enough visibility on it to end users. I’m +1 on adding a strict mode flag this way, but I’m undecided on whether or not we want a separate flag for each of the arithmetic overflow situations that could produce invalid results. My intuition is yes, because different users have different levels of tolerance for different kinds of errors. I’d expect these sorts of configurations to be set up at an infrastructure level, e.g. to maintain consistent standards throughout a whole organization. From: Gengliang Wang Date: Thursday, August 1, 2019 at 3:07 AM To: Marco Gaido Cc: Wenchen Fan , Hyukjin Kwon , Russell Spitzer , Ryan Blue , Reynold Xin , Matt Cheah , Takeshi Yamamuro , Spark dev list Subject: Re: [Discuss] Follow ANSI SQL on table insertion Hi all, Let me explain a little bit on the proposal. By default, we follow the store assignment rules in table insertion. On invalid casting, the result is null. It's better than the behavior in Spark 2.x while keeping backward-compatibility. It is If users can't torrent the silently corrupting, they can enable the new mode which throws runtime exceptions. The proposal itself is quite complete. It satisfies different users to some degree. It is hard to avoid null in data processing anyway. For example, > select 2147483647 + 1 2147483647 is the max value of Int. And the result data type of pulsing two integers are supposed to be Integer type. Since the value of (2147483647 + 1) can't fit into Int, I think Spark return null or throw runtime exceptions in such case. (Someone can argue that we can always convert the result as wider types, but that's another topic about performance and DBMS behaviors) So, give a table t with an Int column, checking data type with Up-Cast can't avoid possible null values in the following SQL, as the result data type of (int_column_a + int_column_b) is int type. > insert into t select int_column_a + int_column_b from tbl_a, tbl_b; Furthermore, if Spark uses Up-Cast and a user's existing ETL job failed because of that, what should he/she do then? I think he/she will try adding "cast" to queries first. Maybe a project for unifying data schema over all data sources has to be done later on if he/she has enough resource. The upgrade can be painful because of the strict rules of Up-Cast, while the user scenario might be able to tolerate converting Double to Decimal, or Timestamp to Date. Gengliang On Thu, Aug 1, 2019 at 4:55 PM Marco Gaido wrote: Hi all, I agree that having both modes and let the user choose the one he/she wants is the best option (I don't see big arguments on this honestly). Once we have this, I don't see big differences on what is the default. What - I think - we still have to work on, is to go ahead with the "strict mode" work and provide a more convenient way for users to switch among the 2 options. I mean: currently we have one flag for throwing exception on overflow for operations on decimals, one for doing the same for operations on other data types and probably going ahead we will have more. I think in the end we will need to collect them all under an "umbrella" flag which lets the user simply switch between strict and non-strict mode. I also think that we will need to document this very well and give it particular attention in our docs, maybe with a dedicated section, in order to provide enough visibility on it to end users. Thanks, Marco Il giorno gio 1 ago 2019 alle ore 09:42 Wenchen Fan ha scritto: Hi Hyukjin, I think no one here is against the SQL standard behavior, which is no corrupted data + runtime exception. IIUC the main argument here is: shall we still keep the existing "return null for invalid operations" behavior as default? Traditional RDBMS is usually used as the final destination of CLEAN data. It's understandable that they need high data quality and they try their best to avoid corrupted data at any cost. However, Spark is different. AFAIK Spark is usual
Re: [Discuss] Follow ANSI SQL on table insertion
Sorry I meant the current behavior for V2, which fails the query compilation if the cast is not safe. Agreed that a separate discussion about overflow might be warranted. I’m surprised we don’t throw an error now, but it might be warranted to do so. -Matt Cheah From: Reynold Xin Date: Wednesday, July 31, 2019 at 9:58 AM To: Matt Cheah Cc: Russell Spitzer , Takeshi Yamamuro , Gengliang Wang , Ryan Blue , Spark dev list , Hyukjin Kwon , Wenchen Fan Subject: Re: [Discuss] Follow ANSI SQL on table insertion Matt what do you mean by maximizing 3, while allowing not throwing errors when any operations overflow? Those two seem contradicting. On Wed, Jul 31, 2019 at 9:55 AM, Matt Cheah wrote: I’m -1, simply from disagreeing with the premise that we can afford to not be maximal on standard 3. The correctness of the data is non-negotiable, and whatever solution we settle on cannot silently adjust the user’s data under any circumstances. I think the existing behavior is fine, or perhaps the behavior can be flagged by the destination writer at write time. -Matt Cheah From: Hyukjin Kwon Date: Monday, July 29, 2019 at 11:33 PM To: Wenchen Fan Cc: Russell Spitzer , Takeshi Yamamuro , Gengliang Wang , Ryan Blue , Spark dev list Subject: Re: [Discuss] Follow ANSI SQL on table insertion >From my look, +1 on the proposal, considering ASCI and other DBMSes in general. 2019년 7월 30일 (화) 오후 3:21, Wenchen Fan 님이 작성: We can add a config for a certain behavior if it makes sense, but the most important thing we want to reach an agreement here is: what should be the default behavior? Let's explore the solution space of table insertion behavior first: At compile time, 1. always add cast 2. add cast following the ASNI SQL store assignment rule (e.g. string to int is forbidden but long to int is allowed) 3. only add cast if it's 100% safe At runtime, 1. return null for invalid operations 2. throw exceptions at runtime for invalid operations The standards to evaluate a solution: 1. How robust the query execution is. For example, users usually don't want to see the query fails midway. 2. how tolerant to user queries. For example, a user would like to write long values to an int column as he knows all the long values won't exceed int range. 3. How clean the result is. For example, users usually don't want to see silently corrupted data (null values). The current Spark behavior for Data Source V1 tables: always add cast and return null for invalid operations. This maximizes standard 1 and 2, but the result is least clean and users are very likely to see silently corrupted data (null values). The current Spark behavior for Data Source V2 tables (new in Spark 3.0): only add cast if it's 100% safe. This maximizes standard 1 and 3, but many queries may fail to compile, even if these queries can run on other SQL systems. Note that, people can still see silently corrupted data because cast is not the only one that can return corrupted data. Simple operations like ADD can also return corrected data if overflow happens. e.g. INSERT INTO t1 (intCol) SELECT anotherIntCol + 100 FROM t2 The proposal here: add cast following ANSI SQL store assignment rule, and return null for invalid operations. This maximizes standard 1, and also fits standard 2 well: if a query can't compile in Spark, it usually can't compile in other mainstream databases as well. I think that's tolerant enough. For standard 3, this proposal doesn't maximize it but can avoid many invalid operations already. Technically we can't make the result 100% clean at compile-time, we have to handle things like overflow at runtime. I think the new proposal makes more sense as the default behavior. On Mon, Jul 29, 2019 at 8:31 PM Russell Spitzer wrote: I understand spark is making the decisions, i'm say the actual final effect of the null decision would be different depending on the insertion target if the target has different behaviors for null. On Mon, Jul 29, 2019 at 5:26 AM Wenchen Fan wrote: > I'm a big -1 on null values for invalid casts. This is why we want to introduce the ANSI mode, so that invalid cast fails at runtime. But we have to keep the null behavior for a while, to keep backward compatibility. Spark returns null for invalid cast since the first day of Spark SQL, we can't just change it without a way to restore to the old behavior. I'm OK with adding a strict mode for the upcast behavior in table insertion, but I don't agree with making it the default. The default behavior should be either the ANSI SQL behavior or the legacy Spark behavior. > other modes should be allowed only with strict warning the behavior will be > determined by the underlying sink. Seems there is some misunderstanding. The table insertion behavior is fully controlled by Spark. Spark decides when to add cast and Spark decided whet
Re: [Discuss] Follow ANSI SQL on table insertion
I’m -1, simply from disagreeing with the premise that we can afford to not be maximal on standard 3. The correctness of the data is non-negotiable, and whatever solution we settle on cannot silently adjust the user’s data under any circumstances. I think the existing behavior is fine, or perhaps the behavior can be flagged by the destination writer at write time. -Matt Cheah From: Hyukjin Kwon Date: Monday, July 29, 2019 at 11:33 PM To: Wenchen Fan Cc: Russell Spitzer , Takeshi Yamamuro , Gengliang Wang , Ryan Blue , Spark dev list Subject: Re: [Discuss] Follow ANSI SQL on table insertion >From my look, +1 on the proposal, considering ASCI and other DBMSes in general. 2019년 7월 30일 (화) 오후 3:21, Wenchen Fan 님이 작성: We can add a config for a certain behavior if it makes sense, but the most important thing we want to reach an agreement here is: what should be the default behavior? Let's explore the solution space of table insertion behavior first: At compile time, 1. always add cast 2. add cast following the ASNI SQL store assignment rule (e.g. string to int is forbidden but long to int is allowed) 3. only add cast if it's 100% safe At runtime, 1. return null for invalid operations 2. throw exceptions at runtime for invalid operations The standards to evaluate a solution: 1. How robust the query execution is. For example, users usually don't want to see the query fails midway. 2. how tolerant to user queries. For example, a user would like to write long values to an int column as he knows all the long values won't exceed int range. 3. How clean the result is. For example, users usually don't want to see silently corrupted data (null values). The current Spark behavior for Data Source V1 tables: always add cast and return null for invalid operations. This maximizes standard 1 and 2, but the result is least clean and users are very likely to see silently corrupted data (null values). The current Spark behavior for Data Source V2 tables (new in Spark 3.0): only add cast if it's 100% safe. This maximizes standard 1 and 3, but many queries may fail to compile, even if these queries can run on other SQL systems. Note that, people can still see silently corrupted data because cast is not the only one that can return corrupted data. Simple operations like ADD can also return corrected data if overflow happens. e.g. INSERT INTO t1 (intCol) SELECT anotherIntCol + 100 FROM t2 The proposal here: add cast following ANSI SQL store assignment rule, and return null for invalid operations. This maximizes standard 1, and also fits standard 2 well: if a query can't compile in Spark, it usually can't compile in other mainstream databases as well. I think that's tolerant enough. For standard 3, this proposal doesn't maximize it but can avoid many invalid operations already. Technically we can't make the result 100% clean at compile-time, we have to handle things like overflow at runtime. I think the new proposal makes more sense as the default behavior. On Mon, Jul 29, 2019 at 8:31 PM Russell Spitzer wrote: I understand spark is making the decisions, i'm say the actual final effect of the null decision would be different depending on the insertion target if the target has different behaviors for null. On Mon, Jul 29, 2019 at 5:26 AM Wenchen Fan wrote: > I'm a big -1 on null values for invalid casts. This is why we want to introduce the ANSI mode, so that invalid cast fails at runtime. But we have to keep the null behavior for a while, to keep backward compatibility. Spark returns null for invalid cast since the first day of Spark SQL, we can't just change it without a way to restore to the old behavior. I'm OK with adding a strict mode for the upcast behavior in table insertion, but I don't agree with making it the default. The default behavior should be either the ANSI SQL behavior or the legacy Spark behavior. > other modes should be allowed only with strict warning the behavior will be > determined by the underlying sink. Seems there is some misunderstanding. The table insertion behavior is fully controlled by Spark. Spark decides when to add cast and Spark decided whether invalid cast should return null or fail. The sink is only responsible for writing data, not the type coercion/cast stuff. On Sun, Jul 28, 2019 at 12:24 AM Russell Spitzer wrote: I'm a big -1 on null values for invalid casts. This can lead to a lot of even more unexpected errors and runtime behavior since null is 1. Not allowed in all schemas (Leading to a runtime error anyway) 2. Is the same as delete in some systems (leading to data loss) And this would be dependent on the sink being used. Spark won't just be interacting with ANSI compliant sinks so I think it makes much more sense to be strict. I think Upcast mode is a sensible default and other modes should be allowed only with strict warning
Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API
We opened a thread for voting yesterday, so please participate! -Matt Cheah From: Yue Li Date: Thursday, June 13, 2019 at 7:22 PM To: Saisai Shao , Imran Rashid Cc: Matt Cheah , "Yifei Huang (PD)" , Mridul Muralidharan , Bo Yang , Ilan Filonenko , Imran Rashid , Justin Uang , Liang Tang , Marcelo Vanzin , Matei Zaharia , Min Shen , Reynold Xin , Ryan Blue , Vinoo Ganesh , Will Manning , "b...@fb.com" , "dev@spark.apache.org" , "fel...@uber.com" , "f...@linkedin.com" , "tgraves...@gmail.com" , "yez...@linkedin.com" , Cedric Zhuang Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API + Cedric, who is our lead developer of Splash shuffle manager at MemVerge. Fully agreed with Saisai. Thanks! Best, Yue From: Saisai Shao Date: Thursday, June 13, 2019 at 2:52 PM To: Imran Rashid Cc: Matt Cheah , "Yifei Huang (PD)" , Mridul Muralidharan , Bo Yang , Ilan Filonenko , Imran Rashid , Justin Uang , Liang Tang , Marcelo Vanzin , Matei Zaharia , Min Shen , Reynold Xin , Ryan Blue , Vinoo Ganesh , Will Manning , "b...@fb.com" , "dev@spark.apache.org" , "fel...@uber.com" , "f...@linkedin.com" , "tgraves...@gmail.com" , "yez...@linkedin.com" , Yue Li Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API I think maybe we could start a vote on this SPIP. This has been discussed for a while, and the current doc is pretty complete as for now. Also we saw lots of demands in the community about building their own shuffle storage. Thanks Saisai Imran Rashid 于2019年6月11日周二 上午3:27写道: I would be happy to shepherd this. On Wed, Jun 5, 2019 at 7:33 PM Matt Cheah wrote: Hi everyone, I wanted to pick this back up again. The discussion has quieted down both on this thread and on the document. We made a few revisions to the document to hopefully make it easier to read and to clarify our criteria for success in the project. Some of the APIs have also been adjusted based on further discussion and things we’ve learned. I was hoping to discuss what our next steps could be here. Specifically, Would any PMC be willing to become the shepherd for this SPIP? Is there any more feedback regarding this proposal? What would we need to do to take this to a voting phase and to begin proposing our work against upstream Spark? Thanks, -Matt Cheah From: "Yifei Huang (PD)" Date: Monday, May 13, 2019 at 1:04 PM To: Mridul Muralidharan Cc: Bo Yang , Ilan Filonenko , Imran Rashid , Justin Uang , Liang Tang , Marcelo Vanzin , Matei Zaharia , Matt Cheah , Min Shen , Reynold Xin , Ryan Blue , Vinoo Ganesh , Will Manning , "b...@fb.com" , "dev@spark.apache.org" , "fel...@uber.com" , "f...@linkedin.com" , "tgraves...@gmail.com" , "yez...@linkedin.com" , "yue...@memverge.com" Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the points that you mentioned: The API is meant to work with the existing SortShuffleManager algorithm. There aren't strict requirements on how other ShuffleManager implementations must behave, so it seems impractical to design an API that could also satisfy those unknown requirements. However, we do believe that the API is rather generic, using OutputStreams for writes and InputStreams for reads, and indexing the data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms treat the data in the same chunks and want an interface for storage, then they can also use this API from within their implementation. About speculative execution, we originally made the assumption that each shuffle task is deterministic, which meant that even if a later mapper overrode a previous committed mapper's value, it's still the same contents. Having searched some tickets and reading https://github.com/apache/spark/pull/22112/files [github.com] more carefully, I think there are problems with our original thought if the writer writes all attempts of a task to the same location. One example is if the writer implementation writes each partition to the remote host in a sequence of chunks. In such a situation, a reducer might read data half written by the original task and half written by the running speculative task, which will not be the correct contents if the mapper output is unordered. Therefore, writes by a single mapper might have to be transactioned, which is not clear from the API, and seems rather complex to reason about, so we shouldn't expect this from the implementer. However, this doesn't affect the fundamentals of the API since we only need to add an additional attemptId to the storage data index (which can be stored within the MapStatus) to solve the problem
[VOTE][SPARK-25299] SPIP: Shuffle Storage API
Hi everyone, I would like to call a vote for the SPIP for SPARK-25299, which proposes to introduce a pluggable storage API for temporary shuffle data. You may find the SPIP document here. The discussion thread for the SPIP was conducted here. Please vote on whether or not this proposal is agreeable to you. Thanks! -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API
Hi everyone, I wanted to pick this back up again. The discussion has quieted down both on this thread and on the document. We made a few revisions to the document to hopefully make it easier to read and to clarify our criteria for success in the project. Some of the APIs have also been adjusted based on further discussion and things we’ve learned. I was hoping to discuss what our next steps could be here. Specifically, Would any PMC be willing to become the shepherd for this SPIP? Is there any more feedback regarding this proposal? What would we need to do to take this to a voting phase and to begin proposing our work against upstream Spark? Thanks, -Matt Cheah From: "Yifei Huang (PD)" Date: Monday, May 13, 2019 at 1:04 PM To: Mridul Muralidharan Cc: Bo Yang , Ilan Filonenko , Imran Rashid , Justin Uang , Liang Tang , Marcelo Vanzin , Matei Zaharia , Matt Cheah , Min Shen , Reynold Xin , Ryan Blue , Vinoo Ganesh , Will Manning , "b...@fb.com" , "dev@spark.apache.org" , "fel...@uber.com" , "f...@linkedin.com" , "tgraves...@gmail.com" , "yez...@linkedin.com" , "yue...@memverge.com" Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the points that you mentioned: The API is meant to work with the existing SortShuffleManager algorithm. There aren't strict requirements on how other ShuffleManager implementations must behave, so it seems impractical to design an API that could also satisfy those unknown requirements. However, we do believe that the API is rather generic, using OutputStreams for writes and InputStreams for reads, and indexing the data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms treat the data in the same chunks and want an interface for storage, then they can also use this API from within their implementation. About speculative execution, we originally made the assumption that each shuffle task is deterministic, which meant that even if a later mapper overrode a previous committed mapper's value, it's still the same contents. Having searched some tickets and reading https://github.com/apache/spark/pull/22112/files more carefully, I think there are problems with our original thought if the writer writes all attempts of a task to the same location. One example is if the writer implementation writes each partition to the remote host in a sequence of chunks. In such a situation, a reducer might read data half written by the original task and half written by the running speculative task, which will not be the correct contents if the mapper output is unordered. Therefore, writes by a single mapper might have to be transactioned, which is not clear from the API, and seems rather complex to reason about, so we shouldn't expect this from the implementer. However, this doesn't affect the fundamentals of the API since we only need to add an additional attemptId to the storage data index (which can be stored within the MapStatus) to solve the problem of concurrent writes. This would also make it more clear that the writer should use attempt ID as an index to ensure that writes from speculative tasks don't interfere with one another (we can add that to the API docs as well). From: Mridul Muralidharan Date: Wednesday, May 8, 2019 at 8:18 PM To: "Yifei Huang (PD)" Cc: Bo Yang , Ilan Filonenko , Imran Rashid , Justin Uang , Liang Tang , Marcelo Vanzin , Matei Zaharia , Matt Cheah , Min Shen , Reynold Xin , Ryan Blue , Vinoo Ganesh , Will Manning , "b...@fb.com" , "dev@spark.apache.org" , "fel...@uber.com" , "f...@linkedin.com" , "tgraves...@gmail.com" , "yez...@linkedin.com" , "yue...@memverge.com" Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind after a quick read: - While it might be tactically beneficial to align with existing implementation, a clean design which does not tie into existing shuffle implementation would be preferable (if it can be done without over engineering). Shuffle implementation can change and there are custom implementations and experiments which differ quite a bit from what comes with Apache Spark. - Please keep speculative execution in mind while designing the interfaces: in spark, implicitly due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId) and (shuffleId, mapId, reducerId) tuple. When you externalize it, there can be conflict : passing a way to distinguish different tasks for same partition would be necessary for nontrivial implementations. This would be a welcome and much needed enhancement to spark- looking forward to its progress !
Re: [VOTE] Functional DataSourceV2 in Spark 3.0
I want to specifically highlight and +1 a point that Ryan brought up: A commitment binds us to do this and make a reasonable attempt at finishing on time. If we choose not to commit, or if we choose to commit and don’t make a reasonable attempt, then we need to ask, “what happened?” Is Spark the right place for this work? What I don’t want is to work on it for 3-4 more months, miss the release, and then not have anyone take that problem seriously because we never said it was important. If we try and fail, then we need to fix what went wrong. This removes the option to pretend it wasn’t a goal in the first place. That’s why I think it is important that we make a statement that we, the community, intend to do it. This is the crux of the matter we want to tackle here. Whether or not we block the release is a decision we can make when we are closer to the release date. But the fact of the matter is that Data Source V2’s new APIs have not been given the prioritization and urgency that they deserve. This vote is binding us to consider Data Source V2 so important that it needs to be prioritized far more highly than it is right now, to the point where we would at least consider delaying the release if it meant we could finish the work. I also don’t quite follow the reason why we shouldn’t consider features to be as important to target as API breaks in major versions. When major versions of any software product are introduced, they certainly include API breaks as necessary, but they also add new features that give users incentive to upgrade in the first place. If all we do is introduce API breaks but no new features or critical bug fixes (and critical bug fixes are often severe enough that they’re backported to earlier branches anyways), what appeal is there for users to upgrade to that latest version? -Matt Cheah On 2/28/19, 1:37 PM, "Mridul Muralidharan" wrote: I am -1 on this vote for pretty much all the reasons that Mark mentioned. A major version change gives us an opportunity to remove deprecated interfaces, stabilize experimental/developer api, drop support for outdated functionality/platforms and evolve the project with a vision for foreseeable future. IMO the primary focus should be on interface evolution, stability and lowering tech debt which might result in breaking changes. Which is not to say DSv2 should not be part of 3.0 Along with a lot of other exciting features also being added, it can be one more important enhancement. But I am not for delaying the release simply to accommodate a specific feature. Features can be added in subsequent as well - I am yet to hear of a good reason why it must be make it into 3.0 to need a VOTE thread. Regards, Mridul On Thu, Feb 28, 2019 at 10:44 AM Mark Hamstra wrote: > > I agree that adding new features in a major release is not forbidden, but that is just not the primary goal of a major release. If we reach the point where we are happy with the new public API before some new features are in a satisfactory state to be merged, then I don't want there to be a prior presumption that we cannot complete the primary goal of the major release. If at that point you want to argue that it is worth waiting for some new feature, then that would be fine and may have sufficient merits to warrant some delay. > > Regardless of whether significant new public API comes into a major release or a feature release, it should come in with an experimental annotation so that we can make changes without requiring a new major release. > > If you want to argue that some new features that are currently targeting 3.0.0 are significant enough that one or more of them should justify an accelerated 3.1.0 release schedule if it is not ready in time for the 3.0.0 release, then I can much more easily get behind that kind of commitment; but I remain opposed to the notion of promoting any new features to the status of blockers of 3.0.0 at this time. > > On Thu, Feb 28, 2019 at 10:23 AM Ryan Blue wrote: >> >> Mark, I disagree. Setting common goals is a critical part of getting things done. >> >> This doesn't commit the community to push out the release if the goals aren't met, but does mean that we will, as a community, seriously consider it. This is also an acknowledgement that this is the most important feature in the next release (whether major or minor) for many of us. This has been in limbo for a very long time, so I think it is important for the community to commit to getting it to a functional state. >> >> It sounds like your objection is to this commitment for 3.0, but remember that 3.0 is the next release so that we can remove deprecated APIs. It does not mean that we aren't a
Re: [VOTE] SPIP: Spark API for Table Metadata
+1 (non-binding) From: Jamison Bennett Date: Thursday, February 28, 2019 at 8:28 AM To: Ryan Blue , Spark Dev List Subject: Re: [VOTE] SPIP: Spark API for Table Metadata +1 (non-binding) Jamison Bennett Cloudera Software Engineer jamison.benn...@cloudera.com 515 Congress Ave, Suite 1212 | Austin, TX | 78701 On Thu, Feb 28, 2019 at 10:20 AM Ryan Blue wrote: +1 (non-binding) On Wed, Feb 27, 2019 at 8:34 PM Russell Spitzer wrote: +1 (non-binding) On Wed, Feb 27, 2019, 6:28 PM Ryan Blue wrote: Hi everyone, In the last DSv2 sync, the consensus was that the table metadata SPIP was ready to bring up for a vote. Now that the multi-catalog identifier SPIP vote has passed, I'd like to start one for the table metadata API, TableCatalog. The proposal is for adding a TableCatalog interface that will be used by v2 plans. That interface has methods to load, create, drop, alter, refresh, rename, and check existence for tables. It also specifies the set of metadata used to configure tables: schema, partitioning, and key-value properties. For more information, please read the SPIP proposal doc [docs.google.com]. Please vote in the next 3 days. [ ] +1: Accept the proposal as an official SPIP [ ] +0 [ ] -1: I don't think this is a good idea because ... Thanks! -- Ryan Blue Software Engineer Netflix -- Ryan Blue Software Engineer Netflix smime.p7s Description: S/MIME cryptographic signature
Re: [VOTE] Functional DataSourceV2 in Spark 3.0
+1 (non-binding) Are identifiers and namespaces going to be rolled under one of those six points? From: Ryan Blue Reply-To: "rb...@netflix.com" Date: Thursday, February 28, 2019 at 8:39 AM To: Spark Dev List Subject: [VOTE] Functional DataSourceV2 in Spark 3.0 I’d like to call a vote for committing to getting DataSourceV2 in a functional state for Spark 3.0. For more context, please see the discussion thread, but here is a quick summary about what this commitment means: · We think that a “functional DSv2” is an achievable goal for the Spark 3.0 release · We will consider this a blocker for Spark 3.0, and take reasonable steps to make it happen · We will not delay the release without a community discussion Here’s what we’ve defined as a functional DSv2: · Add a plugin system for catalogs · Add an interface for table catalogs (see the ongoing SPIP vote) · Add an implementation of the new interface that calls SessionCatalog to load v2 tables · Add a resolution rule to load v2 tables from the v2 catalog · Add CTAS logical and physical plan nodes · Add conversions from SQL parsed plans to v2 logical plans (e.g., INSERT INTO support) Please vote in the next 3 days on whether you agree with committing to this goal. [ ] +1: Agree that we should consider a functional DSv2 implementation a blocker for Spark 3.0 [ ] +0: . . . [ ] -1: I disagree with this goal because . . . Thank you! -- Ryan Blue Software Engineer Netflix smime.p7s Description: S/MIME cryptographic signature
Re: [DISCUSS] Spark 3.0 and DataSourceV2
Will that then require an API break down the line? Do we save that for Spark 4? -Matt Cheah? From: Ryan Blue Reply-To: "rb...@netflix.com" Date: Tuesday, February 26, 2019 at 4:53 PM To: Matt Cheah Cc: Sean Owen , Wenchen Fan , Xiao Li , Matei Zaharia , Spark Dev List Subject: Re: [DISCUSS] Spark 3.0 and DataSourceV2 That's a good question. While I'd love to have a solution for that, I don't think it is a good idea to delay DSv2 until we have one. That is going to require a lot of internal changes and I don't see how we could make the release date if we are including an InternalRow replacement. On Tue, Feb 26, 2019 at 4:41 PM Matt Cheah wrote: Reynold made a note earlier about a proper Row API that isn’t InternalRow – is that still on the table? -Matt Cheah From: Ryan Blue Reply-To: "rb...@netflix.com" Date: Tuesday, February 26, 2019 at 4:40 PM To: Matt Cheah Cc: Sean Owen , Wenchen Fan , Xiao Li , Matei Zaharia , Spark Dev List Subject: Re: [DISCUSS] Spark 3.0 and DataSourceV2 Thanks for bumping this, Matt. I think we can have the discussion here to clarify exactly what we’re committing to and then have a vote thread once we’re agreed. Getting back to the DSv2 discussion, I think we have a good handle on what would be added: · Plugin system for catalogs · TableCatalog interface (I’ll start a vote thread for this SPIP shortly) · TableCatalog implementation backed by SessionCatalog that can load v2 tables · Resolution rule to load v2 tables using the new catalog · CTAS logical and physical plan nodes · Conversions from SQL parsed logical plans to v2 logical plans Initially, this will always use the v2 catalog backed by SessionCatalog to avoid dependence on the multi-catalog work. All of those are already implemented and working, so I think it is reasonable that we can get them in. Then we can consider a few stretch goals: · Get in as much DDL as we can. I think create and drop table should be easy. · Multi-catalog identifier parsing and multi-catalog support If we get those last two in, it would be great. We can make the call closer to release time. Does anyone want to change this set of work? On Tue, Feb 26, 2019 at 4:23 PM Matt Cheah wrote: What would then be the next steps we'd take to collectively decide on plans and timelines moving forward? Might I suggest scheduling a conference call with appropriate PMCs to put our ideas together? Maybe such a discussion can take place at next week's meeting? Or do we need to have a separate formalized voting thread which is guided by a PMC? My suggestion is to try to make concrete steps forward and to avoid letting this slip through the cracks. I also think there would be merits to having a project plan and estimates around how long each of the features we want to complete is going to take to implement and review. -Matt Cheah On 2/24/19, 3:05 PM, "Sean Owen" wrote: Sure, I don't read anyone making these statements though? Let's assume good intent, that "foo should happen" as "my opinion as a member of the community, which is not solely up to me, is that foo should happen". I understand it's possible for a person to make their opinion over-weighted; this whole style of decision making assumes good actors and doesn't optimize against bad ones. Not that it can't happen, just not seeing it here. I have never seen any vote on a feature list, by a PMC or otherwise. We can do that if really needed I guess. But that also isn't the authoritative process in play here, in contrast. If there's not a more specific subtext or issue here, which is fine to say (on private@ if it's sensitive or something), yes, let's move on in good faith. On Sun, Feb 24, 2019 at 3:45 PM Mark Hamstra wrote: > There is nothing wrong with individuals advocating for what they think should or should not be in Spark 3.0, nor should anyone shy away from explaining why they think delaying the release for some reason is or isn't a good idea. What is a problem, or is at least something that I have a problem with, are declarative, pseudo-authoritative statements that 3.0 (or some other release) will or won't contain some feature, API, etc. or that some issue is or is not blocker or worth delaying for. When the PMC has not voted on such issues, I'm often left thinking, "Wait... what? Who decided that, or where did that decision come from?" -- Ryan Blue Software Engineer Netflix -- Ryan Blue Software Engineer Netflix smime.p7s Description: S/MIME cryptographic signature
Re: [DISCUSS] Spark 3.0 and DataSourceV2
Reynold made a note earlier about a proper Row API that isn’t InternalRow – is that still on the table? -Matt Cheah From: Ryan Blue Reply-To: "rb...@netflix.com" Date: Tuesday, February 26, 2019 at 4:40 PM To: Matt Cheah Cc: Sean Owen , Wenchen Fan , Xiao Li , Matei Zaharia , Spark Dev List Subject: Re: [DISCUSS] Spark 3.0 and DataSourceV2 Thanks for bumping this, Matt. I think we can have the discussion here to clarify exactly what we’re committing to and then have a vote thread once we’re agreed. Getting back to the DSv2 discussion, I think we have a good handle on what would be added: · Plugin system for catalogs · TableCatalog interface (I’ll start a vote thread for this SPIP shortly) · TableCatalog implementation backed by SessionCatalog that can load v2 tables · Resolution rule to load v2 tables using the new catalog · CTAS logical and physical plan nodes · Conversions from SQL parsed logical plans to v2 logical plans Initially, this will always use the v2 catalog backed by SessionCatalog to avoid dependence on the multi-catalog work. All of those are already implemented and working, so I think it is reasonable that we can get them in. Then we can consider a few stretch goals: · Get in as much DDL as we can. I think create and drop table should be easy. · Multi-catalog identifier parsing and multi-catalog support If we get those last two in, it would be great. We can make the call closer to release time. Does anyone want to change this set of work? On Tue, Feb 26, 2019 at 4:23 PM Matt Cheah wrote: What would then be the next steps we'd take to collectively decide on plans and timelines moving forward? Might I suggest scheduling a conference call with appropriate PMCs to put our ideas together? Maybe such a discussion can take place at next week's meeting? Or do we need to have a separate formalized voting thread which is guided by a PMC? My suggestion is to try to make concrete steps forward and to avoid letting this slip through the cracks. I also think there would be merits to having a project plan and estimates around how long each of the features we want to complete is going to take to implement and review. -Matt Cheah On 2/24/19, 3:05 PM, "Sean Owen" wrote: Sure, I don't read anyone making these statements though? Let's assume good intent, that "foo should happen" as "my opinion as a member of the community, which is not solely up to me, is that foo should happen". I understand it's possible for a person to make their opinion over-weighted; this whole style of decision making assumes good actors and doesn't optimize against bad ones. Not that it can't happen, just not seeing it here. I have never seen any vote on a feature list, by a PMC or otherwise. We can do that if really needed I guess. But that also isn't the authoritative process in play here, in contrast. If there's not a more specific subtext or issue here, which is fine to say (on private@ if it's sensitive or something), yes, let's move on in good faith. On Sun, Feb 24, 2019 at 3:45 PM Mark Hamstra wrote: > There is nothing wrong with individuals advocating for what they think should or should not be in Spark 3.0, nor should anyone shy away from explaining why they think delaying the release for some reason is or isn't a good idea. What is a problem, or is at least something that I have a problem with, are declarative, pseudo-authoritative statements that 3.0 (or some other release) will or won't contain some feature, API, etc. or that some issue is or is not blocker or worth delaying for. When the PMC has not voted on such issues, I'm often left thinking, "Wait... what? Who decided that, or where did that decision come from?" -- Ryan Blue Software Engineer Netflix smime.p7s Description: S/MIME cryptographic signature
Re: [DISCUSS] Spark 3.0 and DataSourceV2
What would then be the next steps we'd take to collectively decide on plans and timelines moving forward? Might I suggest scheduling a conference call with appropriate PMCs to put our ideas together? Maybe such a discussion can take place at next week's meeting? Or do we need to have a separate formalized voting thread which is guided by a PMC? My suggestion is to try to make concrete steps forward and to avoid letting this slip through the cracks. I also think there would be merits to having a project plan and estimates around how long each of the features we want to complete is going to take to implement and review. -Matt Cheah On 2/24/19, 3:05 PM, "Sean Owen" wrote: Sure, I don't read anyone making these statements though? Let's assume good intent, that "foo should happen" as "my opinion as a member of the community, which is not solely up to me, is that foo should happen". I understand it's possible for a person to make their opinion over-weighted; this whole style of decision making assumes good actors and doesn't optimize against bad ones. Not that it can't happen, just not seeing it here. I have never seen any vote on a feature list, by a PMC or otherwise. We can do that if really needed I guess. But that also isn't the authoritative process in play here, in contrast. If there's not a more specific subtext or issue here, which is fine to say (on private@ if it's sensitive or something), yes, let's move on in good faith. On Sun, Feb 24, 2019 at 3:45 PM Mark Hamstra wrote: > There is nothing wrong with individuals advocating for what they think should or should not be in Spark 3.0, nor should anyone shy away from explaining why they think delaying the release for some reason is or isn't a good idea. What is a problem, or is at least something that I have a problem with, are declarative, pseudo-authoritative statements that 3.0 (or some other release) will or won't contain some feature, API, etc. or that some issue is or is not blocker or worth delaying for. When the PMC has not voted on such issues, I'm often left thinking, "Wait... what? Who decided that, or where did that decision come from?" smime.p7s Description: S/MIME cryptographic signature
Re: [DISCUSS] Spark 3.0 and DataSourceV2
To evaluate the amount of work required to get Data Source V2 into Spark 3.0, we should have a list of all the specific SPIPs and patches that are pending that would constitute a successful and usable revamp of that API. Here are the ones I could find and know off the top of my head: Table Catalog API: https://issues.apache.org/jira/browse/SPARK-24252 In my opinion this is by far the most important API to get in, but it’s also the most important API to give thorough thought and evaluation. Remaining logical plans for CTAS, RTAS, DROP / DELETE, OVERWRITE: https://issues.apache.org/jira/browse/SPARK-24923 + https://issues.apache.org/jira/browse/SPARK-24253 Catalogs for other entities, such as functions. Pluggable system for loading these. Multi-Catalog support - https://issues.apache.org/jira/browse/SPARK-25006 Migration of existing sources to V2, particularly file sources like Parquet and ORC – requires #1 as discussed in yesterday’s meeting Can someone add to this list if we’re missing anything? It might also make sense to either assigned a JIRA label or to update JIRA umbrella issues if any. Whatever mechanism works for being able to find all of these outstanding issues in one place. My understanding is that #1 is the most critical feature we need, and the feature that will go a long way towards allowing everything else to fall into place. #2 is also critical for external implementations of Data Source V2. I think we can afford to defer 3-5 to a future point release. But #1 and #2 are also the features that have remained open for the longest time and we really need to move forward on these. Putting a target release for 3.0 will help in that regard. -Matt Cheah From: Ryan Blue Reply-To: "rb...@netflix.com" Date: Thursday, February 21, 2019 at 2:22 PM To: Matei Zaharia Cc: Spark Dev List Subject: Re: [DISCUSS] Spark 3.0 and DataSourceV2 I'm all for making releases more often if we want. But this work could really use a target release to motivate getting it done. If we agree that it will block a release, then everyone is motivated to review and get the PRs in. If this work doesn't make it in the 3.0 release, I'm not confident that it will get done. Maybe we can have a release shortly after, but the timeline for these features -- that many of us need -- is nearly creeping into years. That's when alternatives start looking more likely to deliver. I'd rather see this work get in so we don't have to consider those alternatives, which is why I think this commitment is a good idea. I also would like to see multi-catalog support, but that is more reasonable to put off for a follow-up feature release, maybe 3.1. On Thu, Feb 21, 2019 at 1:45 PM Matei Zaharia wrote: How large would the delay be? My 2 cents are that there’s nothing stopping us from making feature releases more often if we want to, so we shouldn’t see this as an “either delay 3.0 or release in >6 months” decision. If the work is likely to get in with a small delay and simplifies our work after 3.0 (e.g. we can get rid of older APIs), then the delay may be worth it. But if it would be a large delay, we should also weigh it against other things that are going to get delayed if 3.0 moves much later. It might also be better to propose a specific date to delay until, so people can still plan around when the release branch will likely be cut. Matei > On Feb 21, 2019, at 1:03 PM, Ryan Blue wrote: > > Hi everyone, > > In the DSv2 sync last night, we had a discussion about roadmap and what the > goal should be for getting the main features into Spark. We all agreed that > 3.0 should be that goal, even if it means delaying the 3.0 release. > > The possibility of delaying the 3.0 release may be controversial, so I want > to bring it up to the dev list to build consensus around it. The rationale > for this is partly that much of this work has been outstanding for more than > a year now. If it doesn't make it into 3.0, then it would be another 6 months > before it would be in a release, and would be nearing 2 years to get the work > done. > > Are there any objections to targeting 3.0 for this? > > In addition, much of the planning for multi-catalog support has been done to > make v2 possible. Do we also want to include multi-catalog support? > > > rb > > -- > Ryan Blue > Software Engineer > Netflix -- Ryan Blue Software Engineer Netflix smime.p7s Description: S/MIME cryptographic signature
Re: building docker images for GPU
I will reiterate some feedback I left on the PR. Firstly, it’s not immediately clear if we should be opinionated around supporting GPUs in the Docker image in a first class way. Firstly there’s the question of how we arbitrate the kinds of customizations we support moving forward. For example if we say we support GPUs now, what’s to say that we should not also support FPGAs? Also what kind of testing can we add to CI to ensure what we’ve provided in this Dockerfile works? Instead we can make the Spark images have bare minimum support for basic Spark applications, and then provide detailed instructions for how to build custom Docker images (mostly just needing to make sure the custom image has the right entry point). -Matt Cheah From: Rong Ou Date: Friday, February 8, 2019 at 2:28 PM To: "dev@spark.apache.org" Subject: building docker images for GPU Hi spark dev, I created a JIRA issue a while ago (https://issues.apache.org/jira/browse/SPARK-26398 [issues.apache.org]) to add GPU support to Spark docker images, and sent a PR (https://github.com/apache/spark/pull/23347 [github.com]) that went through several iterations. It was suggested that it should be discussed on the dev mailing list, so here we are. Please chime in if you have any questions or concerns. A little more background. I mainly looked at running XGBoost on Spark using GPUs. Preliminary results have shown that there is potential for significant speedup in training time. This seems like a popular use case for Spark. In any event, it'd be nice for Spark to have better support for GPUs. Building gpu-enabled docker images seems like a useful first step. Thanks, Rong smime.p7s Description: S/MIME cryptographic signature
Re: [DISCUSS] Identifiers with multi-catalog support
+1 for n-part namespace as proposed. Agree that a short SPIP would be appropriate for this. Perhaps also a JIRA ticket? -Matt Cheah From: Felix Cheung Date: Sunday, January 20, 2019 at 4:48 PM To: "rb...@netflix.com" , Spark Dev List Subject: Re: [DISCUSS] Identifiers with multi-catalog support +1 I like Ryan last mail. Thank you for putting it clearly (should be a spec/SPIP!) I agree and understand the need for 3 part id. However I don’t think we should make assumption that it must be or can only be as long as 3 parts. Once the catalog is identified (ie. The first part), the catalog should be responsible for resolving the namespace or schema etc. Agree also path is good idea to add to support file-based variant. Should separator be optional (perhaps in *space) to keep this extensible (it might not always be ‘.’) Also this whole scheme will need to play nice with column identifier as well. From: Ryan Blue Sent: Thursday, January 17, 2019 11:38 AM To: Spark Dev List Subject: Re: [DISCUSS] Identifiers with multi-catalog support Any discussion on how Spark should manage identifiers when multiple catalogs are supported? I know this is an area where a lot of people are interested in making progress, and it is a blocker for both multi-catalog support and CTAS in DSv2. On Sun, Jan 13, 2019 at 2:22 PM Ryan Blue wrote: I think that the solution to this problem is to mix the two approaches by supporting 3 identifier parts: catalog, namespace, and name, where namespace can be an n-part identifier: type Namespace = Seq[String] case class CatalogIdentifier(space: Namespace, name: String) This allows catalogs to work with the hierarchy of the external store, but the catalog API only requires a few discovery methods to list namespaces and to list each type of object in a namespace. def listNamespaces(): Seq[Namespace] def listNamespaces(space: Namespace, prefix: String): Seq[Namespace] def listTables(space: Namespace): Seq[CatalogIdentifier] def listViews(space: Namespace): Seq[CatalogIdentifier] def listFunctions(space: Namespace): Seq[CatalogIdentifier] The methods to list tables, views, or functions, would only return identifiers for the type queried, not namespaces or the other objects. The SQL parser would be updated so that identifiers are parsed to UnresovledIdentifier(parts: Seq[String]), and resolution would work like this pseudo-code: def resolveIdentifier(ident: UnresolvedIdentifier): (CatalogPlugin, CatalogIdentifier) = { val maybeCatalog = sparkSession.catalog(ident.parts.head) ident.parts match { case Seq(catalogName, *space, name) if catalog.isDefined => (maybeCatalog.get, CatalogIdentifier(space, name)) case Seq(*space, name) => (sparkSession.defaultCatalog, CatalogIdentifier(space, name)) } } I think this is a good approach because it allows Spark users to reference or discovery any name in the hierarchy of an external store, it uses a few well-defined methods for discovery, and makes name hierarchy a user concern. · SHOW (DATABASES|SCHEMAS|NAMESPACES) would return the result of listNamespaces() · SHOW NAMESPACES LIKE a.b% would return the result of listNamespaces(Seq("a"), "b") · USE a.b would set the current namespace to Seq("a", "b") · SHOW TABLES would return the result of listTables(currentNamespace) Also, I think that we could generalize this a little more to support path-based tables by adding a path to CatalogIdentifier, either as a namespace or as a separate optional string. Then, the identifier passed to a catalog would work for either a path-based table or a catalog table, without needing a path-based catalog API. Thoughts? On Sun, Jan 13, 2019 at 1:38 PM Ryan Blue wrote: In the DSv2 sync up, we tried to discuss the Table metadata proposal but were side-tracked on its use of TableIdentifier. There were good points about how Spark should identify tables, views, functions, etc, and I want to start a discussion here. Identifiers are orthogonal to the TableCatalog proposal that can be updated to use whatever identifier class we choose. That proposal is concerned with what information should be passed to define a table, and how to pass that information. The main question for this discussion is: how should Spark identify tables, views, and functions when it supports multiple catalogs? There are two main approaches: 1. Use a 3-part identifier, catalog.database.table 2. Use an identifier with an arbitrary number of parts Option 1: use 3-part identifiers The argument for option #1 is that it is simple. If an external data store has additional logical hierarchy layers, then that hierarchy would be mapped to multiple catalogs in Spark. Spark can support show tables and show databases without much trouble. This is the approach used by Presto, so there is some preced
SPARk-25299: Updates As Of December 19, 2018
Hi everyone, Earlier this year, we proposed SPARK-25299, proposing the idea of using other storage systems for persisting shuffle files. Since that time, we have been continuing to work on prototypes for this project. In the interest of increasing transparency into our work, we have created a progress report document where you may find a summary of the work we have been doing, as well as links to our prototypes on Github. We would ask that anyone who is very familiar with the inner workings of Spark’s shuffle could provide feedback and comments on our work thus far. We welcome any further discussion in this space. You may comment in this e-mail thread or by commenting on the progress report document. Looking forward to hearing from you. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: [DISCUSS] Function plugins
How would this work with: Codegen – how does one generate code given a user’s UDF? Would the user be able to specify the code that is generated that represents their function? In practice that’s pretty hard to get right. Row serialization and representation – Will the UDF receive catalyst rows with optimized internal representations, or will Spark have to convert to something more easily consumed by a UDF? Otherwise +1 for trying to get this to work without Hive. I think even having something without codegen and optimized row formats is worthwhile if only because it’s easier to use than Hive UDFs. -Matt Cheah From: Reynold Xin Date: Friday, December 14, 2018 at 1:49 PM To: "rb...@netflix.com" Cc: Spark Dev List Subject: Re: [DISCUSS] Function plugins Having a way to register UDFs that are not using Hive APIs would be great! On Fri, Dec 14, 2018 at 1:30 PM, Ryan Blue wrote: Hi everyone, I’ve been looking into improving how users of our Spark platform register and use UDFs and I’d like to discuss a few ideas for making this easier. The motivation for this is the use case of defining a UDF from SparkSQL or PySpark. We want to make it easy to write JVM UDFs and use them from both SQL and Python. Python UDFs work great in most cases, but we occasionally don’t want to pay the cost of shipping data to python and processing it there so we want to make it easy to register UDFs that will run in the JVM. There is already syntax to create a function from a JVM class [docs.databricks.com] in SQL that would work, but this option requires using the Hive UDF API instead of Spark’s simpler Scala API. It also requires argument translation and doesn’t support codegen. Beyond the problem of the API and performance, it is annoying to require registering every function individually with a CREATE FUNCTION statement. The alternative that I’d like to propose is to add a way to register a named group of functions using the proposed catalog plugin API. For anyone unfamiliar with the proposed catalog plugins, the basic idea is to load and configure plugins using a simple property-based scheme. Those plugins expose functionality through mix-in interfaces, like TableCatalog to create/drop/load/alter tables. Another interface could be UDFCatalog that can load UDFs. interface UDFCatalog extends CatalogPlugin { UserDefinedFunction loadUDF(String name) } To use this, I would create a UDFCatalog class that returns my Scala functions as UDFs. To look up functions, we would use both the catalog name and the function name. This would allow my users to write Scala UDF instances, package them using a UDFCatalog class (provided by me), and easily use them in Spark with a few configuration options to add the catalog in their environment. This would also allow me to expose UDF libraries easily in my configuration, like brickhouse [community.cloudera.com], without users needing to ensure the Jar is loaded and register individual functions. Any thoughts on this high-level approach? I know that this ignores things like creating and storing functions in a FunctionCatalog, and we’d have to solve challenges with function naming (whether there is a db component). Right now I’d like to think through the overall idea and not get too focused on those details. Thanks, rb -- Ryan Blue Software Engineer Netflix smime.p7s Description: S/MIME cryptographic signature
Re: time for Apache Spark 3.0?
I just added the label to https://issues.apache.org/jira/browse/SPARK-25908. Unsure if there are any others. I’ll look through the tickets and see if there are any that are missing the label. -Matt Cheah From: Sean Owen Date: Tuesday, November 13, 2018 at 12:09 PM To: Matt Cheah Cc: Sean Owen , Vinoo Ganesh , dev Subject: Re: time for Apache Spark 3.0? As far as I know any JIRA that has implications for users is tagged this way but I haven't examined all of them. All that are going in for 3.0 should have it as Fix Version . Most changes won't have a user visible impact. Do you see any that seem to need the tag? Call em out or even fix them by adding the tag and proposed release notes. On Tue, Nov 13, 2018, 11:49 AM Matt Cheah wrote: My non-definitive takes -- I would personally like to remove all deprecated methods for Spark 3. I started by removing 'old' deprecated methods in that commit. Things deprecated in 2.4 are maybe less clear, whether they should be removed Everything's fair game for removal or change in a major release. So far some items in discussion seem to be Scala 2.11 support, Python 2 support, R support before 3.4. I don't know about other APIs. Generally, take a look at JIRA for items targeted at version 3.0. Not everything targeted for 3.0 is going in, but ones from committers are more likely than others. Breaking changes ought to be tagged 'release-notes' with a description of the change. The release itself has a migration guide that's being updated as we go. On Mon, Nov 12, 2018 at 5:49 PM Matt Cheah wrote: > > I wanted to clarify what categories of APIs are eligible to be broken in Spark 3.0. Specifically: > > > > Are we removing all deprecated methods? If we’re only removing some subset of deprecated methods, what is that subset? I see a bunch were removed in https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_pull_22921=DwIFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=yQSElmBeMSlm-LdOsYqwPm3ZZJaoBktOmNYSGTF7FKk=_pRqHGBRV-RX3Ij_qSDb7bevUDmqENa-4caKSr5xs88= for example. Are we only committed to removing methods that were deprecated in some Spark version and earlier? > Aside from removing support for Scala 2.11, what other kinds of (non-experimental and non-evolving) APIs are eligible to be broken? > Is there going to be a way to track the current list of all proposed breaking changes / JIRA tickets? Perhaps we can include it in the JIRA ticket that can be filtered down to somehow? > - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org smime.p7s Description: S/MIME cryptographic signature
Re: time for Apache Spark 3.0?
The release-notes label on JIRA sounds good. Can we make it a point to have that done retroactively now, and then moving forward? On 11/12/18, 4:01 PM, "Sean Owen" wrote: My non-definitive takes -- I would personally like to remove all deprecated methods for Spark 3. I started by removing 'old' deprecated methods in that commit. Things deprecated in 2.4 are maybe less clear, whether they should be removed Everything's fair game for removal or change in a major release. So far some items in discussion seem to be Scala 2.11 support, Python 2 support, R support before 3.4. I don't know about other APIs. Generally, take a look at JIRA for items targeted at version 3.0. Not everything targeted for 3.0 is going in, but ones from committers are more likely than others. Breaking changes ought to be tagged 'release-notes' with a description of the change. The release itself has a migration guide that's being updated as we go. On Mon, Nov 12, 2018 at 5:49 PM Matt Cheah wrote: > > I wanted to clarify what categories of APIs are eligible to be broken in Spark 3.0. Specifically: > > > > Are we removing all deprecated methods? If we’re only removing some subset of deprecated methods, what is that subset? I see a bunch were removed in https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_pull_22921=DwIFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=yQSElmBeMSlm-LdOsYqwPm3ZZJaoBktOmNYSGTF7FKk=_pRqHGBRV-RX3Ij_qSDb7bevUDmqENa-4caKSr5xs88= for example. Are we only committed to removing methods that were deprecated in some Spark version and earlier? > Aside from removing support for Scala 2.11, what other kinds of (non-experimental and non-evolving) APIs are eligible to be broken? > Is there going to be a way to track the current list of all proposed breaking changes / JIRA tickets? Perhaps we can include it in the JIRA ticket that can be filtered down to somehow? > - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org smime.p7s Description: S/MIME cryptographic signature
Re: time for Apache Spark 3.0?
I wanted to clarify what categories of APIs are eligible to be broken in Spark 3.0. Specifically: Are we removing all deprecated methods? If we’re only removing some subset of deprecated methods, what is that subset? I see a bunch were removed in https://github.com/apache/spark/pull/22921 for example. Are we only committed to removing methods that were deprecated in some Spark version and earlier? Aside from removing support for Scala 2.11, what other kinds of (non-experimental and non-evolving) APIs are eligible to be broken? Is there going to be a way to track the current list of all proposed breaking changes / JIRA tickets? Perhaps we can include it in the JIRA ticket that can be filtered down to somehow? Thanks, -Matt Cheah From: Vinoo Ganesh Date: Monday, November 12, 2018 at 2:48 PM To: Reynold Xin Cc: Xiao Li , Matei Zaharia , Ryan Blue , Mark Hamstra , dev Subject: Re: time for Apache Spark 3.0? Makes sense, thanks Reynold. From: Reynold Xin Date: Monday, November 12, 2018 at 16:57 To: Vinoo Ganesh Cc: Xiao Li , Matei Zaharia , Ryan Blue , Mark Hamstra , dev Subject: Re: time for Apache Spark 3.0? Master branch now tracks 3.0.0-SHAPSHOT version, so the next one will be 3.0. In terms of time lining, unless we change anything specifically, Spark feature releases are on a 6-mo cadence. Spark 2.4 was just released last week, so 3.0 will be roughly 6 month from now. On Mon, Nov 12, 2018 at 1:54 PM Vinoo Ganesh wrote: Quickly following up on this – is there a target date for when Spark 3.0 may be released and/or a list of the likely api breaks that are anticipated? From: Xiao Li Date: Saturday, September 29, 2018 at 02:09 To: Reynold Xin Cc: Matei Zaharia , Ryan Blue , Mark Hamstra , "u...@spark.apache.org" Subject: Re: time for Apache Spark 3.0? Yes. We should create a SPIP for each major breaking change. Reynold Xin 于2018年9月28日周五 下午11:05写道: i think we should create spips for some of them, since they are pretty large ... i can create some tickets to start with -- excuse the brevity and lower case due to wrist injury On Fri, Sep 28, 2018 at 11:01 PM Xiao Li wrote: Based on the above discussions, we have a "rough consensus" that the next release will be 3.0. Now, we can start working on the API breaking changes (e.g., the ones mentioned in the original email from Reynold). Cheers, Xiao Matei Zaharia 于2018年9月6日周四 下午2:21写道: Yes, you can start with Unstable and move to Evolving and Stable when needed. We’ve definitely had experimental features that changed across maintenance releases when they were well-isolated. If your change risks breaking stuff in stable components of Spark though, then it probably won’t be suitable for that. > On Sep 6, 2018, at 1:49 PM, Ryan Blue wrote: > > I meant flexibility beyond the point releases. I think what Reynold was > suggesting was getting v2 code out more often than the point releases every 6 > months. An Evolving API can change in point releases, but maybe we should > move v2 to Unstable so it can change more often? I don't really see another > way to get changes out more often. > > On Thu, Sep 6, 2018 at 11:07 AM Mark Hamstra wrote: > Yes, that is why we have these annotations in the code and the corresponding > labels appearing in the API documentation: > https://github.com/apache/spark/blob/master/common/tags/src/main/java/org/apache/spark/annotation/InterfaceStability.java > [github.com] > > As long as it is properly annotated, we can change or even eliminate an API > method before the next major release. And frankly, we shouldn't be > contemplating bringing in the DS v2 API (and, I'd argue, any new API) without > such an annotation. There is just too much risk of not getting everything > right before we see the results of the new API being more widely used, and > too much cost in maintaining until the next major release something that we > come to regret for us to create new API in a fully frozen state. > > > On Thu, Sep 6, 2018 at 9:49 AM Ryan Blue wrote: > It would be great to get more features out incrementally. For experimental > features, do we have more relaxed constraints? > > On Thu, Sep 6, 2018 at 9:47 AM Reynold Xin wrote: > +1 on 3.0 > > Dsv2 stable can still evolve in across major releases. DataFrame, Dataset, > dsv1 and a lot of other major features all were developed throughout the 1.x > and 2.x lines. > > I do want to explore ways for us to get dsv2 incremental changes out there > more frequently, to get feedback. Maybe that means we apply additive changes > to 2.4.x; maybe that means making another 2.5 release sooner. I will start a > separate thread about it. > > > > On Thu, Sep 6, 2018 at 9:31 AM Sean Owen wrote: > I think this doesn't necessarily mean 3.0 i
Re: [DISCUSS][K8S] Local dependencies with Kubernetes
Relying on kubectl exec may not be the best solution because clusters with locked down security will not grant users permissions to execute arbitrary code in pods. I can’t think of a great alternative right now but I wanted to bring this to our attention for the time being. -Matt Cheah From: Rob Vesse Date: Monday, October 8, 2018 at 10:09 AM To: dev Subject: Re: [DISCUSS][K8S] Local dependencies with Kubernetes Well yes. However the submission client is already able to monitor the driver pod status so can see when it is up and running. And couldn’t we potentially modify the K8S entry points e.g. KubernetesClientApplication that run inside the driver pods to wait for dependencies to be uploaded? I guess at this stage I am just throwing ideas out there and trying to figure out what’s practical/reasonable Rob From: Yinan Li Date: Monday, 8 October 2018 at 17:36 To: Rob Vesse Cc: dev Subject: Re: [DISCUSS][K8S] Local dependencies with Kubernetes However, the pod must be up and running for this to work. So if you want to use this to upload dependencies to the driver pod, the driver pod must already be up and running. So you may not even have a chance to upload the dependencies at this point. smime.p7s Description: S/MIME cryptographic signature
Re: [Feedback Requested] SPARK-25299: Using Distributed Storage for Persisting Shuffle Data
Yuanjian, Thanks for sharing your progress! I was wondering if there was any prototype code that we could read to get an idea of what the implementation looks like? We can evaluate the design together and also benchmark workloads from across the community – that is, we can collect more data from more Spark users. The experience would be greatly appreciated in the discussion. -Matt Cheah From: Yuanjian Li Date: Friday, August 31, 2018 at 8:29 PM To: Matt Cheah Cc: Spark dev list Subject: Re: [Feedback Requested] SPARK-25299: Using Distributed Storage for Persisting Shuffle Data Hi Matt, Thanks for the great document and proposal, I want to +1 for the reliable shuffle data and give some feedback. I think a reliable shuffle service based on DFS is necessary on Spark, especially running Spark job over unstable environment. For example, while mixed deploying Spark with online service, Spark executor will be killed any time. Current stage retry strategy will make the job many times slower than normal job. Actually we(Baidu inc) solved this problem by stable shuffle service over Hadoop, and we are now docking Spark to this shuffle service. The POC work will be done at October as expect. We'll post more benchmark and detailed work at that time. I'm still reading your discussion document and happy to give more feedback in the doc. Thanks, Yuanjian Li Matt Cheah 于2018年9月1日周六 上午8:42写道: Hi everyone, I filed SPARK-25299 [issues.apache.org] to promote discussion on how we can improve the shuffle operation in Spark. The basic premise is to discuss the ways we can leverage distributed storage to improve the reliability and isolation of Spark’s shuffle architecture. A few designs and a full problem statement are outlined in this architecture discussion document [docs.google.com]. This is a complex problem and it would be great to get feedback from the community about the right direction to take this work in. Note that we have not yet committed to a specific implementation and architecture – there’s a lot that needs to be discussed for this improvement, so we hope to get as much input as possible before moving forward with a design. Please feel free to leave comments and suggestions on the JIRA ticket or on the discussion document. Thank you! -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
[Feedback Requested] SPARK-25299: Using Distributed Storage for Persisting Shuffle Data
Hi everyone, I filed SPARK-25299 to promote discussion on how we can improve the shuffle operation in Spark. The basic premise is to discuss the ways we can leverage distributed storage to improve the reliability and isolation of Spark’s shuffle architecture. A few designs and a full problem statement are outlined in this architecture discussion document. This is a complex problem and it would be great to get feedback from the community about the right direction to take this work in. Note that we have not yet committed to a specific implementation and architecture – there’s a lot that needs to be discussed for this improvement, so we hope to get as much input as possible before moving forward with a design. Please feel free to leave comments and suggestions on the JIRA ticket or on the discussion document. Thank you! -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: [Kubernetes] Resource requests and limits for Driver and Executor Pods
The question is more so generally what an advised best practice is for setting CPU limits. It’s not immediately clear what a correct value is for setting CPU limits if one wants to provide guarantees for consistent / guaranteed execution performance while also not degrading performance. Additionally, there’s a question of if there exists a sane default CPU limit in the Spark pod creation code. Such a default seems difficult to set because the JVM can spawn as many threads as it likes and a single executor can end up thrashing in between its own threads as they contend for the smaller CPU share that is available. From: Yinan LiDate: Thursday, March 29, 2018 at 11:08 PM To: David Vogelbacher Cc: "dev@spark.apache.org" Subject: Re: [Kubernetes] Resource requests and limits for Driver and Executor Pods Hi David, Regarding cpu limit, in Spark 2.3, we do have the following config properties to specify cpu limit for the driver and executors. See http://spark.apache.org/docs/latest/running-on-kubernetes.html [spark.apache.org]. spark.kubernetes.driver.limit.cores spark.kubernetes.executor.limit.cores On Thu, Mar 29, 2018 at 5:14 PM, David Vogelbacher wrote: Hi, At the moment driver and executor pods are created using the following requests and limits: CPUMemory Request[driver,executor].cores[driver,executor].memory LimitUnlimited (but can be specified using spark.[driver,executor].cores)[driver,executor].memory + [driver,executor].memoryOverhead Specifying the requests like this leads to problems if the pods only get the requested amount of resources and nothing of the optional (limit) resources, as it can happen in a fully utilized cluster. For memory: Let’s say we have a node with 100GiB memory and 5 pods with 20 GiB memory and 5 GiB memoryOverhead. At the beginning all 5 pods use 20 GiB of memory and all is well. If a pod then starts using its overhead memory it will get killed as there is no more memory available, even though we told spark that it can use 25 GiB of memory. Instead of requesting `[driver,executor].memory`, we should just request `[driver,executor].memory + [driver,executor].memoryOverhead `. I think this case is a bit clearer than the CPU case, so I went ahead and filed an issue [issues.apache.org] with more details and made a PR [github.com]. For CPU: As it turns out, there can be performance problems if we only have `executor.cores` available (which means we have one core per task). This was raised here [github.com] and is the reason that the cpu limit was set to unlimited. This issue stems from the fact that in general there will be more than one thread per task, resulting in performance impacts if there is only one core available. However, I am not sure that just setting the limit to unlimited is the best solution because it means that even if the Kubernetes cluster can perfectly satisfy the resource requests, performance might be very bad. I think we should guarantee that an executor is able to do its work well (without performance issues or getting killed - as could happen in the memory case) with the resources it gets guaranteed from Kubernetes. One way to solve this could be to request more than 1 core from Kubernetes per task. The exact amount we should request is unclear to me (it largely depends on how many threads actually get spawned for a task). We would need to find a way to determine this somehow automatically or at least come up with a better default value than 1 core per task. Does somebody have ideas or thoughts on how to solve this best? Best, David smime.p7s Description: S/MIME cryptographic signature
Re: Toward an "API" for spark images used by the Kubernetes back-end
Re: Hadoop versioning – it seems reasonable enough for us to be publishing an image per Hadoop version. We should essentially have image configuration parity with what we publish as distributions on the Spark website. Sometimes jars need to be swapped out entirely instead of being strictly additive. An example is a user wanting to build an application that depends on a different version of an existing dependency. Instead of adding multiple jars with different versions to the classpath, they would like to put their own jars that their application has perhaps resolved via Maven. (They could use the userClassPathFirst constructs, but in practice that doesn’t always work particularly for jars that have to be present at JVM boot-time.) So having an extra image version that is “empty” without any jars is reasonable. In this case, we’d want to define the API for where the image’s jars have to live – perhaps in a fixed directory like /opt/spark/jars, or specified by some environment variable that the entrypoint knows to look up. I like the idea of having that location defined by an environment variable, since it allows for more flexibility – but the tradeoff seems negligible between those two options. From: "Lalwani, Jayesh"Date: Thursday, March 22, 2018 at 10:19 AM To: Rob Vesse , "dev@spark.apache.org" Subject: Re: Toward an "API" for spark images used by the Kubernetes back-end I would like to add that many people run Spark behind corporate proxies. It’s very common to add http proxy to extraJavaOptions. Being able to provide custom extraJavaOption should be supported. Also, Hadoop FS 2.7.3 is pretty limited wrt S3 buckets. You cannot use temporary AWS tokens. You cannot assume roles. You cannot use KMS buckets. All of this comes out of the box on EMR because EMR is build with it’s own customized Hadoop FS. For standalone installations, It’s pretty common to “customize” your Spark installation using Hadoop 2.8.3 or higher. I don’t know if a Spark container with Hadoop 2.8.3 will be a standard container. If it isn’t, I see a lot of people creating a customized container with Hadoop FS 2.8.3 From: Rob Vesse Date: Thursday, March 22, 2018 at 6:11 AM To: "dev@spark.apache.org" Subject: Re: Toward an "API" for spark images used by the Kubernetes back-end The difficulty with a custom Spark config is that you need to be careful that the Spark config the user provides does not conflict with the auto-generated portions of the Spark config necessary to make Spark on K8S work. So part of any “API” definition might need to be what Spark config is considered “managed” by the Kubernetes scheduler backend. For more controlled environments - i.e. security conscious - allowing end users to provide custom images may be a non-starter so the more we can do at the “API” level without customising the containers the better. A practical example of this is managing Python dependencies, one option we’re considering is having a base image with Anaconda included and then simply projecting a Conda environment spec into the containers (via volume mounts) and then having the container recreate that Conda environment on startup. That won’t work for all possible environments e.g. those that use non-standard Conda channels but it would provide a lot of capability without customising the images. Rob From: Felix Cheung Date: Thursday, 22 March 2018 at 06:21 To: Holden Karau , Erik Erlandson Cc: dev Subject: Re: Toward an "API" for spark images used by the Kubernetes back-end I like being able to customize the docker image itself - but I realize this thread is more about “API” for the stock image. Environment is nice. Probably we need a way to set custom spark config (as a file??) From: Holden Karau Sent: Wednesday, March 21, 2018 10:44:20 PM To: Erik Erlandson Cc: dev Subject: Re: Toward an "API" for spark images used by the Kubernetes back-end I’m glad this discussion is happening on dev@ :) Personally I like customizing with shell env variables during rolling my own image, but definitely documentation the expectations/usage of the variables is needed before we can really call it an API. On the related question I suspect two of the more “common” likely customizations is adding additional jars for bootstrapping fetching from a DFS & also similarity complicated Python dependencies (although given the Pythons support isn’t merged yet it’s hard to say what exactly this would look like). I could also see some vendors wanting to add some bootstrap/setup scripts to fetch keys or other things. What other ways do folks foresee customizing their Spark docker containers? On Wed, Mar 21, 2018 at 5:04
Re: Spark on Kubernetes Builder Pattern Design Document
I think in this case, the original design that was proposed before the document was implemented on the Spark on K8s fork, that we took some time to build separately before proposing that the fork be merged into the main line. Specifically, the timeline of events was: We started building Spark on Kubernetes on a fork and was prepared to merge our work directly into master, Discussion on https://issues.apache.org/jira/browse/SPARK-18278 led us to move down the path of working on a fork first. We would harden the fork, have the fork become used more widely to prove its value and robustness in practice. See https://github.com/apache-spark-on-k8s/spark On said fork, we made the original design decisions to use a step-based builder pattern for the driver but not the same design for the executors. This original discussion was made among the collaborators of the fork, as much of the work on the fork in general was not done on the mailing list. We eventually decided to merge the fork into the main line, and got the feedback in the corresponding PRs. Therefore the question may less so be with this specific design, but whether or not the overarching approach we took - building Spark on K8s on a fork first before merging into mainline – was the correct one in the first place. There’s also the issue that the work done on the fork was isolated from the dev mailing list. Moving forward as we push our work into mainline Spark, we aim to be transparent with the Spark community via the Spark mailing list and Spark JIRA tickets. We’re specifically aiming to deprecate the fork and migrate all the work done on the fork into the main line. -Matt Cheah From: Mark Hamstra <m...@clearstorydata.com> Date: Monday, February 5, 2018 at 1:44 PM To: Matt Cheah <mch...@palantir.com> Cc: "dev@spark.apache.org" <dev@spark.apache.org>, "ramanath...@google.com" <ramanath...@google.com>, Ilan Filonenko <i...@cornell.edu>, Erik <e...@redhat.com>, Marcelo Vanzin <van...@cloudera.com> Subject: Re: Spark on Kubernetes Builder Pattern Design Document That's good, but you should probably stop and consider whether the discussions that led up to this document's creation could have taken place on this dev list -- because if they could have, then they probably should have as part of the whole spark-on-k8s project becoming part of mainline spark development, not a separate fork. On Mon, Feb 5, 2018 at 1:17 PM, Matt Cheah <mch...@palantir.com> wrote: Hi everyone, While we were building the Spark on Kubernetes integration, we realized that some of the abstractions we introduced for building the driver application in spark-submit, and building executor pods in the scheduler backend, could be improved for better readability and clarity. We received feedback in this pull request[github.com] in particular. In response to this feedback, we’ve put together a design document that proposes a possible refactor to address the given feedback. You may comment on the proposed design at this link: https://docs.google.com/document/d/1XPLh3E2JJ7yeJSDLZWXh_lUcjZ1P0dy9QeUEyxIlfak/edit#[docs.google.com] I hope that we can have a productive discussion and continue improving the Kubernetes integration further. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Spark on Kubernetes Builder Pattern Design Document
Hi everyone, While we were building the Spark on Kubernetes integration, we realized that some of the abstractions we introduced for building the driver application in spark-submit, and building executor pods in the scheduler backend, could be improved for better readability and clarity. We received feedback in this pull request in particular. In response to this feedback, we’ve put together a design document that proposes a possible refactor to address the given feedback. You may comment on the proposed design at this link: https://docs.google.com/document/d/1XPLh3E2JJ7yeJSDLZWXh_lUcjZ1P0dy9QeUEyxIlfak/edit# I hope that we can have a productive discussion and continue improving the Kubernetes integration further. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: Kubernetes: why use init containers?
With regards to separation of concerns, there’s a fringe use case here – if more than one main container is on the pod, then none of them will run if the init-containers fail. A user can have a Pod Preset that attaches more sidecar containers to the driver and/or executors. In that case, those sidecars may perform side effects that are undesirable if the main Spark application failed because dependencies weren’t available. Using the init-container to localize the dependencies will prevent any of these sidecars from executing at all if the dependencies can’t be fetched. It’s definitely a niche use case – I’m not sure how often pod presets are used in practice - but it’s an example to illustrate why the separation of concerns can be beneficial. -Matt Cheah On 1/10/18, 2:36 PM, "Marcelo Vanzin" <van...@cloudera.com> wrote: On Wed, Jan 10, 2018 at 2:30 PM, Yinan Li <liyinan...@gmail.com> wrote: > 1. Retries of init-containers are automatically supported by k8s through pod > restart policies. For this point, sorry I'm not sure how spark-submit > achieves this. Great, add that feature to spark-submit, everybody benefits, not just k8s. > 2. The ability to use credentials that are not shared with the main > containers. Not sure what that achieves. > 3. Not only the user code, but Spark internal code like Executor won't be > run if the init-container fails. Not sure what that achieves. Executor will fail if dependency download fails, Spark driver will recover (and start a new executor if needed). > 4. Easier to build tooling around k8s events/status of the init-container in > case of failures as it's doing exactly one thing: downloading dependencies. Again, I don't see what is all this hoopla about fine grained control of dependency downloads. Spark solved this years ago for Spark applications. Don't reinvent the wheel. -- Marcelo smime.p7s Description: S/MIME cryptographic signature
Re: Kubernetes: why use init containers?
> With a config value set by the submission code, like what I'm doing to > prevent client mode submission in my p.o.c.? The contract for what determines the appropriate scheduler backend to instantiate is then going to be different in Kubernetes versus the other cluster managers. The cluster manager typically only picks the scheduler backend implementation based on the master URL format plus the deploy mode. Perhaps this is an acceptable tradeoff for being able to leverage spark-submit in the cluster mode deployed driver container. Again though, any flag we expose in spark-submit is a user-facing option that can be set erroneously, which is a practice we shouldn’t be encouraging. Taking a step back though, I think we want to use spark-submit’s internals without using spark-submit itself. Any flags we add to spark-submit are user-facing. We ideally would be able to extract the dependency download + run user main class subroutines from spark-submit, and invoke that in all of the cluster managers. Perhaps this calls for a refactor in spark-submit itself to make some parts reusable in other contexts. Just an idea. On 1/10/18, 1:38 PM, "Marcelo Vanzin" <van...@cloudera.com> wrote: On Wed, Jan 10, 2018 at 1:33 PM, Matt Cheah <mch...@palantir.com> wrote: > If we use spark-submit in client mode from the driver container, how do we handle needing to switch between a cluster-mode scheduler backend and a client-mode scheduler backend in the future? With a config value set by the submission code, like what I'm doing to prevent client mode submission in my p.o.c.? There are plenty of solutions to that problem if that's what's worrying you. > Something else re: client mode accessibility – if we make client mode accessible to users even if it’s behind a flag, that’s a very different contract from needing to recompile spark-submit to support client mode. The amount of effort required from the user to get to client mode is very different between the two cases Yes. But if we say we don't support client mode, we don't support client mode regardless of how easy it is for the user to fool Spark into trying to run in that mode. -- Marcelo smime.p7s Description: S/MIME cryptographic signature
Re: Kubernetes: why use init containers?
If we use spark-submit in client mode from the driver container, how do we handle needing to switch between a cluster-mode scheduler backend and a client-mode scheduler backend in the future? Something else re: client mode accessibility – if we make client mode accessible to users even if it’s behind a flag, that’s a very different contract from needing to recompile spark-submit to support client mode. The amount of effort required from the user to get to client mode is very different between the two cases, and the contract is much clearer when client mode is forbidden in all circumstances, versus client mode being allowed with a specific flag. If we’re saying that we don’t support client mode, we should bias towards making client mode as difficult as possible to access, i.e. impossible with a standard Spark distribution. -Matt Cheah On 1/10/18, 1:24 PM, "Marcelo Vanzin" <van...@cloudera.com> wrote: On Wed, Jan 10, 2018 at 1:10 PM, Matt Cheah <mch...@palantir.com> wrote: > I’d imagine this is a reason why YARN hasn’t went with using spark-submit from the application master... I wouldn't use YARN as a template to follow when writing a new backend. A lot of the reason why the YARN backend works the way it does is because of backwards compatibility. IMO it would be much better to change the YARN backend to use spark-submit, because it would immensely simplify the code there. It was a nightmare to get YARN to reach feature parity with other backends because it has to pretty much reimplement everything. But doing that would break pretty much every Spark-on-YARN deployment, so it's not something we can do right now. For the other backends the situation is sort of similar; it probably wouldn't be hard to change standalone's DriverWrapper to also use spark-submit. But that brings potential side effects for existing users that don't exist with spark-on-k8s, because spark-on-k8s is new (the current fork aside). > But using init-containers makes it such that we don’t need to use spark-submit at all Those are actually separate concerns. There are a whole bunch of things that spark-submit provides you that you'd have to replicate in the k8s backend if not using it. Thinks like properly handling special characters in arguments, native library paths, "userClassPathFirst", etc. You get them almost for free with spark-submit, and using an init container does not solve any of those for you. I'd say that using spark-submit is really not up for discussion here; it saves you from re-implementing a whole bunch of code that you shouldn't even be trying to re-implement. Separately, if there is a legitimate need for an init container, then it can be added. But I don't see that legitimate need right now, so I don't see what it's bringing other than complexity. (And no, "the k8s documentation mentions that init containers are sometimes used to download dependencies" is not a legitimate need.) -- Marcelo - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org smime.p7s Description: S/MIME cryptographic signature
Re: Kubernetes: why use init containers?
A crucial point here is considering whether we want to have a separate scheduler backend code path for client mode versus cluster mode. If we need such a separation in the code paths, it would be difficult to make it possible to run spark-submit in client mode from the driver container. We discussed this already when we started to think about client mode. See https://github.com/apache-spark-on-k8s/spark/pull/456. In our initial designs for a client mode, we considered that there are some concepts that would only apply to cluster mode and not to client mode – see https://github.com/apache-spark-on-k8s/spark/pull/456#issuecomment-343007093. But we haven’t worked out all of the details yet. The situation may work out such that client mode is similar enough to cluster mode that we can consider the cluster mode as being a spark-submit in client mode from a container. I’d imagine this is a reason why YARN hasn’t went with using spark-submit from the application master, because there are separate code paths for a YarnClientSchedulerBackend versus a YarnClusterSchedulerBackend, and the deploy mode serves as the switch between the two implementations. Though I am curious as to why Spark standalone isn’t using spark-submit – the DriverWrapper is manually fetching the user’s jars and putting them on a classloader before invoking the user’s main class with that classloader. But there’s only one scheduler backend for both client and cluster mode for standalone’s case. The main idea here is that we need to understand if we need different code paths for a client mode scheduler backend versus a cluster mode scheduler backend, before we can know if we can use spark-submit in client mode from the driver container. But using init-containers makes it such that we don’t need to use spark-submit at all, meaning that the differences can more or less be ignored at least in this particular context. -Matt Cheah On 1/10/18, 8:40 AM, "Marcelo Vanzin" <van...@cloudera.com> wrote: On a side note, while it's great that you guys have meetings to discuss things related to the project, it's general Apache practice to discuss these things in the mailing list - or at the very list send detailed info about what discussed in these meetings to the mailing list. Not everybody can attend these meetings, and I'm not just talking about people being busy, but there are people who live in different time zones. Now that this code is moving into Spark I'd recommend getting people more involved with the Spark project to move things forward. On Tue, Jan 9, 2018 at 8:23 PM, Anirudh Ramanathan <ramanath...@google.com> wrote: > Marcelo, I can see that we might be misunderstanding what this change > implies for performance and some of the deeper implementation details here. > We have a community meeting tomorrow (at 10am PT), and we'll be sure to > explore this idea in detail, and understand the implications and then get > back to you. > > Thanks for the detailed responses here, and for spending time with the idea. > (Also, you're more than welcome to attend the meeting - there's a link here > if you're around.) > > Cheers, > Anirudh > > > On Jan 9, 2018 8:05 PM, "Marcelo Vanzin" <van...@cloudera.com> wrote: > > One thing I forgot in my previous e-mail is that if a resource is > remote I'm pretty sure (but haven't double checked the code) that > executors will download it directly from the remote server, and not > from the driver. So there, distributed download without an init > container. > > On Tue, Jan 9, 2018 at 7:15 PM, Yinan Li <liyinan...@gmail.com> wrote: >> The init-container is required for use with the resource staging server >> >> (https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache-2Dspark-2Don-2Dk8s_userdocs_blob_master_src_jekyll_running-2Don-2Dkubernetes.md-23resource-2Dstaging-2Dserver=DwIFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=rQzoyVLMucfZdPLZAwNlE-PZ90ViBJzTQ49K1dzjr3c=HcCtT_KLkPi_05ojHei1nbXUpwoJomou8bitD-WkYmI=). > > If the staging server *requires* an init container you have already a > design problem right there. > >> Additionally, the init-container is a Kubernetes >> native way of making sure that the dependencies are localized > > Sorry, but the init container does not do anything by itself. You had > to add a whole bunch of code to execute the existing Spark code in an > init container, when not doing it would have achieved the exact same > goal much more easily, in a way that is consistent with how Spark > already does things. > > Matt
Re: Kubernetes: why use init containers?
A few reasons to prefer init-containers come to mind: Firstly, if we used spark-submit from within the driver container, the executors wouldn’t receive the jars on their class loader until after the executor starts because the executor has to launch first before localizing resources. It is certainly possible to make the class loader work with the user’s jars here, as is the case with all the client mode implementations, but, it seems cleaner to have the classpath include the user’s jars at executor launch time instead of needing to reason about the classloading order. We can also consider the idiomatic approach from the perspective of Kubernetes. Yinan touched on this already, but init-containers are traditionally meant to prepare the environment for the application that is to be run, which is exactly what we do here. This also makes it such that the localization process can be completely decoupled from the execution of the application itself. We can then for example detect the errors that happen on the resource localization layer, say when an HDFS cluster is down, before the application itself launches. The failure at the init-container stage is explicitly noted via the Kubernetes pod status API. Finally, running spark-submit from the container would make the SparkSubmit code inadvertently allow running client mode Kubernetes applications as well. We’re not quite ready to support that. Even if we were, it’s not entirely intuitive for the cluster mode code path to depend on the client mode code path. This isn’t entirely without precedent though, as Mesos has a similar dependency. Essentially the semantics seem neater and the contract is very explicit when using an init-container, even though the code does end up being more complex. From: Yinan Li <liyinan...@gmail.com> Date: Tuesday, January 9, 2018 at 7:16 PM To: Nicholas Chammas <nicholas.cham...@gmail.com> Cc: Anirudh Ramanathan <ramanath...@google.com.invalid>, Marcelo Vanzin <van...@cloudera.com>, Matt Cheah <mch...@palantir.com>, Kimoon Kim <kim...@pepperdata.com>, dev <dev@spark.apache.org> Subject: Re: Kubernetes: why use init containers? The init-container is required for use with the resource staging server (https://github.com/apache-spark-on-k8s/userdocs/blob/master/src/jekyll/running-on-kubernetes.md#resource-staging-server[github.com]). The resource staging server (RSS) is a spark-on-k8s component running in a Kubernetes cluster for staging submission client local dependencies to Spark pods. The init-container is responsible for downloading the dependencies from the RSS. We haven't upstream the RSS code yet, but this is a value add component for Spark on K8s as a way for users to use submission local dependencies without resorting to other mechanisms that are not immediately available on most Kubernetes clusters, e.g., HDFS. We do plan to upstream it in the 2.4 timeframe. Additionally, the init-container is a Kubernetes native way of making sure that the dependencies are localized before the main driver/executor containers are started. IMO, this guarantee is positive to have and it helps achieve separation of concerns. So IMO, I think the init-container is a valuable component and should be kept. On Tue, Jan 9, 2018 at 6:25 PM, Nicholas Chammas <nicholas.cham...@gmail.com> wrote: I’d like to point out the output of “git show —stat” for that diff: 29 files changed, 130 insertions(+), 1560 deletions(-) +1 for that and generally for the idea of leveraging spark-submit. You can argue that executors downloading from external servers would be faster than downloading from the driver, but I’m not sure I’d agree - it can go both ways. On a tangentially related note, one of the main reasons spark-ec2[github.com] is so slow to launch clusters is that it distributes files like the Spark binaries to all the workers via the master. Because of that, the launch time scaled with the number of workers requested[issues.apache.org]. When I wrote Flintrock[github.com], I got a large improvement in launch time over spark-ec2 simply by having all the workers download the installation files in parallel from an external host (typically S3 or an Apache mirror). And launch time became largely independent of the cluster size. That may or may not say anything about the driver distributing application files vs. having init containers do it in parallel, but I’d be curious to hear more. Nick On Tue, Jan 9, 2018 at 9:08 PM Anirudh Ramanathan <ramanath...@google.com.invalid> wrote: We were running a change in our fork which was similar to this at one point early on. My biggest concerns off the top of my head with this change would be localization performance with large numbers of executors, and what we lose in terms of separation of concerns. Init containers are a standard construct in k8s for resource localization. Also how thi
Re: Kubernetes backend and docker images
Think we can allow for different images and default to them being the same. Apologize if I missed that as being the original intention though. -Matt Cheah On 1/8/18, 1:45 PM, "Marcelo Vanzin" <van...@cloudera.com> wrote: On Mon, Jan 8, 2018 at 1:39 PM, Matt Cheah <mch...@palantir.com> wrote: > We would still want images to be able to be uniquely specified for the > driver vs. the executors. For example, not all of the libraries required on > the driver may be required on the executors, so the user would want to > specify a different custom driver image from their custom executor image. Are you saying that we should *require* different images for driver and executor, as is the case today, or that we should *allow* different images, but default to the same, as I'm proposing? I see zero reason to require different images. While it's true that the driver may need more libraries than the executor, 99% of the time it's ok to just have those libraries everywhere - it makes configuration easier and doesn't do any harm. -- Marcelo smime.p7s Description: S/MIME cryptographic signature
Re: Kubernetes backend and docker images
// Fixing Anirudh's email address From: Matt Cheah Sent: Monday, January 8, 2018 1:39:12 PM To: Anirudh Ramanathan; Felix Cheung Cc: 蒋星博; Marcelo Vanzin; dev; Timothy Chen Subject: Re: Kubernetes backend and docker images We would still want images to be able to be uniquely specified for the driver vs. the executors. For example, not all of the libraries required on the driver may be required on the executors, so the user would want to specify a different custom driver image from their custom executor image. But the idea of the entry point script that can switch based on environment variables makes sense. I do think we want separate Python and R images, because Python and R come with non-trivial extra baggage that can make the images a lot bigger and slower to download for Scala-only users. From: Anirudh Ramanathan <ramanath...@google.com.INVALID> Date: Monday, January 8, 2018 at 9:48 AM To: Felix Cheung <felixcheun...@hotmail.com> Cc: 蒋星博 <jiangxb1...@gmail.com>, Marcelo Vanzin <van...@cloudera.com>, dev <dev@spark.apache.org>, Matt Cheah <mch...@palantir.com>, Timothy Chen <tnac...@gmail.com> Subject: Re: Kubernetes backend and docker images +matt +tim For reference - here's our previous thread on this dockerfile unification problem - https://github.com/apache-spark-on-k8s/spark/pull/60[github.com]<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache-2Dspark-2Don-2Dk8s_spark_pull_60=DwMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=p4Uw1HnAlReB9Az1dDlMHQHQnxXaWSTUkndFQhaTLrc=Q-Svbf-gRJmvuxWzSjjq5ZZZjJmoTaGkmPNaLQVKZzQ=> I think this approach should be acceptable from both the customization and visibility perspectives. On Mon, Jan 8, 2018 at 9:40 AM, Anirudh Ramanathan <ramanath...@google.com<mailto:ramanath...@google.com>> wrote: +1 We discussed some alternatives early on - including using a single dockerfile and different spec.container.command and spec.container.args from the Kubernetes driver/executor specification (which override entrypoint in docker). No reason that won't work also - except that it reduced the transparency of what was being invoked in the driver/executor/init by hiding it in the actual backend code. Putting it into a single entrypoint file and branching let's us realize the best of both worlds I think. This is an elegant solution, thanks Marcelo. On Jan 6, 2018 10:01 AM, "Felix Cheung" <felixcheun...@hotmail.com<mailto:felixcheun...@hotmail.com>> wrote: +1 Thanks for taking on this. That was my feedback on one of the long comment thread as well, I think we should have one docker image instead of 3 (also pending in the fork are python and R variant, we should consider having one that we official release instead of 9, for example) From: 蒋星博 <jiangxb1...@gmail.com<mailto:jiangxb1...@gmail.com>> Sent: Friday, January 5, 2018 10:57:53 PM To: Marcelo Vanzin Cc: dev Subject: Re: Kubernetes backend and docker images Agree it should be nice to have this simplification, and users can still create their custom images by copy/modifying the default one. Thanks for bring this out Marcelo! 2018-01-05 17:06 GMT-08:00 Marcelo Vanzin <van...@cloudera.com<mailto:van...@cloudera.com>>: Hey all, especially those working on the k8s stuff. Currently we have 3 docker images that need to be built and provided by the user when starting a Spark app: driver, executor, and init container. When the initial review went by, I asked why do we need 3, and I was told that's because they have different entry points. That never really convinced me, but well, everybody wanted to get things in to get the ball rolling. But I still think that's not the best way to go. I did some pretty simple hacking and got things to work with a single image: https://github.com/vanzin/spark/commit/k8s-img[github.com]<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_vanzin_spark_commit_k8s-2Dimg=DwMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=p4Uw1HnAlReB9Az1dDlMHQHQnxXaWSTUkndFQhaTLrc=I6UykB4OI_29gnvRoaKahiOi3jaSF-LEkLJ37EcrCp8=> Is there a reason why that approach would not work? You could still create separate images for driver and executor if wanted, but there's no reason I can see why we should need 3 images for the simple case. Note that the code there can be cleaned up still, and I don't love the idea of using env variables to propagate arguments to the container, but that works for now. -- Marcelo - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org> -- Anirudh Ramanathan
Re: Kubernetes backend and docker images
We would still want images to be able to be uniquely specified for the driver vs. the executors. For example, not all of the libraries required on the driver may be required on the executors, so the user would want to specify a different custom driver image from their custom executor image. But the idea of the entry point script that can switch based on environment variables makes sense. I do think we want separate Python and R images, because Python and R come with non-trivial extra baggage that can make the images a lot bigger and slower to download for Scala-only users. From: Anirudh Ramanathan <ramanath...@google.com.INVALID> Date: Monday, January 8, 2018 at 9:48 AM To: Felix Cheung <felixcheun...@hotmail.com> Cc: 蒋星博 <jiangxb1...@gmail.com>, Marcelo Vanzin <van...@cloudera.com>, dev <dev@spark.apache.org>, Matt Cheah <mch...@palantir.com>, Timothy Chen <tnac...@gmail.com> Subject: Re: Kubernetes backend and docker images +matt +tim For reference - here's our previous thread on this dockerfile unification problem - https://github.com/apache-spark-on-k8s/spark/pull/60[github.com] I think this approach should be acceptable from both the customization and visibility perspectives. On Mon, Jan 8, 2018 at 9:40 AM, Anirudh Ramanathan <ramanath...@google.com> wrote: +1 We discussed some alternatives early on - including using a single dockerfile and different spec.container.command and spec.container.args from the Kubernetes driver/executor specification (which override entrypoint in docker). No reason that won't work also - except that it reduced the transparency of what was being invoked in the driver/executor/init by hiding it in the actual backend code. Putting it into a single entrypoint file and branching let's us realize the best of both worlds I think. This is an elegant solution, thanks Marcelo. On Jan 6, 2018 10:01 AM, "Felix Cheung" <felixcheun...@hotmail.com> wrote: +1 Thanks for taking on this. That was my feedback on one of the long comment thread as well, I think we should have one docker image instead of 3 (also pending in the fork are python and R variant, we should consider having one that we official release instead of 9, for example) From: 蒋星博 <jiangxb1...@gmail.com> Sent: Friday, January 5, 2018 10:57:53 PM To: Marcelo Vanzin Cc: dev Subject: Re: Kubernetes backend and docker images Agree it should be nice to have this simplification, and users can still create their custom images by copy/modifying the default one. Thanks for bring this out Marcelo! 2018-01-05 17:06 GMT-08:00 Marcelo Vanzin <van...@cloudera.com>: Hey all, especially those working on the k8s stuff. Currently we have 3 docker images that need to be built and provided by the user when starting a Spark app: driver, executor, and init container. When the initial review went by, I asked why do we need 3, and I was told that's because they have different entry points. That never really convinced me, but well, everybody wanted to get things in to get the ball rolling. But I still think that's not the best way to go. I did some pretty simple hacking and got things to work with a single image: https://github.com/vanzin/spark/commit/k8s-img[github.com] Is there a reason why that approach would not work? You could still create separate images for driver and executor if wanted, but there's no reason I can see why we should need 3 images for the simple case. Note that the code there can be cleaned up still, and I don't love the idea of using env variables to propagate arguments to the container, but that works for now. -- Marcelo - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org -- Anirudh Ramanathan smime.p7s Description: S/MIME cryptographic signature
Re: SPIP: Spark on Kubernetes
The problem with maintaining this scheduler separately right now is that the scheduler backend is dependent upon the CoarseGrainedSchedulerBackend class, which is not as much a stable API as it is an internal class with components that currently need to be shared by all of the scheduler backends. This makes it such that maintaining this scheduler requires not just maintaining a single small module, but an entire fork of the project as well, so that the cluster manager specific scheduler backend can keep up with the changes to CoarseGrainedSchedulerBackend. If we wanted to avoid forking the entire project and only provide the scheduler backend as a pluggable module, we would need a fully pluggable scheduler backend with a stable API, as Erik mentioned. We also needed to change the spark-submit code to recognize Kubernetes mode and be able to delegate to the Kubernetes submission client, so that would need to be pluggable as well. More discussion on fully pluggable scheduler backends is at https://issues.apache.org/jira/browse/SPARK-19700. -Matt Cheah From: Erik Erlandson <eerla...@redhat.com> Date: Friday, August 18, 2017 at 8:34 AM To: "dev@spark.apache.org" <dev@spark.apache.org> Subject: Re: SPIP: Spark on Kubernetes There are a fair number of people (myself included) who have interest in making scheduler back-ends fully pluggable. That will represent a significant impact to core spark architecture, with corresponding risk. Adding the kubernetes back-end in a manner similar to the other three back-ends has had a very small impact on spark core, which allowed it to be developed in parallel and easily stay re-based on successive spark releases while we were developing it and building up community support. On Thu, Aug 17, 2017 at 7:14 PM, Mridul Muralidharan <mri...@gmail.com> wrote: While I definitely support the idea of Apache Spark being able to leverage kubernetes, IMO it is better for long term evolution of spark to expose appropriate SPI such that this support need not necessarily live within Apache Spark code base. It will allow for multiple backends to evolve, decoupled from spark core. In this case, would have made maintaining apache-spark-on-k8s repo easier; just as it would allow for supporting other backends - opensource (nomad for ex) and proprietary. In retrospect directly integrating yarn support into spark, while mirroring mesos support at that time, was probably an incorrect design choice on my part. Regards, Mridul On Tue, Aug 15, 2017 at 8:32 AM, Anirudh Ramanathan <fox...@google.com.invalid> wrote: > Spark on Kubernetes effort has been developed separately in a fork, and > linked back from the Apache Spark project as an experimental backend. We're > ~6 months in, have had 5 releases. > > 2 Spark versions maintained (2.1, and 2.2) > Extensive integration testing and refactoring efforts to maintain code > quality > Developer and user-facing documentation > 10+ consistent code contributors from different organizations involved in > actively maintaining and using the project, with several more members > involved in testing and providing feedback. > The community has delivered several talks on Spark-on-Kubernetes generating > lots of feedback from users. > In addition to these, we've seen efforts spawn off such as: > > HDFS on Kubernetes with Locality and Performance Experiments > Kerberized access to HDFS from Spark running on Kubernetes > > Following the SPIP process, I'm putting this SPIP up for a vote. > > +1: Yeah, let's go forward and implement the SPIP. > +0: Don't really care. > -1: I don't think this is a good idea because of the following technical > reasons. > > If there is any further clarification desired, on the design or the > implementation, please feel free to ask questions or provide feedback. > > > SPIP: Kubernetes as A Native Cluster Manager > > > Full Design Doc: link > > JIRA: https://issues.apache.org/jira/browse/SPARK-18278[issues.apache.org] > > Kubernetes Issue: > https://github.com/kubernetes/kubernetes/issues/34377[github.com] > > > Authors: Yinan Li, Anirudh Ramanathan, Erik Erlandson, Andrew Ash, Matt > Cheah, > > Ilan Filonenko, Sean Suchter, Kimoon Kim > > Background and Motivation > > Containerization and cluster management technologies are constantly evolving > in the cluster computing world. Apache Spark currently implements support > for Apache Hadoop YARN and Apache Mesos, in addition to providing its own > standalone cluster manager. In 2014, Google announced development of > Kubernetes which has its own unique feature set and differentiates itself > from YARN and Mesos. Since its debut, it has seen contributions from over > 1300 contributors with over 5 commits. Kubernetes has cemented itself as > a co
Proposal for SPARK-18278
Hi everyone, Kubernetes is a technology that is a key player in the cluster computing world. Currently, running Spark applications on Kubernetes requires deploying a standalone Spark cluster on the Kubernetes cluster, and then running the jobs against the standalone Spark cluster. However, there would be many benefits to running Spark on Kubernetes natively, and so SPARK-18278<https://issues.apache.org/jira/browse/SPARK-18278> has been filed to track discussion around supporting Kubernetes as a cluster manager, in addition to the existing Mesos, YARN, and standalone cluster managers. A first draft of a proposal outlining a potential long-term plan around this feature has been attached to the JIRA ticket. Any feedback and discussion would be greatly appreciated. Thanks, -Matt Cheah
Re: [VOTE] Release Apache Spark 2.0.0 (RC1)
-1 because of SPARK-16181 which is a correctness regression from 1.6. Looks like the patch is ready though: https://github.com/apache/spark/pull/13884 – it would be ideal for this patch to make it into the release. -Matt Cheah From: Nick Pentreath <nick.pentre...@gmail.com<mailto:nick.pentre...@gmail.com>> Date: Friday, June 24, 2016 at 4:37 AM To: "dev@spark.apache.org<mailto:dev@spark.apache.org>" <dev@spark.apache.org<mailto:dev@spark.apache.org>> Subject: Re: [VOTE] Release Apache Spark 2.0.0 (RC1) I'm getting the following when trying to run ./dev/run-tests (not happening on master) from the extracted source tar. Anyone else seeing this? error: Could not access 'fc0a1475ef' ** File "./dev/run-tests.py", line 69, in __main__.identify_changed_files_from_git_commits Failed example: [x.name<https://urldefense.proofpoint.com/v2/url?u=http-3A__x.name=DQMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=Y3d-oJvw2gK_2KXYjXY8_yzfAosPOqqaV4wtMg6ZPwM=wx5Qjw-efxMVvKXnjUsSkkQcEF6zQHQLQaGtAK9pxIw=> for x in determine_modules_for_files( identify_changed_files_from_git_commits("fc0a1475ef", target_ref="5da21f07"))] Exception raised: Traceback (most recent call last): File "/Users/nick/miniconda2/lib/python2.7/doctest.py", line 1315, in __run compileflags, 1) in test.globs File "", line 1, in [x.name<https://urldefense.proofpoint.com/v2/url?u=http-3A__x.name=DQMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=Y3d-oJvw2gK_2KXYjXY8_yzfAosPOqqaV4wtMg6ZPwM=wx5Qjw-efxMVvKXnjUsSkkQcEF6zQHQLQaGtAK9pxIw=> for x in determine_modules_for_files( identify_changed_files_from_git_commits("fc0a1475ef", target_ref="5da21f07"))] File "./dev/run-tests.py", line 86, in identify_changed_files_from_git_commits universal_newlines=True) File "/Users/nick/miniconda2/lib/python2.7/subprocess.py", line 573, in check_output raise CalledProcessError(retcode, cmd, output=output) CalledProcessError: Command '['git', 'diff', '--name-only', 'fc0a1475ef', '5da21f07']' returned non-zero exit status 1 error: Could not access '50a0496a43' ** File "./dev/run-tests.py", line 71, in __main__.identify_changed_files_from_git_commits Failed example: 'root' in [x.name<https://urldefense.proofpoint.com/v2/url?u=http-3A__x.name=DQMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=Y3d-oJvw2gK_2KXYjXY8_yzfAosPOqqaV4wtMg6ZPwM=wx5Qjw-efxMVvKXnjUsSkkQcEF6zQHQLQaGtAK9pxIw=> for x in determine_modules_for_files( identify_changed_files_from_git_commits("50a0496a43", target_ref="6765ef9"))] Exception raised: Traceback (most recent call last): File "/Users/nick/miniconda2/lib/python2.7/doctest.py", line 1315, in __run compileflags, 1) in test.globs File "", line 1, in 'root' in [x.name<https://urldefense.proofpoint.com/v2/url?u=http-3A__x.name=DQMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=Y3d-oJvw2gK_2KXYjXY8_yzfAosPOqqaV4wtMg6ZPwM=wx5Qjw-efxMVvKXnjUsSkkQcEF6zQHQLQaGtAK9pxIw=> for x in determine_modules_for_files( identify_changed_files_from_git_commits("50a0496a43", target_ref="6765ef9"))] File "./dev/run-tests.py", line 86, in identify_changed_files_from_git_commits universal_newlines=True) File "/Users/nick/miniconda2/lib/python2.7/subprocess.py", line 573, in check_output raise CalledProcessError(retcode, cmd, output=output) CalledProcessError: Command '['git', 'diff', '--name-only', '50a0496a43', '6765ef9']' returned non-zero exit status 1 ** 1 items had failures: 2 of 2 in __main__.identify_changed_files_from_git_commits ***Test Failed*** 2 failures. On Fri, 24 Jun 2016 at 06:59 Yin Huai <yh...@databricks.com<mailto:yh...@databricks.com>> wrote: -1 because of https://issues.apache.org/jira/browse/SPARK-16121<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D16121=DQMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=Y3d-oJvw2gK_2KXYjXY8_yzfAosPOqqaV4wtMg6ZPwM=9200NP4SpeJSUNrSrlWWEC7vFvjWSyCHnx5LD7Sj9u4=>. This jira was resolved after 2.0.0-RC1 was cut. Without the fix, Spark SQL effectively only uses the driver to list files when loading datasets and the driver-side file listing is very slow for datasets having many files and p
Re: spark 2.0 issue with yarn?
@Marcelo: Interesting - why would this manifest on the YARN-client side though (as Spark is the client to YARN in this case)? Spark as a client shouldn’t care about what auxiliary services are on the YARN cluster. @Jesse: The change I wrote excludes all artifacts from the com.sun.jersey group. So to allow usage of the timeline service, we specifically need to find the thing that we don’t want to exclude and exclude everything but that from the YARN dependencies. I think the class that’s missing in this particular stack trace comes from com.sun.jersey:jersey-client. Refactoring the exclusions in pom.xml under hadoop-yarn-api to not exclude jersey-client should fix at least this. Again – we want to exclude all the other Jersey things that YARN is pulling in though, unless further errors indicate otherwise. In general I’m wary of having both Jersey 1 and Jersey 2 jars on the class path at all. However jersey-client looks relatively harmless since it does not bundle in JAX-RS classes, nor does it appear to have anything weird in its META-INF folder. -Matt Cheah On 5/9/16, 3:10 PM, "Marcelo Vanzin" <van...@cloudera.com> wrote: >Hi Jesse, > >On Mon, May 9, 2016 at 2:52 PM, Jesse F Chen <jfc...@us.ibm.com> wrote: >> Sean - thanks. definitely related to SPARK-12154. >> Is there a way to continue use Jersey 1 for existing working >>environment? > >The error you're getting is because of a third-party extension that >tries to talk to the YARN ATS; that's not part of upstream Spark, >although I believe it's part of HDP. So you may have to talk to >Hortonworks about that, or disable that extension in Spark 2.0 for the >moment. > > >-- >Marcelo - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Understanding fault tolerance in shuffle operations
Hi everyone, I have a question about the shuffle mechanisms in Spark and the fault-tolerance I should expect. Suppose I have a simple job with two stages – something like rdd.textFile().mapToPair().reduceByKey().saveAsTextFile(). The questions I have are, 1. Suppose I’m not using the external shuffle service. I’m running the job. The first stage succeeds. During the second stage, one of the executors is lost (for the simplest case, someone uses kill –9 on it and the job itself should have no problems completing otherwise). Should I expect the job to be able to recover and complete successfully? My understanding is that the lost shuffle files from that executor can still be re-computed and the job should be able to complete successfully. 2. Suppose I’m using the shuffle service. How does this change the result of question #1? 3. Suppose I’m using the shuffle service, and I’m using standalone mode. The first stage succeeds. During the second stage, I kill both the executor and the worker that spawned that executor. Now that the shuffle files associated with that worker’s shuffle service daemon have been lost, will the job be able to recompute the lost shuffle data? This is the scenario I’m running into most, where my tasks fail because they try to reach the shuffle service instead of trying to recompute the lost shuffle files. Thanks, -Matt Cheah
Re: HashedRelation Memory Pressure on Broadcast Joins
I would expect the memory pressure to grow because not only are we storing the backing array to the iterator of the rows on the driver, but we’re also storing a copy of each of those rows in the hash table. Whereas if we didn’t do the copy on the drive side then the hash table would only have to store pointers to those rows in the array. Perhaps we can think about whether or not we want to be using the HashedRelation constructs in broadcast join physical plans? The file isn’t compressed - it was a 150MB CSV living on HDFS. I would expect it to fit in a 1GB heap, but I agree that it is difficult to reason about dataset size on disk vs. memory. -Matt Cheah On 3/2/16, 10:15 AM, "Davies Liu" <dav...@databricks.com> wrote: >UnsafeHashedRelation and HashedRelation could also be used in Executor >(for non-broadcast hash join), then the UnsafeRow could come from >UnsafeProjection, >so We should copy the rows for safety. > >We could have a smarter copy() for UnsafeRow (avoid the copy if it's >already copied), >but I don't think this copy here will increase the memory pressure. >The total memory >will be determined by how many rows are stored in the hash tables. > >In general, if you do not have enough memory, just don't increase >autoBroadcastJoinThreshold, >or the performance could be worse because of full GC. > >Sometimes the tables looks small as compressed files (for example, >parquet file), >once it's loaded into memory, it could required much more memory than the >size >of file on disk. > > >On Tue, Mar 1, 2016 at 5:17 PM, Matt Cheah <mch...@palantir.com> wrote: >> Hi everyone, >> >> I had a quick question regarding our implementation of >>UnsafeHashedRelation >> and HashedRelation. It appears that we copy the rows that we’ve >>collected >> into memory upon inserting them into the hash table in >> UnsafeHashedRelation#apply(). I was wondering why we are copying the >>rows >> every time? I can’t imagine these rows being mutable in this scenario. >> >> The context is that I’m looking into a case where a small data frame >>should >> fit in the driver’s memory, but my driver ran out of memory after I >> increased the autoBroadcastJoinThreshold. YourKit is indicating that >>this >> logic is consuming more memory than my driver can handle. >> >> Thanks, >> >> -Matt Cheah
HashedRelation Memory Pressure on Broadcast Joins
Hi everyone, I had a quick question regarding our implementation of UnsafeHashedRelation and HashedRelation<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala>. It appears that we copy the rows that we’ve collected into memory upon inserting them into the hash table in UnsafeHashedRelation#apply(). I was wondering why we are copying the rows every time? I can’t imagine these rows being mutable in this scenario. The context is that I’m looking into a case where a small data frame should fit in the driver’s memory, but my driver ran out of memory after I increased the autoBroadcastJoinThreshold. YourKit is indicating that this logic is consuming more memory than my driver can handle. Thanks, -Matt Cheah
Re: Preserving partitioning with dataframe select
Interesting I might be misinterpreting my Spark UI then, in terms of the number of stages I¹m seeing in the job before and after I¹m doing the pre-partitioning. That said, I was mostly thinking about this when reading through the code. In particular, under basicOperators.scala in org.apache.spark.sql.execution, the Project gets compiled down to child.executor.mapPartitionsInternal without passing the preservesPartitioning flag. Is this Projection being moved around in the case that the optimizer wants to take advantage of co-partitioning? Guidance on how to trace the planner¹s logic would be appreciated! -Matt Cheah From: Reynold Xin <r...@databricks.com> Date: Sunday, February 7, 2016 at 11:11 PM To: Matt Cheah <mch...@palantir.com> Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Mingyu Kim <m...@palantir.com> Subject: Re: Preserving partitioning with dataframe select Matt, Thanks for the email. Are you just asking whether it should work, or reporting they don't work? Internally, the way we track physical data distribution should make the scenarios described work. If it doesn't, we should make them work. On Sat, Feb 6, 2016 at 6:49 AM, Matt Cheah <mch...@palantir.com> wrote: > Hi everyone, > > When using raw RDDs, it is possible to have a map() operation indicate that > the partitioning for the RDD would be preserved by the map operation. This > makes it easier to reduce the overhead of shuffles by ensuring that RDDs are > co-partitioned when they are joined. > > When I'm using Data Frames, I'm pre-partitioning the data frame by using > DataFrame.partitionBy($"X"), but I will invoke a select statement after the > partitioning before joining that dataframe with another. Roughly speaking, I'm > doing something like this pseudo-code: > > partitionedDataFrame = dataFrame.partitionBy("$X") > groupedDataFrame = partitionedDataFrame.groupBy($"X").agg(aggregations) > // Rename "X" to "Y" to make sure columns are unique > groupedDataFrameRenamed = groupedDataFrame.withColumnRenamed("X", "Y") > // Roughly speaking, join on "X == Y" to get the aggregation results onto > every row > joinedDataFrame = partitionedDataFrame.join(groupedDataFrame) > > However the renaming of the columns maps to a select statement, and to my > knowledge, selecting the columns is throwing off the partitioning which > results in shuffle both the partitionedDataFrame and the groupedDataFrame. > > I have the following questions given this example: > > 1) Is pre-partitioning the Data Frame effective? In other words, does the > physical planner recognize when underlying RDDs are co-partitioned and compute > more efficient joins by reducing the amount of data that is shuffled? > 2) If the planner takes advantage of co-partitioning, is the renaming of the > columns invalidating the partitioning of the grouped Data Frame? When I look > at the planner's conversion from logical.Project to the physical plan, I only > see it invoking child.mapPartitions without specifying the > preservesPartitioning flag. > > Thanks, > > -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Preserving partitioning with dataframe select
Hi everyone, When using raw RDDs, it is possible to have a map() operation indicate that the partitioning for the RDD would be preserved by the map operation. This makes it easier to reduce the overhead of shuffles by ensuring that RDDs are co-partitioned when they are joined. When I'm using Data Frames, I'm pre-partitioning the data frame by using DataFrame.partitionBy($"X"), but I will invoke a select statement after the partitioning before joining that dataframe with another. Roughly speaking, I'm doing something like this pseudo-code: partitionedDataFrame = dataFrame.partitionBy("$X") groupedDataFrame = partitionedDataFrame.groupBy($"X").agg(aggregations) // Rename "X" to "Y" to make sure columns are unique groupedDataFrameRenamed = groupedDataFrame.withColumnRenamed("X", "Y") // Roughly speaking, join on "X == Y" to get the aggregation results onto every row joinedDataFrame = partitionedDataFrame.join(groupedDataFrame) However the renaming of the columns maps to a select statement, and to my knowledge, selecting the columns is throwing off the partitioning which results in shuffle both the partitionedDataFrame and the groupedDataFrame. I have the following questions given this example: 1) Is pre-partitioning the Data Frame effective? In other words, does the physical planner recognize when underlying RDDs are co-partitioned and compute more efficient joins by reducing the amount of data that is shuffled? 2) If the planner takes advantage of co-partitioning, is the renaming of the columns invalidating the partitioning of the grouped Data Frame? When I look at the planner's conversion from logical.Project to the physical plan, I only see it invoking child.mapPartitions without specifying the preservesPartitioning flag. Thanks, -Matt Cheah
Quick question regarding Maven and Spark Assembly jar
Hi everyone, A very brief question out of curiosity is there any particular reason why we don¹t publish the Spark assembly jar on the Maven repository? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: DataFrames Aggregate does not spill?
I was executing on Spark 1.4 so I didn¹t notice the Tungsten option would make spilling happen in 1.5. I¹ll upgrade to 1.5 and see how that turns out. Thanks! From: Reynold Xin <r...@databricks.com> Date: Monday, September 21, 2015 at 5:36 PM To: Matt Cheah <mch...@palantir.com> Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Mingyu Kim <m...@palantir.com>, Peter Faiman <peterfai...@palantir.com> Subject: Re: DataFrames Aggregate does not spill? What's the plan if you run explain? In 1.5 the default should be TungstenAggregate, which does spill (switching from hash-based aggregation to sort-based aggregation). On Mon, Sep 21, 2015 at 5:34 PM, Matt Cheah <mch...@palantir.com> wrote: > Hi everyone, > > I¹m debugging some slowness and apparent memory pressure + GC issues after I > ported some workflows from raw RDDs to Data Frames. In particular, I¹m looking > into an aggregation workflow that computes many aggregations per key at once. > > My workflow before was doing a fairly straightforward combineByKey call where > the aggregation would build up non-trivially sized objects in memory I was > computing numerous sums over various fields of the data at a time. In > particular, I noticed that there was spilling to disk on the map side of the > aggregation. > > When I switched to using DataFrames aggregation particularly > DataFrame.groupBy(some list of keys).agg(exprs) where I passed a large number > of ³Sum² exprs - the execution began to choke. I saw one of my executors had a > long GC pause and my job isn¹t able to recover. However when I reduced the > number of Sum expressions being computed in the aggregation, the workflow > started to work normally. > > I have a hypothesis that I would like to run by everyone. In > org.apache.spark.sql.execution.Aggregate.scala, branch-1.5, I¹m looking at the > execution of Aggregation > <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_ > blob_branch-2D1.5_sql_core_src_main_scala_org_apache_spark_sql_execution_Aggre > gate.scala=BQMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=hzwIMNQ9E9 > 9EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs=MY0QvbkaVGKP6m7L6daL19eak5Q_ByWt_84mRZfff8 > k=2f8iTPkA6bsre40-juWK2Q5xA-v_5y6f3ucP4cKKa1s=> , which appears to build > the aggregation result in memory via updating a HashMap and iterating over the > rows. However this appears to be less robust than what would happen if > PairRDDFunctions.combineByKey were to be used. If combineByKey were used, then > instead of using two mapPartitions calls (assuming the aggregation is > partially-computable, as Sum is), it would use the ExternalSorter and > ExternalAppendOnlyMap objects to compute the aggregation. This would allow the > aggregated result to grow large as some of the aggregated result could be > spilled to disk. This especially seems bad if the aggregation reduction factor > is low; that is, if there are many unique keys in the dataset. In particular, > the Hash Map is holding O(# of keys * number of aggregated results per key) > items in memory at a time. > > I was wondering what everyone¹s thought on this problem is. Did we consciously > think about the robustness implications when choosing to use an in memory Hash > Map to compute the aggregation? Is this an inherent limitation of the > aggregation implementation in Data Frames? > > Thanks, > > -Matt Cheah > > > > > smime.p7s Description: S/MIME cryptographic signature
DataFrames Aggregate does not spill?
Hi everyone, I¹m debugging some slowness and apparent memory pressure + GC issues after I ported some workflows from raw RDDs to Data Frames. In particular, I¹m looking into an aggregation workflow that computes many aggregations per key at once. My workflow before was doing a fairly straightforward combineByKey call where the aggregation would build up non-trivially sized objects in memory I was computing numerous sums over various fields of the data at a time. In particular, I noticed that there was spilling to disk on the map side of the aggregation. When I switched to using DataFrames aggregation particularly DataFrame.groupBy(some list of keys).agg(exprs) where I passed a large number of ³Sum² exprs - the execution began to choke. I saw one of my executors had a long GC pause and my job isn¹t able to recover. However when I reduced the number of Sum expressions being computed in the aggregation, the workflow started to work normally. I have a hypothesis that I would like to run by everyone. In org.apache.spark.sql.execution.Aggregate.scala, branch-1.5, I¹m looking at the execution of Aggregation <https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org /apache/spark/sql/execution/Aggregate.scala> , which appears to build the aggregation result in memory via updating a HashMap and iterating over the rows. However this appears to be less robust than what would happen if PairRDDFunctions.combineByKey were to be used. If combineByKey were used, then instead of using two mapPartitions calls (assuming the aggregation is partially-computable, as Sum is), it would use the ExternalSorter and ExternalAppendOnlyMap objects to compute the aggregation. This would allow the aggregated result to grow large as some of the aggregated result could be spilled to disk. This especially seems bad if the aggregation reduction factor is low; that is, if there are many unique keys in the dataset. In particular, the Hash Map is holding O(# of keys * number of aggregated results per key) items in memory at a time. I was wondering what everyone¹s thought on this problem is. Did we consciously think about the robustness implications when choosing to use an in memory Hash Map to compute the aggregation? Is this an inherent limitation of the aggregation implementation in Data Frames? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
PySpark GroupByKey implementation question
Hi everyone, I was examining the Pyspark implementation of groupByKey in rdd.py. I would like to submit a patch improving Scala RDD¹s groupByKey that has a similar robustness against large groups, as Pyspark¹s implementation has logic to spill part of a single group to disk along the way. Its implementation appears to do the following: 1. Combine and group-by-key per partition locally, potentially spilling individual groups to disk 2. Shuffle the data explicitly using partitionBy 3. After the shuffle, do another local groupByKey to get the final result, again potentially spilling individual groups to disk My question is: what does the explicit map-side-combine step (#1) specifically benefit here? I was under the impression that map-side-combine for groupByKey was not optimal and is turned off in the Scala implementation Scala PairRDDFunctions.groupByKey calls to combineByKey with map-side-combine set to false. Is it something specific to how Pyspark can potentially spill the individual groups to disk? Thanks, -Matt Cheah P.S. Relevant Links: https://issues.apache.org/jira/browse/SPARK-3074 https://github.com/apache/spark/pull/1977 smime.p7s Description: S/MIME cryptographic signature
Re: [SQL] Write parquet files under partition directories?
Excellent! Where can I find the code, pull request, and Spark ticket where this was introduced? Thanks, -Matt Cheah From: Reynold Xin r...@databricks.com Date: Monday, June 1, 2015 at 10:25 PM To: Matt Cheah mch...@palantir.com Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Andrew Ash a...@palantir.com Subject: Re: [SQL] Write parquet files under partition directories? There will be in 1.4. df.write.partitionBy(year, month, day).parquet(/path/to/output) On Mon, Jun 1, 2015 at 10:21 PM, Matt Cheah mch...@palantir.com wrote: Hi there, I noticed in the latest Spark SQL programming guide https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_la test_sql-2Dprogramming-2Dguide.htmld=BQMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBr Z4tFb6oOnmz8r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=_7T9n01KFlQS8djMT P3ylblUaOYNr68mj286s8zIdQ8s=VQxAw6mG9yopDs37lNi7H_CnYiFQumqDAn9A8881Xyce= , there is support for optimized reading of partitioned Parquet files that have a particular directory structure (year=1/month=10/day=3, for example). However, I see no analogous way to write DataFrames as Parquet files with similar directory structures based on user-provided partitioning. Generally, is it possible to write DataFrames as partitioned Parquet files that downstream partition discovery can take advantage of later? I considered extending the Parquet output format, but it looks like ParquetTableOperations.scala has fixed the output format to AppendingParquetOutputFormat. Also, I was wondering if it would be valuable to contribute writing Parquet in partition directories as a PR. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
[SQL] Write parquet files under partition directories?
Hi there, I noticed in the latest Spark SQL programming guide https://spark.apache.org/docs/latest/sql-programming-guide.html , there is support for optimized reading of partitioned Parquet files that have a particular directory structure (year=1/month=10/day=3, for example). However, I see no analogous way to write DataFrames as Parquet files with similar directory structures based on user-provided partitioning. Generally, is it possible to write DataFrames as partitioned Parquet files that downstream partition discovery can take advantage of later? I considered extending the Parquet output format, but it looks like ParquetTableOperations.scala has fixed the output format to AppendingParquetOutputFormat. Also, I was wondering if it would be valuable to contribute writing Parquet in partition directories as a PR. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
But RDD.aggregate() has this code: // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps we just missed it and need to apply the change to aggregate()? It seems appropriate to target a fix for 1.3.0. -Matt Cheah From: Josh Rosen rosenvi...@gmail.com Date: Wednesday, February 18, 2015 at 6:12 AM To: Matt Cheah mch...@palantir.com Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Andrew Ash a...@palantir.com Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning? It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira _browse_SPARK-2D4743d=AwMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8 r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=HsNLIeID8mKWH68HoNyb_x4jS5D3 WSrjQQZX1rW_e9ws=lOqRteYjf7RRl41OfKvkfh7IaSs3wIW643Fz_Iwlekce= / https://github.com/apache/spark/pull/3605 https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spar k_pull_3605d=AwMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=hzwIMNQ 9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=HsNLIeID8mKWH68HoNyb_x4jS5D3WSrjQQZX1 rW_e9ws=60tyF-5TbJyVlh7upvFFhNbxKFhh9bUCWJMp5D2wUN8e= . Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD¹s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey requires that a ³createCombiner² function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: 1. What is the rationale for making the zero value be serialized using the closure serializer? This isn¹t part of the closure, but is an initial data item. 2. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
[Performance] Possible regression in rdd.take()?
Hi everyone, Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take() consistently has a slower execution time on the later release. I was wondering if anyone else has had similar observations. I have two setups where this reproduces. The first is a local test. I launched a spark cluster with 4 worker JVMs on my Mac, and launched a Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8 files, which ends up having 128 partitions, and a total of 8000 rows. The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all numbers being in seconds: 1 items Spark 1.0.2: 0.069281, 0.012261, 0.011083 Spark 1.1.1: 0.11577, 0.097636, 0.11321 4 items Spark 1.0.2: 0.023751, 0.069365, 0.023603 Spark 1.1.1: 0.224287, 0.229651, 0.158431 10 items Spark 1.0.2: 0.047019, 0.049056, 0.042568 Spark 1.1.1: 0.353277, 0.288965, 0.281751 40 items Spark 1.0.2: 0.216048, 0.198049, 0.796037 Spark 1.1.1: 1.865622, 2.224424, 2.037672 This small test suite indicates a consistently reproducible performance regression. I also notice this on a larger scale test. The cluster used is on EC2: ec2 instance type: m2.4xlarge 10 slaves, 1 master ephemeral storage 70 cores, 50 GB/box In this case, I have a 100GB dataset split into 78 files totally 350 million items, and I take the first 50,000 items from the RDD. In this case, I have tested this on different formats of the raw data. With plaintext files: Spark 1.0.2: 0.422s, 0.363s, 0.382s Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s With snappy-compressed Avro files: Spark 1.0.2: 0.73s, 0.395s, 0.426s Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s Again demonstrating a reproducible performance regression. I was wondering if anyone else observed this regression, and if so, if anyone would have any idea what could possibly have caused it between Spark 1.0.2 and Spark 1.1.1? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: [Performance] Possible regression in rdd.take()?
I actually tested Spark 1.2.0 with the code in the rdd.take() method swapped out for what was in Spark 1.0.2. The run time was still slower, which indicates to me something at work lower in the stack. -Matt Cheah On 2/18/15, 4:54 PM, Patrick Wendell pwend...@gmail.com wrote: I believe the heuristic governing the way that take() decides to fetch partitions changed between these versions. It could be that in certain cases the new heuristic is worse, but it might be good to just look at the source code and see, for your number of elements taken and number of partitions, if there was any effective change in how aggressively spark fetched partitions. This was quite a while ago, but I think the change was made because in many cases the newer code works more efficiently. - Patrick On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take() consistently has a slower execution time on the later release. I was wondering if anyone else has had similar observations. I have two setups where this reproduces. The first is a local test. I launched a spark cluster with 4 worker JVMs on my Mac, and launched a Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8 files, which ends up having 128 partitions, and a total of 8000 rows. The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all numbers being in seconds: 1 items Spark 1.0.2: 0.069281, 0.012261, 0.011083 Spark 1.1.1: 0.11577, 0.097636, 0.11321 4 items Spark 1.0.2: 0.023751, 0.069365, 0.023603 Spark 1.1.1: 0.224287, 0.229651, 0.158431 10 items Spark 1.0.2: 0.047019, 0.049056, 0.042568 Spark 1.1.1: 0.353277, 0.288965, 0.281751 40 items Spark 1.0.2: 0.216048, 0.198049, 0.796037 Spark 1.1.1: 1.865622, 2.224424, 2.037672 This small test suite indicates a consistently reproducible performance regression. I also notice this on a larger scale test. The cluster used is on EC2: ec2 instance type: m2.4xlarge 10 slaves, 1 master ephemeral storage 70 cores, 50 GB/box In this case, I have a 100GB dataset split into 78 files totally 350 million items, and I take the first 50,000 items from the RDD. In this case, I have tested this on different formats of the raw data. With plaintext files: Spark 1.0.2: 0.422s, 0.363s, 0.382s Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s With snappy-compressed Avro files: Spark 1.0.2: 0.73s, 0.395s, 0.426s Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s Again demonstrating a reproducible performance regression. I was wondering if anyone else observed this regression, and if so, if anyone would have any idea what could possibly have caused it between Spark 1.0.2 and Spark 1.1.1? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: [Performance] Possible regression in rdd.take()?
Ah okay, I turned on spark.localExecution.enabled and the performance returned to what Spark 1.0.2 had. However I can see how users can inadvertently incur memory and network strain in fetching the whole partition to the driver. I¹ll evaluate on my side if we want to turn this on or not. Thanks for the quick and accurate response! -Matt CHeah From: Aaron Davidson ilike...@gmail.com Date: Wednesday, February 18, 2015 at 5:25 PM To: Matt Cheah mch...@palantir.com Cc: Patrick Wendell pwend...@gmail.com, dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Sandor Van Wassenhove sand...@palantir.com Subject: Re: [Performance] Possible regression in rdd.take()? You might be seeing the result of this patch: https://github.com/apache/spark/commit/d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5 797 which was introduced in 1.1.1. This patch disabled the ability for take() to run without launching a Spark job, which means that the latency is significantly increased for small jobs (but not for large ones). You can try enabling local execution and seeing if your problem goes away. On Wed, Feb 18, 2015 at 5:10 PM, Matt Cheah mch...@palantir.com wrote: I actually tested Spark 1.2.0 with the code in the rdd.take() method swapped out for what was in Spark 1.0.2. The run time was still slower, which indicates to me something at work lower in the stack. -Matt Cheah On 2/18/15, 4:54 PM, Patrick Wendell pwend...@gmail.com wrote: I believe the heuristic governing the way that take() decides to fetch partitions changed between these versions. It could be that in certain cases the new heuristic is worse, but it might be good to just look at the source code and see, for your number of elements taken and number of partitions, if there was any effective change in how aggressively spark fetched partitions. This was quite a while ago, but I think the change was made because in many cases the newer code works more efficiently. - Patrick On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take() consistently has a slower execution time on the later release. I was wondering if anyone else has had similar observations. I have two setups where this reproduces. The first is a local test. I launched a spark cluster with 4 worker JVMs on my Mac, and launched a Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8 files, which ends up having 128 partitions, and a total of 8000 rows. The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all numbers being in seconds: 1 items Spark 1.0.2: 0.069281, 0.012261, 0.011083 Spark 1.1.1: 0.11577, 0.097636, 0.11321 4 items Spark 1.0.2: 0.023751, 0.069365, 0.023603 Spark 1.1.1: 0.224287, 0.229651, 0.158431 10 items Spark 1.0.2: 0.047019, 0.049056, 0.042568 Spark 1.1.1: 0.353277, 0.288965, 0.281751 40 items Spark 1.0.2: 0.216048, 0.198049, 0.796037 Spark 1.1.1: 1.865622, 2.224424, 2.037672 This small test suite indicates a consistently reproducible performance regression. I also notice this on a larger scale test. The cluster used is on EC2: ec2 instance type: m2.4xlarge 10 slaves, 1 master ephemeral storage 70 cores, 50 GB/box In this case, I have a 100GB dataset split into 78 files totally 350 million items, and I take the first 50,000 items from the RDD. In this case, I have tested this on different formats of the raw data. With plaintext files: Spark 1.0.2: 0.422s, 0.363s, 0.382s Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s With snappy-compressed Avro files: Spark 1.0.2: 0.73s, 0.395s, 0.426s Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s Again demonstrating a reproducible performance regression. I was wondering if anyone else observed this regression, and if so, if anyone would have any idea what could possibly have caused it between Spark 1.0.2 and Spark 1.1.1? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
Hi everyone, I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD¹s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey requires that a ³createCombiner² function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: 1. What is the rationale for making the zero value be serialized using the closure serializer? This isn¹t part of the closure, but is an initial data item. 2. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
[Classloading] Strange class loading issue
Hi everyone, I¹m running into a strange class loading issue when running a Spark job, using Spark 1.0.2. I¹m running a process where some Java code is compiled dynamically into a jar and added to the Spark context via addJar(). It is also added to the class loader of the thread that created the Spark context. When I try to run any job that only references the dynamically-compiled class on the workers, and then convert them to some other value (say integers) before collecting the result at the driver, the job completes successfully. We¹re using Kryo serialization. When I try to run a similar workflow but request for objects containing fields of the type of the dynamically compiled class at the driver (say by collect()) the job breaks with the following exception: Caused by: java.lang.RuntimeException: java.security.PrivilegedActionException: org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: Unable to find class: TBEjava1 Where ³TBEjava1² is the name of the dynamically compiled class. Here is what I can deduce from my debugging: 1. The class is loaded on the thread that launches the Spark context and calls reduce(). To check, I put a breakpoint right before my reduce() call and used Class.forName(TBEjava1, false, Thread.currentThread().getContextClassLoader()); and got back a valid class object without ClassNotFoundException being thrown. 2. The worker threads can also refer to the class. I put breakpoints in the worker methods (using local[N] mode for the context for now) and they complete the mapping and reducing functions successfully. 3. The Kryo serializer calls readClass() and then calls Class.forName() inside Kryo, using the class loader in that Kryo object. The class not found exception is thrown there, however the stack trace doesn¹t appear as such. I¹m wondering if we might be running into https://issues.apache.org/jira/browse/SPARK-3046 or something. I looked a bit at the Spark code, and from what I understand, the thread pool created for task result getter does not inherit the context class loader of the thread that created the Spark Context, which would explain why the task result getter threads can¹t find classes even though they are available via addJar(). Any suggestions for a workaround? Feel free to correct me on any incorrect observations I¹ve made as well. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Spark shuffle consolidateFiles performance degradation numbers
Hi everyone, I'm running into more and more cases where too many files are opened when spark.shuffle.consolidateFiles is turned off. I was wondering if this is a common scenario among the rest of the community, and if so, if it is worth considering the setting to be turned on by default. From the documentation, it seems like the performance could be hurt on ext3 file systems. However, what are the concrete numbers of performance degradation that is seen typically? A 2x slowdown in the average job? 3x? Also, what cause the performance degradation on ext3 file systems specifically? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Spark shuffle consolidateFiles performance degradation quantification
Hi everyone, I'm running into more and more cases where too many files are opened when spark.shuffle.consolidateFiles is turned off. I was wondering if this is a common scenario among the rest of the community, and if so, if it is worth considering the setting to be turned on by default. From the documentation, it seems like the performance could be hurt on ext3 file systems. However, what are the concrete numbers of performance degradation that is seen typically? A 2x slowdown in the average job? 3x? Also, what cause the performance degradation on ext3 file systems specifically? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature