[GitHub] spark issue #16685: [SPARK-19335] Introduce insert, update, and upsert comma...
Github user ilganeli commented on the issue: https://github.com/apache/spark/pull/16685 @gatorsmile I'll submit a PR with just the UPDATE functionality, how do you suggest proceeding on the UPSERT front? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16685: [SPARK-19335] Introduce insert, update, and upsert comma...
Github user ilganeli commented on the issue: https://github.com/apache/spark/pull/16685 @xwu0226 The The target table may be created and maintained outside of the Spark application. The only restriction is that in order to do efficient inserts, the table does not enforce a uniqueness constraint. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16685: [SPARK-19335] Introduce insert, update, and upsert comma...
Github user ilganeli commented on the issue: https://github.com/apache/spark/pull/16685 @xwu0226 Thanks for the comments, I've reviewed your submission and commented here https://github.com/apache/spark/pull/16692. Specifically in response to your comments: 1) We did not find the join to be a limiting factor in our tests. Granted, this is very dataset specific but conceptually, Spark can do distributed joins very effectively and extracting the data from the database is an O(n) operation. The main cost of this approach is the additional copy of data out of the database and then back in as an INSERT + UPDATE. However, an UPSERT operation is equivalent to a DELETE and INSERT operation. I think there may be a slight horse race between CopyOutOFDb/INSERT/UPDATE and UPSERT but I'm not convinced there's a dramatic performance cost in this step, particularly considering the dramatic cost of enforcing the uniqueness constraint for UPSERT. 2) This is indeed a valid concern. This approach requires the Spark programmer to enforce and maintain the uniqueness constraints on the table, rather than the other way around. This is a conceptual shift from how things are usually implemented (where the DB Admin is king) but in our case this choice was justified by massive performance improvements. 3) I agree using Prepared Statement would be better. I tried initially with Prepared Statement and ran into issues with certain datatypes (particularly timestamps). I haven't yet tried with the wildcards as it's currently implemented in JdbcUtils Insert statement, I think it's definitely doable that way. This might also help to boost performance. 4) I like the approach that you guys took to expand JDBCDialect in https://github.com/apache/spark/pull/16692. It's a well modularized approach. Agree that something similar could be done here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16692: [SPARK-19335] Introduce UPSERT feature to SPARK
Github user ilganeli commented on the issue: https://github.com/apache/spark/pull/16692 Hi, all - thanks for this submission. Overall it's a very clean implementation and I like it a lot. There's obviously a large amount of effort that went into developing this. The main issue with this approach however is that the Upsert statement itself is an extremely expensive operation. Depending on how your uniqueness condition is defined, validating against the uniqueness constraint proves to be the most expensive part of this whole sequence. In https://github.com/apache/spark/pull/16685 I chose to implement this by reading in the existing table and doing a join operation to identify conflicts. The reason for this is that operation is easily distributed across the entire dataset. In contrast, the implementation as it stands in this PR ultimately depends entirely on the database to enforce the uniqueness constraint, something that in fact can ONLY be executed serially and requires a full traversal of the index created on the uniqueness constraint. Furthermore, this index, in both MySQL and Postgres (the examples you've provided) cannot be implemented as a Hash index. Unless the owner of the database manually computes and enforces hashes on individual rows, this approach instead relies on btree indices to do this lookup. This is a marginal cost when the btree is on a single field but if the uniqueness constraint spans multiple columns, this index is implemented as nested btrees. This, in turn, proves to be an extraordinarily costly update with non-linear performance degradation as both the size of the database and the size of the table being upserted increase. This approach mirrors our initial approach to the problem but we ultimately moved away from this approach in favor of the one in https://github.com/apache/spark/pull/16685 for performance reasons. We were able to achieve a more than 10x performance increase, even taking into account the cost of the additional joins. Our tests were not massive - we tested against a roughly 10gb database in Postgres with approximately 10 million rows - on a relatively middle-line machine. I would love to know if you guys have done any performance benchmarks with this approach and if you could try out the approach in https://github.com/apache/spark/pull/16685 and let me know how that performs. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16685: [SPARK-19335] Introduce insert, update, and upsert comma...
Github user ilganeli commented on the issue: https://github.com/apache/spark/pull/16685 It sounds like you consider there to be too many errata and assumptions made in this patch for it to be a worthwhile code contribution. Given the numerous assumptions made in this PR, how would you instead feel about converting this as a documentation patch and somehow providing this as example code for users? I'm not sure if there is currently any official documentation around doing UPDATE in Spark so maybe this instead becomes a source of helpful information for others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16685: [SPARK-19335] Introduce insert, update, and upsert comma...
Github user ilganeli commented on the issue: https://github.com/apache/spark/pull/16685 @gatorsmile That makes a lot of sense. Here is a code snippet that relies on the database to do the UPSERT: ``` /** * Generate the SQL statement to perform an upsert (UPDATE OR INSERT) of a given row into a specific table * * @param row The row to insert into the table * @param schema The table schema * @param tableName The table name in the database * @param primaryKeys The unique constraint imposed on the database * @return */ @transient def genUpsertScript(row: Row, schema: StructType, tableName: String, primaryKeys: Set[String]): String = { val primaryKeyString: String = getKeyString(primaryKeys) val schemaString = schema.map(s => s.name).reduce(_ + ", " + _) val valString = row.toSeq.map(v => "'" + v.toString.replaceAll("'", "''") + "'").reduce(_ + "," + _) val withExcluded = { schema.map(_.name) .filterNot(primaryKeys.contains) .map(s => s + " = EXCLUDED." + s) //EXCLUDED is a magic internal Postgres table .reduce(_ + ",\n" + _) } val upsert = { s"INSERT INTO $tableName ($schemaString)\n VALUES ($valString)\n" + s"ON CONFLICT ($primaryKeyString) DO UPDATE\n" + s"SET\n" + withExcluded + ";" } logS("Generated SQL: " + upsert, Level.DEBUG) upsert } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16685: [SPARK-19335] Introduce insert, update, and upsert comma...
Github user ilganeli commented on the issue: https://github.com/apache/spark/pull/16685 I recognize that this is not an optimal solution, but Spark has historically contained multiple sub-optimal operations that are nonetheless useful in certain contexts and it's left to the user to understand and use things correctly. A few examples off the top of my head include collectPartitions, zipWithIndex, and repartition - all of which may be expensive operations but are nonetheless useful when used appropriately. I believe there's value in introducing this as a starting point which works in most scenarios and is more efficient than relying on the database to handle the uniqueness constraint and be responsible for a mass update, with the expectation of future improvement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16685: [SPARK-19335] Introduce insert, update, and upsert comma...
Github user ilganeli commented on the issue: https://github.com/apache/spark/pull/16685 @gatorsmile What is a "key" update and in what context would that sort of operation be needed? I don't think a secondary index on the table prevent this method from working, the primary issue is that makes it a more expensive operation. The database still enforces any existing constraints. If the ask is to support a "uniqueness" constraint on multiple columns, that is already supported via ```primaryKeys``` passed to the upsert function(). The update uses the "id" column not as a uniqueness constraint, but as a simple and efficient way to identify a given row to update. A future improvement would be to support using multiple columns to identify the row to update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16685: [SPARK-19335] Introduce insert, update, and upser...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/16685#discussion_r97623026 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -722,14 +724,246 @@ object JdbcUtils extends Logging { } /** + * Check whether a table exists in a given database + * + * @return True if the table exists. + */ + @transient + def checkTableExists(targetDb: String, tableName: String): Boolean = { +val dbc: Connection = DriverManager.getConnection(targetDb) +val dbm = dbc.getMetaData() +// Check if the table exists. If it exists, perform an upsert. +// Otherwise, do a simple dataframe write to the DB +val tables = dbm.getTables(null, null, tableName, null) +val exists = tables.next() // Returns false if next does not exist +dbc.close() +exists + } + + // Provide a reasonable starting batch size for database operations. + private val DEFAULT_BATCH_SIZE: Int = 200 + + // Limit the number of database connections. Some DBs suffer when there are many open + // connections. + private val DEFAULT_MAX_CONNECTIONS: Int = 50 --- End diff -- Got it, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16685: [SPARK-19335] Introduce insert, update, and upsert comma...
Github user ilganeli commented on the issue: https://github.com/apache/spark/pull/16685 @gatorsmile That's exactly right - in our testing, writing to databases with unique constraints proved to be extremely difficult to do efficiently. That's why this approach moves the maintenance of the unique constraint into the application (in this case the upsert function). This is obviously suboptimal because it weakens the database to some degree but I think there's still enough utility in allowing an update operation. I know there's certainly been enough demand for it online and it's a common use case. I think with proper documentation of the challenges of doing an upsert and the necessary considerations for the table being updated, this can be a very welcome addition to Spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16685: [SPARK-19335] Introduce insert, update, and upser...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/16685#discussion_r97455966 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -17,20 +17,22 @@ package org.apache.spark.sql.execution.datasources.jdbc --- End diff -- Oh heck - this did seem like the appropriate place to put this though. Any thoughts on where it could live instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16685: [SPARK-19935] Introduce insert, update, and upsert comma...
Github user ilganeli commented on the issue: https://github.com/apache/spark/pull/16685 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16685: [SPARK-19935] Introduce insert, update, and upser...
GitHub user ilganeli opened a pull request: https://github.com/apache/spark/pull/16685 [SPARK-19935] Introduce insert, update, and upsert commands to the JdbcUtils class ## What changes were proposed in this pull request? Adds the ability to perform an insert, update, or update command to the JdbcUtils class which supports writing DataFrames to databases via JDBC This functionality has not existed heretofore within Spark and doing an Upsert efficiently is generally difficult. The method presented here strikes a reasonable balance between simplicity and performance and has shown reasonably efficient scaling. The insert operation, while already existing, is implemented slightly differently in this approach to be consistent with how update is implemented. ## How was this patch tested? This functionality has been tested through extensive manual testing and tuning while developing this patch. If the committers believe that this is a valuable addition, I will be happy to add additional unit tests around this feature. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilganeli/spark SPARK-19935 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16685.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16685 commit 8d499fe908c29f3b84236315a65e9221ae08cb14 Author: Ilya Ganelin Date: 2017-01-24T00:28:16Z Introduce insert, update, and upsert commands to the JdbcUtils class commit 89cef373077283627cc896dce4ab95c9d5aa41de Author: Ilya Ganelin Date: 2017-01-24T00:32:49Z Extra line commit e1fc6f6697a00567015c47d13173ec4976e7cbb3 Author: Ilya Ganelin Date: 2017-01-24T00:37:12Z Fixed merge conflicts commit a64719b2c0b687cbe0b854d4a0c5e6e02f75a0bc Author: Ilya Ganelin Date: 2017-01-24T00:39:13Z Reverted changes to df writer commit ca494ebdf9110b67c96fc1c3df8463a4d63a56da Author: Ilya Ganelin Date: 2017-01-24T00:39:46Z Reverted changes to savemode --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8890][SQL][WIP] Reduce memory consumpti...
Github user ilganeli closed the pull request at: https://github.com/apache/spark/pull/7514 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8464][Core][Shuffle] Consider separatin...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7129#issuecomment-137578014 Thatâs fine, go for it. From: Michael Armbrust mailto:notificati...@github.com>> Reply-To: apache/spark mailto:re...@reply.github.com>> Date: Thursday, September 3, 2015 at 2:18 PM To: apache/spark mailto:sp...@noreply.github.com>> Cc: "Ganelin, Ilya" mailto:ilya.gane...@capitalone.com>> Subject: Re: [spark] [SPARK-8464][Core][Shuffle] Consider separating aggregator and non-aggregator paths in ExternalSorter (#7129) @ilganeli<https://github.com/ilganeli> thanks a lot for working on this, but we've decided this probably isn't the right thing to do. Do you mind if we close this issue? â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/7129#issuecomment-137577700>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r38597940 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -473,6 +473,283 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + + // Helper function to validate state when creating tests for task failures + def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { +assert(stageAttempt.stageId === stageId) +assert(stageAttempt.stageAttemptId == attempt) + } + + def makeCompletions(stageAttempt: TaskSet, reduceParts: Int): Seq[(Success.type, MapStatus)] = { +stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, makeMapStatus("host" + ('A' + idx).toChar, reduceParts)) +}.toSeq + } + + // Helper functions to extract commonly used code in Fetch Failure test cases + def setupStageAbortTest(sc: SparkContext) { +sc.listenerBus.addListener(new EndListener()) +ended = false +jobResult = null + } + + // Create a new Listener to confirm that the listenerBus sees the JobEnd message + // when we abort the stage. This message will also be consumed by the EventLoggingListener + // so this will propagate up to the user. + var ended = false + var jobResult : JobResult = null + + class EndListener extends SparkListener { +override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + jobResult = jobEnd.jobResult + ended = true +} + } + + /** + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it + * succesfullly. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + * @param numShufflePartitions - The number of partitions in the next stage + */ + def completeNextShuffleMapSuccesfully( + stageId: Int, + attemptIdx: Int, + numShufflePartitions: Int): Unit = { +val stageAttempt = taskSets.last +checkStageId(stageId, attemptIdx, stageAttempt) +complete(stageAttempt, makeCompletions(stageAttempt, numShufflePartitions)) + } + + /** + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it + * with all FetchFailure. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + * @param shuffleDep - The shuffle dependency of the stage with a fetch failure + */ + def completeNextStageWithFetchFailure( + stageId: Int, + attemptIdx: Int, + shuffleDep: ShuffleDependency[_, _, _]): Unit = { +val stageAttempt = taskSets.last +checkStageId(stageId, attemptIdx, stageAttempt) + +complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map{ case (task, idx) => + (FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, "ignored"), null) +}.toSeq) + } + + /** + * Common code to get the next result stage attempt, confirm it's the one we expect, and + * complete it with a success where we return 42. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + */ + def completeNextResultStageWithSuccess (stageId: Int, attemptIdx: Int): Unit = { +val stageAttempt = taskSets.last +checkStageId(stageId, attemptIdx, stageAttempt) +assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage]) +complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq) + } + + /** + * In this test, we simulate a job where many tasks in the same stage fail. We want to show + * that many fetch failures inside a single stage attempt do not trigger an abort + * on their own, but only when there are enough failing stage attempts. + */ + test("Multiple tasks w/ fetch failures in same stage attempt should not abort the stage.") { +setupStageAbortTest(sc) + +val parts = 8 +val shuffleMapRdd = new MyRDD(sc, parts, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleId = shuffleDep.shuffleId +val reduceRdd = new MyRDD(sc, parts, List(shuffleDep)) +submit(reduceRdd, (0 until parts).toArray) + +completeNextShuffleMapSuccesfully(0, 0, numShufflePartitions = parts) + +completeNextStageWithFetchFailure(1, 0, shuffleDep) + +// Resubmit and confirm that now all is well +scheduler.resubmitFailedStages() + +assert(scheduler.runningStages.nonEmpty)
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r38597257 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -473,6 +473,283 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + + // Helper function to validate state when creating tests for task failures + def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { +assert(stageAttempt.stageId === stageId) +assert(stageAttempt.stageAttemptId == attempt) + } + + def makeCompletions(stageAttempt: TaskSet, reduceParts: Int): Seq[(Success.type, MapStatus)] = { +stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, makeMapStatus("host" + ('A' + idx).toChar, reduceParts)) +}.toSeq + } + + // Helper functions to extract commonly used code in Fetch Failure test cases + def setupStageAbortTest(sc: SparkContext) { +sc.listenerBus.addListener(new EndListener()) +ended = false +jobResult = null + } + + // Create a new Listener to confirm that the listenerBus sees the JobEnd message + // when we abort the stage. This message will also be consumed by the EventLoggingListener + // so this will propagate up to the user. + var ended = false + var jobResult : JobResult = null + + class EndListener extends SparkListener { +override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + jobResult = jobEnd.jobResult + ended = true +} + } + + /** + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it + * succesfullly. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + * @param numShufflePartitions - The number of partitions in the next stage + */ + def completeNextShuffleMapSuccesfully( --- End diff -- @kayousterhout Yes, I can - I always do this :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r38566621 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -473,6 +473,280 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + // Helper function to validate state when creating tests for task failures + def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { +assert(stageAttempt.stageId === stageId) +assert(stageAttempt.stageAttemptId == attempt) + } + + def makeCompletions(stageAttempt: TaskSet, reduceParts: Int): Seq[(Success.type, MapStatus)] = { +stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, makeMapStatus("host" + ('A' + idx).toChar, reduceParts)) +}.toSeq + } + + def setupStageAbortTest(sc: SparkContext) { +sc.listenerBus.addListener(new EndListener()) +ended = false +jobResult = null + } + + // Create a new Listener to confirm that the listenerBus sees the JobEnd message + // when we abort the stage. This message will also be consumed by the EventLoggingListener + // so this will propagate up to the user. + var ended = false + var jobResult : JobResult = null + + class EndListener extends SparkListener { +override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + jobResult = jobEnd.jobResult + ended = true +} + } + + // Helper functions to extract commonly used code in Fetch Failure test cases + /** + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it + * succesfullly. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + * @param numShufflePartitions - The number of partitions in the next stage + */ + def completeNextShuffleMapSuccesfully(stageId: Int, attemptIdx: Int, + numShufflePartitions: Int): Unit = { +val stageAttempt = taskSets.last +checkStageId(stageId, attemptIdx, stageAttempt) +complete(stageAttempt, makeCompletions(stageAttempt, numShufflePartitions)) + } + + /** + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it + * with all FetchFailure. --- End diff -- Imran â I donât have cycles to do a significant refactor at the moment. I would suggest we merge and follow up later. From: Imran Rashid mailto:notificati...@github.com>> Reply-To: apache/spark mailto:re...@reply.github.com>> Date: Wednesday, September 2, 2015 at 11:24 AM To: apache/spark mailto:sp...@noreply.github.com>> Cc: "Ganelin, Ilya" mailto:ilya.gane...@capitalone.com>> Subject: Re: [spark] [SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException (#5636) In core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala<https://github.com/apache/spark/pull/5636#discussion_r38566341>: > + * succesfullly. > + * > + * @param stageId - The current stageId > + * @param attemptIdx - The current attempt count > + * @param numShufflePartitions - The number of partitions in the next stage > + */ > + def completeNextShuffleMapSuccesfully(stageId: Int, attemptIdx: Int, > + numShufflePartitions: Int): Unit = { > +val stageAttempt = taskSets.last > +checkStageId(stageId, attemptIdx, stageAttempt) > +complete(stageAttempt, makeCompletions(stageAttempt, numShufflePartitions)) > + } > + > + /** > + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it > + * with all FetchFailure. yeah, agree that as is, that test isn't really adding anything over the other tests as you've noted. I certainly don't think I'd say "too hard to fix" -- I suppose its just my antsy-ness to get this in, but objectively, it probably makes sense to fix. all you are really asking is to change completeNextStageWithFetchFailure to oneFetchFailureInNextStage and change "Multiple tasks w/ fetch failures..." to just directly do what this method is doing now, pretty minor change. How about this: wait a day for @ilganeli<https://github.com/ilganeli> to update, and if he doesn't get to it we merge as-is and I do a simple follow-up pr? â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/5636/files#r38566341>. The information contained i
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-137184487 Updated based on comments. Thanks for the review @kayousterhout --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r38479539 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -473,6 +473,280 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + // Helper function to validate state when creating tests for task failures + def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { +assert(stageAttempt.stageId === stageId) +assert(stageAttempt.stageAttemptId == attempt) + } + + def makeCompletions(stageAttempt: TaskSet, reduceParts: Int): Seq[(Success.type, MapStatus)] = { +stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, makeMapStatus("host" + ('A' + idx).toChar, reduceParts)) +}.toSeq + } + + def setupStageAbortTest(sc: SparkContext) { +sc.listenerBus.addListener(new EndListener()) +ended = false +jobResult = null + } + + // Create a new Listener to confirm that the listenerBus sees the JobEnd message + // when we abort the stage. This message will also be consumed by the EventLoggingListener + // so this will propagate up to the user. + var ended = false + var jobResult : JobResult = null + + class EndListener extends SparkListener { +override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + jobResult = jobEnd.jobResult + ended = true +} + } + + // Helper functions to extract commonly used code in Fetch Failure test cases + /** + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it + * succesfullly. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + * @param numShufflePartitions - The number of partitions in the next stage + */ + def completeNextShuffleMapSuccesfully(stageId: Int, attemptIdx: Int, + numShufflePartitions: Int): Unit = { +val stageAttempt = taskSets.last +checkStageId(stageId, attemptIdx, stageAttempt) +complete(stageAttempt, makeCompletions(stageAttempt, numShufflePartitions)) + } + + /** + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it + * with all FetchFailure. --- End diff -- This is primarily for convenience when automatically generating failing tasks. Thank you, Ilya Ganelin -Original Message- From: Kay Ousterhout [notificati...@github.com<mailto:notificati...@github.com>] Sent: Tuesday, September 01, 2015 06:19 PM Eastern Standard Time To: apache/spark Cc: Ganelin, Ilya Subject: Re: [spark] [SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException (#5636) In core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala<https://github.com/apache/spark/pull/5636#discussion_r38479198>: > + * succesfullly. > + * > + * @param stageId - The current stageId > + * @param attemptIdx - The current attempt count > + * @param numShufflePartitions - The number of partitions in the next stage > + */ > + def completeNextShuffleMapSuccesfully(stageId: Int, attemptIdx: Int, > + numShufflePartitions: Int): Unit = { > +val stageAttempt = taskSets.last > +checkStageId(stageId, attemptIdx, stageAttempt) > +complete(stageAttempt, makeCompletions(stageAttempt, numShufflePartitions)) > + } > + > + /** > + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it > + * with all FetchFailure. Is there a reason all of the tasks need to end in Failure? Can you just have one task end in a failure? â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/5636/files#r38479198>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your comp
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-136836260 Sure man - I'll do this shortly. Thanks. Thank you, Ilya Ganelin -Original Message- From: Imran Rashid [notificati...@github.com<mailto:notificati...@github.com>] Sent: Tuesday, September 01, 2015 03:08 PM Eastern Standard Time To: apache/spark Cc: Ganelin, Ilya Subject: Re: [spark] [SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException (#5636) hey @ilganeli<https://github.com/ilganeli> just one minor book-keeping thing -- can you update the PR description, since that becomes the commit msg? The original description is no longer accurate, can you change to something like: To avoid an infinite loop of stage retries, we abort the job completely after 4 consecutive stage failures for one stage. We still allow more than stage failures if there is an intervening successful attempt for the stage, so that in very long-lived applications, where a stage may get reused many times, we don't abort the job after failures that have been recovered from successfully. â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/5636#issuecomment-136829906>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-132799248 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-128505348 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r36226584 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -473,6 +473,324 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + // Helper function to validate state when creating tests for task failures + def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { +assert(stageAttempt.stageId === stageId) +assert(stageAttempt.stageAttemptId == attempt) + } + + def makeCompletions(stageAttempt: TaskSet, reduceParts: Int): Seq[(Success.type, MapStatus)] = { +stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, makeMapStatus("host" + ('A' + idx).toChar, reduceParts)) +}.toSeq + } + + def setupStageAbortTest(sc: SparkContext) { +sc.listenerBus.addListener(new EndListener()) +ended = false +jobResult = null + } + + // Create a new Listener to confirm that the listenerBus sees the JobEnd message + // when we abort the stage. This message will also be consumed by the EventLoggingListener + // so this will propagate up to the user. + var ended = false + var jobResult : JobResult = null + + class EndListener extends SparkListener { +override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + jobResult = jobEnd.jobResult + ended = true +} + } + + /** + * In this test we simulate a job failure where the first stage completes successfully and + * the second stage fails due to a fetch failure. Multiple successive fetch failures of a stage + * trigger an overall stage abort to avoid endless retries. + */ + test("Multiple consecutive stage failures should lead to task being aborted.") { +setupStageAbortTest(sc) + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleId = shuffleDep.shuffleId +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) + +for (attempt <- 0 until Stage.MAX_STAGE_FAILURES) { + // Complete all the tasks for the current attempt of stage 0 successfully + val stage0Attempt = taskSets.last + checkStageId(0, attempt, stage0Attempt) + + // Run stage 0 + complete(stage0Attempt, makeCompletions(stage0Attempt, 2)) --- End diff -- I made the above update (just testing and will commit shortly). Grabbing the number of partitions from the scheduler was a no-go since that was throwing NullPointerExceptions. I just pass it as a parameter instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8890][SQL][WIP] Reduce memory consumpti...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/7514#discussion_r36017849 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala --- @@ -244,34 +244,65 @@ private[sql] case class InsertIntoHadoopFsRelation( } def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { + // Track which rows have been output to disk so that if a data sort is necessary mid-write, + // we don't end up outputting the same data twice + val writtenRows: HashSet[InternalRow] = new HashSet[InternalRow] --- End diff -- Got it, so just use an ``ExternalSorter``` based off that iterator to do the sort to avoid potential memory problems. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8890][SQL][WIP] Reduce memory consumpti...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/7514#discussion_r36006570 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala --- @@ -244,34 +244,65 @@ private[sql] case class InsertIntoHadoopFsRelation( } def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { + // Track which rows have been output to disk so that if a data sort is necessary mid-write, + // we don't end up outputting the same data twice + val writtenRows: HashSet[InternalRow] = new HashSet[InternalRow] --- End diff -- The point is that after a sort, everything is reorganized so we may end up traversing some elements that have already been processed, no? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8890][SQL][WIP] Reduce memory consumpti...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/7514#discussion_r35994814 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala --- @@ -244,34 +244,65 @@ private[sql] case class InsertIntoHadoopFsRelation( } def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { + // Track which rows have been output to disk so that if a data sort is necessary mid-write, + // we don't end up outputting the same data twice + val writtenRows: HashSet[InternalRow] = new HashSet[InternalRow] --- End diff -- Is there a preferred way to do this? I could have the HashSet be created once to avoid creating it every time and clear it between calls? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8890][SQL][WIP] Reduce memory consumpti...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/7514#discussion_r35994765 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala --- @@ -244,34 +244,65 @@ private[sql] case class InsertIntoHadoopFsRelation( } def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { + // Track which rows have been output to disk so that if a data sort is necessary mid-write, + // we don't end up outputting the same data twice + val writtenRows: HashSet[InternalRow] = new HashSet[InternalRow] + + // Flag to track whether data has been sorted in which case it's safe to close previously + // used outputWriters + var sorted: Boolean = false + // If anything below fails, we should abort the task. try { writerContainer.executorSideSetup(taskContext) -// Projects all partition columns and casts them to strings to build partition directories. -val partitionCasts = partitionOutput.map(Cast(_, StringType)) -val partitionProj = newProjection(codegenEnabled, partitionCasts, output) -val dataProj = newProjection(codegenEnabled, dataOutput, output) +// Sort the data by partition so that it's possible to use a single outputWriter at a +// time to process the incoming data +def sortRows(iterator: Iterator[InternalRow]): Iterator[InternalRow] = { + // Sort by the same key used to look up the outputWriter to allow us to recyle the writer + iterator.toArray.sortBy(writerContainer.computePartitionPath).toIterator --- End diff -- Isn't the iterator object already in memory? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8890][SQL][WIP] Reduce memory consumpti...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7514#issuecomment-126751779 @rxin @davies @JoshRosen Hey all, could I please get a review of these updates? I'd love to get this fix in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8464][Core][Shuffle] Consider separatin...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7129#issuecomment-126736990 @rxin @JoshRosen @davies Could I please get a review for this patch? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r35719942 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -473,6 +473,322 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + // Helper function to validate state when creating tests for task failures + def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { +assert(stageAttempt.stageId === stageId) +assert(stageAttempt.stageAttemptId == attempt) + } + + def makeCompletions(stageAttempt: TaskSet): Seq[(Success.type, MapStatus)] = { +stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, makeMapStatus("host" + ('A' + idx).toChar, stageAttempt.tasks.size)) --- End diff -- Ahh makes sense. Thank you, Ilya Ganelin -Original Message- From: Imran Rashid [notificati...@github.com<mailto:notificati...@github.com>] Sent: Tuesday, July 28, 2015 09:09 PM Eastern Standard Time To: apache/spark Cc: Ganelin, Ilya Subject: Re: [spark] [SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException (#5636) In core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala<https://github.com/apache/spark/pull/5636#discussion_r35719826>: > @@ -473,6 +473,322 @@ class DAGSchedulerSuite > assertDataStructuresEmpty() >} > > + // Helper function to validate state when creating tests for task failures > + def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { > +assert(stageAttempt.stageId === stageId) > +assert(stageAttempt.stageAttemptId == attempt) > + } > + > + def makeCompletions(stageAttempt: TaskSet): Seq[(Success.type, MapStatus)] = { > +stageAttempt.tasks.zipWithIndex.map { case (task, idx) => > + (Success, makeMapStatus("host" + ('A' + idx).toChar, stageAttempt.tasks.size)) the last arg to makeMapStatus is actually the number of partitions for the next stage, so you can't just use stageAttempt.tasks.size. You need to add a reduceParts arg to makeCompletions. â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/5636/files#r35719826>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r35695602 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -473,6 +473,319 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + // Helper function to validate state when creating tests for task failures + def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { +assert(stageAttempt.stageId === stageId) +assert(stageAttempt.stageAttemptId == attempt-1) + } + + /** + * In this test we simulate a job failure where the first stage completes successfully and + * the second stage fails due to a fetch failure. Multiple successive fetch failures of a stage + * trigger an overall stage abort to avoid endless retries. + */ + test("Multiple consecutive stage failures should lead to task being aborted.") { +// Create a new Listener to confirm that the listenerBus sees the JobEnd message +// when we abort the stage. This message will also be consumed by the EventLoggingListener +// so this will propagate up to the user. +var ended = false +var jobResult : JobResult = null +class EndListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +jobResult = jobEnd.jobResult +ended = true + } +} + +sc.listenerBus.addListener(new EndListener()) + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleId = shuffleDep.shuffleId +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) + +for (attempt <- 1 to Stage.MAX_STAGE_FAILURES) { + // Complete all the tasks for the current attempt of stage 0 successfully + val stage0Attempt = taskSets.last + + // Confirm that this is the first attempt for stage 0 + checkStageId(0, attempt, stage0Attempt) + + // Make each task in stage 0 success + val completions = stage0Attempt.tasks.zipWithIndex.map{ case (task, idx) => +(Success, makeMapStatus("host" + ('A' + idx).toChar, 2)) + }.toSeq + + // Run stage 0 + complete(stage0Attempt, completions) + + // Now we should have a new taskSet, for a new attempt of stage 1. + // We will have one fetch failure for this task set + val stage1Attempt = taskSets.last + checkStageId(1, attempt, stage1Attempt) + + val stage1Successes = stage1Attempt.tasks.tail.map { _ => (Success, 42)} + + // Run Stage 1, this time with a task failure + complete(stage1Attempt, +Seq((FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)) + ++ stage1Successes + ) + + // this will (potentially) trigger a resubmission of stage 0, since we've lost some of its + // map output, for the next iteration through the loop + scheduler.resubmitFailedStages() + + if (attempt < Stage.MAX_STAGE_FAILURES) { +assert(scheduler.runningStages.nonEmpty) +assert(!ended) + } else { +// Stage has been aborted and removed from running stages +assertDataStructuresEmpty() +sc.listenerBus.waitUntilEmpty(1000) +assert(ended) +assert(jobResult.isInstanceOf[JobFailed]) + } +} + } + + /** + * In this test we simulate a job failure where there are two failures in two different stages. + * Specifically, stage0 fails twice, and then stage1 twice. In total, the job has had four + * failures overall but not four failures for a particular stage, and as such should not be + * aborted. + */ + test("Failures in different stages should not trigger an overall abort") { +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleId = shuffleDep.shuffleId +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) + +// In the first two iterations, Stage 0 succeeds and stage 1 fails. In the next two iterations, +// stage 0 fails. +for (attempt <- 1 to Stage.MAX_STAGE_FAILURES) { + // Complete all the tasks for the current attempt of stage 0 successfully + val stage0Attempt = taskSets.last + + // Confirm that this is the first attempt for stage 0 + checkStageId(0, attempt, stage0Attempt) + + if (attempt < Stage.MAX_STAGE_FAILURES/
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-125678382 @squito I submitted updates based on our discussion, the one open question is whether I'm verifying success of stages correctly in the second test or if there's a better way. The other tests are coming shortly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r35605388 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -76,6 +76,33 @@ private[spark] abstract class Stage( */ private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId) + /** + * Spark is resilient to executors dying by retrying stages on FetchFailures. Here, we keep track + * of the number of stage failures to prevent endless stage retries. + */ + private var failedStageCount = 0 + + private[scheduler] def clearFailures() : Unit = { +failedStageCount = 0 + } + + /** + * Check whether we should abort the failedStage due to multiple failures. + * This method updates the running count of failures for a particular stage and returns + * true if the number of failures exceeds the allowable number of failures. + */ + private[scheduler] def failAndShouldAbort(): Boolean = { +// We increment the failure count on the first attempt for a particular Stage +if (_latestInfo.attemptId == 0) +{ + failedStageCount += 1 +} --- End diff -- That's what I had a few commits prior, as I understood it, we wanted a set since at that time we were concerned about multiple concurrent attempts (and thus failures) per stage. From the test harness though, I guess what we really need is to track failures per stageId. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r35598677 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -473,6 +473,232 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + // Helper function to validate state and print output when creating tests for task failures + def checkStageIdAndPrint(stageId: Int, attempt: Int, stageAttempt: TaskSet) { +println(s"$attempt($attempt): taskSets = $taskSets : ${ + taskSets.map{_.tasks.mkString(",")}.mkString(";")}") + +assert(stageAttempt.stageId === stageId) +assert(stageAttempt.stageAttemptId == attempt) +println(s"tasks for $stageAttempt : ${stageAttempt.tasks.mkString(",")}") + } + + /** + * In this test we simulate a job failure where the first stage completes successfully and + * the second stage fails due to a fetch failure. Multiple successive fetch failures of a stage + * trigger an overall stage abort to avoid endless retries. + */ + test("Multiple consecutive stage failures should lead to task being aborted.") { +// Create a new Listener to confirm that the listenerBus sees the JobEnd message +// when we abort the stage. This message will also be consumed by the EventLoggingListener +// so this will propagate up to the user. +var ended = false +var jobResult : JobResult = null +class EndListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +jobResult = jobEnd.jobResult +ended = true + } +} + +sc.listenerBus.addListener(new EndListener()) + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleId = shuffleDep.shuffleId +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) + +for (attempt <- 0 to Stage.MAX_STAGE_FAILURES) { + // Complete all the tasks for the current attempt of stage 0 successfully + val stage0Attempt = taskSets.last + + // Confirm that this is the first attempt for stage 0 + checkStageIdAndPrint(0, attempt, stage0Attempt) + + // Make each task in stage 0 success + val completions = stage0Attempt.tasks.zipWithIndex.map{ case (task, idx) => +(Success, makeMapStatus("host" + ('A' + idx).toChar, 2)) + }.toSeq + + // Run stage 0 + complete(stage0Attempt, completions) + + // Now we should have a new taskSet, for a new attempt of stage 1. + // We will have one fetch failure for this task set + val stage1Attempt = taskSets.last + checkStageIdAndPrint(1, attempt, stage1Attempt) + + val stage1Successes = stage1Attempt.tasks.tail.map { _ => (Success, 42)} + + // Run Stage 1, this time with a task failure + complete(stage1Attempt, +Seq((FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)) + ++ stage1Successes + ) + + // this will (potentially) trigger a resubmission of stage 0, since we've lost some of its + // map output, for the next iteration through the loop + scheduler.resubmitFailedStages() + + if (attempt < Stage.MAX_STAGE_FAILURES) { +assert(scheduler.runningStages.nonEmpty) +assert(!ended) + } else { +// Stage has been aborted and removed from running stages +assertDataStructuresEmpty() +sc.listenerBus.waitUntilEmpty(1000) +assert(ended) +assert(jobResult.isInstanceOf[JobFailed]) + } +} + } + + /** + * In this test we simulate a job failure where there are two failures in two different stages. + * Specifically, stage0 fails twice, and then stage1 twice. In total, the job has had four + * failures overall but not four failures for a particular stage, and as such should not be + * aborted. + */ + test("Failures in different stages should not trigger an overall abort") { +// Create a new Listener to confirm that the listenerBus sees the JobEnd message +// when we abort the stage. This message will also be consumed by the EventLoggingListener +// so this will propagate up to the user. +var ended = false +var jobResult : JobResult = null +class EndListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +jobResult
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-125338587 @squito Thanks for the complete example, this really helped clarify what's going on. I've updated the three tests cases I had based on this code and added a bunch of comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r35374507 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -475,7 +475,148 @@ class DAGSchedulerSuite assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } + + test("Multiple consecutive stage failures should lead to stage being aborted.") { +// Create a new Listener to confirm that the listenerBus sees the JobEnd message +// when we abort the stage. This message will also be consumed by the EventLoggingListener +// so this will propagate up to the user. +var ended = false +var jobResult : JobResult = null +class EndListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +jobResult = jobEnd.jobResult +ended = true + } +} + +sc.listenerBus.addListener(new EndListener()) + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleId = shuffleDep.shuffleId +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) + +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1 + +for (x <- 1 to Stage.MAX_STAGE_FAILURES) { + // the 2nd ResultTask failed + complete(taskSets(1), Seq( --- End diff -- @squito What would be the correct way to have multiple attempts versus what I'm doing? This framework is still somewhat confusing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8890][SQL][WIP] Reduce memory consumpti...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7514#issuecomment-122916688 @rxin Where would be the best place to add a test for this functionality? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8464][Core][Shuffle] Consider separatin...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7129#issuecomment-122069744 @JoshRosen @rxin Hi folks - any chance of getting a review? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-121082174 Awesome, thanks Imran. I'll jump on this once things are merged in! Thank you, Ilya Ganelin -Original Message- From: Imran Rashid [notificati...@github.com<mailto:notificati...@github.com>] Sent: Monday, July 13, 2015 05:57 PM Eastern Standard Time To: apache/spark Cc: Ganelin, Ilya Subject: Re: [spark] [SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException (#5636) Hi @ilganeli<https://github.com/ilganeli> I think #6750<https://github.com/apache/spark/pull/6750> is getting close to being merged -- that will give you what you need to pick this back up (specifically, each task knows its stage attempt). Just wanted to give you a heads up if you want to try to bring this one up to date and address the other issues. â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/5636#issuecomment-121074092>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8464][Core][Shuffle] Consider separatin...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7129#issuecomment-120462843 @rxin @joshrosen Could I please get a review of this PR? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8464][Core][Shuffle] Consider separatin...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7129#issuecomment-119810173 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8464][Core][Shuffle] Consider separatin...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7129#issuecomment-119777478 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8733][MLLIB] ML RDD.unpersist calls sho...
Github user ilganeli closed the pull request at: https://github.com/apache/spark/pull/7160 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8733][MLLIB] ML RDD.unpersist calls sho...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7160#issuecomment-118211797 Will do once I have access to a PC. No trouble at all. Thank you, Ilya Ganelin -Original Message- From: jkbradley [notificati...@github.com<mailto:notificati...@github.com>] Sent: Thursday, July 02, 2015 06:58 PM Eastern Standard Time To: apache/spark Cc: Ganelin, Ilya Subject: Re: [spark] [SPARK-8733][MLLIB] ML RDD.unpersist calls should use blocking = false (#7160) OK, sorry for the trouble! Can you please close this PR? (We can't) â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/7160#issuecomment-118188231>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8733][MLLIB] ML RDD.unpersist calls sho...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7160#issuecomment-118187601 All - thanks for the clarification and weigh in. I don't have any pipelines set up that would exercise this with the level of rigor necessary. I think it's fine to close it for now. Thanks. Thank you, Ilya Ganelin -Original Message- From: jkbradley [notificati...@github.com<mailto:notificati...@github.com>] Sent: Thursday, July 02, 2015 06:09 PM Eastern Standard Time To: apache/spark Cc: Ganelin, Ilya Subject: Re: [spark] [SPARK-8733][MLLIB] ML RDD.unpersist calls should use blocking = false (#7160) Good points; I suppose unpersisting without blocking might cause other slowdowns which could cause other timeouts to occur. This does seem like the kind of change which might require running all of the MLlib tests in spark-perf on a cluster to test for speed changes. Perhaps a better solution in the meantime is simply to adjust timeout settings as needed. @mengxr<https://github.com/mengxr> I'm OK with closing the JIRA if it does not seem worth the trouble of testing. @ilganeli<https://github.com/ilganeli> I doubt we have the bandwidth to test right now, but do let us know if you'd like to keep this open & run tests. â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/7160#issuecomment-118181016>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8464][Core][Shuffle] Consider separatin...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7129#issuecomment-117793402 @JoshRosen Any chance I could get a second pair of eyes on this? I'm wary of changes to ExternalSorter introducing complicated merge conflicts. Would love your help, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8733][MLLIB] ML RDD.unpersist calls sho...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7160#issuecomment-117792531 @mengxr @jkbradley Initially reported this, perhaps he should weigh in here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3071] Increase default driver memory
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/7132#discussion_r33709898 --- Diff: network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java --- @@ -25,13 +32,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import io.netty.buffer.Unpooled; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; --- End diff -- Idea has its own ideas about how things should be ordered :-0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8733][MLLIB] ML RDD.unpersist calls sho...
GitHub user ilganeli opened a pull request: https://github.com/apache/spark/pull/7160 [SPARK-8733][MLLIB] ML RDD.unpersist calls should use blocking = false Updated all usages within the ML Lib module to use blocking = false. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilganeli/spark SPARK-8733 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7160.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7160 commit f31f0c120b7b2243a4fad6f1e612bba001228a8a Author: Ilya Ganelin Date: 2015-07-01T14:56:34Z Updated all usages of unpersist within ML Lib to use blocking=false --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3071] Increase default driver memory
GitHub user ilganeli opened a pull request: https://github.com/apache/spark/pull/7132 [SPARK-3071] Increase default driver memory I've updated default values in comments, documentation, and in the command line builder to be 1g based on comments in the JIRA. Please let me know if I've missed anything. Will the spark-shell use the value within the command line builder during instantiation? You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilganeli/spark SPARK-3071 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7132.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7132 commit 2698a3d72a87b7eaef59a64227b4a5bfbca5da10 Author: Ilya Ganelin Date: 2015-06-30T19:19:30Z Updated default value for driver memory --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8464][Core][Shuffle] Consider separatin...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/7129#issuecomment-117279578 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8464][Core][Shuffle][WIP] Consider sepa...
GitHub user ilganeli opened a pull request: https://github.com/apache/spark/pull/7129 [SPARK-8464][Core][Shuffle][WIP] Consider separating aggregator and non-aggregator paths in ExternalSorter I've started by separating ExternalAggregator into two classes, one which assumes an aggregator is defined and one which does not. There is a substantial amount of code overlap so the next step is to extract common code into a parent class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilganeli/spark SPARK-8464 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7129.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7129 commit d0024efb43501f2d0a02c74b7beb1dcc0970c834 Author: Ilya Ganelin Date: 2015-06-30T14:14:47Z [SPARK-8464] As a starting point, refactored ExternalSorter into two class instances, one which uses Aggregator and one which does not. Next step is to extract out common code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4666] Improve YarnAllocator's parsing o...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/3525#issuecomment-116662486 @srowen @JoshRosen I think this should be refactored to use the updates from #5574 but I don't think #5574 resolves this on its own because of the need to handle the min/max allocation - my 2c. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6686#issuecomment-114973836 @davies @rxin Are there any further suggestions? Would love to get this into 1.4.1, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/6686#discussion_r32958201 --- Diff: python/pyspark/sql/types.py --- @@ -56,6 +56,32 @@ def __eq__(self, other): def __ne__(self, other): return not self.__eq__(other) +def from_string(self, data_type): --- End diff -- Is it ok to leave it as is to be specific? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/6686#discussion_r32785670 --- Diff: python/pyspark/sql/types.py --- @@ -368,8 +367,43 @@ def __init__(self, fields): >>> struct1 == struct2 False """ -assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType" -self.fields = fields +if not fields: +self.fields = [] +else: +self.fields = fields +assert all(isinstance(f, StructField) for f in fields),\ +"fields should be a list of StructField" + +def add(self, name_or_struct_field, data_type=None, nullable=True, metadata=None): +""" +Construct a StructType by adding new elements to it to define the schema + +>>> struct1 = StructType().add("f1", StringType(), True).add("f2", StringType(), True, None) +>>> struct2 = StructType([StructField("f1", StringType(), True),\ + StructField("f2", StringType(), True, None)]) +>>> struct1 == struct2 +True +>>> struct1 = (StructType().add(StructField("f1", StringType(), True)) +... .add(StructField("f2", StringType(), True, None))) +>>> struct2 = StructType([StructField("f1", StringType(), True), +... StructField("f2", StringType(), True, None)]) +>>> struct1 == struct2 +True + +:param nameOrStructField: Either the name of the field or a StructField object +:param data_type: If present, the DataType of the StructField to create +:param nullable: Whether the field to add should be nullable (default True) +:param metadata: Any additional metadata (default None) +:return: a new updated StructType +""" +if isinstance(name_or_struct_field, StructField): +self.fields.append(name_or_struct_field) +return self +else: +if isinstance(name_or_struct_field, str) and data_type is None: +raise ValueError("Must specify DataType if passing name of struct_field to create.") +self.fields.append(StructField(name_or_struct_field, data_type, nullable, metadata)) --- End diff -- @davies Is there an equivalent of the DataTypeParser in Python? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6686#issuecomment-113062905 @davies Made the changes you suggested. Only thing I didn't know how to do is to intercept an exception in Python. Is there a cleaner way of doing it than I added? The way I'm doing it right now seems hacky but I didn't see a better example in the other cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/6686#discussion_r32656127 --- Diff: python/pyspark/sql/types.py --- @@ -368,8 +367,49 @@ def __init__(self, fields): >>> struct1 == struct2 False """ -assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType" -self.fields = fields +if not fields: +self.fields = [] +else: +self.fields = fields +assert all(isinstance(f, StructField) for f in fields),\ +"fields should be a list of StructField" + +def add(self, name_or_struct_field, data_type=NullType(), nullable=True, metadata=None): --- End diff -- Davies - totally agree. This was changed specifically to consolidate to a single method as suggested by Reynold. I initially had separate add methods - one which accepted a StructField and one which accepted the 4 parameters, the first two of which were defined. What would you suggest? My preference is to break this out into two methods for clarity and to avoid the problem you mention. Thank you, Ilya Ganelin -Original Message- From: Davies Liu [notificati...@github.com<mailto:notificati...@github.com>] Sent: Wednesday, June 17, 2015 01:18 PM Eastern Standard Time To: apache/spark Cc: Ganelin, Ilya Subject: Re: [spark] [SPARK-8056][SQL] Design an easier way to construct schema for both Scala and Python (#6686) In python/pyspark/sql/types.py<https://github.com/apache/spark/pull/6686#discussion_r32650869>: > @@ -368,8 +367,49 @@ def __init__(self, fields): > >>> struct1 == struct2 > False > """ > -assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType" > -self.fields = fields > +if not fields: > +self.fields = [] > +else: > +self.fields = fields > +assert all(isinstance(f, StructField) for f in fields),\ > +"fields should be a list of StructField" > + > +def add(self, name_or_struct_field, data_type=NullType(), nullable=True, metadata=None): What's the use cases that we should have StructType without specifying the dataType of each column? In createDataFrame, if a schema of StructType is provided, it will not try to infer the data types, so it does not work with StructType with NoneType in it. â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/6686/files#r32650869>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6686#issuecomment-112480475 @rxin Is this good to go? Looking forward to getting this merged! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6686#issuecomment-111956199 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6686#issuecomment-111793926 @rxin Removing the second add method doesn't quite work. The same method signature can't work while maintaining the same parameter order as StructField (which I don't want to change for the sake of consistency). It's possible to re-use the first parameter ```name```of the ```add``` method but that seems wrong. I don't think it's terrible to have two methods in this case that clearly distinguish the provided types. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/6686#discussion_r32376192 --- Diff: python/pyspark/sql/types.py --- @@ -367,9 +378,54 @@ def __init__(self, fields): >>> struct1 == struct2 False """ -assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType" +if not fields: +fields = [] +assert all(isinstance(f, StructField) for f in fields),\ +"fields should be a list of StructField" self.fields = fields +def add_field(self, data_type): +""" +Construct a StructType by adding new elements to it to define the schema +>>> struct1 = StructType().add_field(StructField("f1", StringType(), True))\ + .add_field(StructField("f2", StringType(), True, None)) +>>> struct2 = StructType([StructField("f1", StringType(), True),\ + StructField("f2", StringType(), True, None)]) +>>> struct1 == struct2 +True +>>> struct1 = StructType().add_field(StructField("f1", StringType(), True))\ + .add_field(StructField("f2", StringType(), True, None)) +>>> struct2 = StructType([StructField("f1", StringType(), True)]) +>>> struct1 == struct2 +False + +:param data_type: A StructField object to be added to the StructType +:return: a new updated StructType +""" +assert isinstance(data_type, StructField) +self.fields.append(data_type) +return self + +def add(self, name, data_type, nullable=True, metadata=None): --- End diff -- I wanted to support adding a StructField since the constructor accepts an array of StructFields. I thought it would be more intuitive for backwards compatibility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/6686#discussion_r32376189 --- Diff: python/pyspark/sql/types.py --- @@ -349,13 +349,24 @@ def fromJson(cls, json): json["metadata"]) +class Foo: --- End diff -- Stale --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6686#issuecomment-111737128 @rxin I've added python bindings, is there any place for python unit tests? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL] Design an easier way to cons...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6686#issuecomment-111584715 @rxin Can do - where should I add the python methods? I'm not too familiar with the Python code base. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7996] Deprecate the developer api Spark...
GitHub user ilganeli opened a pull request: https://github.com/apache/spark/pull/6731 [SPARK-7996] Deprecate the developer api SparkEnv.actorSystem Changed ```SparkEnv.actorSystem``` to be a function such that we can use the deprecated flag with it and added a deprecated message. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilganeli/spark SPARK-7996 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/6731.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6731 commit 9610b086fbb9d60de1cf08dbe1e7cb7c714d771b Author: Ilya Ganelin Date: 2015-06-09T22:29:18Z Converted actorSystem to function and added deprecated flag --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL][WIP] Design an easier way to...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6686#issuecomment-110466888 @rxin That does admittedly make more sense. At first glance it looked like the limitation was within the underlying Scala library which was outside our purview. I think the latest change should be all we need. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL][WIP] Design an easier way to...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6686#issuecomment-110439765 @rxin I reverted to using fromString because the ```DataTypeParser.parse``` does not support ```LongType```. This means it's not a strictly correct conversion and would actually invalidate a number of existing tests if we were to use this function instead. The ```parse()``` function I think has a different purpose. I restored its use within ```DataType.cast()``` but use ```fromString``` elsewhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL][WIP] Design an easier way to...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/6686#discussion_r32038192 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala --- @@ -822,7 +822,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @group expr_ops * @since 1.3.0 */ - def cast(to: String): Column = cast(DataTypeParser.parse(to)) --- End diff -- DataTypeParser.parse does not support LongTypes. I don't think their functionality quite overlap, I'd propose to leave cast as it is and to use fromString everywhere else. Does that work for you? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3533][Core] Add saveAsTextFileByKey() m...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/4895#issuecomment-109762391 Did not - I can port it over if you need it. Thanks ! Thank you, Ilya Ganelin -Original Message- From: brendancol [notificati...@github.com<mailto:notificati...@github.com>] Sent: Sunday, June 07, 2015 08:33 AM Eastern Standard Time To: apache/spark Cc: Ganelin, Ilya Subject: Re: [spark] [SPARK-3533][Core] Add saveAsTextFileByKey() method to RDDs (#4895) hey did this ever make it into Packages? Great idea and would love to use it. â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/4895#issuecomment-109747790>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL][WIP] Design an easier way to...
Github user ilganeli closed the pull request at: https://github.com/apache/spark/pull/6678 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL][WIP] Design an easier way to...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6678#issuecomment-109607026 Opened a new PR here for simplicity: https://github.com/apache/spark/pull/6686 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL][WIP] Design an easier way to...
GitHub user ilganeli opened a pull request: https://github.com/apache/spark/pull/6686 [SPARK-8056][SQL][WIP] Design an easier way to construct schema for both Scala and Python I've added functionality to create new StructType similar to how we add parameters to a new SparkContext. I've updated most of the StructType class to make it so the underlying data store is based off a map of fields instead of the array used to construct it. This both enables the above functionality and clarifies some semantics of what's going on internally in terms of how the data is used and also guards against an issue previously present where one could have duplicates in the schema that would cause problems. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilganeli/spark SPARK-8056B Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/6686.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6686 commit fdf7e9fe1d40789498dbec57edae61279d769325 Author: Ilya Ganelin Date: 2015-06-06T15:49:16Z [SPARK-8056] Created add methods to facilitate building new StructType objects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL][WIP] Design an easier way to...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6678#issuecomment-109514465 @rxin I'm not super gung-ho about having the map. I figured it would be cleaner than just expanding an ArrayBuffer. If you don't like this approach I can implement the alternative. Thank you, Ilya Ganelin -Original Message- From: Reynold Xin [notificati...@github.com<mailto:notificati...@github.com>] Sent: Friday, June 05, 2015 10:42 PM Eastern Standard Time To: apache/spark Cc: Ganelin, Ilya Subject: Re: [spark] [SPARK-8056][SQL][WIP] Design an easier way to construct schema for both Scala and Python (#6678) @ilganeli<https://github.com/ilganeli> can we simplify this and get rid of the map? Basically just add the "add" function. The map also increases serialization size (even though you could make it transient). â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/6678#issuecomment-109506678>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8056][SQL][WIP] Design an easier way to...
GitHub user ilganeli opened a pull request: https://github.com/apache/spark/pull/6678 [SPARK-8056][SQL][WIP] Design an easier way to construct schema for both Scala and Python I've added functionality to create new ```StructType``` similar to how we add parameters to a new ```SparkContext```. I've updated most of the ```StructType``` class to make it so the underlying data store is based off a map of fields instead of the array used to construct it. This both enables the above functionality and clarifies some semantics of what's going on internally in terms of how the data is used and also guards against an issue previously present where one could have duplicates in the schema that would cause problems. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilganeli/spark SPARK-8056 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/6678.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6678 commit 22fbb6d57b0f54a7d86ae8caae886439c7db5600 Author: Ilya Ganelin Date: 2015-06-05T23:06:02Z Comitting initial revision of StructType code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7672][CORE] Use int conversion in trans...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/6198#issuecomment-102522874 @andrewor14 @nishkamravi2 Good catch. The fix looks good. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r30068215 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -475,7 +475,148 @@ class DAGSchedulerSuite assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } + + test("Multiple consecutive stage failures should lead to stage being aborted.") { +// Create a new Listener to confirm that the listenerBus sees the JobEnd message +// when we abort the stage. This message will also be consumed by the EventLoggingListener +// so this will propagate up to the user. +var ended = false +var jobResult : JobResult = null +class EndListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +jobResult = jobEnd.jobResult +ended = true + } +} + +sc.listenerBus.addListener(new EndListener()) + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleId = shuffleDep.shuffleId +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) + +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1 + +for (x <- 1 to Stage.MAX_STAGE_FAILURES) { + // the 2nd ResultTask failed + complete(taskSets(1), Seq( +(Success, 42), +(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null))) + + scheduler.resubmitFailedStages() + if (x < Stage.MAX_STAGE_FAILURES) { +assert(scheduler.runningStages.nonEmpty) +assert(!ended) + } else { +// Stage has been aborted and removed from running stages +assertDataStructuresEmpty() +sc.listenerBus.waitUntilEmpty(1000) +assert(ended) +assert(jobResult.isInstanceOf[JobFailed]) + } +} + } + + + test("Multiple consecutive Fetch failures in a stage triggers an abort.") { +// Create a new Listener to confirm that the listenerBus sees the JobEnd message +// when we abort the stage. This message will also be consumed by the EventLoggingListener +// so this will propagate up to the user. +var ended = false +var jobResult : JobResult = null +class EndListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +jobResult = jobEnd.jobResult +ended = true + } +} + +sc.listenerBus.addListener(new EndListener()) + +val shuffleMapRdd = new MyRDD(sc, 8, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleId = shuffleDep.shuffleId +val reduceRdd = new MyRDD(sc, 8, List(shuffleDep)) +submit(reduceRdd, Array(0, 1, 2, 3, 4, 5, 6, 7)) + +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1 + +complete(taskSets(1), Seq( + (Success, 42), + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null), + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored1"), null), + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored2"), null), + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored3"), null), + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored4"), null), + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored5"), null), + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored6"), null))) + +scheduler.resubmitFailedStages() +assertDataStructuresEmpty() +sc.listenerBus.waitUntilEmpty(1000) +assert(ended) +assert(jobResult.isInstanceOf[JobFailed]) + } + + test("Multiple consecutive task failures (not FetchFailures) in a stage should not " + +"trigger an abort.") {
[GitHub] spark pull request: [SPARK-7392][Core] bugfix: Kryo buffer size ca...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5934#discussion_r29751570 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -51,9 +51,9 @@ class KryoSerializer(conf: SparkConf) private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") - if (bufferSizeKb >= 2048) { + if (bufferSizeKb >= 2048 * 1024) { throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " + - s"2048 mb, got: + $bufferSizeKb mb.") + s"2048 mb, got: + ${bufferSizeKb/1024} mb.") --- End diff -- ByteUnit.KiB.toMiB(bufferSizeKb) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7392][Core] bugfix: Kryo buffer size ca...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5934#discussion_r29751370 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -51,9 +51,9 @@ class KryoSerializer(conf: SparkConf) private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") - if (bufferSizeKb >= 2048) { + if (bufferSizeKb >= 2048 * 1024) { --- End diff -- You could use ByteUnit.MiB.toKiB(2) for clarity here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-98855534 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-98852897 All - I've updated the tests and code to (I believe) reflect the discussion here. Please let me know if I correctly understood the intent. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r29615914 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1085,6 +1085,10 @@ class DAGScheduler( if (disallowStageRetryForTest) { abortStage(failedStage, "Fetch failure will not retry stage due to testing config") +} else if (failedStage.failAndShouldAbort()) { --- End diff -- All - I realized that simply counting attemptIds will not be enough. There are two scenarios: 1) Concurrent failures of a FetchFailed task in a stage 2) Sequential failures of a stage due to a single task failing in sequence. If all we cared about was counting the number of distinct concurrent failures, keeping a Set would suffice. However, we can't use attemptId because it's reset between sequential stage executions e.g. between attempt 1 and attempt 2. Thus, I think the solution is to have a ```HashMap[StageFailureCount, StageAttemptIds] hashMap```. The logic for determining whether to abort is thus to have a) ```hashMap.size() > 4``` OR b) ```hashMap(i).size() > 4```. Does this seem reasonable? The above scenario came out when I was running my two tests (which simulate conditions (1) and (2). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r29489052 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -475,7 +475,52 @@ class DAGSchedulerSuite assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } + + test("Test taskAbort after multiple stage failures.") { --- End diff -- Doh - there are no stupid questions - only stupid people. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r29488736 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -475,7 +475,52 @@ class DAGSchedulerSuite assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } + + test("Test taskAbort after multiple stage failures.") { --- End diff -- Totally right - I meant to say for more than two tasks. Will the number of tasks be a function of the RDD dependencies? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r29488689 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1085,6 +1085,10 @@ class DAGScheduler( if (disallowStageRetryForTest) { abortStage(failedStage, "Fetch failure will not retry stage due to testing config") +} else if (failedStage.failAndShouldAbort()) { --- End diff -- Kay - are you proposing to maintain a Set of stage attempt IDs for failed stages instead of maintaing a count? That way we eliminate duplicate concurrent stage failures? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r29488653 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -475,7 +475,52 @@ class DAGSchedulerSuite assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } + + test("Test taskAbort after multiple stage failures.") { --- End diff -- Kay - could you please point me at an example of a stage with multiple tasks in one of the tests just to help provide a baseline? There's no examples of this within the DAGSchedulerSuite and I'm having a bit of trouble figuring out how to code this up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r29474028 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -475,7 +475,58 @@ class DAGSchedulerSuite assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } + + test("Test taskAbort after multiple stage failures.") { +// Create a new Listener to confirm that the listenerBus sees the JobEnd message +// when we abort the stage. This message will also be consumed by the EventLoggingListener +// so this will propagate up to the user. +var ended = false +var jobResult : JobResult = null +class EndListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +jobResult = jobEnd.jobResult +ended = true + } +} +sc.listenerBus.addListener(new EndListener()) + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleId = shuffleDep.shuffleId +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) +sparkListener.failedStages.clear() +scheduler.resubmitFailedStages() + +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1 + +// Create stage object to get maxStageFailures +val stage = new ResultStage(0, reduceRdd, 0, null, 0, new CallSite("blah","blah")) +for (x <- 1 to stage.maxStageFailures) { + // the 2nd ResultTask failed + complete(taskSets(1), Seq( +(Success, 42), +(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null))) + + scheduler.resubmitFailedStages() + if (x < stage.maxStageFailures) { +assert(scheduler.runningStages.nonEmpty) +assert(!ended) +assert(!jobResult.isInstanceOf[JobFailed]) --- End diff -- Imran - I wanted to specifically check that it's an instance of ```JobFailed``` since that's the ```EventType``` that's created in the ```DAGScheduler``` when a stage is aborted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on a diff in the pull request: https://github.com/apache/spark/pull/5636#discussion_r29473972 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -475,7 +475,58 @@ class DAGSchedulerSuite assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } + + test("Test taskAbort after multiple stage failures.") { +// Create a new Listener to confirm that the listenerBus sees the JobEnd message +// when we abort the stage. This message will also be consumed by the EventLoggingListener +// so this will propagate up to the user. +var ended = false +var jobResult : JobResult = null +class EndListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +jobResult = jobEnd.jobResult +ended = true + } +} +sc.listenerBus.addListener(new EndListener()) + +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) +val shuffleId = shuffleDep.shuffleId +val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) +submit(reduceRdd, Array(0, 1)) +sparkListener.failedStages.clear() +scheduler.resubmitFailedStages() --- End diff -- Correct on both counts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-97618227 Sigh. Streaming tests. Retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-97609602 Retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-97592794 Imran - I think a easy check for this is to use the Spark listener as I've added to the test. Because failJobAndIndependentStages posts to the listenerBus when a stage fails, we just need to confirm that the listenerBus sees this message since it's then propagated up to the user via the Log listeners. Please see updates to the TestSuite --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-97494755 Imran - I believe they would get the Exception since the call to abortStage throws a SparkException. I believe I can check for that to confirm the end to end behavior. Sent with Good (www.good.com) -Original Message- From: Imran Rashid [notificati...@github.com<mailto:notificati...@github.com>] Sent: Wednesday, April 29, 2015 02:35 AM Eastern Standard Time To: apache/spark Cc: Ganelin, Ilya Subject: Re: [spark] [SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException (#5636) Thanks for the update @ilganeli<https://github.com/ilganeli> ! my comments are mostly minor. The only thing which is bugging me is that the tests don't really show how the stage failure gets pushed up to the user code. Eg., do they get a SparkException with a good message -- or does the DAGScheduler end up in some weird state where it stops running any additional jobs? I think it should work, but the DAGScheduler code is hairy enough that I'd really prefer a test. But I can't come up with a good way to write a unit test (or test manually for that matter). Maybe something like this test in ShuffleSuite? https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/ShuffleSuite.scala#L264 The problem is you don't have a good way to delete the shuffle files between stage attempts ... but maybe we could swap-in a different diskBlockManager that always fails to find the files or something. I'll think about it a little more. â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/5636#issuecomment-97323086>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5931][CORE] Use consistent naming for t...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5236#issuecomment-97313146 Hah. Thanks. Sent with Good (www.good.com) -Original Message- From: Reynold Xin [notificati...@github.com<mailto:notificati...@github.com>] Sent: Wednesday, April 29, 2015 01:36 AM Eastern Standard Time To: apache/spark Cc: Ganelin, Ilya Subject: Re: [spark] [SPARK-5931][CORE] Use consistent naming for time properties (#5236) I'm late to the party, but just want to say -- this is super cool! â Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/5236#issuecomment-97312212>. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5932][CORE] Use consistent naming for s...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5574#issuecomment-97111691 @andrewor14 How does this look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-97111636 @squito I've added a unit test to confirm this code is working as expected (and fixed a bug that was there). Please let me know if this is what you had in mind or if this needs anything else. Cheers! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5945] Spark should not retry a stage in...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/5636#issuecomment-96897660 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6746B] Refactor large functions in DAGS...
Github user ilganeli closed the pull request at: https://github.com/apache/spark/pull/5396 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3533][Core] Add saveAsTextFileByKey() m...
Github user ilganeli commented on the pull request: https://github.com/apache/spark/pull/4895#issuecomment-96772751 Closing this PR, I'll move it to Packages at some point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3533][Core] Add saveAsTextFileByKey() m...
Github user ilganeli closed the pull request at: https://github.com/apache/spark/pull/4895 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org