[jira] [Updated] (SPARK-21513) SQL to_json should support all column types

2017-07-23 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson updated SPARK-21513:
---
Labels: Starter  (was: )

> SQL to_json should support all column types
> ---
>
> Key: SPARK-21513
> URL: https://issues.apache.org/jira/browse/SPARK-21513
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Aaron Davidson
>  Labels: Starter
>
> The built-in SQL UDF "to_json" currently supports serializing StructType 
> columns, as well as Arrays of StructType columns. If you attempt to use it on 
> a different type, for example a map, you get an error like this:
> {code}
> AnalysisException: cannot resolve 'structstojson(`tags`)' due to data type 
> mismatch: Input type map must be a struct or array of 
> structs.;;
> {code}
> This limitation seems arbitrary; if I were to go through the effort of 
> enclosing my map in a struct, it would be serializable. Same thing with any 
> other non-struct type.
> Therefore the desired improvement is to allow to_json to operate directly on 
> any column type. The associated code is 
> [here|https://github.com/apache/spark/blob/86174ea89b39a300caaba6baffac70f3dc702788/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L653].



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21513) SQL to_json should support all column types

2017-07-23 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson updated SPARK-21513:
---
Description: 
The built-in SQL UDF "to_json" currently supports serializing StructType 
columns, as well as Arrays of StructType columns. If you attempt to use it on a 
different type, for example a map, you get an error like this:

{code}
AnalysisException: cannot resolve 'structstojson(`tags`)' due to data type 
mismatch: Input type map must be a struct or array of structs.;;
{code}

This limitation seems arbitrary; if I were to go through the effort of 
enclosing my map in a struct, it would be serializable. Same thing with any 
other non-struct type.

Therefore the desired improvement is to allow to_json to operate directly on 
any column type. The associated code is 
[here|https://github.com/apache/spark/blob/86174ea89b39a300caaba6baffac70f3dc702788/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L653].

  was:
The built-in SQL UDF "to_json" currently supports serializing StructType 
columns, as well as Arrays of StructType columns. If you attempt to use it on a 
different type, for example a map, you get an error like this:

{code}
AnalysisException: cannot resolve 'structstojson(`tags`)' due to data type 
mismatch: Input type map must be a struct or array of structs.;;
{code}

This limitation seems arbitrary; if I were to go through the effort of 
enclosing my map in a struct, it would be serializable. Same thing with any 
other non-struct type.

Therefore the desired improvement is to allow to_json to operate directly on 
any column type. The associated code is 
[here](https://github.com/apache/spark/blob/86174ea89b39a300caaba6baffac70f3dc702788/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L653).


> SQL to_json should support all column types
> ---
>
> Key: SPARK-21513
> URL: https://issues.apache.org/jira/browse/SPARK-21513
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Aaron Davidson
>
> The built-in SQL UDF "to_json" currently supports serializing StructType 
> columns, as well as Arrays of StructType columns. If you attempt to use it on 
> a different type, for example a map, you get an error like this:
> {code}
> AnalysisException: cannot resolve 'structstojson(`tags`)' due to data type 
> mismatch: Input type map must be a struct or array of 
> structs.;;
> {code}
> This limitation seems arbitrary; if I were to go through the effort of 
> enclosing my map in a struct, it would be serializable. Same thing with any 
> other non-struct type.
> Therefore the desired improvement is to allow to_json to operate directly on 
> any column type. The associated code is 
> [here|https://github.com/apache/spark/blob/86174ea89b39a300caaba6baffac70f3dc702788/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L653].



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21513) SQL to_json should support all column types

2017-07-23 Thread Aaron Davidson (JIRA)
Aaron Davidson created SPARK-21513:
--

 Summary: SQL to_json should support all column types
 Key: SPARK-21513
 URL: https://issues.apache.org/jira/browse/SPARK-21513
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.0
Reporter: Aaron Davidson


The built-in SQL UDF "to_json" currently supports serializing StructType 
columns, as well as Arrays of StructType columns. If you attempt to use it on a 
different type, for example a map, you get an error like this:

{code}
AnalysisException: cannot resolve 'structstojson(`tags`)' due to data type 
mismatch: Input type map must be a struct or array of structs.;;
{code}

This limitation seems arbitrary; if I were to go through the effort of 
enclosing my map in a struct, it would be serializable. Same thing with any 
other non-struct type.

Therefore the desired improvement is to allow to_json to operate directly on 
any column type. The associated code is 
[here](https://github.com/apache/spark/blob/86174ea89b39a300caaba6baffac70f3dc702788/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L653).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18497) ForeachSink fails with "assertion failed: No plan for EventTimeWatermark"

2016-11-17 Thread Aaron Davidson (JIRA)
Aaron Davidson created SPARK-18497:
--

 Summary: ForeachSink fails with "assertion failed: No plan for 
EventTimeWatermark"
 Key: SPARK-18497
 URL: https://issues.apache.org/jira/browse/SPARK-18497
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Aaron Davidson


I have a pretty standard stream. I call ".writeStream.foreach(...).start()" and 
get

{code}
java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark 
timestamp#39688: timestamp, interval 1 days
at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:232)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:232)
at 
org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:107)
at 
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:232)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:54)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2751)
at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2290)
at 
org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:70)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:449)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:227)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:215)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sq

[jira] [Created] (SPARK-9397) DataFrame should provide an API to find source data files if applicable

2015-07-27 Thread Aaron Davidson (JIRA)
Aaron Davidson created SPARK-9397:
-

 Summary: DataFrame should provide an API to find source data files 
if applicable
 Key: SPARK-9397
 URL: https://issues.apache.org/jira/browse/SPARK-9397
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Reporter: Aaron Davidson
Assignee: Aaron Davidson


Certain applications would benefit from being able to inspect DataFrames that 
are straightforwardly produced by data sources that stem from files, and find 
out their source data. For example, one might want to display to a user the 
size of the data underlying a table, or to copy or mutate it.

Currently, there is not a good way to get this information in a public API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8644) SparkException thrown due to Executor exceptions should include caller site in stack trace

2015-07-16 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14630645#comment-14630645
 ] 

Aaron Davidson commented on SPARK-8644:
---

These are actually complementary. SPARK-8625 makes sure the Executor exception 
is properly preserved, but that exception may not include user code either (or 
may not help indicate which driver code caused the job). Now we will properly 
include all relevant stacks.

> SparkException thrown due to Executor exceptions should include caller site 
> in stack trace
> --
>
> Key: SPARK-8644
> URL: https://issues.apache.org/jira/browse/SPARK-8644
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.4.1
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
>
> Currently when a job fails due to executor (or other) issues, the exception 
> thrown by Spark has a stack trace which stops at the DAGScheduler EventLoop, 
> which makes it hard to trace back to the user code which submitted the job. 
> It should try to include the user submission stack trace.
> Example exception today:
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
> (TID 0, localhost): java.lang.RuntimeException: uh-oh!
>   at 
> org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34$$anonfun$apply$mcJ$sp$1.apply(DAGSchedulerSuite.scala:851)
>   at 
> org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34$$anonfun$apply$mcJ$sp$1.apply(DAGSchedulerSuite.scala:851)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1637)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>   at org.apache.spark.scheduler.Task.run(Task.scala:70)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1285)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1276)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1275)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1275)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:749)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:749)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:749)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1486)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> {code}
> Here is the part I want to include:
> {code}
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1095)
>   at 
> org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34.apply$mcJ$sp(DAGSchedulerSuite.scala:851)
>   at 
> org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34.apply(DAGSchedulerSuite.scala:851)
>   at 
> org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34.apply(DAGSchedulerSuite.scala:851)
>   at org.scalatest.Assertions$class.intercept(Assertions.scala:997)
>   at org.scalatest.FunSuite.intercept(FunSuite.scala:1555)
>   at 
> org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33.apply$mcV$sp(DAGSchedulerSuite.scala:850)
>   at 
> org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33.apply(DAGSchedulerSuite.scala:849)
>   at 
> org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33.apply(DAGSchedulerSuite.scala:849)
>   at 
> org.scalatest.Transforme

[jira] [Updated] (SPARK-8707) RDD#toDebugString fails if any cached RDD has invalid partitions

2015-06-29 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson updated SPARK-8707:
--
Description: 
Repro:

{code}
sc.textFile("/ThisFileDoesNotExist").cache()
sc.parallelize(0 until 100).toDebugString
{code}

Output:

{code}
java.io.IOException: Not a file: /ThisFileDoesNotExist
at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:215)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.storage.RDDInfo$.fromRdd(RDDInfo.scala:59)
at 
org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:1455)
at 
org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:1455)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.SparkContext.getRDDStorageInfo(SparkContext.scala:1455)
at org.apache.spark.rdd.RDD.debugSelf$1(RDD.scala:1573)
at org.apache.spark.rdd.RDD.firstDebugString$1(RDD.scala:1607)
at org.apache.spark.rdd.RDD.toDebugString(RDD.scala:1637
{code}

This is because toDebugString gets all the partitions from all RDDs, which 
fails (via SparkContext#getRDDStorageInfo). This pathway should definitely be 
resilient to other RDDs being invalid (and getRDDStorageInfo should probably 
also be).

  was:
Repro:

{code}
sc.parallelize(0 until 100).toDebugString
sc.textFile("/ThisFileDoesNotExist").cache()
sc.parallelize(0 until 100).toDebugString
{code}

Output:

{code}
java.io.IOException: Not a file: /ThisFileDoesNotExist
at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:215)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.storage.RDDInfo$.fromRdd(RDDInfo.scala:59)
at 
org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:1455)
at 
org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:1455)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.SparkContext.getRDDStorageInfo(SparkContext.scala:1455)
at org.apache.spark.rdd.RDD.debugSelf$1(RDD.scala:1573)
at org.apache.spark.rdd.RDD.firstDebugString$1(RDD.scala:1607)
at org.apache.spark.rdd.RDD.toDebugString(RDD.scala:1637
{code}

This is because toDebugString gets all the partitions from all RDDs, which 
fails (via SparkContext#getRDDStorageInfo). This pathway should definitely be 
resilient to other RDDs being invalid (and getRDDStorageInfo should probably 
also be).


> RDD#toDebugString fails if any cached RDD has invalid partitions

[jira] [Updated] (SPARK-8707) RDD#toDebugString fails if any cached RDD has invalid partitions

2015-06-29 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson updated SPARK-8707:
--
Summary: RDD#toDebugString fails if any cached RDD has invalid partitions  
(was: RDD#toDebugString fails if any cached RDD is invalid)

> RDD#toDebugString fails if any cached RDD has invalid partitions
> 
>
> Key: SPARK-8707
> URL: https://issues.apache.org/jira/browse/SPARK-8707
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.4.0, 1.4.1
>Reporter: Aaron Davidson
>  Labels: starter
>
> Repro:
> {code}
> sc.parallelize(0 until 100).toDebugString
> sc.textFile("/ThisFileDoesNotExist").cache()
> sc.parallelize(0 until 100).toDebugString
> {code}
> Output:
> {code}
> java.io.IOException: Not a file: /ThisFileDoesNotExist
>   at 
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:215)
>   at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>   at org.apache.spark.storage.RDDInfo$.fromRdd(RDDInfo.scala:59)
>   at 
> org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:1455)
>   at 
> org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:1455)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at 
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.spark.SparkContext.getRDDStorageInfo(SparkContext.scala:1455)
>   at org.apache.spark.rdd.RDD.debugSelf$1(RDD.scala:1573)
>   at org.apache.spark.rdd.RDD.firstDebugString$1(RDD.scala:1607)
>   at org.apache.spark.rdd.RDD.toDebugString(RDD.scala:1637
> {code}
> This is because toDebugString gets all the partitions from all RDDs, which 
> fails (via SparkContext#getRDDStorageInfo). This pathway should definitely be 
> resilient to other RDDs being invalid (and getRDDStorageInfo should probably 
> also be).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8707) RDD#toDebugString fails if any cached RDD is invalid

2015-06-29 Thread Aaron Davidson (JIRA)
Aaron Davidson created SPARK-8707:
-

 Summary: RDD#toDebugString fails if any cached RDD is invalid
 Key: SPARK-8707
 URL: https://issues.apache.org/jira/browse/SPARK-8707
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.4.0, 1.4.1
Reporter: Aaron Davidson


Repro:

{code}
sc.parallelize(0 until 100).toDebugString
sc.textFile("/ThisFileDoesNotExist").cache()
sc.parallelize(0 until 100).toDebugString
{code}

Output:

{code}
java.io.IOException: Not a file: /ThisFileDoesNotExist
at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:215)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.storage.RDDInfo$.fromRdd(RDDInfo.scala:59)
at 
org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:1455)
at 
org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:1455)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.SparkContext.getRDDStorageInfo(SparkContext.scala:1455)
at org.apache.spark.rdd.RDD.debugSelf$1(RDD.scala:1573)
at org.apache.spark.rdd.RDD.firstDebugString$1(RDD.scala:1607)
at org.apache.spark.rdd.RDD.toDebugString(RDD.scala:1637
{code}

This is because toDebugString gets all the partitions from all RDDs, which 
fails (via SparkContext#getRDDStorageInfo). This pathway should definitely be 
resilient to other RDDs being invalid (and getRDDStorageInfo should probably 
also be).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8644) SparkException thrown due to Executor exceptions should include caller site in stack trace

2015-06-25 Thread Aaron Davidson (JIRA)
Aaron Davidson created SPARK-8644:
-

 Summary: SparkException thrown due to Executor exceptions should 
include caller site in stack trace
 Key: SPARK-8644
 URL: https://issues.apache.org/jira/browse/SPARK-8644
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.4.1
Reporter: Aaron Davidson
Assignee: Aaron Davidson


Currently when a job fails due to executor (or other) issues, the exception 
thrown by Spark has a stack trace which stops at the DAGScheduler EventLoop, 
which makes it hard to trace back to the user code which submitted the job. It 
should try to include the user submission stack trace.

Example exception today:

{code}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 
0, localhost): java.lang.RuntimeException: uh-oh!
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34$$anonfun$apply$mcJ$sp$1.apply(DAGSchedulerSuite.scala:851)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34$$anonfun$apply$mcJ$sp$1.apply(DAGSchedulerSuite.scala:851)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1637)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1285)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1276)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1275)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1275)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:749)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:749)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:749)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1486)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
{code}

Here is the part I want to include:

{code}
at org.apache.spark.rdd.RDD.count(RDD.scala:1095)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34.apply$mcJ$sp(DAGSchedulerSuite.scala:851)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34.apply(DAGSchedulerSuite.scala:851)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34.apply(DAGSchedulerSuite.scala:851)
at org.scalatest.Assertions$class.intercept(Assertions.scala:997)
at org.scalatest.FunSuite.intercept(FunSuite.scala:1555)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33.apply$mcV$sp(DAGSchedulerSuite.scala:850)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33.apply(DAGSchedulerSuite.scala:849)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33.apply(DAGSchedulerSuite.scala:849)
at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42)
at 
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
at 
org.scalatest.FunSuiteLike

[jira] [Commented] (SPARK-7183) Memory leak in netty shuffle with spark standalone cluster

2015-05-13 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14543226#comment-14543226
 ] 

Aaron Davidson commented on SPARK-7183:
---

Sorry for delay, this should be fine to backport, it's a relatively 
straightforward fix.

> Memory leak in netty shuffle with spark standalone cluster
> --
>
> Key: SPARK-7183
> URL: https://issues.apache.org/jira/browse/SPARK-7183
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>Assignee: Liang-Chi Hsieh
>  Labels: memory-leak, netty, shuffle
> Fix For: 1.4.0
>
>
> There is slow leak in netty shuffle with spark cluster in 
> {{TransportRequestHandler.streamIds}}
> In spark cluster, there are some reusable netty connections between two block 
> managers to get/send blocks between worker/drivers. These connections are 
> handled by the {{org.apache.spark.network.server.TransportRequestHandler}} in 
> server side. This handler keep tracking all the streamids negotiate by RPC 
> when shuffle data need transform in these two block managers and the streamid 
> is keeping increasing, and never get a chance to be deleted exception this 
> connection is dropped (seems never happen in normal running).
> Here are some detail logs of this  {{TransportRequestHandler}} (Note: we add 
> a log a print the total size of {{TransportRequestHandler.streamIds}}, the 
> log is "Current set size is N of 
> org.apache.spark.network.server.TransportRequestHandler@ADDRESS", this set 
> size is keeping increasing in our test)
> {quote}
> 15/04/22 21:00:16 DEBUG TransportServer: Shuffle server started on port :46288
> 15/04/22 21:00:16 INFO NettyBlockTransferService: Server created on 46288
> 15/04/22 21:00:31 INFO TransportRequestHandler: Created 
> TransportRequestHandler 
> org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
> 15/04/22 21:00:32 TRACE MessageDecoder: Received message RpcRequest: 
> RpcRequest\{requestId=6655045571437304938, message=[B@59778678\}
> 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Received request: 
> OpenBlocks\{appId=app-20150422210016-, execId=, 
> blockIds=[broadcast_1_piece0]}
> 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Registered streamId 
> 1387459488000 with 1 buffers
> 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result 
> RpcResponse\{requestId=6655045571437304938, response=[B@d2840b\} to client 
> /10.111.7.150:33802
> 15/04/22 21:00:33 TRACE MessageDecoder: Received message ChunkFetchRequest: 
> ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488000, 
> chunkIndex=0}}
> 15/04/22 21:00:33 TRACE TransportRequestHandler: Received req from 
> /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488000, 
> chunkIndex=0\}
> 15/04/22 21:00:33 INFO TransportRequestHandler: Current set size is 1 of 
> org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
> 15/04/22 21:00:33 TRACE OneForOneStreamManager: Removing stream id 
> 1387459488000
> 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result 
> ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488000, 
> chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 
> lim=3839 cap=3839]}} to client /10.111.7.150:33802
> 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: 
> RpcRequest\{requestId=6660601528868866371, message=[B@42bed1b8\}
> 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Received request: 
> OpenBlocks\{appId=app-20150422210016-, execId=, 
> blockIds=[broadcast_3_piece0]}
> 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Registered streamId 
> 1387459488001 with 1 buffers
> 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result 
> RpcResponse\{requestId=6660601528868866371, response=[B@7fa3fb60\} to client 
> /10.111.7.150:33802
> 15/04/22 21:00:34 TRACE MessageDecoder: Received message ChunkFetchRequest: 
> ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488001, 
> chunkIndex=0}}
> 15/04/22 21:00:34 TRACE TransportRequestHandler: Received req from 
> /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488001, 
> chunkIndex=0\}
> 15/04/22 21:00:34 INFO TransportRequestHandler: Current set size is 2 of 
> org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
> 15/04/22 21:00:34 TRACE OneForOneStreamManager: Removing stream id 
> 1387459488001
> 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result 
> ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488001, 
> chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 
> lim=4277 cap=4277]}} to client /10.111.7.150:33802
> 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: 
> RpcRequest\{requestId=8

[jira] [Resolved] (SPARK-7183) Memory leak in netty shuffle with spark standalone cluster

2015-05-01 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-7183.
---
   Resolution: Fixed
Fix Version/s: 1.5.0

> Memory leak in netty shuffle with spark standalone cluster
> --
>
> Key: SPARK-7183
> URL: https://issues.apache.org/jira/browse/SPARK-7183
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: memory-leak, netty, shuffle
> Fix For: 1.5.0
>
>
> There is slow leak in netty shuffle with spark cluster in 
> {{TransportRequestHandler.streamIds}}
> In spark cluster, there are some reusable netty connections between two block 
> managers to get/send blocks between worker/drivers. These connections are 
> handled by the {{org.apache.spark.network.server.TransportRequestHandler}} in 
> server side. This handler keep tracking all the streamids negotiate by RPC 
> when shuffle data need transform in these two block managers and the streamid 
> is keeping increasing, and never get a chance to be deleted exception this 
> connection is dropped (seems never happen in normal running).
> Here are some detail logs of this  {{TransportRequestHandler}} (Note: we add 
> a log a print the total size of {{TransportRequestHandler.streamIds}}, the 
> log is "Current set size is N of 
> org.apache.spark.network.server.TransportRequestHandler@ADDRESS", this set 
> size is keeping increasing in our test)
> {quote}
> 15/04/22 21:00:16 DEBUG TransportServer: Shuffle server started on port :46288
> 15/04/22 21:00:16 INFO NettyBlockTransferService: Server created on 46288
> 15/04/22 21:00:31 INFO TransportRequestHandler: Created 
> TransportRequestHandler 
> org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
> 15/04/22 21:00:32 TRACE MessageDecoder: Received message RpcRequest: 
> RpcRequest\{requestId=6655045571437304938, message=[B@59778678\}
> 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Received request: 
> OpenBlocks\{appId=app-20150422210016-, execId=, 
> blockIds=[broadcast_1_piece0]}
> 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Registered streamId 
> 1387459488000 with 1 buffers
> 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result 
> RpcResponse\{requestId=6655045571437304938, response=[B@d2840b\} to client 
> /10.111.7.150:33802
> 15/04/22 21:00:33 TRACE MessageDecoder: Received message ChunkFetchRequest: 
> ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488000, 
> chunkIndex=0}}
> 15/04/22 21:00:33 TRACE TransportRequestHandler: Received req from 
> /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488000, 
> chunkIndex=0\}
> 15/04/22 21:00:33 INFO TransportRequestHandler: Current set size is 1 of 
> org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
> 15/04/22 21:00:33 TRACE OneForOneStreamManager: Removing stream id 
> 1387459488000
> 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result 
> ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488000, 
> chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 
> lim=3839 cap=3839]}} to client /10.111.7.150:33802
> 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: 
> RpcRequest\{requestId=6660601528868866371, message=[B@42bed1b8\}
> 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Received request: 
> OpenBlocks\{appId=app-20150422210016-, execId=, 
> blockIds=[broadcast_3_piece0]}
> 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Registered streamId 
> 1387459488001 with 1 buffers
> 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result 
> RpcResponse\{requestId=6660601528868866371, response=[B@7fa3fb60\} to client 
> /10.111.7.150:33802
> 15/04/22 21:00:34 TRACE MessageDecoder: Received message ChunkFetchRequest: 
> ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488001, 
> chunkIndex=0}}
> 15/04/22 21:00:34 TRACE TransportRequestHandler: Received req from 
> /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488001, 
> chunkIndex=0\}
> 15/04/22 21:00:34 INFO TransportRequestHandler: Current set size is 2 of 
> org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
> 15/04/22 21:00:34 TRACE OneForOneStreamManager: Removing stream id 
> 1387459488001
> 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result 
> ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488001, 
> chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 
> lim=4277 cap=4277]}} to client /10.111.7.150:33802
> 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: 
> RpcRequest\{requestId=8454597410163901330, message=[B@19c673d1\}
> 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Received request: 
> OpenBlocks\{appId=app-201

[jira] [Resolved] (SPARK-7003) Improve reliability of connection failure detection between Netty block transfer service endpoints

2015-04-20 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-7003.
---
   Resolution: Fixed
Fix Version/s: 1.4.0

> Improve reliability of connection failure detection between Netty block 
> transfer service endpoints
> --
>
> Key: SPARK-7003
> URL: https://issues.apache.org/jira/browse/SPARK-7003
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.3.1
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
> Fix For: 1.4.0
>
>
> Currently we rely on the assumption that an exception will be raised and the 
> channel closed if two endpoints cannot communicate over a Netty TCP channel. 
> However, this guarantee does not hold in all network environments, and 
> SPARK-6962 seems to point to a case where only the server side of the 
> connection detected a fault.
> We should improve robustness of fetch/rpc requests by having an explicit 
> timeout in the transport layer which closes the connection if there is a 
> period of inactivity while there are outstanding requests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6962) Netty BlockTransferService hangs in the middle of SQL query

2015-04-19 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14502270#comment-14502270
 ] 

Aaron Davidson commented on SPARK-6962:
---

I created SPARK-7003 to track a fix to the potential problem I noted, and a PR 
to follow: https://github.com/apache/spark/pull/5584

If it would be possible to pull in that patch, it either may fix the issue 
you're seeing (by performing retries in the event of network faults) or at 
least fail after a few minutes rather than hanging indefinitely -- either 
result would be interesting.

> Netty BlockTransferService hangs in the middle of SQL query
> ---
>
> Key: SPARK-6962
> URL: https://issues.apache.org/jira/browse/SPARK-6962
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.2.0, 1.2.1, 1.3.0
>Reporter: Jon Chase
> Attachments: jstacks.txt
>
>
> Spark SQL queries (though this seems to be a Spark Core issue - I'm just 
> using queries in the REPL to surface this, so I mention Spark SQL) hang 
> indefinitely under certain (not totally understood) circumstances.  
> This is resolved by setting spark.shuffle.blockTransferService=nio, which 
> seems to point to netty as the issue.  Netty was set as the default for the 
> block transport layer in 1.2.0, which is when this issue started.  Setting 
> the service to nio allows queries to complete normally.
> I do not see this problem when running queries over smaller (~20 5MB files) 
> datasets.  When I increase the scope to include more data (several hundred 
> ~5MB files), the queries will get through several steps but eventuall hang  
> indefinitely.
> Here's the email chain regarding this issue, including stack traces:
> http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/
> For context, here's the announcement regarding the block transfer service 
> change: 
> http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-7003) Improve reliability of connection failure detection between Netty block transfer service endpoints

2015-04-19 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson updated SPARK-7003:
--
Summary: Improve reliability of connection failure detection between Netty 
block transfer service endpoints  (was: Improve reliability of connection 
failure between Netty block transfer service endpoints)

> Improve reliability of connection failure detection between Netty block 
> transfer service endpoints
> --
>
> Key: SPARK-7003
> URL: https://issues.apache.org/jira/browse/SPARK-7003
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.3.1
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
>
> Currently we rely on the assumption that an exception will be raised and the 
> channel closed if two endpoints cannot communicate over a Netty TCP channel. 
> However, this guarantee does not hold in all network environments, and 
> SPARK-6962 seems to point to a case where only the server side of the 
> connection detected a fault.
> We should improve robustness of fetch/rpc requests by having an explicit 
> timeout in the transport layer which closes the connection if there is a 
> period of inactivity while there are outstanding requests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-7003) Improve reliability of connection failure between Netty block transfer service endpoints

2015-04-19 Thread Aaron Davidson (JIRA)
Aaron Davidson created SPARK-7003:
-

 Summary: Improve reliability of connection failure between Netty 
block transfer service endpoints
 Key: SPARK-7003
 URL: https://issues.apache.org/jira/browse/SPARK-7003
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.3.1
Reporter: Aaron Davidson
Assignee: Aaron Davidson


Currently we rely on the assumption that an exception will be raised and the 
channel closed if two endpoints cannot communicate over a Netty TCP channel. 
However, this guarantee does not hold in all network environments, and 
SPARK-6962 seems to point to a case where only the server side of the 
connection detected a fault.

We should improve robustness of fetch/rpc requests by having an explicit 
timeout in the transport layer which closes the connection if there is a period 
of inactivity while there are outstanding requests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6962) Netty BlockTransferService hangs in the middle of SQL query

2015-04-17 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501063#comment-14501063
 ] 

Aaron Davidson commented on SPARK-6962:
---

I have a hypothesis that the above is caused by assumptions we make about the 
eventuality/symmetry of socket timeouts that are not guaranteed in arbitrary 
network topologies. If this is the case, though, then I would also expect nio 
to have intermittent failures, though it could at least recover from them.

A potential fix would be a timer thread in 
[TransportResponseHandler|https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java]
 which would require that there is some message received every N seconds (the 
default network timeout in Spark is 120 seconds) as long as there is some 
outstanding request. This should be fairly robust due to our use of retries on 
IOExceptions in 
[RetryingBlockFetcher|https://github.com/apache/spark/blob/master/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java].

> Netty BlockTransferService hangs in the middle of SQL query
> ---
>
> Key: SPARK-6962
> URL: https://issues.apache.org/jira/browse/SPARK-6962
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.2.0, 1.2.1, 1.3.0
>Reporter: Jon Chase
> Attachments: jstacks.txt
>
>
> Spark SQL queries (though this seems to be a Spark Core issue - I'm just 
> using queries in the REPL to surface this, so I mention Spark SQL) hang 
> indefinitely under certain (not totally understood) circumstances.  
> This is resolved by setting spark.shuffle.blockTransferService=nio, which 
> seems to point to netty as the issue.  Netty was set as the default for the 
> block transport layer in 1.2.0, which is when this issue started.  Setting 
> the service to nio allows queries to complete normally.
> I do not see this problem when running queries over smaller (~20 5MB files) 
> datasets.  When I increase the scope to include more data (several hundred 
> ~5MB files), the queries will get through several steps but eventuall hang  
> indefinitely.
> Here's the email chain regarding this issue, including stack traces:
> http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/
> For context, here's the announcement regarding the block transfer service 
> change: 
> http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6962) Netty BlockTransferService hangs in the middle of SQL query

2015-04-17 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501054#comment-14501054
 ] 

Aaron Davidson commented on SPARK-6962:
---

Thanks for those log excerpts. It is likely significant that each IP appeared 
exactly once in a connection exception among the executors. Given this warning, 
but no corresponding error "Still have X requests outstanding when connection 
from 10.106.143.39 is closed", I also would be inclined to deduce that only the 
TransportServer-side of the socket is timing out, and that for some reason the 
connection exception is not reaching the client side of the socket (which would 
have caused the outstanding fetch requests to fail promptly).

If this situation could arise, then each client could be waiting indefinitely 
for some other server to respond, which it will not. Is your cluster in any 
sort of unusual network configuration?

Even so, this only could explain why the hang is indefinite, not why all 
communication is paused for 20 minutes leading up to it.

To further diagnose this, it would actually be very useful if you could turn on 
TRACE level debugging for org.apache.spark.storage.ShuffleBlockFetcherIterator 
and org.apache.spark.network (this should look like 
{{log4j.logger.org.apache.spark.network=TRACE}} in the log4j.properties).

> Netty BlockTransferService hangs in the middle of SQL query
> ---
>
> Key: SPARK-6962
> URL: https://issues.apache.org/jira/browse/SPARK-6962
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.2.0, 1.2.1, 1.3.0
>Reporter: Jon Chase
> Attachments: jstacks.txt
>
>
> Spark SQL queries (though this seems to be a Spark Core issue - I'm just 
> using queries in the REPL to surface this, so I mention Spark SQL) hang 
> indefinitely under certain (not totally understood) circumstances.  
> This is resolved by setting spark.shuffle.blockTransferService=nio, which 
> seems to point to netty as the issue.  Netty was set as the default for the 
> block transport layer in 1.2.0, which is when this issue started.  Setting 
> the service to nio allows queries to complete normally.
> I do not see this problem when running queries over smaller (~20 5MB files) 
> datasets.  When I increase the scope to include more data (several hundred 
> ~5MB files), the queries will get through several steps but eventuall hang  
> indefinitely.
> Here's the email chain regarding this issue, including stack traces:
> http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/
> For context, here's the announcement regarding the block transfer service 
> change: 
> http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6962) Netty BlockTransferService hangs in the middle of SQL query

2015-04-17 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14499308#comment-14499308
 ] 

Aaron Davidson commented on SPARK-6962:
---

Executor logs in particular. Are all remaining tasks hanging and on all 
different machines? Similar to what Patrick said, if there's an asymmetry on 
the machines it could suggest one has stopped responding and everyone is 
waiting on it. It's possible that only one Executor is behaving in an erratic 
way, though it's abnormal too that the connection didn't just timeout after 
some time and the task be retried.

> Netty BlockTransferService hangs in the middle of SQL query
> ---
>
> Key: SPARK-6962
> URL: https://issues.apache.org/jira/browse/SPARK-6962
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.2.0, 1.2.1, 1.3.0
>Reporter: Jon Chase
> Attachments: jstacks.txt
>
>
> Spark SQL queries (though this seems to be a Spark Core issue - I'm just 
> using queries in the REPL to surface this, so I mention Spark SQL) hang 
> indefinitely under certain (not totally understood) circumstances.  
> This is resolved by setting spark.shuffle.blockTransferService=nio, which 
> seems to point to netty as the issue.  Netty was set as the default for the 
> block transport layer in 1.2.0, which is when this issue started.  Setting 
> the service to nio allows queries to complete normally.
> I do not see this problem when running queries over smaller (~20 5MB files) 
> datasets.  When I increase the scope to include more data (several hundred 
> ~5MB files), the queries will get through several steps but eventuall hang  
> indefinitely.
> Here's the email chain regarding this issue, including stack traces:
> http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/
> For context, here's the announcement regarding the block transfer service 
> change: 
> http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6962) Netty BlockTransferService hangs in the middle of SQL query

2015-04-16 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14498570#comment-14498570
 ] 

Aaron Davidson commented on SPARK-6962:
---

Renamed the JIRA to avoid having everyone with any sort of hanging-related 
problem thinking this is the cause.

Thanks for the info, and this seems like it could be a very serious issue. 
Would you mind additionally attaching some sort of logs or at least scanning 
for errors through them? This doesn't look like a synchronization related 
issue, so I would expect some other exceptions to have caused the behavior.

> Netty BlockTransferService hangs in the middle of SQL query
> ---
>
> Key: SPARK-6962
> URL: https://issues.apache.org/jira/browse/SPARK-6962
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.2.0, 1.2.1, 1.3.0
>Reporter: Jon Chase
> Attachments: jstacks.txt
>
>
> Spark SQL queries (though this seems to be a Spark Core issue - I'm just 
> using queries in the REPL to surface this, so I mention Spark SQL) hang 
> indefinitely under certain (not totally understood) circumstances.  
> This is resolved by setting spark.shuffle.blockTransferService=nio, which 
> seems to point to netty as the issue.  Netty was set as the default for the 
> block transport layer in 1.2.0, which is when this issue started.  Setting 
> the service to nio allows queries to complete normally.
> I do not see this problem when running queries over smaller (~20 5MB files) 
> datasets.  When I increase the scope to include more data (several hundred 
> ~5MB files), the queries will get through several steps but eventuall hang  
> indefinitely.
> Here's the email chain regarding this issue, including stack traces:
> http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/
> For context, here's the announcement regarding the block transfer service 
> change: 
> http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6962) Netty BlockTransferService hangs in the middle of SQL query

2015-04-16 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson updated SPARK-6962:
--
Summary: Netty BlockTransferService hangs in the middle of SQL query  (was: 
Spark gets stuck on a step, hangs forever - jobs do not complete)

> Netty BlockTransferService hangs in the middle of SQL query
> ---
>
> Key: SPARK-6962
> URL: https://issues.apache.org/jira/browse/SPARK-6962
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.2.0, 1.2.1, 1.3.0
>Reporter: Jon Chase
> Attachments: jstacks.txt
>
>
> Spark SQL queries (though this seems to be a Spark Core issue - I'm just 
> using queries in the REPL to surface this, so I mention Spark SQL) hang 
> indefinitely under certain (not totally understood) circumstances.  
> This is resolved by setting spark.shuffle.blockTransferService=nio, which 
> seems to point to netty as the issue.  Netty was set as the default for the 
> block transport layer in 1.2.0, which is when this issue started.  Setting 
> the service to nio allows queries to complete normally.
> I do not see this problem when running queries over smaller (~20 5MB files) 
> datasets.  When I increase the scope to include more data (several hundred 
> ~5MB files), the queries will get through several steps but eventuall hang  
> indefinitely.
> Here's the email chain regarding this issue, including stack traces:
> http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/
> For context, here's the announcement regarding the block transfer service 
> change: 
> http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6729) DriverQuirks get can get OutOfBounds exception is some cases

2015-04-06 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson updated SPARK-6729:
--
Assignee: Volodymyr Lyubinets

> DriverQuirks get can get OutOfBounds exception is some cases
> 
>
> Key: SPARK-6729
> URL: https://issues.apache.org/jira/browse/SPARK-6729
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Volodymyr Lyubinets
>Assignee: Volodymyr Lyubinets
>Priority: Minor
> Fix For: 1.4.0
>
>
> The function uses .substring(0, X), which will trigger OutOfBoundsException 
> if string length is less than X. A better way to do this is to use 
> startsWith, which won't error out in this case. I'll propose a patch shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-6729) DriverQuirks get can get OutOfBounds exception is some cases

2015-04-06 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-6729.
---
   Resolution: Fixed
Fix Version/s: 1.4.0

> DriverQuirks get can get OutOfBounds exception is some cases
> 
>
> Key: SPARK-6729
> URL: https://issues.apache.org/jira/browse/SPARK-6729
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Volodymyr Lyubinets
>Assignee: Volodymyr Lyubinets
>Priority: Minor
> Fix For: 1.4.0
>
>
> The function uses .substring(0, X), which will trigger OutOfBoundsException 
> if string length is less than X. A better way to do this is to use 
> startsWith, which won't error out in this case. I'll propose a patch shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6646) Spark 2.0: Rearchitecting Spark for Mobile Platforms

2015-04-01 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14390205#comment-14390205
 ] 

Aaron Davidson commented on SPARK-6646:
---

Please help, I tried putting spark on iphone but it ignited and now no phone.

> Spark 2.0: Rearchitecting Spark for Mobile Platforms
> 
>
> Key: SPARK-6646
> URL: https://issues.apache.org/jira/browse/SPARK-6646
> Project: Spark
>  Issue Type: Improvement
>  Components: Project Infra
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Blocker
> Attachments: Spark on Mobile - Design Doc - v1.pdf
>
>
> Mobile computing is quickly rising to dominance, and by the end of 2017, it 
> is estimated that 90% of CPU cycles will be devoted to mobile hardware. 
> Spark’s project goal can be accomplished only when Spark runs efficiently for 
> the growing population of mobile users.
> Designed and optimized for modern data centers and Big Data applications, 
> Spark is unfortunately not a good fit for mobile computing today. In the past 
> few months, we have been prototyping the feasibility of a mobile-first Spark 
> architecture, and today we would like to share with you our findings. This 
> ticket outlines the technical design of Spark’s mobile support, and shares 
> results from several early prototypes.
> Mobile friendly version of the design doc: 
> https://databricks.com/blog/2015/04/01/spark-2-rearchitecting-spark-for-mobile.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6373) Add SSL/TLS for the Netty based BlockTransferService

2015-03-24 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378084#comment-14378084
 ] 

Aaron Davidson commented on SPARK-6373:
---

>From my brief look at your work, it seemed pretty close to what I was 
>mentioning, and overall seemed to be in a very workable state (with tests!). I 
>think it would be great if you could ready the patch for review, and hopefully 
>it won't be too much more work from your end.

(Largely unrelated note, but on your patch, an auto-formatter seems to have run 
which increased the size of the diff; additionally, the linter seems to have 
been configured with an 80-char limit rather than Spark's 100-char.)

> Add SSL/TLS for the Netty based BlockTransferService 
> -
>
> Key: SPARK-6373
> URL: https://issues.apache.org/jira/browse/SPARK-6373
> Project: Spark
>  Issue Type: New Feature
>  Components: Block Manager, Shuffle
>Affects Versions: 1.2.1
>Reporter: Jeffrey Turpin
>
> Add the ability to allow for secure communications (SSL/TLS) for the Netty 
> based BlockTransferService and the ExternalShuffleClient. This ticket will 
> hopefully start the conversation around potential designs... Below is a 
> reference to a WIP prototype which implements this functionality 
> (prototype)... I have attempted to disrupt as little code as possible and 
> tried to follow the current code structure (for the most part) in the areas I 
> modified. I also studied how Hadoop achieves encrypted shuffle 
> (http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html)
> https://github.com/turp1twin/spark/commit/024b559f27945eb63068d1badf7f82e4e7c3621c



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6229) Support SASL encryption in network/common module

2015-03-22 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14375197#comment-14375197
 ] 

Aaron Davidson commented on SPARK-6229:
---

Two points:

1. network/common as written is intended to be independent of Spark; it's just 
a small, low-level transport layer which was intended to be easy to make stable 
and fast, while exposing a much narrower API than Netty. In this case, it *is* 
useful to have separation of concerns between the low-level transport and the 
application-level concerns like encryption or authentication, which depend at 
least partially on Spark's semantics (i.e., SecurityManager).

2. Even if this were not the case, it is useful to expose minimal APIs between 
components, for the sake of unit testing and clearly demarcating different 
components. For this alone I would favor having an EncryptionHandler of some 
sort rather than an "if (sasl) { pipeline.addLast(new SaslHandler()) }". 
Someone reading the code has to only understand what's needed by the high-level 
"encryption" API rather than needing to know what SASL is or even having the 
option of reaching into the SASL internals. The separation is clearer.

These are why I would prefer to have a layer of indirection (a factory, some 
might say) between SASL or SSL and the TransportContext itself. I want to keep 
the TransportContext simple and easy to verify, without embedded notions of 
either SASL or SSL.

> Support SASL encryption in network/common module
> 
>
> Key: SPARK-6229
> URL: https://issues.apache.org/jira/browse/SPARK-6229
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Marcelo Vanzin
>
> After SASL support has been added to network/common, supporting encryption 
> should be rather simple. Encryption is supported for DIGEST-MD5 and GSSAPI. 
> Since the latter requires a valid kerberos login to work (and so doesn't 
> really work with executors), encryption would require the use of DIGEST-MD5.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6373) Add SSL/TLS for the Netty based BlockTransferService

2015-03-22 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14375186#comment-14375186
 ] 

Aaron Davidson commented on SPARK-6373:
---

Thanks for taking a look at this. The API and modifications needed for this are 
indeed similar to SPARK-6229, and so I'd like ideally to find a common way to 
support both of them. My high level proposal on that thread was to add an 
"EncryptionHandler" interface which is implemented by both SASL and SSL, which 
returns a Netty Handler which will be attached to an appropriate point in the 
pipeline. Additionally, the blocking part of the setup can be implemented using 
the TransportClientBootstrap mechanism, where either SSL or SASL simply adds a 
new bootstrap which waits for the Handler to enter the final state of the 
initialization.

We could additionally always add the ChunkedWriteHandler, I suspect the 
overhead is pretty minimal.

So from your perspective, would such an API be tenable? Looking at your 
implementation, it seems like we could avoid the different pipelines for Client 
vs Server if we make the above changes, and would take the SSLFactory via the 
TransportContext constructor, wrapped inside an EncryptionHandler. Otherwise, 
your implementation would remain the same, though.

> Add SSL/TLS for the Netty based BlockTransferService 
> -
>
> Key: SPARK-6373
> URL: https://issues.apache.org/jira/browse/SPARK-6373
> Project: Spark
>  Issue Type: New Feature
>  Components: Block Manager, Shuffle
>Affects Versions: 1.2.1
>Reporter: Jeffrey Turpin
>
> Add the ability to allow for secure communications (SSL/TLS) for the Netty 
> based BlockTransferService and the ExternalShuffleClient. This ticket will 
> hopefully start the conversation around potential designs... Below is a 
> reference to a WIP prototype which implements this functionality 
> (prototype)... I have attempted to disrupt as little code as possible and 
> tried to follow the current code structure (for the most part) in the areas I 
> modified. I also studied how Hadoop achieves encrypted shuffle 
> (http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html)
> https://github.com/turp1twin/spark/commit/024b559f27945eb63068d1badf7f82e4e7c3621c



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-6122) Upgrade Tachyon dependency to 0.6.0

2015-03-22 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-6122.
---
Resolution: Fixed

> Upgrade Tachyon dependency to 0.6.0
> ---
>
> Key: SPARK-6122
> URL: https://issues.apache.org/jira/browse/SPARK-6122
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.2.1
>Reporter: Haoyuan Li
>Assignee: Calvin Jia
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-4012) Uncaught OOM in ContextCleaner

2015-03-18 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-4012.
---
   Resolution: Fixed
Fix Version/s: 1.4.0

> Uncaught OOM in ContextCleaner
> --
>
> Key: SPARK-4012
> URL: https://issues.apache.org/jira/browse/SPARK-4012
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Nan Zhu
>Assignee: Nan Zhu
> Fix For: 1.4.0
>
>
> When running an "might-be-memory-intensive"  application locally, I received 
> the following exception
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "Spark Context Cleaner"
> Java HotSpot(TM) 64-Bit Server VM warning: Exception 
> java.lang.OutOfMemoryError occurred dispatching signal SIGINT to handler- the 
> VM may need to be forcibly terminated
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "Driver Heartbeater"
> Java HotSpot(TM) 64-Bit Server VM warning: Exception 
> java.lang.OutOfMemoryError occurred dispatching signal SIGINT to handler- 
> the VM may need to be forcibly terminated
> Java HotSpot(TM) 64-Bit Server VM warning: Exception 
> java.lang.OutOfMemoryError occurred dispatching signal SIGINT to handler- the 
> VM may need to be forcibly terminated
> Java HotSpot(TM) 64-Bit Server VM warning: Exception 
> java.lang.OutOfMemoryError occurred dispatching signal SIGINT to handler- the 
> VM may need to be forcibly terminated
> Java HotSpot(TM) 64-Bit Server VM warning: Exception 
> java.lang.OutOfMemoryError occurred dispatching signal SIGINT to handler- the 
> VM may need to be forcibly terminated
> Java HotSpot(TM) 64-Bit Server VM warning: Exception 
> java.lang.OutOfMemoryError occurred dispatching signal SIGINT to handler- the 
> VM may need to be forcibly terminated
> Java HotSpot(TM) 64-Bit Server VM warning: Exception 
> java.lang.OutOfMemoryError occurred dispatching signal SIGINT to handler- the 
> VM may need to be forcibly terminated
> I looked at the code, we might want to call Utils.tryOrExit instead of 
> Utils.logUncaughtExceptions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6330) newParquetRelation gets incorrect FileSystem

2015-03-16 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson updated SPARK-6330:
--
Fix Version/s: 1.4.0

> newParquetRelation gets incorrect FileSystem
> 
>
> Key: SPARK-6330
> URL: https://issues.apache.org/jira/browse/SPARK-6330
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.3.0
>Reporter: Volodymyr Lyubinets
>Assignee: Volodymyr Lyubinets
> Fix For: 1.4.0, 1.3.1
>
>
> Here's a snippet from newParquet.scala:
> def refresh(): Unit = {
>   val fs = FileSystem.get(sparkContext.hadoopConfiguration)
>   // Support either reading a collection of raw Parquet part-files, or a 
> collection of folders
>   // containing Parquet files (e.g. partitioned Parquet table).
>   val baseStatuses = paths.distinct.map { p =>
> val qualified = fs.makeQualified(new Path(p))
> if (!fs.exists(qualified) && maybeSchema.isDefined) {
>   fs.mkdirs(qualified)
>   prepareMetadata(qualified, maybeSchema.get, 
> sparkContext.hadoopConfiguration)
> }
> fs.getFileStatus(qualified)
>   }.toArray
> If we are running this locally and path points to S3, fs would be incorrect. 
> A fix is to construct fs for each file separately.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-6330) newParquetRelation gets incorrect FileSystem

2015-03-16 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-6330.
---
   Resolution: Fixed
Fix Version/s: 1.3.1
 Assignee: Volodymyr Lyubinets

> newParquetRelation gets incorrect FileSystem
> 
>
> Key: SPARK-6330
> URL: https://issues.apache.org/jira/browse/SPARK-6330
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.3.0
>Reporter: Volodymyr Lyubinets
>Assignee: Volodymyr Lyubinets
> Fix For: 1.3.1
>
>
> Here's a snippet from newParquet.scala:
> def refresh(): Unit = {
>   val fs = FileSystem.get(sparkContext.hadoopConfiguration)
>   // Support either reading a collection of raw Parquet part-files, or a 
> collection of folders
>   // containing Parquet files (e.g. partitioned Parquet table).
>   val baseStatuses = paths.distinct.map { p =>
> val qualified = fs.makeQualified(new Path(p))
> if (!fs.exists(qualified) && maybeSchema.isDefined) {
>   fs.mkdirs(qualified)
>   prepareMetadata(qualified, maybeSchema.get, 
> sparkContext.hadoopConfiguration)
> }
> fs.getFileStatus(qualified)
>   }.toArray
> If we are running this locally and path points to S3, fs would be incorrect. 
> A fix is to construct fs for each file separately.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-6351) ParquetRelation2 does not support paths for different file systems

2015-03-16 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-6351.
---
   Resolution: Duplicate
Fix Version/s: 1.3.1
 Assignee: Volodymyr Lyubinets

> ParquetRelation2 does not support paths for different file systems
> --
>
> Key: SPARK-6351
> URL: https://issues.apache.org/jira/browse/SPARK-6351
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.3.0
>Reporter: Pei-Lun Lee
>Assignee: Volodymyr Lyubinets
>Priority: Critical
>  Labels: backport-needed
> Fix For: 1.3.1
>
>
> With new parquet API, if multiple parquet paths with different file systems 
> (e.g. s3n and hdfs) are given, the error "IllegalArgumentException: Wrong FS" 
> will be raised. Due to the FileSystem is created from hadoop configuration, 
> which only allow one default file system. This behavior was supposed to work 
> in old API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6229) Support encryption in network/common module

2015-03-13 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14360808#comment-14360808
 ] 

Aaron Davidson commented on SPARK-6229:
---

The reason we did not originally put the SASL stuff into the middle of the 
TransportContext was to keep the latter simple, allowing SASL to effectively be 
a plugin on top. This is pretty critical to the design of not pushing 
application-level concerns into the transport layer whenever possible.

That said, clearly encryption cannot work without another layer in the 
pipeline. My inclination would be to add a pluggable encryption handler as an 
argument to TransportContext. The interface could be something simple like:

{code}
trait EncryptionHandler {
  def createChannelHandlers(): List[ChannelHandler]
}
{code}

which are then attached appropriately to the pipeline on channel initialization.

In particular, in my imagination, a user would either want to use SSL 
encryption or SASL-based encryption, but not both, so making a single, 
optional, EncryptionHandler is sufficient. Second, a user may want to use SSL 
and SASL authentication together (maybe not, but I could imagine SSL only being 
used for encryption over the wire and SASL being used for more fine-grained 
access control), so ideally this could keep the SASL bootstrap, but the SASL 
encryption mechanism has a handle to the SASL RPC Handler and waits for it to 
say OK before beginning encryption.

If, however, there is no good use-case for SSL + SASL auth, then you could 
still do the refactoring you had in mind of putting the SASL authentication as 
part of the handler instead of a bootstrap.

> Support encryption in network/common module
> ---
>
> Key: SPARK-6229
> URL: https://issues.apache.org/jira/browse/SPARK-6229
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Marcelo Vanzin
>
> After SASL support has been added to network/common, supporting encryption 
> should be rather simple. Encryption is supported for DIGEST-MD5 and GSSAPI. 
> Since the latter requires a valid kerberos login to work (and so doesn't 
> really work with executors), encryption would require the use of DIGEST-MD5.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2545) Add a diagnosis mode for closures to figure out what they're bringing in

2015-03-02 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14344502#comment-14344502
 ] 

Aaron Davidson commented on SPARK-2545:
---

Probably especially useful there, yes.

> Add a diagnosis mode for closures to figure out what they're bringing in
> 
>
> Key: SPARK-2545
> URL: https://issues.apache.org/jira/browse/SPARK-2545
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Aaron Davidson
>
> Today, it's pretty hard to figure out why your closure is bigger than 
> expected, because it's not obvious what objects are being included or who is 
> including them. We should have some sort of diagnosis available to users with 
> very large closures that displays the contents of the closure.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-28 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14341970#comment-14341970
 ] 

Aaron Davidson commented on SPARK-6056:
---

It's possible that it's actually the shuffle-read that's actually doing 
memory-mapping -- please try setting spark.storage.memoryMapThreshold to around 
1073741824 (1 GB) to disable this form of memory mapping for the test.

> Unlimit offHeap memory use cause RM killing the container
> -
>
> Key: SPARK-6056
> URL: https://issues.apache.org/jira/browse/SPARK-6056
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.1
>Reporter: SaintBacchus
>
> No matter set the `preferDirectBufs` or limit the number of thread or not 
> ,spark can not limit the use of offheap memory.
> At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
> Netty had allocated a offheap memory buffer with the same size in heap.
> So how many buffer you want to transfor, the same size offheap memory will be 
> allocated.
> But once the allocated memory size reach the capacity of the overhead momery 
> set in yarn, this executor will be killed.
> I wrote a simple code to test it:
> {code:title=test.scala|borderStyle=solid}
> import org.apache.spark.storage._
> import org.apache.spark._
> val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=>new 
> Array[Byte](10*1024*1024)).persist
> bufferRdd.count
> val part =  bufferRdd.partitions(0)
> val sparkEnv = SparkEnv.get
> val blockMgr = sparkEnv.blockManager
> def test = {
> val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
> val resultIt = 
> blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
> val len = resultIt.map(_.length).sum
> println(s"[${Thread.currentThread.getId}] get block length = $len")
> }
> def test_driver(count:Int, parallel:Int)(f: => Unit) = {
> val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
> val taskSupport  = new 
> scala.collection.parallel.ForkJoinTaskSupport(tpool)
> val parseq = (1 to count).par
> parseq.tasksupport = taskSupport
> parseq.foreach(x=>f)
> tpool.shutdown
> tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
> }
> {code}
> progress:
> 1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
> 2. :load test.scala in spark-shell
> 3. use such comman to catch executor on slave node
> {code}
> pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print $1}');top -b -p 
> $pid|grep $pid
> {code}
> 4. test_driver(20,100)(test) in spark-shell
> 5. watch the output of the command on slave node
> If use multi-thread to get len, the physical memery will soon   exceed the 
> limit set by spark.yarn.executor.memoryOverhead



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3889) JVM dies with SIGBUS, resulting in ConnectionManager failed ACK

2015-02-28 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14341962#comment-14341962
 ] 

Aaron Davidson commented on SPARK-3889:
---

This may be a new issue. I would open a new ticket, especially because the 
"ConnectionManager failed ACK" thing shouldn't be happening in 1.2.1; there 
should be different symptoms and perhaps a different cause as well.

A last ditch thing to try, by the way, is to up 
spark.storage.memoryMapThreshold to a very large number (e.g., 1 GB in bytes) 
and see if it still occurs -- if so, then please report more details about your 
workload and any other possible symptoms you see.

> JVM dies with SIGBUS, resulting in ConnectionManager failed ACK
> ---
>
> Key: SPARK-3889
> URL: https://issues.apache.org/jira/browse/SPARK-3889
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
>Priority: Critical
> Fix For: 1.2.0
>
>
> Here's the first part of the core dump, possibly caused by a job which 
> shuffles a lot of very small partitions.
> {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGBUS (0x7) at pc=0x7fa5885fcdb0, pid=488, tid=140343502632704
> #
> # JRE version: 7.0_25-b30
> # Java VM: OpenJDK 64-Bit Server VM (23.7-b01 mixed mode linux-amd64 
> compressed oops)
> # Problematic frame:
> # v  ~StubRoutines::jbyte_disjoint_arraycopy
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # If you would like to submit a bug report, please include
> # instructions on how to reproduce the bug and visit:
> #   https://bugs.launchpad.net/ubuntu/+source/openjdk-7/
> #
> ---  T H R E A D  ---
> Current thread (0x7fa4b0631000):  JavaThread "Executor task launch 
> worker-170" daemon [_thread_in_Java, id=6783, 
> stack(0x7fa4448ef000,0x7fa4449f)]
> siginfo:si_signo=SIGBUS: si_errno=0, si_code=2 (BUS_ADRERR), 
> si_addr=0x7fa428f79000
> {code}
> Here is the only useful content I can find related to JVM and SIGBUS from 
> Google: https://bugzilla.redhat.com/show_bug.cgi?format=multiple&id=976664
> It appears it may be related to disposing byte buffers, which we do in the 
> ConnectionManager -- we mmap shuffle files via ManagedBuffer and dispose of 
> them in BufferMessage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-3889) JVM dies with SIGBUS, resulting in ConnectionManager failed ACK

2015-02-28 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson updated SPARK-3889:
--
Comment: was deleted

(was: The only place we memory map in 1.1 is this method: 
https://github.com/apache/spark/blob/branch-1.1/core/src/main/scala/org/apache/spark/storage/DiskStore.scala#L106

This threshold is configurable with "spark.storage.memoryMapThreshold" -- we 
upped the default from 2 KB to 2 MB in 1.2, which you could try here as well.)

> JVM dies with SIGBUS, resulting in ConnectionManager failed ACK
> ---
>
> Key: SPARK-3889
> URL: https://issues.apache.org/jira/browse/SPARK-3889
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
>Priority: Critical
> Fix For: 1.2.0
>
>
> Here's the first part of the core dump, possibly caused by a job which 
> shuffles a lot of very small partitions.
> {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGBUS (0x7) at pc=0x7fa5885fcdb0, pid=488, tid=140343502632704
> #
> # JRE version: 7.0_25-b30
> # Java VM: OpenJDK 64-Bit Server VM (23.7-b01 mixed mode linux-amd64 
> compressed oops)
> # Problematic frame:
> # v  ~StubRoutines::jbyte_disjoint_arraycopy
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # If you would like to submit a bug report, please include
> # instructions on how to reproduce the bug and visit:
> #   https://bugs.launchpad.net/ubuntu/+source/openjdk-7/
> #
> ---  T H R E A D  ---
> Current thread (0x7fa4b0631000):  JavaThread "Executor task launch 
> worker-170" daemon [_thread_in_Java, id=6783, 
> stack(0x7fa4448ef000,0x7fa4449f)]
> siginfo:si_signo=SIGBUS: si_errno=0, si_code=2 (BUS_ADRERR), 
> si_addr=0x7fa428f79000
> {code}
> Here is the only useful content I can find related to JVM and SIGBUS from 
> Google: https://bugzilla.redhat.com/show_bug.cgi?format=multiple&id=976664
> It appears it may be related to disposing byte buffers, which we do in the 
> ConnectionManager -- we mmap shuffle files via ManagedBuffer and dispose of 
> them in BufferMessage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-27 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14341344#comment-14341344
 ] 

Aaron Davidson commented on SPARK-6056:
---

This should already be the case:
https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java#L126
(allowDirectBufs is set to false based on preferDirectBufs 
[here|https://github.com/apache/spark/blob/2b9b72682e587909a84d3ace214c22cec830eeaf/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java#L105])

> Unlimit offHeap memory use cause RM killing the container
> -
>
> Key: SPARK-6056
> URL: https://issues.apache.org/jira/browse/SPARK-6056
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.1
>Reporter: SaintBacchus
>
> No matter set the `preferDirectBufs` or limit the number of thread or not 
> ,spark can not limit the use of offheap memory.
> At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
> Netty had allocated a offheap memory buffer with the same size in heap.
> So how many buffer you want to transfor, the same size offheap memory will be 
> allocated.
> But once the allocated memory size reach the capacity of the overhead momery 
> set in yarn, this executor will be killed.
> I wrote a simple code to test it:
> ```scala
> val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=>new 
> Array[Byte](10*1024*1024)).persist
> bufferRdd.count
> val part =  bufferRdd.partitions(0)
> val sparkEnv = SparkEnv.get
> val blockMgr = sparkEnv.blockManager
> val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
> val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
> val len = resultIt.map(_.length).sum
> ```
> If use multi-thread to get len, the physical memery will soon   exceed the 
> limit set by spark.yarn.executor.memoryOverhead



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-27 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14340858#comment-14340858
 ] 

Aaron Davidson commented on SPARK-6056:
---

Actually, that line just calls newDirectBuf, but the implementation of 
newDirectBuf will itself only actually create a new direct byte buffer if it's 
"cheap" (see 
[here|https://github.com/netty/netty/blob/netty-4.0.23.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java#L397])
 -- i.e., if the current pool allows direct bufs or if there's a threadlocal 
one available already.

In our case, when preferDirectBufs is set to false, we set 
DEFAULT_NUM_DIRECT_ARENA to false:
https://github.com/netty/netty/blob/netty-4.0.23.Final/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java#L76

When nDirectArena == 0, directArenas is set to null in PooledByteBufAllocator, 
which means that isDirectBufferPooled should return false.

So if everything is hooked up as it should be, the code looks like it should 
not do direct allocation. It's possible there's a bug somewhere in the path 
between nDirectArenas and newDirectBuf that causes this not to be the case, 
though.

> Unlimit offHeap memory use cause RM killing the container
> -
>
> Key: SPARK-6056
> URL: https://issues.apache.org/jira/browse/SPARK-6056
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.1
>Reporter: SaintBacchus
>
> No matter set the `preferDirectBufs` or limit the number of thread or not 
> ,spark can not limit the use of offheap memory.
> At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
> Netty had allocated a offheap memory buffer with the same size in heap.
> So how many buffer you want to transfor, the same size offheap memory will be 
> allocated.
> But once the allocated memory size reach the capacity of the overhead momery 
> set in yarn, this executor will be killed.
> I wrote a simple code to test it:
> ```scala
> val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=>new 
> Array[Byte](10*1024*1024)).persist
> bufferRdd.count
> val part =  bufferRdd.partitions(0)
> val sparkEnv = SparkEnv.get
> val blockMgr = sparkEnv.blockManager
> val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
> val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
> val len = resultIt.map(_.length).sum
> ```
> If use multi-thread to get len, the physical memery will soon   exceed the 
> limit set by spark.yarn.executor.memoryOverhead



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-27 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14340834#comment-14340834
 ] 

Aaron Davidson commented on SPARK-6056:
---

cc [~rxin]

> Unlimit offHeap memory use cause RM killing the container
> -
>
> Key: SPARK-6056
> URL: https://issues.apache.org/jira/browse/SPARK-6056
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.1
>Reporter: SaintBacchus
>
> No matter set the `preferDirectBufs` or limit the number of thread or not 
> ,spark can not limit the use of offheap memory.
> At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
> Netty had allocated a offheap memory buffer with the same size in heap.
> So how many buffer you want to transfor, the same size offheap memory will be 
> allocated.
> But once the allocated memory size reach the capacity of the overhead momery 
> set in yarn, this executor will be killed.
> I wrote a simple code to test it:
> ```scala
> val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=>new 
> Array[Byte](10*1024*1024)).persist
> bufferRdd.count
> val part =  bufferRdd.partitions(0)
> val sparkEnv = SparkEnv.get
> val blockMgr = sparkEnv.blockManager
> val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
> val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
> val len = resultIt.map(_.length).sum
> ```
> If use multi-thread to get len, the physical memery will soon   exceed the 
> limit set by spark.yarn.executor.memoryOverhead



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6026) Eliminate the bypassMergeThreshold parameter and associated hash-ish shuffle within the Sort shuffle code

2015-02-25 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14338002#comment-14338002
 ] 

Aaron Davidson commented on SPARK-6026:
---

Interesting -- what was the setup in which you observed this behavior? Perhaps 
my intuition on the relative performance at small partitions is incorrect.

> Eliminate the bypassMergeThreshold parameter and associated hash-ish shuffle 
> within the Sort shuffle code
> -
>
> Key: SPARK-6026
> URL: https://issues.apache.org/jira/browse/SPARK-6026
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.3.0
>Reporter: Kay Ousterhout
>
> The bypassMergeThreshold parameter (and associated use of a hash-ish shuffle 
> when the number of partitions is less than this) is basically a workaround 
> for SparkSQL, because the fact that the sort-based shuffle stores 
> non-serialized objects is a deal-breaker for SparkSQL, which re-uses objects. 
>  Once the sort-based shuffle is changed to store serialized objects, we 
> should never be secretly doing hash-ish shuffle even when the user has 
> specified to use sort-based shuffle (because of its otherwise worse 
> performance).
> [~rxin][~adav], masters of shuffle, it would be helpful to get agreement from 
> you on this proposal (and also a sanity check that I've correctly 
> characterized the issue).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5124) Standardize internal RPC interface

2015-02-25 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14337992#comment-14337992
 ] 

Aaron Davidson commented on SPARK-5124:
---

As Reynold mentioned me mentioning, I am partial towards something like 
receiveAndReply(msg: Any, response: RpcResponseCallback). This makes the 
difference in parameters of the two more obvious (more so than why one takes an 
Any and the other takes an RpcMessage).

> Standardize internal RPC interface
> --
>
> Key: SPARK-5124
> URL: https://issues.apache.org/jira/browse/SPARK-5124
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Reynold Xin
>Assignee: Shixiong Zhu
> Attachments: Pluggable RPC - draft 1.pdf, Pluggable RPC - draft 2.pdf
>
>
> In Spark we use Akka as the RPC layer. It would be great if we can 
> standardize the internal RPC interface to facilitate testing. This will also 
> provide the foundation to try other RPC implementations in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6026) Eliminate the bypassMergeThreshold parameter and associated hash-ish shuffle within the Sort shuffle code

2015-02-25 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14337870#comment-14337870
 ] 

Aaron Davidson commented on SPARK-6026:
---

What is the main motivation for this change, out of curiosity? I do not think 
that the hash-ish shuffle is slower for smaller number of partitions. 
(Simplifying the code is a good argument, though.)

> Eliminate the bypassMergeThreshold parameter and associated hash-ish shuffle 
> within the Sort shuffle code
> -
>
> Key: SPARK-6026
> URL: https://issues.apache.org/jira/browse/SPARK-6026
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.3.0
>Reporter: Kay Ousterhout
>
> The bypassMergeThreshold parameter (and associated use of a hash-ish shuffle 
> when the number of partitions is less than this) is basically a workaround 
> for SparkSQL, because the fact that the sort-based shuffle stores 
> non-serialized objects is a deal-breaker for SparkSQL, which re-uses objects. 
>  Once the sort-based shuffle is changed to store serialized objects, we 
> should never be secretly doing hash-ish shuffle even when the user has 
> specified to use sort-based shuffle (because of its otherwise worse 
> performance).
> [~rxin][~adav], masters of shuffle, it would be helpful to get agreement from 
> you on this proposal (and also a sanity check that I've correctly 
> characterized the issue).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5124) Standardize internal RPC interface

2015-02-25 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14337745#comment-14337745
 ] 

Aaron Davidson commented on SPARK-5124:
---

[~zsxwing] For receiveAndReply, I think the intention is message.reply(), so it 
can be performed in a separate thread.

My comment on failure would be so that exceptions can be relayed during 
sendWithReply() (this is natural anyway if sendWithReply returns a Future, it 
can just be completed with the exception).

> Standardize internal RPC interface
> --
>
> Key: SPARK-5124
> URL: https://issues.apache.org/jira/browse/SPARK-5124
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Reynold Xin
>Assignee: Shixiong Zhu
> Attachments: Pluggable RPC - draft 1.pdf, Pluggable RPC - draft 2.pdf
>
>
> In Spark we use Akka as the RPC layer. It would be great if we can 
> standardize the internal RPC interface to facilitate testing. This will also 
> provide the foundation to try other RPC implementations in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5124) Standardize internal RPC interface

2015-02-25 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14336974#comment-14336974
 ] 

Aaron Davidson commented on SPARK-5124:
---

I tend to prefer having explicit message.reply() semantics over Akka's weird 
ask. This is how the Transport layer implements RPCs, for instance: 
https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java#L26

I might even supplement message with a reply with failure method, so exceptions 
can be relayed.

I have not found this mechanism to be a bottleneck at many thousands of 
requests per second.

> Standardize internal RPC interface
> --
>
> Key: SPARK-5124
> URL: https://issues.apache.org/jira/browse/SPARK-5124
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Reynold Xin
>Assignee: Shixiong Zhu
> Attachments: Pluggable RPC - draft 1.pdf, Pluggable RPC - draft 2.pdf
>
>
> In Spark we use Akka as the RPC layer. It would be great if we can 
> standardize the internal RPC interface to facilitate testing. This will also 
> provide the foundation to try other RPC implementations in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3889) JVM dies with SIGBUS, resulting in ConnectionManager failed ACK

2015-02-10 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14314650#comment-14314650
 ] 

Aaron Davidson commented on SPARK-3889:
---

The only place we memory map in 1.1 is this method: 
https://github.com/apache/spark/blob/branch-1.1/core/src/main/scala/org/apache/spark/storage/DiskStore.scala#L106

This threshold is configurable with "spark.storage.memoryMapThreshold" -- we 
upped the default from 2 KB to 2 MB in 1.2, which you could try here as well.

> JVM dies with SIGBUS, resulting in ConnectionManager failed ACK
> ---
>
> Key: SPARK-3889
> URL: https://issues.apache.org/jira/browse/SPARK-3889
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
>Priority: Critical
> Fix For: 1.2.0
>
>
> Here's the first part of the core dump, possibly caused by a job which 
> shuffles a lot of very small partitions.
> {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGBUS (0x7) at pc=0x7fa5885fcdb0, pid=488, tid=140343502632704
> #
> # JRE version: 7.0_25-b30
> # Java VM: OpenJDK 64-Bit Server VM (23.7-b01 mixed mode linux-amd64 
> compressed oops)
> # Problematic frame:
> # v  ~StubRoutines::jbyte_disjoint_arraycopy
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # If you would like to submit a bug report, please include
> # instructions on how to reproduce the bug and visit:
> #   https://bugs.launchpad.net/ubuntu/+source/openjdk-7/
> #
> ---  T H R E A D  ---
> Current thread (0x7fa4b0631000):  JavaThread "Executor task launch 
> worker-170" daemon [_thread_in_Java, id=6783, 
> stack(0x7fa4448ef000,0x7fa4449f)]
> siginfo:si_signo=SIGBUS: si_errno=0, si_code=2 (BUS_ADRERR), 
> si_addr=0x7fa428f79000
> {code}
> Here is the only useful content I can find related to JVM and SIGBUS from 
> Google: https://bugzilla.redhat.com/show_bug.cgi?format=multiple&id=976664
> It appears it may be related to disposing byte buffers, which we do in the 
> ConnectionManager -- we mmap shuffle files via ManagedBuffer and dispose of 
> them in BufferMessage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3889) JVM dies with SIGBUS, resulting in ConnectionManager failed ACK

2015-02-10 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14314651#comment-14314651
 ] 

Aaron Davidson commented on SPARK-3889:
---

The only place we memory map in 1.1 is this method: 
https://github.com/apache/spark/blob/branch-1.1/core/src/main/scala/org/apache/spark/storage/DiskStore.scala#L106

This threshold is configurable with "spark.storage.memoryMapThreshold" -- we 
upped the default from 2 KB to 2 MB in 1.2, which you could try here as well.

> JVM dies with SIGBUS, resulting in ConnectionManager failed ACK
> ---
>
> Key: SPARK-3889
> URL: https://issues.apache.org/jira/browse/SPARK-3889
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
>Priority: Critical
> Fix For: 1.2.0
>
>
> Here's the first part of the core dump, possibly caused by a job which 
> shuffles a lot of very small partitions.
> {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGBUS (0x7) at pc=0x7fa5885fcdb0, pid=488, tid=140343502632704
> #
> # JRE version: 7.0_25-b30
> # Java VM: OpenJDK 64-Bit Server VM (23.7-b01 mixed mode linux-amd64 
> compressed oops)
> # Problematic frame:
> # v  ~StubRoutines::jbyte_disjoint_arraycopy
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # If you would like to submit a bug report, please include
> # instructions on how to reproduce the bug and visit:
> #   https://bugs.launchpad.net/ubuntu/+source/openjdk-7/
> #
> ---  T H R E A D  ---
> Current thread (0x7fa4b0631000):  JavaThread "Executor task launch 
> worker-170" daemon [_thread_in_Java, id=6783, 
> stack(0x7fa4448ef000,0x7fa4449f)]
> siginfo:si_signo=SIGBUS: si_errno=0, si_code=2 (BUS_ADRERR), 
> si_addr=0x7fa428f79000
> {code}
> Here is the only useful content I can find related to JVM and SIGBUS from 
> Google: https://bugzilla.redhat.com/show_bug.cgi?format=multiple&id=976664
> It appears it may be related to disposing byte buffers, which we do in the 
> ConnectionManager -- we mmap shuffle files via ManagedBuffer and dispose of 
> them in BufferMessage.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2958) FileClientHandler should not be shared in the pipeline

2015-02-08 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311796#comment-14311796
 ] 

Aaron Davidson commented on SPARK-2958:
---

This class no longer exists (I think we removed the old netty-based code path), 
so I think we can close this.

> FileClientHandler should not be shared in the pipeline
> --
>
> Key: SPARK-2958
> URL: https://issues.apache.org/jira/browse/SPARK-2958
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Reynold Xin
>
> Netty module creates a single FileClientHandler and shares it in all threads. 
> We should create a new one for each pipeline thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-2958) FileClientHandler should not be shared in the pipeline

2015-02-08 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson closed SPARK-2958.
-
Resolution: Done

> FileClientHandler should not be shared in the pipeline
> --
>
> Key: SPARK-2958
> URL: https://issues.apache.org/jira/browse/SPARK-2958
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Reynold Xin
>
> Netty module creates a single FileClientHandler and shares it in all threads. 
> We should create a new one for each pipeline thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-1180) Allow to provide a custom persistence engine

2015-02-01 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-1180.
---
   Resolution: Duplicate
Fix Version/s: 1.3.0
 Assignee: Prashant Sharma

> Allow to provide a custom persistence engine
> 
>
> Key: SPARK-1180
> URL: https://issues.apache.org/jira/browse/SPARK-1180
> Project: Spark
>  Issue Type: Improvement
>Reporter: Jacek Lewandowski
>Assignee: Prashant Sharma
>Priority: Minor
> Fix For: 1.3.0
>
>
> Currently Spark supports only predefined ZOOKEEPER and FILESYSTEM persistence 
> engines. It would be nice to give a possibility to provide custom persistence 
> engine by specifying a class name in {{spark.deploy.recoveryMode}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-5073) "spark.storage.memoryMapThreshold" has two default values

2015-01-11 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-5073.
---
Resolution: Fixed

> "spark.storage.memoryMapThreshold" has two default values
> -
>
> Key: SPARK-5073
> URL: https://issues.apache.org/jira/browse/SPARK-5073
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Jianhui Yuan
>Priority: Minor
>
> In org.apache.spark.storage.DiskStore:
>  val minMemoryMapBytes = 
> blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)
> In org.apache.spark.network.util.TransportConf:
>  public int memoryMapBytes() {
>  return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 
> 1024);
>  }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4868) Twitter DStream.map() throws "Task not serializable"

2014-12-16 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14249525#comment-14249525
 ] 

Aaron Davidson commented on SPARK-4868:
---

The repl has funny namespacing going on, what happens if you put your no-op 
function inside of an object, a la
object Test {
  def noop(a: Any) = {}
}
and then invoke it statically? Make sure to restart the repl in between 
attempts to avoid polluted namespaces as well.

If that doesn't work you can always mark the streaming context as transient, 
but that never feels quite right.

> Twitter DStream.map() throws "Task not serializable"
> 
>
> Key: SPARK-4868
> URL: https://issues.apache.org/jira/browse/SPARK-4868
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, Streaming
>Affects Versions: 1.1.1
> Environment: * Spark 1.1.1
> * EC2 cluster with 1 slave spun up using {{spark-ec2}}
> * twitter4j 3.0.3
> * {{spark-shell}} called with {{--jars}} argument to load 
> {{spark-streaming-twitter_2.10-1.0.0.jar}} as well as all the twitter4j jars.
>Reporter: Nicholas Chammas
>Priority: Minor
>
> _(Continuing the discussion [started here on the Spark user 
> list|http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-in-Spark-Streaming-td5725.html].)_
> The following Spark Streaming code throws a serialization exception I do not 
> understand.
> {code}
> import twitter4j.auth.{Authorization, OAuthAuthorization}
> import twitter4j.conf.ConfigurationBuilder 
> import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
> import org.apache.spark.streaming.twitter.TwitterUtils
> def getAuth(): Option[Authorization] = {
>   System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
>   System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
>   System.setProperty("twitter4j.oauth.accessToken", "accessToken") 
>   System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
>   Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
> } 
> def noop(a: Any): Any = {
>   a
> }
> val ssc = new StreamingContext(sc, Seconds(5))
> val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
> val liveTweets = liveTweetObjects.map(_.getText)
> liveTweets.map(t => noop(t)).print()  // exception here
> ssc.start()
> {code}
> So before I even start the StreamingContext, I get the following stack trace:
> {code}
> scala> liveTweets.map(t => noop(t)).print()
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>   at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
>   at $iwC$$iwC$$iwC$$iwC.(:27)
>   at $iwC$$iwC$$iwC.(:32)
>   at $iwC$$iwC.(:34)
>   at $iwC.(:36)
>   at (:38)
>   at .(:42)
>   at .()
>   at .(:7)
>   at .()
>   at $print()
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
>   at 
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
>   at 
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
>   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
>   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
>   at 
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
>   at 
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
>   at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
>   at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
>   at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
>   at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
>   at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
>   at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
>   at 
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
>   at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
>   at org.apache.spark.repl.SparkILoo

[jira] [Created] (SPARK-4864) Add documentation to Netty-based configs

2014-12-16 Thread Aaron Davidson (JIRA)
Aaron Davidson created SPARK-4864:
-

 Summary: Add documentation to Netty-based configs
 Key: SPARK-4864
 URL: https://issues.apache.org/jira/browse/SPARK-4864
 Project: Spark
  Issue Type: Bug
  Components: Documentation
Reporter: Aaron Davidson
Assignee: Aaron Davidson


Currently there is no public documentation for the NettyBlockTransferService or 
various configuration options of the network package. We should add some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-4837) NettyBlockTransferService does not abide by spark.blockManager.port config option

2014-12-12 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson updated SPARK-4837:
--
Summary: NettyBlockTransferService does not abide by 
spark.blockManager.port config option  (was: NettyBlockTransferService does not 
abide by spark.blockManager.port config optino)

> NettyBlockTransferService does not abide by spark.blockManager.port config 
> option
> -
>
> Key: SPARK-4837
> URL: https://issues.apache.org/jira/browse/SPARK-4837
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
>Priority: Critical
>
> The NettyBlockTransferService always binds to a random port, and does not use 
> the spark.blockManager.port config as specified.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4837) NettyBlockTransferService does not abide by spark.blockManager.port config optino

2014-12-12 Thread Aaron Davidson (JIRA)
Aaron Davidson created SPARK-4837:
-

 Summary: NettyBlockTransferService does not abide by 
spark.blockManager.port config optino
 Key: SPARK-4837
 URL: https://issues.apache.org/jira/browse/SPARK-4837
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Aaron Davidson
Assignee: Aaron Davidson
Priority: Critical


The NettyBlockTransferService always binds to a random port, and does not use 
the spark.blockManager.port config as specified.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-12 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14244710#comment-14244710
 ] 

Aaron Davidson commented on SPARK-4740:
---

The thing is, the decision about which IO requests to make is done at a higher 
level -- individual blocks are requested by the ShuffleBlockFetcherIterator. So 
it is not changed based on Netty vs NIO. The only difference that i can think 
of is the concurrency of the requests -- in Netty we only make 1 concurrent 
request per machine in the cluster (so in the 4-node case, 3 concurrent 
requests to disk), while in NIO we saw 20 concurrent threads reading from disk 
in the same environment. It's possible that the disk controllers handle the 
increased request parallelism by correctly merging IO requests at the hardware 
level.

To this end, we added the numConnectionsPerPeer option in order to allow users 
to make multiple concurrent requests per machine. However, testing on the HDD 
cluster did not seem to reveal that the problem was resolved. I would be 
curious what the results would be for the powerful CPU test with 
numConnectionsPerPeer set to around 6 (which should enable up to 18 concurrent 
requests from disk, similar to the observed NIO 20).

Another possibility is that while the requests in both the Netty and NIO cases 
are the same, we receive all the IO requests from a single reduce task before 
moving onto the next. Two reduce tasks are going to read through all the local 
files, so if we asked the disk to read, say, 10 KB from 200 files, and 
subsequently did another sweep of 10 KB from the same 200 files, this could be 
less efficient than if we interleaved the requests such that we ask for 10 KB 
from file0 for reduce0 then 10 KB from file0 for reduce1, then moved on to 
file1. It is possible that somehow NIO is interleaving these requests more 
naturally than Netty, though this problem is not fundamental to either system 
and is probably a happy coincidence if it is the case. We could try, for 
instance, handling the response to block requests on a different event loop 
than receiving the requests, and randomly choose from any buffered requests to 
serve IO.

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
>Assignee: Reynold Xin
> Attachments: (rxin patch better executor)TestRunner  sort-by-key - 
> Thread dump for executor 3_files.zip, (rxin patch normal executor)TestRunner  
> sort-by-key - Thread dump for executor 0 _files.zip, Spark-perf Test Report 
> 16 Cores per Executor.pdf, Spark-perf Test Report.pdf, TestRunner  
> sort-by-key - Thread dump for executor 1_files (Netty-48 Cores per node).zip, 
> TestRunner  sort-by-key - Thread dump for executor 1_files (Nio-48 cores per 
> node).zip, rxin_patch-on_4_node_cluster_48CoresPerNode(Unbalance).7z
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-10 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14241463#comment-14241463
 ] 

Aaron Davidson commented on SPARK-4740:
---

Clarification: The merged version of Reynold's patch has connectionsPerPeer set 
to 1, since we could not demonstrate a significant improvement with other 
values. In your test with HDDs, did you have it set, or was it using the 
default value?

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
>Assignee: Reynold Xin
> Attachments: (rxin patch better executor)TestRunner  sort-by-key - 
> Thread dump for executor 3_files.zip, (rxin patch normal executor)TestRunner  
> sort-by-key - Thread dump for executor 0 _files.zip, Spark-perf Test Report 
> 16 Cores per Executor.pdf, Spark-perf Test Report.pdf, TestRunner  
> sort-by-key - Thread dump for executor 1_files (Netty-48 Cores per node).zip, 
> TestRunner  sort-by-key - Thread dump for executor 1_files (Nio-48 cores per 
> node).zip, rxin_patch-on_4_node_cluster_48CoresPerNode(Unbalance).7z
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-09 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14240704#comment-14240704
 ] 

Aaron Davidson commented on SPARK-4740:
---

We had 4 i2.8xlarge executors each with 32 vCPUs, 244 GB RAM (64 GB executor 
memory), 8x800 GB SSD (~400MB/s max throughput each), 10GigE.
Our driver was an i2.xlarge with 4 vCPUs, 30.5 GB RAM (20 GB driver memory), 
1x800 GB SSD, 10GigE.

Ran master spark-perf and spark with a modification to allow it take a Long as 
a number of records, no change in base configuration except netty vs nio and 
numConnectionsPerPeer when specified above.

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
>Assignee: Reynold Xin
> Attachments: (rxin patch better executor)TestRunner  sort-by-key - 
> Thread dump for executor 3_files.zip, (rxin patch normal executor)TestRunner  
> sort-by-key - Thread dump for executor 0 _files.zip, Spark-perf Test Report 
> 16 Cores per Executor.pdf, Spark-perf Test Report.pdf, TestRunner  
> sort-by-key - Thread dump for executor 1_files (Netty-48 Cores per node).zip, 
> TestRunner  sort-by-key - Thread dump for executor 1_files (Nio-48 cores per 
> node).zip, rxin_patch-on_4_node_cluster_48CoresPerNode(Unbalance).7z
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-09 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14240579#comment-14240579
 ] 

Aaron Davidson edited comment on SPARK-4740 at 12/10/14 3:22 AM:
-

I spent another day trying to reproduce the issue. Unfortunately, I was again 
unable to reproduce it, even without Renyold's patch in play.

NIO: 35 minutes in reduce
Netty: 34 minutes in reduce

I ended up getting 10GigE, but it was not utilized, as the cores were maxed out 
with only around 40MB/s of read throughput. I realized at the end of testing 
that the SSDs were not being used because we had 160 GB of buffer cache outside 
of the JVMs which was able to cache all the data. I then disabled use of the 
majority of the buffer cache (down to 30GB outside of the JVM) and saw the 
above number of 40 MB/s throughput from the SSDs. Performance did not decrease 
significantly when we switched from buffer cache to SSDs, which is not 
surprising given the relatively low throughput requirements.

The issue may be prominent when there is sufficient CPU capacity to exceed a 
few disks' worth of throughput, but that was not even nearly the case here; on 
the other hand, you guys are using HDDs and more cores. Unfortunately, no EC2 
instance type has 8 HDDs except hs1 which has 24, but only 16 cores, and it may 
take significantly more time to get a spark cluster started on that since we 
have not tested the scripts on the hs1 instance type.

I am going to downgrade this from blocker to critical for now. We should still 
try to diagnose and fix the issue, but the final fix may go into 1.2.1 instead 
of blocking 1.2 from being released. We will feature a notice prominently in 
the release notes that netty may may cause a regression for certain cluster 
configurations.

By the way, I also ran the full data set with 2 connections per peer, but I did 
not see any sort of the strange asymmetry that you guys reported from your 
cluster. All machines ran at around the same rate, completing very nearly the 
same number of tasks each time.


was (Author: ilikerps):
I spent another day trying to reproduce the issue. Unfortunately, I was again 
unable to reproduce it, even without Renyold's patch in play.

NIO: 35 minutes in reduce
Netty: 34 minutes in reduce

40 minutes

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
>Assignee: Reynold Xin
> Attachments: (rxin patch better executor)TestRunner  sort-by-key - 
> Thread dump for executor 3_files.zip, (rxin patch normal executor)TestRunner  
> sort-by-key - Thread dump for executor 0 _files.zip, Spark-perf Test Report 
> 16 Cores per Executor.pdf, Spark-perf Test Report.pdf, TestRunner  
> sort-by-key - Thread dump for executor 1_files (Netty-48 Cores per node).zip, 
> TestRunner  sort-by-key - Thread dump for executor 1_files (Nio-48 cores per 
> node).zip, rxin_patch-on_4_node_cluster_48CoresPerNode(Unbalance).7z
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-09 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14240579#comment-14240579
 ] 

Aaron Davidson commented on SPARK-4740:
---

I spent another day trying to reproduce the issue. Unfortunately, I was again 
unable to reproduce it, even without Renyold's patch in play.

NIO: 35 minutes in reduce
Netty: 34 minutes in reduce

40 minutes

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
>Assignee: Reynold Xin
>Priority: Blocker
> Attachments: (rxin patch better executor)TestRunner  sort-by-key - 
> Thread dump for executor 3_files.zip, (rxin patch normal executor)TestRunner  
> sort-by-key - Thread dump for executor 0 _files.zip, Spark-perf Test Report 
> 16 Cores per Executor.pdf, Spark-perf Test Report.pdf, TestRunner  
> sort-by-key - Thread dump for executor 1_files (Netty-48 Cores per node).zip, 
> TestRunner  sort-by-key - Thread dump for executor 1_files (Nio-48 cores per 
> node).zip, rxin_patch-on_4_node_cluster_48CoresPerNode(Unbalance).7z
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-4805) BlockTransferMessage.toByteArray() trips assertion

2014-12-09 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-4805.
---
Resolution: Fixed
  Assignee: Sean Owen

> BlockTransferMessage.toByteArray() trips assertion
> --
>
> Key: SPARK-4805
> URL: https://issues.apache.org/jira/browse/SPARK-4805
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.1.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>
> The {{ByteBuf}} below is allocated to have enough room to encode just the 
> message, but a type byte is written too. The buffer resizes and is left with 
> capacity, which is a minor waste. But when assertions are on, it fails the 
> assertion below. This is triggered by unit tests, but the Java test for this 
> wasn't running. You can verify that a simple "+1" in the allocated size fixes 
> it.
> {code}
>   public byte[] toByteArray() {
> ByteBuf buf = Unpooled.buffer(encodedLength());
> buf.writeByte(type().id);
> encode(buf);
> assert buf.writableBytes() == 0 : "Writable bytes remain: " + 
> buf.writableBytes();
> return buf.array();
>   }
> {code}
> [~ilikerps]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-08 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14239102#comment-14239102
 ] 

Aaron Davidson commented on SPARK-4740:
---

I tried to reproduce this on an EC2 cluster is 4 i2.8xlarge. These have 32 
vCPUs, 8 SSDs, but my interconnect is probably only 1GigE. I ran with the same 
parameters, except 10x fewer values, to iterate more quickly. Unfortunately, I 
was unable to reproduce the issue -- Netty ran with all nodes at the same speed 
and slightly faster than NIO.

However, looking at your graphs, the 1GigE may be the bottleneck here, so 
tomorrow morning I will try to run with 10GigE and the normal data size.

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
>Assignee: Reynold Xin
>Priority: Blocker
> Attachments: (rxin patch better executor)TestRunner  sort-by-key - 
> Thread dump for executor 3_files.zip, (rxin patch normal executor)TestRunner  
> sort-by-key - Thread dump for executor 0 _files.zip, Spark-perf Test Report 
> 16 Cores per Executor.pdf, Spark-perf Test Report.pdf, TestRunner  
> sort-by-key - Thread dump for executor 1_files (Netty-48 Cores per node).zip, 
> TestRunner  sort-by-key - Thread dump for executor 1_files (Nio-48 cores per 
> node).zip, rxin_patch-on_4_node_cluster_48CoresPerNode(Unbalance).7z
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-06 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14237034#comment-14237034
 ] 

Aaron Davidson commented on SPARK-4740:
---

Do you have speculation enabled, by the way? I'm wondering if some tasks get 
stuck indefinitely and never complete.

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
>Assignee: Reynold Xin
>Priority: Blocker
> Attachments: (rxin patch better executor)TestRunner  sort-by-key - 
> Thread dump for executor 3_files.zip, (rxin patch normal executor)TestRunner  
> sort-by-key - Thread dump for executor 0 _files.zip, Spark-perf Test Report 
> 16 Cores per Executor.pdf, Spark-perf Test Report.pdf, TestRunner  
> sort-by-key - Thread dump for executor 1_files (Netty-48 Cores per node).zip, 
> TestRunner  sort-by-key - Thread dump for executor 1_files (Nio-48 cores per 
> node).zip
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-06 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14237026#comment-14237026
 ] 

Aaron Davidson commented on SPARK-4740:
---

Could we get logs from the good/bad executors? I'm curious to which nodes each 
one has connected. (Ideally the logs would be at the DEBUG level, otherwise it 
probably would not have enough information.) We can also work on trying to 
repro on our own setup, though this looks like we're pretty close.

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
>Assignee: Reynold Xin
>Priority: Blocker
> Attachments: (rxin patch better executor)TestRunner  sort-by-key - 
> Thread dump for executor 3_files.zip, (rxin patch normal executor)TestRunner  
> sort-by-key - Thread dump for executor 0 _files.zip, Spark-perf Test Report 
> 16 Cores per Executor.pdf, Spark-perf Test Report.pdf, TestRunner  
> sort-by-key - Thread dump for executor 1_files (Netty-48 Cores per node).zip, 
> TestRunner  sort-by-key - Thread dump for executor 1_files (Nio-48 cores per 
> node).zip
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-06 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14237018#comment-14237018
 ] 

Aaron Davidson commented on SPARK-4740:
---

Very interesting -- "good" executor is using 6 cores for reading (expected for 
numConnectionsPerPeer = 2), and "bad" one is only using 2. Something about the 
patch may be nondeterministically failing to work...

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
>Assignee: Reynold Xin
>Priority: Blocker
> Attachments: (rxin patch better executor)TestRunner  sort-by-key - 
> Thread dump for executor 3_files.zip, (rxin patch normal executor)TestRunner  
> sort-by-key - Thread dump for executor 0 _files.zip, Spark-perf Test Report 
> 16 Cores per Executor.pdf, Spark-perf Test Report.pdf, TestRunner  
> sort-by-key - Thread dump for executor 1_files (Netty-48 Cores per node).zip, 
> TestRunner  sort-by-key - Thread dump for executor 1_files (Nio-48 cores per 
> node).zip
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-06 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236898#comment-14236898
 ] 

Aaron Davidson commented on SPARK-4740:
---

Thanks for testing out the patch. Could we get a jstack, perhaps on both the 
fast and slow executors?

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
>Assignee: Reynold Xin
>Priority: Blocker
> Attachments: Spark-perf Test Report 16 Cores per Executor.pdf, 
> Spark-perf Test Report.pdf, TestRunner  sort-by-key - Thread dump for 
> executor 1_files (Netty-48 Cores per node).zip, TestRunner  sort-by-key - 
> Thread dump for executor 1_files (Nio-48 cores per node).zip
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-05 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14235813#comment-14235813
 ] 

Aaron Davidson commented on SPARK-4740:
---

I think we have ourselves a winner. NIO is reading from disk with 20 threads 
(handle-message-executor-*), so it's certainly saturating all disks. Netty is 
only reading with 3 threads.

It's a kind of silly idea, but at the top of 
TransportClientFactory#createClient, you could change this code (which reuses 
existing clients):

{code}
  if (cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}", address, 
cachedClient);
return cachedClient;
  } else {
logger.info("Found inactive connection to {}, closing it.", address);
connectionPool.remove(address, cachedClient); // Remove inactive 
clients.
  }
{code}

to randomly discard the existing client, by simply changing it to this:

{code}
  if (cachedClient.isActive() && Math.random() > 0.1) {
logger.trace("Returning cached connection to {}: {}", address, 
cachedClient);
return cachedClient;
  } else {
logger.info("Found inactive connection to {}, closing it.", address);
connectionPool.remove(address, cachedClient); // Remove inactive 
clients.
  }
{code}

This would cause us to create a new connection (and not close the old one) 1/10 
of the time, hopefully causing us to use ~5 concurrent clients per host rather 
than 1. Not a real solution, of course, but could demonstrate very clearly what 
the issue is if it works.

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
> Attachments: Spark-perf Test Report 16 Cores per Executor.pdf, 
> Spark-perf Test Report.pdf, TestRunner  sort-by-key - Thread dump for 
> executor 1_files (Netty-48 Cores per node).zip, TestRunner  sort-by-key - 
> Thread dump for executor 1_files (Nio-48 cores per node).zip
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-04 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14235241#comment-14235241
 ] 

Aaron Davidson commented on SPARK-4740:
---

I could believe that the result is no better. Could you provide a jstack from 
the NIO run?

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
> Attachments: Spark-perf Test Report.pdf, TestRunner  sort-by-key - 
> Thread dump for executor 1_files (Netty-48 Cores per node).zip
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey

2014-12-04 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14235015#comment-14235015
 ] 

Aaron Davidson commented on SPARK-4740:
---

To clarify, we have two hypotheses currently:

1. Something is weird about transferTo that actually makes it less efficient 
than reading the whole thing into memory in this situation.
2. The fact that we only have 1 connection (and thus serving thread) per peer 
is causing us to only concurrently access up to 3 disks at once, though we're 
not sure if NIO is using more than 3 threads to serve either.

If we rule out transferTo and it turns out NIO is using more than 3 threads to 
serve, it is likely that we should try making TransportClientFactory able to 
produce more than 1 TransportClient per host for situations where the number of 
Executors is much less than the number of cores per Executor.

> Netty's network throughput is about 1/2 of NIO's in spark-perf sortByKey
> 
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
> Attachments: Spark-perf Test Report.pdf, TestRunner  sort-by-key - 
> Thread dump for executor 1_files (48 Cores per node).zip
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4740) Netty's network bandwidth is much lower than NIO in spark-perf and Netty takes longer running time

2014-12-04 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234758#comment-14234758
 ] 

Aaron Davidson commented on SPARK-4740:
---

Could you try to set "spark.shuffle.io.serverThreads" and 
"spark.shuffle.io.clientThreads" to 48? We have an artificial max default of 8 
to limit off-heap memory usage, but it's possible this is not sufficient to 
saturate 10GB/s.

> Netty's network bandwidth is much lower than NIO in spark-perf and Netty 
> takes longer running time
> --
>
> Key: SPARK-4740
> URL: https://issues.apache.org/jira/browse/SPARK-4740
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0
>Reporter: Zhang, Liye
> Attachments: Spark-perf Test Report.pdf
>
>
> When testing current spark master (1.3.0-snapshot) with spark-perf 
> (sort-by-key, aggregate-by-key, etc), Netty based shuffle transferService 
> takes much longer time than NIO based shuffle transferService. The network 
> throughput of Netty is only about half of that of NIO. 
> We tested with standalone mode, and the data set we used for test is 20 
> billion records, and the total size is about 400GB. Spark-perf test is 
> Running on a 4 node cluster with 10G NIC, 48 cpu cores per node and each 
> executor memory is 64GB. The reduce tasks number is set to 1000. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-4189) FileSegmentManagedBuffer should have a configurable memory map threshold

2014-12-01 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-4189.
---
Resolution: Fixed
  Assignee: Reynold Xin

> FileSegmentManagedBuffer should have a configurable memory map threshold
> 
>
> Key: SPARK-4189
> URL: https://issues.apache.org/jira/browse/SPARK-4189
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Aaron Davidson
>Assignee: Reynold Xin
>
> One size does not fit all, it would be useful if there was a configuration to 
> change the threshold at which we memory map shuffle files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4189) FileSegmentManagedBuffer should have a configurable memory map threshold

2014-12-01 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14231035#comment-14231035
 ] 

Aaron Davidson commented on SPARK-4189:
---

This was actually filed before https://github.com/apache/spark/pull/3172 and 
was fixed by it.

> FileSegmentManagedBuffer should have a configurable memory map threshold
> 
>
> Key: SPARK-4189
> URL: https://issues.apache.org/jira/browse/SPARK-4189
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Aaron Davidson
>
> One size does not fit all, it would be useful if there was a configuration to 
> change the threshold at which we memory map shuffle files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4644) Implement skewed join

2014-11-29 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14228839#comment-14228839
 ] 

Aaron Davidson commented on SPARK-4644:
---

[~zsxwing] I believe that this problem is related more fundamentally to the 
problem that Spark currently requires that all values for the same key remain 
in memory. Your solution aims to fix this for the specific case of joins, but I 
wonder if we generalize it, if we could solve this for things like groupBy as 
well.

I don't have a fully fleshed out idea yet, but I was considering a model where 
there are 2 types of shuffles: aggregation-based and rearrangement-based. 
Aggregation-based shuffles use partial aggregation and combiners to form and 
merge (K, C) pairs. Rearrangement-based shuffles do not expect a decrease in 
the amount of total data, however, and so my thought is that this model does 
not make sense.

Instead, we could provide an interface similar to ExternalAppendOnlyMap but 
which returns an Iterator[(K, Iterable[V])] pairs, with some extra semantics 
related to the Iterable[V]s (such as having a .chunkedIterator() method which 
enables block nested loops join).

In this model, join could be implemented by mapping the left side's key to (K, 
1) and the right side to (K, 2) and having logic which reads from two adjacent 
value-iterables simultaneously -- e.g., 

val ((k, 1), left: Iterable[V]) = map.next()
val ((k, 2), right: Iterable[V]) = map.next()
// perform merge using the left and right iterators.


> Implement skewed join
> -
>
> Key: SPARK-4644
> URL: https://issues.apache.org/jira/browse/SPARK-4644
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Shixiong Zhu
> Attachments: Skewed Join Design Doc.pdf
>
>
> Skewed data is not rare. For example, a book recommendation site may have 
> several books which are liked by most of the users. Running ALS on such 
> skewed data will raise a OutOfMemory error, if some book has too many users 
> which cannot be fit into memory. To solve it, we propose a skewed join 
> implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-4170) Closure problems when running Scala app that "extends App"

2014-11-27 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-4170.
---
Resolution: Fixed
  Assignee: Sean Owen

> Closure problems when running Scala app that "extends App"
> --
>
> Key: SPARK-4170
> URL: https://issues.apache.org/jira/browse/SPARK-4170
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Minor
>
> Michael Albert noted this problem on the mailing list 
> (http://apache-spark-user-list.1001560.n3.nabble.com/BUG-when-running-as-quot-extends-App-quot-closures-don-t-capture-variables-td17675.html):
> {code}
> object DemoBug extends App {
> val conf = new SparkConf()
> val sc = new SparkContext(conf)
> val rdd = sc.parallelize(List("A","B","C","D"))
> val str1 = "A"
> val rslt1 = rdd.filter(x => { x != "A" }).count
> val rslt2 = rdd.filter(x => { str1 != null && x != "A" }).count
> 
> println("DemoBug: rslt1 = " + rslt1 + " rslt2 = " + rslt2)
> }
> {code}
> This produces the output:
> {code}
> DemoBug: rslt1 = 3 rslt2 = 0
> {code}
> If instead there is a proper "main()", it works as expected.
> I also this week noticed that in a program which "extends App", some values 
> were inexplicably null in a closure. When changing to use main(), it was fine.
> I assume there is a problem with variables not being added to the closure 
> when main() doesn't appear in the standard way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4516) Netty off-heap memory use causes executors to be killed by OS

2014-11-25 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14225743#comment-14225743
 ] 

Aaron Davidson commented on SPARK-4516:
---

About my last point, [~rxin], [~pwendell], and I decided it may be better if we 
just cap the number of threads we use by default to 8, to try to avoid issues 
for people who use executors with very large number of cores and were on the 
edge of their off-heap limits already. #3469 implements this, which may cause a 
performance regression if we're wrong about the magic number 8 being an upper 
bound on the useful number of cores. It can be overridden via the 
serverThreads/clientThreads properties, but if anyone sees this as an issue, 
please let me know.

> Netty off-heap memory use causes executors to be killed by OS
> -
>
> Key: SPARK-4516
> URL: https://issues.apache.org/jira/browse/SPARK-4516
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
> Environment: Linux, Mesos
>Reporter: Hector Yee
>Assignee: Aaron Davidson
>Priority: Critical
>  Labels: netty, shuffle
> Fix For: 1.2.0
>
>
> The netty block transfer manager has a race condition where it closes an 
> active connection resulting in the error below. Switching to nio seems to 
> alleviate the problem.
> {code}
> 14/11/20 18:53:43 INFO TransportClientFactory: Found inactive connection to 
> i-974cd879.inst.aws.airbnb.com/10.154.228.43:57773, closing it.
> 14/11/20 18:53:43 ERROR RetryingBlockFetcher: Exception while beginning fetch 
> of 1 outstanding blocks 
> java.io.IOException: Failed to connect to 
> i-974cd879.inst.aws.airbnb.com/10.154.228.43:57773
> at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:141)
> at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
> at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at 
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:148)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:288)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> com.airbnb.common.ml.training.LinearRankerTrainer$$anonfun$7.apply(LinearRankerTrainer.scala:246)
> at 
> com.airbnb.common.ml.training.LinearRankerTrainer$$anonfun$7.apply(LinearRankerTrainer.scala:235)
> at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
> at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused: 
> i-974cd879.inst.aws.airbnb.com/10.154.228.43:57773
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at 
> io.netty.channel.nio.NioEv

[jira] [Commented] (SPARK-4516) Netty off-heap memory use causes executors to be killed by OS

2014-11-25 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14225700#comment-14225700
 ] 

Aaron Davidson commented on SPARK-4516:
---

It turns out there was a real bug which caused us to allocate memory 
proportional to both number of cores and number of _executors_ in the cluster. 
PR [#3465|https://github.com/apache/spark/pull/3465] should remove the latter 
factor, which should greatly decrease the amount of off-heap memory allocated.

Do note that even with this patch, one key feature of the Netty transport 
service is that we do allocate and reuse significant off-heap buffer space 
rather than on-heap, which helps reduce GC pauses. So it's possible that 
certain environments which previously heavily constrained off-heap memory (by 
giving almost all of the container/cgroup's memory to the Spark heap) may have 
to be modified to ensure that at least 32 * (number of cores) MB is available 
to be allocated off-JVM heap.

If this is not possible, you can either disable direct byte buffer usage via 
"spark.shuffle.io.preferDirectBufs" or set "spark.shuffle.io.serverThreads" and 
"spark.shuffle.io.clientThreads" to something smaller than the number of 
executor cores. Typically we find that 10GB/s network cannot saturate more 
than, say, 8 cores on a machine (in practice I've never seen even that many 
required), so we would expect no performance degradation if you set these 
parameters such on beefier machines, and it should cap off-heap allocation to 
order of 256 MB.

> Netty off-heap memory use causes executors to be killed by OS
> -
>
> Key: SPARK-4516
> URL: https://issues.apache.org/jira/browse/SPARK-4516
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
> Environment: Linux, Mesos
>Reporter: Hector Yee
>Priority: Critical
>  Labels: netty, shuffle
>
> The netty block transfer manager has a race condition where it closes an 
> active connection resulting in the error below. Switching to nio seems to 
> alleviate the problem.
> {code}
> 14/11/20 18:53:43 INFO TransportClientFactory: Found inactive connection to 
> i-974cd879.inst.aws.airbnb.com/10.154.228.43:57773, closing it.
> 14/11/20 18:53:43 ERROR RetryingBlockFetcher: Exception while beginning fetch 
> of 1 outstanding blocks 
> java.io.IOException: Failed to connect to 
> i-974cd879.inst.aws.airbnb.com/10.154.228.43:57773
> at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:141)
> at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
> at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at 
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:148)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:288)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> com.airbnb.common.ml.training.LinearRankerTrainer$$anonfun$7.apply(LinearRankerTrainer.scala:246)
> at 
> com.airbnb.common.ml.training.LinearRankerTrainer$$anonfun$7.apply(LinearRankerTrainer.scala:235)
> at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
> at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(Threa

[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-25 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14225681#comment-14225681
 ] 

Aaron Davidson commented on SPARK-2468:
---

Hey guys, I finally got a chance to run a more comprehensive set of tests with 
constrained containers. In doing so, I found a critical issue which caused us 
to allocate direct byte buffers proportional to the number of executors times 
the number of cores, rather than just proportional to the number of cores. With 
patch [#3465|https://github.com/apache/spark/pull/3465], I was able to run a 
shuffle with [~lianhuiwang]'s configuration of 7GB container with 6GB heap and 
2 cores -- prior to the patch, it exceeded the container's limits. 

If you guys get a chance, please let me know if this is sufficient to fix your 
issues with your initial overhead configurations. (Note that while the memory 
usage was greatly decreased, we still allocate a significant amount of off-heap 
memory, so it's possible you need to shift some of the heap to off-heap if your 
off-heap was previously very constrained.)

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4516) Race condition in netty

2014-11-20 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14220114#comment-14220114
 ] 

Aaron Davidson commented on SPARK-4516:
---

Also ideally there would be some other exception earlier on the client that 
indicates when/why the connection became inactive, like a 
ClosedChannelException. If not, we may be silently ignoring an exception that 
should be logged.

> Race condition in netty
> ---
>
> Key: SPARK-4516
> URL: https://issues.apache.org/jira/browse/SPARK-4516
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
> Environment: Linux, Mesos
>Reporter: Hector Yee
>Priority: Critical
>  Labels: netty, shuffle
>
> The netty block transfer manager has a race condition where it closes an 
> active connection resulting in the error below. Switching to nio seems to 
> alleviate the problem.
> {code}
> 14/11/20 18:53:43 INFO TransportClientFactory: Found inactive connection to 
> i-974cd879.inst.aws.airbnb.com/10.154.228.43:57773, closing it.
> 14/11/20 18:53:43 ERROR RetryingBlockFetcher: Exception while beginning fetch 
> of 1 outstanding blocks 
> java.io.IOException: Failed to connect to 
> i-974cd879.inst.aws.airbnb.com/10.154.228.43:57773
> at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:141)
> at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
> at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at 
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:148)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:288)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> com.airbnb.common.ml.training.LinearRankerTrainer$$anonfun$7.apply(LinearRankerTrainer.scala:246)
> at 
> com.airbnb.common.ml.training.LinearRankerTrainer$$anonfun$7.apply(LinearRankerTrainer.scala:235)
> at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
> at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused: 
> i-974cd879.inst.aws.airbnb.com/10.154.228.43:57773
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail:

[jira] [Commented] (SPARK-4516) Race condition in netty

2014-11-20 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14220106#comment-14220106
 ] 

Aaron Davidson commented on SPARK-4516:
---

The code in question, despite the log message, does not actually close the 
socket, but just forgets about it. It seems at least somewhat likely that the 
client is in fact inactive because the port on the other side went down.

Note that the second error is from us trying to create a new connection, not 
from reusing an old connection that was closed. This suggests that the server 
is in fact in a bad state. If you ever get the chance to reproduce this and 
look at the server's logs with netty on, that would be much appreciated.

> Race condition in netty
> ---
>
> Key: SPARK-4516
> URL: https://issues.apache.org/jira/browse/SPARK-4516
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.2.0
> Environment: Linux, Mesos
>Reporter: Hector Yee
>Priority: Critical
>  Labels: netty, shuffle
>
> The netty block transfer manager has a race condition where it closes an 
> active connection resulting in the error below. Switching to nio seems to 
> alleviate the problem.
> {code}
> 14/11/20 18:53:43 INFO TransportClientFactory: Found inactive connection to 
> i-974cd879.inst.aws.airbnb.com/10.154.228.43:57773, closing it.
> 14/11/20 18:53:43 ERROR RetryingBlockFetcher: Exception while beginning fetch 
> of 1 outstanding blocks 
> java.io.IOException: Failed to connect to 
> i-974cd879.inst.aws.airbnb.com/10.154.228.43:57773
> at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:141)
> at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
> at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at 
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:148)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:288)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> com.airbnb.common.ml.training.LinearRankerTrainer$$anonfun$7.apply(LinearRankerTrainer.scala:246)
> at 
> com.airbnb.common.ml.training.LinearRankerTrainer$$anonfun$7.apply(LinearRankerTrainer.scala:235)
> at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
> at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused: 
> i-974cd879.inst.aws.airbnb.com/10.154.228.43:57773
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.N

[jira] [Commented] (SPARK-4434) spark-submit cluster deploy mode JAR URLs are broken in 1.1.1

2014-11-16 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14214166#comment-14214166
 ] 

Aaron Davidson commented on SPARK-4434:
---

Side note: the error message about "file://", which was not introduced in the 
patch you reverted, is incorrect. A "file://XX.jar" URI is never valid. One or 
three slashes must be used; two slashes indicates that a hostname follows.

> spark-submit cluster deploy mode JAR URLs are broken in 1.1.1
> -
>
> Key: SPARK-4434
> URL: https://issues.apache.org/jira/browse/SPARK-4434
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy, Spark Core
>Affects Versions: 1.1.1, 1.2.0
>Reporter: Josh Rosen
>Assignee: Andrew Or
>Priority: Blocker
>
> When submitting a driver using {{spark-submit}} in cluster mode, Spark 1.1.0 
> allowed you to omit the {{file://}} or {{hdfs://}} prefix from the 
> application JAR URL, e.g.
> {code}
> ./bin/spark-submit --deploy-mode cluster --master 
> spark://joshs-mbp.att.net:7077 --class org.apache.spark.examples.SparkPi 
> /Users/joshrosen/Documents/old-spark-releases/spark-1.1.0-bin-hadoop1/lib/spark-examples-1.1.0-hadoop1.0.4.jar
> {code}
> In Spark 1.1.1 and 1.2.0, this same command now fails with an error:
> {code}
> ./bin/spark-submit --deploy-mode cluster --master 
> spark://joshs-mbp.att.net:7077 --class org.apache.spark.examples.SparkPi 
> /Users/joshrosen/Documents/Spark/examples/target/scala-2.10/spark-examples_2.10-1.1.2-SNAPSHOT.jar
> Jar url 
> 'file:/Users/joshrosen/Documents/Spark/examples/target/scala-2.10/spark-examples_2.10-1.1.2-SNAPSHOT.jar'
>  is not in valid format.
> Must be a jar file path in URL format (e.g. hdfs://XX.jar, file://XX.jar)
> Usage: DriverClient [options] launch
> [driver options]
> Usage: DriverClient kill  
> {code}
> I tried changing my URL to conform to the new format, but this either 
> resulted in an error or a job that failed:
> {code}
> ./bin/spark-submit --deploy-mode cluster --master 
> spark://joshs-mbp.att.net:7077 --class org.apache.spark.examples.SparkPi 
> file:///Users/joshrosen/Documents/Spark/examples/target/scala-2.10/spark-examples_2.10-1.1.2-SNAPSHOT.jar
> Jar url 
> 'file:///Users/joshrosen/Documents/Spark/examples/target/scala-2.10/spark-examples_2.10-1.1.2-SNAPSHOT.jar'
>  is not in valid format.
> Must be a jar file path in URL format (e.g. hdfs://XX.jar, file://XX.jar)
> {code}
> If I omit the extra slash:
> {code}
> ./bin/spark-submit --deploy-mode cluster --master 
> spark://joshs-mbp.att.net:7077 --class org.apache.spark.examples.SparkPi 
> file://Users/joshrosen/Documents/Spark/examples/target/scala-2.10/spark-examples_2.10-1.1.2-SNAPSHOT.jar
> Sending launch command to spark://joshs-mbp.att.net:7077
> Driver successfully submitted as driver-20141116143235-0002
> ... waiting before polling master for driver state
> ... polling master for driver state
> State of driver-20141116143235-0002 is ERROR
> Exception from cluster was: java.lang.IllegalArgumentException: Wrong FS: 
> file://Users/joshrosen/Documents/Spark/examples/target/scala-2.10/spark-examples_2.10-1.1.2-SNAPSHOT.jar,
>  expected: file:///
> java.lang.IllegalArgumentException: Wrong FS: 
> file://Users/joshrosen/Documents/Spark/examples/target/scala-2.10/spark-examples_2.10-1.1.2-SNAPSHOT.jar,
>  expected: file:///
>   at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:381)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:55)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:393)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
>   at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:329)
>   at 
> org.apache.spark.deploy.worker.DriverRunner.org$apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:157)
>   at 
> org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:74)
> {code}
> This bug effectively prevents users from using {{spark-submit}} in cluster 
> mode to run drivers whose JARs are stored on shared cluster filesystems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-4326) unidoc is broken on master

2014-11-13 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-4326.
---
   Resolution: Fixed
Fix Version/s: 1.2.0
 Assignee: Xiangrui Meng

> unidoc is broken on master
> --
>
> Key: SPARK-4326
> URL: https://issues.apache.org/jira/browse/SPARK-4326
> Project: Spark
>  Issue Type: Bug
>  Components: Build, Documentation
>Affects Versions: 1.3.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Critical
> Fix For: 1.2.0
>
>
> On master, `jekyll build` throws the following error:
> {code}
> [error] 
> /Users/meng/src/spark/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala:205:
>  value hashInt is not a member of com.google.common.hash.HashFunction
> [error]   private def rehash(h: Int): Int = 
> Hashing.murmur3_32().hashInt(h).asInt()
> [error]  ^
> [error] 
> /Users/meng/src/spark/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala:426:
>  value limit is not a member of object com.google.common.io.ByteStreams
> [error] val bufferedStream = new 
> BufferedInputStream(ByteStreams.limit(fileStream, end - start))
> [error]  ^
> [error] 
> /Users/meng/src/spark/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala:558:
>  value limit is not a member of object com.google.common.io.ByteStreams
> [error] val bufferedStream = new 
> BufferedInputStream(ByteStreams.limit(fileStream, end - start))
> [error]  ^
> [error] 
> /Users/meng/src/spark/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala:261:
>  value hashInt is not a member of com.google.common.hash.HashFunction
> [error]   private def hashcode(h: Int): Int = 
> Hashing.murmur3_32().hashInt(h).asInt()
> [error]^
> [error] 
> /Users/meng/src/spark/core/src/main/scala/org/apache/spark/util/collection/Utils.scala:37:
>  type mismatch;
> [error]  found   : java.util.Iterator[T]
> [error]  required: Iterable[?]
> [error] collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), 
> num)).iterator
> [error]  ^
> [error] 
> /Users/meng/src/spark/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:421:
>  value putAll is not a member of 
> com.google.common.cache.Cache[org.apache.hadoop.fs.FileStatus,parquet.hadoop.Footer]
> [error]   footerCache.putAll(newFooters)
> [error]   ^
> [warn] 
> /Users/meng/src/spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala:34:
>  @deprecated now takes two arguments; see the scaladoc.
> [warn] @deprecated("No code should depend on FakeParquetHiveSerDe as it is 
> only intended as a " +
> [warn]  ^
> [info] No documentation generated with unsucessful compiler run
> [warn] two warnings found
> [error] 6 errors found
> [error] (spark/scalaunidoc:doc) Scaladoc generation failed
> [error] Total time: 48 s, completed Nov 10, 2014 1:31:01 PM
> {code}
> It doesn't happen on branch-1.2.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2468) Netty-based block server / client module

2014-11-13 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14210074#comment-14210074
 ] 

Aaron Davidson edited comment on SPARK-2468 at 11/13/14 5:48 PM:
-

Here is my spark configuration for the test, 32 cores total (note that this is 
test-only configuration to maximize throughput, I would not recommend these 
settings for real workloads):

spark.shuffle.io.clientThreads =16,
spark.shuffle.io.serverThreads =16,
spark.serializer = "org.apache.spark.serializer.KryoSerializer",
spark.shuffle.blockTransferService = "netty",
spark.shuffle.compress = false,
spark.shuffle.io.maxRetries = 0,
spark.reducer.maxMbInFlight = 512

Forgot to mention, but #3155 now automatically sets 
spark.shuffle.io.clientThreads and spark.shuffle.io.serverThreads based on the 
number of cores the Executor has allotted to it. You can  override it by 
setting those properties by hand, but ideally the default behavior is 
sufficient.


was (Author: ilikerps):
Here is my spark configuration (note 32 cores total):
spark.shuffle.io.clientThreads =16,
spark.shuffle.io.serverThreads =16,
spark.serializer = "org.apache.spark.serializer.KryoSerializer",
spark.shuffle.blockTransferService = "netty",
spark.shuffle.compress = false,
spark.shuffle.io.maxRetries = 0,
spark.reducer.maxMbInFlight = 512

Forgot to mention, but #3155 now automatically sets 
spark.shuffle.io.clientThreads and spark.shuffle.io.serverThreads based on the 
number of cores the Executor has allotted to it. You can  override it by 
setting those properties by hand, but ideally the default behavior is 
sufficient.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-13 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14210074#comment-14210074
 ] 

Aaron Davidson commented on SPARK-2468:
---

Here is my spark configuration (note 32 cores total):
spark.shuffle.io.clientThreads =16,
spark.shuffle.io.serverThreads =16,
spark.serializer = "org.apache.spark.serializer.KryoSerializer",
spark.shuffle.blockTransferService = "netty",
spark.shuffle.compress = false,
spark.shuffle.io.maxRetries = 0,
spark.reducer.maxMbInFlight = 512

Forgot to mention, but #3155 now automatically sets 
spark.shuffle.io.clientThreads and spark.shuffle.io.serverThreads based on the 
number of cores the Executor has allotted to it. You can  override it by 
setting those properties by hand, but ideally the default behavior is 
sufficient.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-12 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14209407#comment-14209407
 ] 

Aaron Davidson commented on SPARK-2468:
---

My test was significantly less strict in its memory requirements, which may be 
the difference with respect to OOMs. I used two 28GB containers on different 
machines, with 24GB of that given to Spark's heap. Due to the networking of the 
containers, the maximum throughput was around 7Gb/s (combined directionally), 
which I was able to saturate using Netty but could only achieve around 3.5Gb/s 
(combined) using Nio.

My test was a sort of 50GB generated data shuffled between the two machines. I 
tested the sort as a whole as well as a different version where I injected a 
deserializer which immediately EOFs (this causes us to still read all data but 
do no computation on the reducer side, maximizing network throughput).

Here is my full test, including the no-op deserializer:

{code}
import org.apache.spark.SparkConf
import org.apache.spark.serializer.{Serializer, SerializerInstance, 
SerializationStream, DeserializationStream}
import java.io._
import java.nio.ByteBuffer
import scala.reflect.ClassTag

class NoOpReadSerializer(conf: SparkConf) extends Serializer with Serializable {
  override def newInstance(): SerializerInstance = {
new NoOpReadSerializerInstance()
  }
}

class NoOpReadSerializerInstance()
  extends SerializerInstance {

  override def serialize[T: ClassTag](t: T): ByteBuffer = {
val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
ByteBuffer.wrap(bos.toByteArray)
  }

  override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
null.asInstanceOf[T]
  }

  override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: 
ClassLoader): T = {
null.asInstanceOf[T]
  }

  override def serializeStream(s: OutputStream): SerializationStream = {
new NoOpSerializationStream(s, 100)
  }

  override def deserializeStream(s: InputStream): DeserializationStream = {
new NoOpDeserializationStream(s, 
Thread.currentThread().getContextClassLoader)
  }

  def deserializeStream(s: InputStream, loader: ClassLoader): 
DeserializationStream = {
new NoOpDeserializationStream(s, loader)
  }
}

class NoOpDeserializationStream(in: InputStream, loader: ClassLoader)
  extends DeserializationStream {
  def readObject[T: ClassTag](): T = throw new EOFException()
  def close() { }
}

class NoOpSerializationStream(out: OutputStream, counterReset: Int) extends 
SerializationStream {
  private val objOut = new ObjectOutputStream(out)
  private var counter = 0

  def writeObject[T: ClassTag](t: T): SerializationStream = {
objOut.writeObject(t)
counter += 1
if (counterReset > 0 && counter >= counterReset) {
  objOut.reset()
  counter = 0
}
this
  }

  def flush() { objOut.flush() }
  def close() { objOut.close() }
}


// Test code below:
implicit val arrayOrdering = Ordering.by((_: Array[Byte]).toIterable)
def createSort() = sc.parallelize( 0 until 500, 320).map { x : Int =>
  val rand = new scala.util.Random(System.nanoTime())
  val bytes = new Array[Byte](1)
  rand.nextBytes(bytes)
  (bytes, 1)
}.sortByKey(true, 333)

val x = createSort()
x.count() // does shuffle + sorting on reduce side

val y = createSort().asInstanceOf[org.apache.spark.rdd.ShuffledRDD[_, _, 
_]].setSerializer(new NoOpReadSerializer(sc.getConf))
y.count() // does shuffle with no read-side computation (warning: causes FD 
leak in Spark!)
{code}

Note that if you run that with less memory, you may have to tun the number of 
partitions or size of data to avoid invoking the ExternalSorter. I observed 
very little GC and no significant heap/process growth in memory after the first 
run.

I will try another test where the memory is more constrained to further 
investigate the OOM problem.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One po

[jira] [Commented] (SPARK-4326) unidoc is broken on master

2014-11-12 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14209242#comment-14209242
 ] 

Aaron Davidson commented on SPARK-4326:
---

[~mengxr] perhaps you pointed to the wrong commit, here I added guava 11.0.2 
specifically:
https://github.com/apache/spark/commit/4c42986cc070d9c5c55c7bf8a2a67585967b1082#diff-e1e0dc857976f8b64c5f3a99435ff947R53

I can't explain why this would be causing this issue, as the dependency should 
be overridden as Marcelo said. However, this dependency is actually not 
necessary, it's only to avoid people writing code that won't compile in YARN or 
other Hadoop versions. We could just remove it and find some other way to deal 
with people possibly writing invalid code.

The second issue regarding compile errors in the shuffle protocol is even 
weirder, as that shouldn't require any dependencies and is simply a java 
compiler thing. Maybe it doesn't compile on a sufficiently old JDK? Perhaps an 
incorrect version of SBT?

> unidoc is broken on master
> --
>
> Key: SPARK-4326
> URL: https://issues.apache.org/jira/browse/SPARK-4326
> Project: Spark
>  Issue Type: Bug
>  Components: Build, Documentation
>Affects Versions: 1.3.0
>Reporter: Xiangrui Meng
>Priority: Critical
>
> On master, `jekyll build` throws the following error:
> {code}
> [error] 
> /Users/meng/src/spark/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala:205:
>  value hashInt is not a member of com.google.common.hash.HashFunction
> [error]   private def rehash(h: Int): Int = 
> Hashing.murmur3_32().hashInt(h).asInt()
> [error]  ^
> [error] 
> /Users/meng/src/spark/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala:426:
>  value limit is not a member of object com.google.common.io.ByteStreams
> [error] val bufferedStream = new 
> BufferedInputStream(ByteStreams.limit(fileStream, end - start))
> [error]  ^
> [error] 
> /Users/meng/src/spark/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala:558:
>  value limit is not a member of object com.google.common.io.ByteStreams
> [error] val bufferedStream = new 
> BufferedInputStream(ByteStreams.limit(fileStream, end - start))
> [error]  ^
> [error] 
> /Users/meng/src/spark/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala:261:
>  value hashInt is not a member of com.google.common.hash.HashFunction
> [error]   private def hashcode(h: Int): Int = 
> Hashing.murmur3_32().hashInt(h).asInt()
> [error]^
> [error] 
> /Users/meng/src/spark/core/src/main/scala/org/apache/spark/util/collection/Utils.scala:37:
>  type mismatch;
> [error]  found   : java.util.Iterator[T]
> [error]  required: Iterable[?]
> [error] collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), 
> num)).iterator
> [error]  ^
> [error] 
> /Users/meng/src/spark/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala:421:
>  value putAll is not a member of 
> com.google.common.cache.Cache[org.apache.hadoop.fs.FileStatus,parquet.hadoop.Footer]
> [error]   footerCache.putAll(newFooters)
> [error]   ^
> [warn] 
> /Users/meng/src/spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala:34:
>  @deprecated now takes two arguments; see the scaladoc.
> [warn] @deprecated("No code should depend on FakeParquetHiveSerDe as it is 
> only intended as a " +
> [warn]  ^
> [info] No documentation generated with unsucessful compiler run
> [warn] two warnings found
> [error] 6 errors found
> [error] (spark/scalaunidoc:doc) Scaladoc generation failed
> [error] Total time: 48 s, completed Nov 10, 2014 1:31:01 PM
> {code}
> It doesn't happen on branch-1.2.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4370) Limit cores used by Netty transfer service based on executor size

2014-11-12 Thread Aaron Davidson (JIRA)
Aaron Davidson created SPARK-4370:
-

 Summary: Limit cores used by Netty transfer service based on 
executor size
 Key: SPARK-4370
 URL: https://issues.apache.org/jira/browse/SPARK-4370
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Aaron Davidson
Assignee: Aaron Davidson
Priority: Critical


Right now, the NettyBlockTransferService uses the total number of cores on the 
system as the number of threads and buffer arenas to create. The latter is more 
troubling -- this can lead to significant allocation of extra heap and direct 
memory in situations where executors are relatively small compared to the whole 
machine. For instance, on a machine with 32 cores, we will allocate (32 cores * 
16MB per arena = 512MB) * 2 for client and server = 1GB direct and heap memory. 
This can be a huge overhead if you're only using, say, 8 of those cores.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-11 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14207774#comment-14207774
 ] 

Aaron Davidson commented on SPARK-2468:
---

[~zzcclp] Ah, man, that's not good. Just to be certain, you ran your last test 
with the patch #3155 applied? We shouldn't be able to allocate more than 96MB 
of off-heap memory if so, which should be well within the 1GB you had left over 
between the 12GB Spark heap and 13GB YARN container.

[~lianhuiwang] Were you able to re-run the test at any point? I ran a simple 
benchmark on ec2 and did not see any regressions from earlier, so if you're 
still seeing perf being worse than NIO, that suggests it may be 
workload-specific, making it harder to reproduce.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-1830) Deploy failover, Make Persistence engine and LeaderAgent Pluggable.

2014-11-11 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-1830.
---
   Resolution: Fixed
Fix Version/s: (was: 1.2.0)
   (was: 1.0.1)
   1.3.0
 Assignee: Prashant Sharma

> Deploy failover, Make Persistence engine and LeaderAgent Pluggable.
> ---
>
> Key: SPARK-1830
> URL: https://issues.apache.org/jira/browse/SPARK-1830
> Project: Spark
>  Issue Type: New Feature
>  Components: Deploy
>Reporter: Prashant Sharma
>Assignee: Prashant Sharma
> Fix For: 1.3.0
>
>
> With current code base it is difficult to plugin an external user specified 
> "Persistence Engine" or "Election Agent". It would be good to expose this as 
> a pluggable API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-4307) Initialize FileDescriptor lazily in FileRegion

2014-11-11 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-4307.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

> Initialize FileDescriptor lazily in FileRegion
> --
>
> Key: SPARK-4307
> URL: https://issues.apache.org/jira/browse/SPARK-4307
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Reporter: Reynold Xin
>Assignee: Reynold Xin
> Fix For: 1.2.0
>
>
> We use Netty's DefaultFileRegion to do zero copy send. However, 
> DefaultFileRegion requires a FileDescriptor, which results in a large number 
> of opened files in larger workloads.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-07 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14202490#comment-14202490
 ] 

Aaron Davidson commented on SPARK-2468:
---

[~lianhuiwang] Can you try again with preferDirectBufs set to true, and just 
setting maxUsableCores down to the number of cores each container actually has? 
It's possible the performance discrepancy you're seeing is simply due to heap 
byte buffers not being as fast as direct ones. You might also decrease the Java 
heap size a bit while keeping the container size the same, if _any_ direct 
memory allocation is causing the container to be killed.

[~zzcclp] Same suggestion for you about setting preferDirectBufs to true and 
setting maxUsableCores down, but I will also perform another round of 
benchmarking -- it's possible we accidentally introduced a performance 
regression in the last few patches. 

Comparing Hadoop vs Spark performance is a different matter. A few suggestions 
on your setup: You should set executor-cores to 5, so that each executor is 
actually using 5 cores instead of just 1. You're losing significant parallelism 
because of this setting, as Spark will only launch 1 task per core on an 
executor at any given time. Second, groupBy() is inefficient (it's doc was 
changed recently to reflect this), and should be avoided. I would recommend 
changing your job to sort the whole RDD using something similar to 
{code}mapR.map { x => ((x._1._1, x._2._1), x) }.sortByKey(){code}, which would 
not require that all values for a single group fit in memory. This would still 
effectively group by x._1._1, but would sort within each group by x._2._1, and 
would utilize Spark's efficient sorting machinery.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-07 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14201849#comment-14201849
 ] 

Aaron Davidson commented on SPARK-2468:
---

[~zzcclp] Thank you for the writeup. Is it really the case that each of your 
executors is only using 1 core for its 20GB of RAM? It seems like 5 would be in 
line with the portion of memory you're using. Also, the sum of your storage and 
memory fractions exceed 1, so if you're caching any data and then performing a 
reduction/groupBy, you could actually see an OOM even without this other issue. 
I would recommend keeping shuffle fraction relatively low unless you have a 
good reason not to, as it can lead to increased instability.

The numbers are relatively close to my expectations, which would estimate netty 
allocating around 750MB of direct buffer space, thinking that it has 24 cores. 
With #3155 and maxUsableCores set to 1 (or 5), I hope this issue may be 
resolved.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-07 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14201832#comment-14201832
 ] 

Aaron Davidson commented on SPARK-2468:
---

[~lianhuiwang] I have created 
[#3155|https://github.com/apache/spark/pull/3155/files], which I will clean up 
and try to get in tomorrow, which makes the preferDirectBufs config forcefully 
disable direct byte buffers from both the server and client pools. 
Additionally, I have added the conf "spark.shuffle.io.maxUsableCores" which 
should allow you to inform the executor how many cores you're actually using, 
so it will avoid allocating enough memory for all the machine's cores. 

I hope that simply specifying the maxUsableCores is sufficient to actually fix 
this issue for you, but the combination should give a higher chance of success.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-07 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14201812#comment-14201812
 ] 

Aaron Davidson commented on SPARK-2468:
---

Looking at the netty code a bit more, it seems that they might unconditionally 
allocate direct buffers for IO, whether or not direct is "preferred". 
Additionally, they allocate more memory based on the number of cores in your 
system. The default settings would be roughly 16MB per core, and this might be 
multiplied by 2 in our current setup since we have independent client and 
server pools in the same JVM. I'm not certain how executors running in YARN 
report "availableProcessors", but is it possible your machines have 32 or 
greater cores? This could cause an extra allocation of around 1GB direct heap 
buffers.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-07 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14201783#comment-14201783
 ] 

Aaron Davidson commented on SPARK-2468:
---

Thanks a lot for those diagnostics. Can you confirm that 
"spark.shuffle.io.preferDirectBufs" does show up in the UI as being set 
properly? Does your workload mainly involve a large shuffle? How big is each 
partition/how many are there? In addition to the netty buffers (which _should_ 
be disabled by the config), we also memory map shuffle blocks larger than 2MB.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-07 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14201774#comment-14201774
 ] 

Aaron Davidson commented on SPARK-2468:
---

Yup, that would work.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-07 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14201766#comment-14201766
 ] 

Aaron Davidson commented on SPARK-2468:
---

[~zzcclp] Yes, please do. What's the memory of your YARN executors/containers? 
With preferDirectBufs off, we should allocate little to no off-heap memory, so 
these results are surprising.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-4242) Add SASL to external shuffle service

2014-11-06 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-4242.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

> Add SASL to external shuffle service
> 
>
> Key: SPARK-4242
> URL: https://issues.apache.org/jira/browse/SPARK-4242
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
> Fix For: 1.2.0
>
>
> It's already added to NettyBlockTransferService, let's just add it to 
> ExternalShuffleClient as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-4236) External shuffle service must cleanup its shuffle files

2014-11-06 Thread Aaron Davidson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aaron Davidson resolved SPARK-4236.
---
   Resolution: Fixed
Fix Version/s: 1.2.0
 Assignee: Aaron Davidson

> External shuffle service must cleanup its shuffle files
> ---
>
> Key: SPARK-4236
> URL: https://issues.apache.org/jira/browse/SPARK-4236
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
>Priority: Critical
> Fix For: 1.2.0
>
>
> When external shuffle service is enabled, executors no longer cleanup their 
> own shuffle files, as the external server can continue to serve them. The 
> external serer must clean these files up once it knows that they will never 
> be used again (i.e., when the application terminates or when Spark's 
> ContextCleaner requests that they be deleted).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2468) Netty-based block server / client module

2014-11-06 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14201432#comment-14201432
 ] 

Aaron Davidson commented on SPARK-2468:
---

[~zzcclp] I believe it is close to merging. [~rxin] is finishing up his review.

> Netty-based block server / client module
> 
>
> Key: SPARK-2468
> URL: https://issues.apache.org/jira/browse/SPARK-2468
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>Priority: Critical
> Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4287) Add TTL-based cleanup in external shuffle service

2014-11-06 Thread Aaron Davidson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14201428#comment-14201428
 ] 

Aaron Davidson commented on SPARK-4287:
---

I think the right solution here might be to supplement the ContextCleaner (or 
BlockManagerMaster) with the notion of the external shuffle service, and for it 
to make a request to the external shuffle service to clean up the appropriate 
shuffle files. This should hopefully be a relatively small change.

> Add TTL-based cleanup in external shuffle service
> -
>
> Key: SPARK-4287
> URL: https://issues.apache.org/jira/browse/SPARK-4287
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Andrew Or
>
> A problem with long-running SparkContexts using the external shuffle service 
> is that its shuffle files may never be cleaned. This is because the 
> ContextCleaner may no longer have executors to go through to clean up these 
> files (they may be killed intentionally). We should have a TTL-based timeout 
> that does this as a backup, defaulting to a timeout of a week or something.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   3   >