[jira] [Commented] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2018-01-27 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342491#comment-16342491
 ] 

Hyukjin Kwon commented on SPARK-23128:
--

I think we need a SPIP too.

> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Carson Wang
>Priority: Major
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and 
> address the limitations. The idea is described at 
> [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21727) Operating on an ArrayType in a SparkR DataFrame throws error

2018-01-27 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342487#comment-16342487
 ] 

Hyukjin Kwon commented on SPARK-21727:
--

(y)

> Operating on an ArrayType in a SparkR DataFrame throws error
> 
>
> Key: SPARK-21727
> URL: https://issues.apache.org/jira/browse/SPARK-21727
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.2.0
>Reporter: Neil Alexander McQuarrie
>Assignee: Neil Alexander McQuarrie
>Priority: Major
> Fix For: 2.3.0, 2.4.0
>
>
> Previously 
> [posted|https://stackoverflow.com/questions/45056973/sparkr-dataframe-with-r-lists-as-elements]
>  this as a stack overflow question but it seems to be a bug.
> If I have an R data.frame where one of the column data types is an integer 
> *list* -- i.e., each of the elements in the column embeds an entire R list of 
> integers -- then it seems I can convert this data.frame to a SparkR DataFrame 
> just fine... SparkR treats the column as ArrayType(Double). 
> However, any subsequent operation on this SparkR DataFrame appears to throw 
> an error.
> Create an example R data.frame:
> {code}
> indices <- 1:4
> myDf <- data.frame(indices)
> myDf$data <- list(rep(0, 20))}}
> {code}
> Examine it to make sure it looks okay:
> {code}
> > str(myDf) 
> 'data.frame':   4 obs. of  2 variables:  
>  $ indices: int  1 2 3 4  
>  $ data   :List of 4
>..$ : num  0 0 0 0 0 0 0 0 0 0 ...
>..$ : num  0 0 0 0 0 0 0 0 0 0 ...
>..$ : num  0 0 0 0 0 0 0 0 0 0 ...
>..$ : num  0 0 0 0 0 0 0 0 0 0 ...
> > head(myDf)   
>   indices   data 
> 1   1 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 
> 2   2 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 
> 3   3 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 
> 4   4 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
> {code}
> Convert it to a SparkR DataFrame:
> {code}
> library(SparkR, lib.loc=paste0(Sys.getenv("SPARK_HOME"),"/R/lib"))
> sparkR.session(master = "local[*]")
> mySparkDf <- as.DataFrame(myDf)
> {code}
> Examine the SparkR DataFrame schema; notice that the list column was 
> successfully converted to ArrayType:
> {code}
> > schema(mySparkDf)
> StructType
> |-name = "indices", type = "IntegerType", nullable = TRUE
> |-name = "data", type = "ArrayType(DoubleType,true)", nullable = TRUE
> {code}
> However, operating on the SparkR DataFrame throws an error:
> {code}
> > collect(mySparkDf)
> 17/07/13 17:23:00 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 
> (TID 1)
> java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: 
> java.lang.Double is not a valid external type for schema of array
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
> else validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
> org.apache.spark.sql.Row, true]), 0, indices), IntegerType) AS indices#0
> ... long stack trace ...
> {code}
> Using Spark 2.2.0, R 3.4.0, Java 1.8.0_131, Windows 10.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22947) SPIP: as-of join in Spark SQL

2018-01-27 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342485#comment-16342485
 ] 

Hyukjin Kwon commented on SPARK-22947:
--

cc [~hvanhovell] too for your information.

> SPIP: as-of join in Spark SQL
> -
>
> Key: SPARK-22947
> URL: https://issues.apache.org/jira/browse/SPARK-22947
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Li Jin
>Priority: Major
> Attachments: SPIP_ as-of join in Spark SQL (1).pdf
>
>
> h2. Background and Motivation
> Time series analysis is one of the most common analysis on financial data. In 
> time series analysis, as-of join is a very common operation. Supporting as-of 
> join in Spark SQL will allow many use cases of using Spark SQL for time 
> series analysis.
> As-of join is “join on time” with inexact time matching criteria. Various 
> library has implemented asof join or similar functionality:
> Kdb: https://code.kx.com/wiki/Reference/aj
> Pandas: 
> http://pandas.pydata.org/pandas-docs/version/0.19.0/merging.html#merging-merge-asof
> R: This functionality is called “Last Observation Carried Forward”
> https://www.rdocumentation.org/packages/zoo/versions/1.8-0/topics/na.locf
> JuliaDB: http://juliadb.org/latest/api/joins.html#IndexedTables.asofjoin
> Flint: https://github.com/twosigma/flint#temporal-join-functions
> This proposal advocates introducing new API in Spark SQL to support as-of 
> join.
> h2. Target Personas
> Data scientists, data engineers
> h2. Goals
> * New API in Spark SQL that allows as-of join
> * As-of join of multiple table (>2) should be performant, because it’s very 
> common that users need to join multiple data sources together for further 
> analysis.
> * Define Distribution, Partitioning and shuffle strategy for ordered time 
> series data
> h2. Non-Goals
> These are out of scope for the existing SPIP, should be considered in future 
> SPIP as improvement to Spark’s time series analysis ability:
> * Utilize partition information from data source, i.e, begin/end of each 
> partition to reduce sorting/shuffling
> * Define API for user to implement asof join time spec in business calendar 
> (i.e. lookback one business day, this is very common in financial data 
> analysis because of market calendars)
> * Support broadcast join
> h2. Proposed API Changes
> h3. TimeContext
> TimeContext is an object that defines the time scope of the analysis, it has 
> begin time (inclusive) and end time (exclusive). User should be able to 
> change the time scope of the analysis (i.e, from one month to five year) by 
> just changing the TimeContext. 
> To Spark engine, TimeContext is a hint that:
> can be used to repartition data for join
> serve as a predicate that can be pushed down to storage layer
> Time context is similar to filtering time by begin/end, the main difference 
> is that time context can be expanded based on the operation taken (see 
> example in as-of join).
> Time context example:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> {code}
> h3. asofJoin
> h4. User Case A (join without key)
> Join two DataFrames on time, with one day lookback:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = JoinSpec(timeContext).on("time").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output:
> {code:java}
> dfA:
> time, quantity
> 20160101, 100
> 20160102, 50
> 20160104, -50
> 20160105, 100
> dfB:
> time, price
> 20151231, 100.0
> 20160104, 105.0
> 20160105, 102.0
> output:
> time, quantity, price
> 20160101, 100, 100.0
> 20160102, 50, null
> 20160104, -50, 105.0
> 20160105, 100, 102.0
> {code}
> Note row (20160101, 100) of dfA is joined with (20151231, 100.0) of dfB. This 
> is an important illustration of the time context - it is able to expand the 
> context to 20151231 on dfB because of the 1 day lookback.
> h4. Use Case B (join with key)
> To join on time and another key (for instance, id), we use “by” to specify 
> the key.
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = 
> JoinSpec(timeContext).on("time").by("id").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output:
> {code:java}
> dfA:
> time, id, quantity
> 20160101, 1, 100
> 20160101, 2, 50
> 20160102, 1, -50
> 20160102, 2, 50
> dfB:
> time, id, price
> 20151231, 1, 100.0
> 20150102, 1, 105.0
> 20150102, 2, 195.0
> Output:
> time, id, quantity, price
> 20160101, 1, 100, 100.0
> 20160101, 2, 50, null
> 20160102, 1, -50, 105.0
> 20160102, 2, 50, 195.0
> {code}
> h2. Optional Design Sketch
> h3. Implementation A
> (This is just initial thought of how to implement this)
> 

[jira] [Resolved] (SPARK-23201) Cannot create view when duplicate columns exist in subquery

2018-01-27 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-23201.
--
Resolution: Cannot Reproduce

> Cannot create view when duplicate columns exist in subquery
> ---
>
> Key: SPARK-23201
> URL: https://issues.apache.org/jira/browse/SPARK-23201
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.3
>Reporter: Johannes Mayer
>Priority: Critical
>
> I have two tables A(colA, col2, col3), B(colB, col3, col5)
> If i join them in a subquery on A.colA = B.colB i can select the non 
> duplicate columns, but i cannot create a view (col3 is duplicate, but not 
> selected)
>  
> {code:java}
> create view testview as select
> tmp.colA, tmp.col2, tmp.colB, tmp.col5
> from (
> select * from A left join B
> on (A.colA = B.colB)
> )
> {code}
>  
>  
> This works:
>  
> {code:java}
> select
> tmp.colA, tmp.col2, tmp.colB, tmp.col5
> from (
> select * from A left join B
> on (A.colA = B.colB)
> )
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23114) Spark R 2.3 QA umbrella

2018-01-27 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-23114.
--
Resolution: Done

Thanks for bearing with me.

> Spark R 2.3 QA umbrella
> ---
>
> Key: SPARK-23114
> URL: https://issues.apache.org/jira/browse/SPARK-23114
> Project: Spark
>  Issue Type: Umbrella
>  Components: Documentation, SparkR
>Reporter: Joseph K. Bradley
>Assignee: Felix Cheung
>Priority: Critical
>
> This JIRA lists tasks for the next Spark release's QA period for SparkR.
> The list below gives an overview of what is involved, and the corresponding 
> JIRA issues are linked below that.
> h2. API
> * Audit new public APIs (from the generated html doc)
> ** relative to Spark Scala/Java APIs
> ** relative to popular R libraries
> h2. Documentation and example code
> * For new algorithms, create JIRAs for updating the user guide sections & 
> examples
> * Update Programming Guide
> * Update website



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-23114) Spark R 2.3 QA umbrella

2018-01-27 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reopened SPARK-23114:
--

> Spark R 2.3 QA umbrella
> ---
>
> Key: SPARK-23114
> URL: https://issues.apache.org/jira/browse/SPARK-23114
> Project: Spark
>  Issue Type: Umbrella
>  Components: Documentation, SparkR
>Reporter: Joseph K. Bradley
>Assignee: Felix Cheung
>Priority: Critical
>
> This JIRA lists tasks for the next Spark release's QA period for SparkR.
> The list below gives an overview of what is involved, and the corresponding 
> JIRA issues are linked below that.
> h2. API
> * Audit new public APIs (from the generated html doc)
> ** relative to Spark Scala/Java APIs
> ** relative to popular R libraries
> h2. Documentation and example code
> * For new algorithms, create JIRAs for updating the user guide sections & 
> examples
> * Update Programming Guide
> * Update website



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23187) Accumulator object can not be sent from Executor to Driver

2018-01-27 Thread Lantao Jin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342463#comment-16342463
 ] 

Lantao Jin edited comment on SPARK-23187 at 1/28/18 6:15 AM:
-

[~jerryshao]
{quote}
So I guess it is because UI doesn't display your registered accumulator in time
{quote}
I don't think so. I hard code some logs in driver instead of checking UI.
Please check the value of accumUpdates in method executorHeartbeatReceived() of 
TaskSchedulerImpl. The value set in reportHeartBeat() can't be received here.


was (Author: cltlfcjin):
[~jerryshao]
{quota}
So I guess it is because UI doesn't display your registered accumulator in time
{quota}
I don't think so. I hard code some logs in driver instead of checking UI.
Please check the value of accumUpdates in method executorHeartbeatReceived() of 
TaskSchedulerImpl. The value set in reportHeartBeat() can't be received here.

> Accumulator object can not be sent from Executor to Driver
> --
>
> Key: SPARK-23187
> URL: https://issues.apache.org/jira/browse/SPARK-23187
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.1, 2.3.0
>Reporter: Lantao Jin
>Priority: Major
>
> In the Executor.scala->reportHeartBeat(), task Metrics value can not be sent 
> to Driver (In receive side all values are zero).
> I write an UT for explanation.
> {code}
> diff --git 
> a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala 
> b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> index f9481f8..57fb096 100644
> --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> @@ -17,11 +17,16 @@
>  package org.apache.spark.rpc.netty
> +import scala.collection.mutable.ArrayBuffer
> +
>  import org.scalatest.mockito.MockitoSugar
>  import org.apache.spark._
>  import org.apache.spark.network.client.TransportClient
>  import org.apache.spark.rpc._
> +import org.apache.spark.util.AccumulatorContext
> +import org.apache.spark.util.AccumulatorV2
> +import org.apache.spark.util.LongAccumulator
>  class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
> @@ -83,5 +88,21 @@ class NettyRpcEnvSuite extends RpcEnvSuite with 
> MockitoSugar {
>  assertRequestMessageEquals(
>msg3,
>RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
> +
> +val acc = new LongAccumulator
> +val sc = SparkContext.getOrCreate(new 
> SparkConf().setMaster("local").setAppName("testAcc"));
> +sc.register(acc, "testAcc")
> +acc.setValue(1)
> +//val msg4 = new RequestMessage(senderAddress, receiver, acc)
> +//assertRequestMessageEquals(
> +//  msg4,
> +//  RequestMessage(nettyEnv, client, msg4.serialize(nettyEnv)))
> +
> +val accbuf = new ArrayBuffer[AccumulatorV2[_, _]]()
> +accbuf += acc
> +val msg5 = new RequestMessage(senderAddress, receiver, accbuf)
> +assertRequestMessageEquals(
> +  msg5,
> +  RequestMessage(nettyEnv, client, msg5.serialize(nettyEnv)))
>}
>  }
> {code}
> msg4 and msg5 are all going to failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23187) Accumulator object can not be sent from Executor to Driver

2018-01-27 Thread Lantao Jin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342463#comment-16342463
 ] 

Lantao Jin commented on SPARK-23187:


[~jerryshao]
{quota}
So I guess it is because UI doesn't display your registered accumulator in time
{quota}
I don't think so. I hard code some logs in driver instead of checking UI.
Please check the value of accumUpdates in method executorHeartbeatReceived() of 
TaskSchedulerImpl. The value set in reportHeartBeat() can't be received here.

> Accumulator object can not be sent from Executor to Driver
> --
>
> Key: SPARK-23187
> URL: https://issues.apache.org/jira/browse/SPARK-23187
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.1, 2.3.0
>Reporter: Lantao Jin
>Priority: Major
>
> In the Executor.scala->reportHeartBeat(), task Metrics value can not be sent 
> to Driver (In receive side all values are zero).
> I write an UT for explanation.
> {code}
> diff --git 
> a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala 
> b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> index f9481f8..57fb096 100644
> --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> @@ -17,11 +17,16 @@
>  package org.apache.spark.rpc.netty
> +import scala.collection.mutable.ArrayBuffer
> +
>  import org.scalatest.mockito.MockitoSugar
>  import org.apache.spark._
>  import org.apache.spark.network.client.TransportClient
>  import org.apache.spark.rpc._
> +import org.apache.spark.util.AccumulatorContext
> +import org.apache.spark.util.AccumulatorV2
> +import org.apache.spark.util.LongAccumulator
>  class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
> @@ -83,5 +88,21 @@ class NettyRpcEnvSuite extends RpcEnvSuite with 
> MockitoSugar {
>  assertRequestMessageEquals(
>msg3,
>RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
> +
> +val acc = new LongAccumulator
> +val sc = SparkContext.getOrCreate(new 
> SparkConf().setMaster("local").setAppName("testAcc"));
> +sc.register(acc, "testAcc")
> +acc.setValue(1)
> +//val msg4 = new RequestMessage(senderAddress, receiver, acc)
> +//assertRequestMessageEquals(
> +//  msg4,
> +//  RequestMessage(nettyEnv, client, msg4.serialize(nettyEnv)))
> +
> +val accbuf = new ArrayBuffer[AccumulatorV2[_, _]]()
> +accbuf += acc
> +val msg5 = new RequestMessage(senderAddress, receiver, accbuf)
> +assertRequestMessageEquals(
> +  msg5,
> +  RequestMessage(nettyEnv, client, msg5.serialize(nettyEnv)))
>}
>  }
> {code}
> msg4 and msg5 are all going to failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23114) Spark R 2.3 QA umbrella

2018-01-27 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342441#comment-16342441
 ] 

Felix Cheung commented on SPARK-23114:
--

Sure!


> Spark R 2.3 QA umbrella
> ---
>
> Key: SPARK-23114
> URL: https://issues.apache.org/jira/browse/SPARK-23114
> Project: Spark
>  Issue Type: Umbrella
>  Components: Documentation, SparkR
>Reporter: Joseph K. Bradley
>Assignee: Felix Cheung
>Priority: Critical
>
> This JIRA lists tasks for the next Spark release's QA period for SparkR.
> The list below gives an overview of what is involved, and the corresponding 
> JIRA issues are linked below that.
> h2. API
> * Audit new public APIs (from the generated html doc)
> ** relative to Spark Scala/Java APIs
> ** relative to popular R libraries
> h2. Documentation and example code
> * For new algorithms, create JIRAs for updating the user guide sections & 
> examples
> * Update Programming Guide
> * Update website



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23226) Cannot read CSV file from Windows UNC network path

2018-01-27 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-23226.
--
Resolution: Not A Problem

> Cannot read CSV file from Windows UNC network path
> --
>
> Key: SPARK-23226
> URL: https://issues.apache.org/jira/browse/SPARK-23226
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
> Environment: Windows 10
>Reporter: Colin Breame
>Priority: Major
>
> The CSV reader cannot read a file using a Window UNC path.
> {code:java}
> val df = spark.read.csv(raw"""windows\\unc\\path\\to\\file.csv"""){code}
> Results in:
> {code:java}
> org.apache.spark.sql.AnalysisException: Path does not exist: 
> file:/windows/unc/path/to/file.csv;
> at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:382)
> at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.immutable.List.flatMap(List.scala:344)
> at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:415)
> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:352)
> ... 48 elided{code}
> The following also results in the same message:
> {code:java}
> val df = spark.read.csv(raw"""\\windows\\unc\\path\\to\\file.csv"""){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23240) PythonWorkerFactory issues unhelpful message when pyspark.daemon produces bogus stdout

2018-01-27 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342432#comment-16342432
 ] 

Hyukjin Kwon commented on SPARK-23240:
--

Actually [~bersprockets], now there is a workaround for a custom fix within 
daemon and worker - https://github.com/apache/spark/pull/20151 from 2.4.0.

> PythonWorkerFactory issues unhelpful message when pyspark.daemon produces 
> bogus stdout
> --
>
> Key: SPARK-23240
> URL: https://issues.apache.org/jira/browse/SPARK-23240
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.1
>Reporter: Bruce Robbins
>Priority: Minor
>
> Environmental issues or site-local customizations (i.e., sitecustomize.py 
> present in the python install directory) can interfere with daemon.py’s 
> output to stdout. PythonWorkerFactory produces unhelpful messages when this 
> happens, causing some head scratching before the actual issue is determined.
> Case #1: Extraneous data in pyspark.daemon’s stdout. In this case, 
> PythonWorkerFactory uses the output as the daemon’s port number and ends up 
> throwing an exception when creating the socket:
> {noformat}
> java.lang.IllegalArgumentException: port out of range:1819239265
>   at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
>   at java.net.InetSocketAddress.(InetSocketAddress.java:188)
>   at java.net.Socket.(Socket.java:244)
>   at 
> org.apache.spark.api.python.PythonWorkerFactory.createSocket$1(PythonWorkerFactory.scala:78)
> {noformat}
> Case #2: No data in pyspark.daemon’s stdout. In this case, 
> PythonWorkerFactory throws an EOFException exception reading the from the 
> Process input stream.
> The second case is somewhat less mysterious than the first, because 
> PythonWorkerFactory also displays the stderr from the python process.
> When there is unexpected or missing output in pyspark.daemon’s stdout, 
> PythonWorkerFactory should say so.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23114) Spark R 2.3 QA umbrella

2018-01-27 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342431#comment-16342431
 ] 

Hyukjin Kwon commented on SPARK-23114:
--

Sorry for nitpicking [~felixcheung]. Shall we leave this {{Done}} if you 
wouldn't mind?

> Spark R 2.3 QA umbrella
> ---
>
> Key: SPARK-23114
> URL: https://issues.apache.org/jira/browse/SPARK-23114
> Project: Spark
>  Issue Type: Umbrella
>  Components: Documentation, SparkR
>Reporter: Joseph K. Bradley
>Assignee: Felix Cheung
>Priority: Critical
>
> This JIRA lists tasks for the next Spark release's QA period for SparkR.
> The list below gives an overview of what is involved, and the corresponding 
> JIRA issues are linked below that.
> h2. API
> * Audit new public APIs (from the generated html doc)
> ** relative to Spark Scala/Java APIs
> ** relative to popular R libraries
> h2. Documentation and example code
> * For new algorithms, create JIRAs for updating the user guide sections & 
> examples
> * Update Programming Guide
> * Update website



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-16026) Cost-based Optimizer Framework

2018-01-27 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-16026.
--
Resolution: Done

> Cost-based Optimizer Framework
> --
>
> Key: SPARK-16026
> URL: https://issues.apache.org/jira/browse/SPARK-16026
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Zhenhua Wang
>Priority: Major
>  Labels: releasenotes
> Fix For: 2.3.0
>
> Attachments: Spark_CBO_Design_Spec.pdf
>
>
> This is an umbrella ticket to implement a cost-based optimizer framework 
> beyond broadcast join selection. This framework can be used to implement some 
> useful optimizations such as join reordering.
> The design should discuss how to break the work down into multiple, smaller 
> logical units. For example, changes to statistics class, system catalog, cost 
> estimation/propagation in expressions, cost estimation/propagation in 
> operators can be done in decoupled pull requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-16026) Cost-based Optimizer Framework

2018-01-27 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reopened SPARK-16026:
--

> Cost-based Optimizer Framework
> --
>
> Key: SPARK-16026
> URL: https://issues.apache.org/jira/browse/SPARK-16026
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Zhenhua Wang
>Priority: Major
>  Labels: releasenotes
> Fix For: 2.3.0
>
> Attachments: Spark_CBO_Design_Spec.pdf
>
>
> This is an umbrella ticket to implement a cost-based optimizer framework 
> beyond broadcast join selection. This framework can be used to implement some 
> useful optimizations such as join reordering.
> The design should discuss how to break the work down into multiple, smaller 
> logical units. For example, changes to statistics class, system catalog, cost 
> estimation/propagation in expressions, cost estimation/propagation in 
> operators can be done in decoupled pull requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23251) ClassNotFoundException: scala.Any when there's a missing implicit Map encoder

2018-01-27 Thread Bruce Robbins (JIRA)
Bruce Robbins created SPARK-23251:
-

 Summary: ClassNotFoundException: scala.Any when there's a missing 
implicit Map encoder
 Key: SPARK-23251
 URL: https://issues.apache.org/jira/browse/SPARK-23251
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.1
 Environment: mac os high sierra, centos 7
Reporter: Bruce Robbins


In branch-2.2, when you attempt to use row.getValuesMap[Any] without an 
implicit Map encoder, you get a nice descriptive compile-time error:
{noformat}
scala> df.map(row => row.getValuesMap[Any](List("stationName", "year"))).collect
:26: error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other types 
will be added in future releases.
       df.map(row => row.getValuesMap[Any](List("stationName", "year"))).collect
             ^
scala> implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, 
Any]]
mapEncoder: org.apache.spark.sql.Encoder[Map[String,Any]] = class[value[0]: 
binary]
scala> df.map(row => row.getValuesMap[Any](List("stationName", "year"))).collect
res1: Array[Map[String,Any]] = Array(Map(stationName -> 007026 9, year -> 
2014), Map(stationName -> 007026 9, year -> 2014), Map(stationName -> 
007026 9, year -> 2014),
etc...
{noformat}
 
 On the latest master and also on branch-2.3, the transformation compiles (at 
least on spark-shell), but throws a ClassNotFoundException:

 
{noformat}
scala> df.map(row => row.getValuesMap[Any](List("stationName", "year"))).collect
java.lang.ClassNotFoundException: scala.Any
 at 
scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at 
scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
 at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
 at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
 at 
scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
 at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
 at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
 at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
 at 
scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
 at 
scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
 at 
scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
 at 
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:512)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
 at 
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
 at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
 at org.apache.spark.sql.SQLImplicits.newMapEncoder(SQLImplicits.scala:172)
 ... 49 elided
scala> implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, 
Any]]
mapEncoder: org.apache.spark.sql.Encoder[Map[String,Any]] = 

[jira] [Resolved] (SPARK-23248) Relocate module docstrings to the top in PySpark examples

2018-01-27 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-23248.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

Issue resolved by pull request 20416
[https://github.com/apache/spark/pull/20416]

> Relocate module docstrings to the top in PySpark examples
> -
>
> Key: SPARK-23248
> URL: https://issues.apache.org/jira/browse/SPARK-23248
> Project: Spark
>  Issue Type: Improvement
>  Components: Examples, PySpark
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>Assignee: Hyukjin Kwon
>Priority: Minor
> Fix For: 2.3.0
>
>
> docstrings for some examples looks placed wrongly (under imports). For 
> example, if i run
> {code}
> >>> help(aft_survival_regression)
> {code}
> It shows as below:
> {code}
> Help on module aft_survival_regression:
> NAME
> aft_survival_regression
> ...
> DESCRIPTION
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> #
> ...
> (END)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23248) Relocate module docstrings to the top in PySpark examples

2018-01-27 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-23248:


Assignee: Hyukjin Kwon

> Relocate module docstrings to the top in PySpark examples
> -
>
> Key: SPARK-23248
> URL: https://issues.apache.org/jira/browse/SPARK-23248
> Project: Spark
>  Issue Type: Improvement
>  Components: Examples, PySpark
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>Assignee: Hyukjin Kwon
>Priority: Minor
> Fix For: 2.3.0
>
>
> docstrings for some examples looks placed wrongly (under imports). For 
> example, if i run
> {code}
> >>> help(aft_survival_regression)
> {code}
> It shows as below:
> {code}
> Help on module aft_survival_regression:
> NAME
> aft_survival_regression
> ...
> DESCRIPTION
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> #
> ...
> (END)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23250) Typo in JavaDoc/ScalaDoc for DataFrameWriter

2018-01-27 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23250:


Assignee: Apache Spark

> Typo in JavaDoc/ScalaDoc for DataFrameWriter
> 
>
> Key: SPARK-23250
> URL: https://issues.apache.org/jira/browse/SPARK-23250
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 2.1.1, 2.1.2, 2.2.0, 2.2.1
>Reporter: Charles Chen
>Assignee: Apache Spark
>Priority: Trivial
>
> JavaDoc/ScalaDoc for several methods in the DataFrameWriter class denote 
> "This is applicable for all file-based data sources (e.g. Parquet, JSON) 
> *staring* Spark 2.1.0" - should be "starting Spark 2.1.0". This typo is not 
> present in the Spark 2.1.0 docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23250) Typo in JavaDoc/ScalaDoc for DataFrameWriter

2018-01-27 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23250:


Assignee: (was: Apache Spark)

> Typo in JavaDoc/ScalaDoc for DataFrameWriter
> 
>
> Key: SPARK-23250
> URL: https://issues.apache.org/jira/browse/SPARK-23250
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 2.1.1, 2.1.2, 2.2.0, 2.2.1
>Reporter: Charles Chen
>Priority: Trivial
>
> JavaDoc/ScalaDoc for several methods in the DataFrameWriter class denote 
> "This is applicable for all file-based data sources (e.g. Parquet, JSON) 
> *staring* Spark 2.1.0" - should be "starting Spark 2.1.0". This typo is not 
> present in the Spark 2.1.0 docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23250) Typo in JavaDoc/ScalaDoc for DataFrameWriter

2018-01-27 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342397#comment-16342397
 ] 

Apache Spark commented on SPARK-23250:
--

User 'CCInCharge' has created a pull request for this issue:
https://github.com/apache/spark/pull/20417

> Typo in JavaDoc/ScalaDoc for DataFrameWriter
> 
>
> Key: SPARK-23250
> URL: https://issues.apache.org/jira/browse/SPARK-23250
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 2.1.1, 2.1.2, 2.2.0, 2.2.1
>Reporter: Charles Chen
>Priority: Trivial
>
> JavaDoc/ScalaDoc for several methods in the DataFrameWriter class denote 
> "This is applicable for all file-based data sources (e.g. Parquet, JSON) 
> *staring* Spark 2.1.0" - should be "starting Spark 2.1.0". This typo is not 
> present in the Spark 2.1.0 docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23114) Spark R 2.3 QA umbrella

2018-01-27 Thread Felix Cheung (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Felix Cheung resolved SPARK-23114.
--
Resolution: Fixed

> Spark R 2.3 QA umbrella
> ---
>
> Key: SPARK-23114
> URL: https://issues.apache.org/jira/browse/SPARK-23114
> Project: Spark
>  Issue Type: Umbrella
>  Components: Documentation, SparkR
>Reporter: Joseph K. Bradley
>Assignee: Felix Cheung
>Priority: Critical
>
> This JIRA lists tasks for the next Spark release's QA period for SparkR.
> The list below gives an overview of what is involved, and the corresponding 
> JIRA issues are linked below that.
> h2. API
> * Audit new public APIs (from the generated html doc)
> ** relative to Spark Scala/Java APIs
> ** relative to popular R libraries
> h2. Documentation and example code
> * For new algorithms, create JIRAs for updating the user guide sections & 
> examples
> * Update Programming Guide
> * Update website



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23114) Spark R 2.3 QA umbrella

2018-01-27 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342394#comment-16342394
 ] 

Felix Cheung commented on SPARK-23114:
--

Resolving.

[~sameerag] please see release note above.

> Spark R 2.3 QA umbrella
> ---
>
> Key: SPARK-23114
> URL: https://issues.apache.org/jira/browse/SPARK-23114
> Project: Spark
>  Issue Type: Umbrella
>  Components: Documentation, SparkR
>Reporter: Joseph K. Bradley
>Assignee: Felix Cheung
>Priority: Critical
>
> This JIRA lists tasks for the next Spark release's QA period for SparkR.
> The list below gives an overview of what is involved, and the corresponding 
> JIRA issues are linked below that.
> h2. API
> * Audit new public APIs (from the generated html doc)
> ** relative to Spark Scala/Java APIs
> ** relative to popular R libraries
> h2. Documentation and example code
> * For new algorithms, create JIRAs for updating the user guide sections & 
> examples
> * Update Programming Guide
> * Update website



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21975) Histogram support in cost-based optimizer

2018-01-27 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-21975:

Labels: releasenotes  (was: )

> Histogram support in cost-based optimizer
> -
>
> Key: SPARK-21975
> URL: https://issues.apache.org/jira/browse/SPARK-21975
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Zhenhua Wang
>Priority: Major
>  Labels: releasenotes
> Fix For: 2.3.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21975) Histogram support in cost-based optimizer

2018-01-27 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li reassigned SPARK-21975:
---

Assignee: Zhenhua Wang

> Histogram support in cost-based optimizer
> -
>
> Key: SPARK-21975
> URL: https://issues.apache.org/jira/browse/SPARK-21975
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Zhenhua Wang
>Assignee: Zhenhua Wang
>Priority: Major
>  Labels: releasenotes
> Fix For: 2.3.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23117) SparkR 2.3 QA: Check for new R APIs requiring example code

2018-01-27 Thread Felix Cheung (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Felix Cheung resolved SPARK-23117.
--
Resolution: Won't Fix
  Assignee: Felix Cheung

> SparkR 2.3 QA: Check for new R APIs requiring example code
> --
>
> Key: SPARK-23117
> URL: https://issues.apache.org/jira/browse/SPARK-23117
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, SparkR
>Reporter: Joseph K. Bradley
>Assignee: Felix Cheung
>Priority: Major
>
> Audit list of new features added to MLlib's R API, and see which major items 
> are missing example code (in the examples folder).  We do not need examples 
> for everything, only for major items such as new algorithms.
> For any such items:
> * Create a JIRA for that feature, and assign it to the author of the feature 
> (or yourself if interested).
> * Link it to (a) the original JIRA which introduced that feature ("related 
> to") and (b) to this JIRA ("requires").



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-16026) Cost-based Optimizer Framework

2018-01-27 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li reopened SPARK-16026:
-

> Cost-based Optimizer Framework
> --
>
> Key: SPARK-16026
> URL: https://issues.apache.org/jira/browse/SPARK-16026
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Zhenhua Wang
>Priority: Major
>  Labels: releasenotes
> Fix For: 2.3.0
>
> Attachments: Spark_CBO_Design_Spec.pdf
>
>
> This is an umbrella ticket to implement a cost-based optimizer framework 
> beyond broadcast join selection. This framework can be used to implement some 
> useful optimizations such as join reordering.
> The design should discuss how to break the work down into multiple, smaller 
> logical units. For example, changes to statistics class, system catalog, cost 
> estimation/propagation in expressions, cost estimation/propagation in 
> operators can be done in decoupled pull requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-16026) Cost-based Optimizer Framework

2018-01-27 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-16026.
-
Resolution: Fixed

> Cost-based Optimizer Framework
> --
>
> Key: SPARK-16026
> URL: https://issues.apache.org/jira/browse/SPARK-16026
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Zhenhua Wang
>Priority: Major
>  Labels: releasenotes
> Fix For: 2.3.0
>
> Attachments: Spark_CBO_Design_Spec.pdf
>
>
> This is an umbrella ticket to implement a cost-based optimizer framework 
> beyond broadcast join selection. This framework can be used to implement some 
> useful optimizations such as join reordering.
> The design should discuss how to break the work down into multiple, smaller 
> logical units. For example, changes to statistics class, system catalog, cost 
> estimation/propagation in expressions, cost estimation/propagation in 
> operators can be done in decoupled pull requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23250) Typo in JavaDoc/ScalaDoc for DataFrameWriter

2018-01-27 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342392#comment-16342392
 ] 

Sean Owen commented on SPARK-23250:
---

It should be "starting with Spark 2.1.0" even. We don't make JIRAs for these 
but go ahead with a PR now.

> Typo in JavaDoc/ScalaDoc for DataFrameWriter
> 
>
> Key: SPARK-23250
> URL: https://issues.apache.org/jira/browse/SPARK-23250
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 2.1.1, 2.1.2, 2.2.0, 2.2.1
>Reporter: Charles Chen
>Priority: Trivial
>
> JavaDoc/ScalaDoc for several methods in the DataFrameWriter class denote 
> "This is applicable for all file-based data sources (e.g. Parquet, JSON) 
> *staring* Spark 2.1.0" - should be "starting Spark 2.1.0". This typo is not 
> present in the Spark 2.1.0 docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17282) Implement ALTER TABLE UPDATE STATISTICS SET

2018-01-27 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342388#comment-16342388
 ] 

Xiao Li commented on SPARK-17282:
-

This should be still needed. 

> Implement ALTER TABLE UPDATE STATISTICS SET
> ---
>
> Key: SPARK-17282
> URL: https://issues.apache.org/jira/browse/SPARK-17282
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Xiao Li
>Priority: Major
>
> Users can change the statistics by the DDL statement:
> {noformat}
> ALTER TABLE UPDATE STATISTICS SET
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23250) Typo in JavaDoc/ScalaDoc for DataFrameWriter

2018-01-27 Thread Charles Chen (JIRA)
Charles Chen created SPARK-23250:


 Summary: Typo in JavaDoc/ScalaDoc for DataFrameWriter
 Key: SPARK-23250
 URL: https://issues.apache.org/jira/browse/SPARK-23250
 Project: Spark
  Issue Type: Bug
  Components: Documentation
Affects Versions: 2.2.1, 2.2.0, 2.1.2, 2.1.1
Reporter: Charles Chen


JavaDoc/ScalaDoc for several methods in the DataFrameWriter class denote "This 
is applicable for all file-based data sources (e.g. Parquet, JSON) *staring* 
Spark 2.1.0" - should be "starting Spark 2.1.0". This typo is not present in 
the Spark 2.1.0 docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23220) broadcast hint not applied in a streaming left anti join

2018-01-27 Thread Mathieu DESPRIEE (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mathieu DESPRIEE resolved SPARK-23220.
--
   Resolution: Resolved
Fix Version/s: 2.3.0

> broadcast hint not applied in a streaming left anti join
> 
>
> Key: SPARK-23220
> URL: https://issues.apache.org/jira/browse/SPARK-23220
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
>Reporter: Mathieu DESPRIEE
>Priority: Major
> Fix For: 2.3.0
>
> Attachments: Screenshot from 2018-01-25 17-32-45.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, 
> and a static dataframe. This one is quite small (a few 100s of rows), but the 
> query plan by default is a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed 
> the same app with a FileSource pointing to our S3 storage with all archives. 
> In that situation, the first mini-batch is quite heavy (several 100'000s of 
> input files), and the time spent in sort-merge join is non-acceptable. 
> Additionally it's highly skewed, so partition sizes are completely uneven, 
> and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>  +- Join LeftAnti, (hostname#3584 = hostname#190)
> :- Project [app_id, ...
> <-- snip -->
>+- StreamingExecutionRelation 
> FileStreamSource[s3://{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], 
> [app_id
>  <--snip--> ... 62 more fields]
> +- ResolvedHint isBroadcastable=true
>+- Relation[hostname#190,descr#191] 
> RedshiftRelation("PUBLIC"."hostname_filter")
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23220) broadcast hint not applied in a streaming left anti join

2018-01-27 Thread Mathieu DESPRIEE (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342351#comment-16342351
 ] 

Mathieu DESPRIEE commented on SPARK-23220:
--

I confirm this is fixed in 2.3.0

> broadcast hint not applied in a streaming left anti join
> 
>
> Key: SPARK-23220
> URL: https://issues.apache.org/jira/browse/SPARK-23220
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
>Reporter: Mathieu DESPRIEE
>Priority: Major
> Attachments: Screenshot from 2018-01-25 17-32-45.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, 
> and a static dataframe. This one is quite small (a few 100s of rows), but the 
> query plan by default is a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed 
> the same app with a FileSource pointing to our S3 storage with all archives. 
> In that situation, the first mini-batch is quite heavy (several 100'000s of 
> input files), and the time spent in sort-merge join is non-acceptable. 
> Additionally it's highly skewed, so partition sizes are completely uneven, 
> and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>  +- Join LeftAnti, (hostname#3584 = hostname#190)
> :- Project [app_id, ...
> <-- snip -->
>+- StreamingExecutionRelation 
> FileStreamSource[s3://{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], 
> [app_id
>  <--snip--> ... 62 more fields]
> +- ResolvedHint isBroadcastable=true
>+- Relation[hostname#190,descr#191] 
> RedshiftRelation("PUBLIC"."hostname_filter")
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23220) broadcast hint not applied in a streaming left anti join

2018-01-27 Thread Mathieu DESPRIEE (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342339#comment-16342339
 ] 

Mathieu DESPRIEE edited comment on SPARK-23220 at 1/27/18 10:05 PM:


Found that the problem is related to the persist() operation. When the 
right-hand side of the join is cached, we have a InMemoryRelation in the plan. 
The sizeInBytes of InMemoryRelation defaults to spark.sql.defaultSizeInBytes 
which is Long.MaxValue.
In turn, the canBroadcast() in JoinSelection strategy is false, which prevent 
the BroadcastJoin.

Without the persist(), the plan is completely different :
{noformat}
== Optimized Logical Plan ==
Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- Join LeftAnti, (hostname#44 = hostname#13)
   :- Relation[app_id#40,event#41,event_id#42,timestamp#43,hostname#44] json
   +- ResolvedHint isBroadcastable=true
  +- LocalRelation [hostname#13]

== Physical Plan ==
*Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- *BroadcastHashJoin [hostname#44], [hostname#13], LeftAnti, BuildRight
   :- *FileScan json [app_id#40,event#41,event_id#42,timestamp#43,hostname#44] 
Batched: false, Format: JSON, Location: 
InMemoryFileIndex[file:/tmp/events628533427690545694/data], PartitionFilters: 
[], PushedFilters: [], ReadSchema: 
struct
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]))
  +- LocalTableScan [hostname#13]
{noformat}

The problem of stats in InMemoryRelation is addressed by SPARK-22673. I'll 
check if it fixes this bug as well.


was (Author: mathieude):
Found that the problem is related to the persist() operation. When the 
right-hand side of the join is cached, we have a InMemoryRelation in the plan. 
The sizeInBytes of InMemoryRelation defaults to `spark.sql.defaultSizeInBytes` 
which is Long.MaxValue.
In turn, the canBroadcast() in JoinSelection strategy is false, which prevent 
the BroadcastJoin.

Without the persist(), the plan is completely different :
{noformat}
== Optimized Logical Plan ==
Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- Join LeftAnti, (hostname#44 = hostname#13)
   :- Relation[app_id#40,event#41,event_id#42,timestamp#43,hostname#44] json
   +- ResolvedHint isBroadcastable=true
  +- LocalRelation [hostname#13]

== Physical Plan ==
*Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- *BroadcastHashJoin [hostname#44], [hostname#13], LeftAnti, BuildRight
   :- *FileScan json [app_id#40,event#41,event_id#42,timestamp#43,hostname#44] 
Batched: false, Format: JSON, Location: 
InMemoryFileIndex[file:/tmp/events628533427690545694/data], PartitionFilters: 
[], PushedFilters: [], ReadSchema: 
struct
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]))
  +- LocalTableScan [hostname#13]
{noformat}

The problem of stats in InMemoryRelation is addressed by SPARK-22673. I'll 
check if it fixes this bug as well.

> broadcast hint not applied in a streaming left anti join
> 
>
> Key: SPARK-23220
> URL: https://issues.apache.org/jira/browse/SPARK-23220
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
>Reporter: Mathieu DESPRIEE
>Priority: Major
> Attachments: Screenshot from 2018-01-25 17-32-45.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, 
> and a static dataframe. This one is quite small (a few 100s of rows), but the 
> query plan by default is a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed 
> the same app with a FileSource pointing to our S3 storage with all archives. 
> In that situation, the first mini-batch is quite heavy (several 100'000s of 
> input files), and the time spent in sort-merge join is non-acceptable. 
> Additionally it's highly skewed, so partition sizes are completely uneven, 
> and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>  +- Join LeftAnti, (hostname#3584 = hostname#190)
> :- Project [app_id, ...
> <-- snip -->
>+- StreamingExecutionRelation 
> FileStreamSource[s3://{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], 
> [app_id
>  <--snip--> ... 62 

[jira] [Comment Edited] (SPARK-23220) broadcast hint not applied in a streaming left anti join

2018-01-27 Thread Mathieu DESPRIEE (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342339#comment-16342339
 ] 

Mathieu DESPRIEE edited comment on SPARK-23220 at 1/27/18 10:04 PM:


Found that the problem is related to the persist() operation. When the 
right-hand side of the join is cached, we have a InMemoryRelation in the plan. 
The sizeInBytes of InMemoryRelation defaults to `spark.sql.defaultSizeInBytes` 
which is Long.MaxValue.
In turn, the canBroadcast() in JoinSelection strategy is false, which prevent 
the BroadcastJoin.

Without the persist(), the plan is completely different :
{noformat}
== Optimized Logical Plan ==
Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- Join LeftAnti, (hostname#44 = hostname#13)
   :- Relation[app_id#40,event#41,event_id#42,timestamp#43,hostname#44] json
   +- ResolvedHint isBroadcastable=true
  +- LocalRelation [hostname#13]

== Physical Plan ==
*Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- *BroadcastHashJoin [hostname#44], [hostname#13], LeftAnti, BuildRight
   :- *FileScan json [app_id#40,event#41,event_id#42,timestamp#43,hostname#44] 
Batched: false, Format: JSON, Location: 
InMemoryFileIndex[file:/tmp/events628533427690545694/data], PartitionFilters: 
[], PushedFilters: [], ReadSchema: 
struct
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]))
  +- LocalTableScan [hostname#13]
{noformat}

The problem of stats in InMemoryRelation is addressed by SPARK-22673. I'll 
check if it fixes this bug as well.


was (Author: mathieude):
Found that the problem is related to the persist() operation. When the 
right-hand side of the join is cached, we have a InMemoryRelation in the plan. 
The sizeInBytes of InMemoryRelation defaults to spark.sql.defaultSizeInBytes 
which is Long.MaxValue.
In turn, the canBroadcast() in JoinSelection strategy is false, which prevent 
the BroadcastJoin.

Without the persist(), the plan is completely different :
{noformat}
== Optimized Logical Plan ==
Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- Join LeftAnti, (hostname#44 = hostname#13)
   :- Relation[app_id#40,event#41,event_id#42,timestamp#43,hostname#44] json
   +- ResolvedHint isBroadcastable=true
  +- LocalRelation [hostname#13]

== Physical Plan ==
*Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- *BroadcastHashJoin [hostname#44], [hostname#13], LeftAnti, BuildRight
   :- *FileScan json [app_id#40,event#41,event_id#42,timestamp#43,hostname#44] 
Batched: false, Format: JSON, Location: 
InMemoryFileIndex[file:/tmp/events628533427690545694/data], PartitionFilters: 
[], PushedFilters: [], ReadSchema: 
struct
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]))
  +- LocalTableScan [hostname#13]
{noformat}

The problem of stats in InMemoryRelation is addressed by SPARK-22673. I'll 
check if it fixes this bug as well.

> broadcast hint not applied in a streaming left anti join
> 
>
> Key: SPARK-23220
> URL: https://issues.apache.org/jira/browse/SPARK-23220
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
>Reporter: Mathieu DESPRIEE
>Priority: Major
> Attachments: Screenshot from 2018-01-25 17-32-45.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, 
> and a static dataframe. This one is quite small (a few 100s of rows), but the 
> query plan by default is a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed 
> the same app with a FileSource pointing to our S3 storage with all archives. 
> In that situation, the first mini-batch is quite heavy (several 100'000s of 
> input files), and the time spent in sort-merge join is non-acceptable. 
> Additionally it's highly skewed, so partition sizes are completely uneven, 
> and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>  +- Join LeftAnti, (hostname#3584 = hostname#190)
> :- Project [app_id, ...
> <-- snip -->
>+- StreamingExecutionRelation 
> FileStreamSource[s3://{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], 
> [app_id
>  <--snip--> ... 62 

[jira] [Updated] (SPARK-23220) broadcast hint not applied in a streaming left anti join

2018-01-27 Thread Mathieu DESPRIEE (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mathieu DESPRIEE updated SPARK-23220:
-
Attachment: (was: Screenshot from 2018-01-27 22-32-30.png)

> broadcast hint not applied in a streaming left anti join
> 
>
> Key: SPARK-23220
> URL: https://issues.apache.org/jira/browse/SPARK-23220
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
>Reporter: Mathieu DESPRIEE
>Priority: Major
> Attachments: Screenshot from 2018-01-25 17-32-45.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, 
> and a static dataframe. This one is quite small (a few 100s of rows), but the 
> query plan by default is a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed 
> the same app with a FileSource pointing to our S3 storage with all archives. 
> In that situation, the first mini-batch is quite heavy (several 100'000s of 
> input files), and the time spent in sort-merge join is non-acceptable. 
> Additionally it's highly skewed, so partition sizes are completely uneven, 
> and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>  +- Join LeftAnti, (hostname#3584 = hostname#190)
> :- Project [app_id, ...
> <-- snip -->
>+- StreamingExecutionRelation 
> FileStreamSource[s3://{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], 
> [app_id
>  <--snip--> ... 62 more fields]
> +- ResolvedHint isBroadcastable=true
>+- Relation[hostname#190,descr#191] 
> RedshiftRelation("PUBLIC"."hostname_filter")
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23220) broadcast hint not applied in a streaming left anti join

2018-01-27 Thread Mathieu DESPRIEE (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342339#comment-16342339
 ] 

Mathieu DESPRIEE commented on SPARK-23220:
--

Found that the problem is related to the persist() operation. When the 
right-hand side of the join is cached, we have a InMemoryRelation in the plan. 
The sizeInBytes of InMemoryRelation defaults to spark.sql.defaultSizeInBytes 
which is Long.MaxValue.
In turn, the canBroadcast() in JoinSelection strategy is false, which prevent 
the BroadcastJoin.

Without the persist(), the plan is completely different :
{noformat}
== Optimized Logical Plan ==
Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- Join LeftAnti, (hostname#44 = hostname#13)
   :- Relation[app_id#40,event#41,event_id#42,timestamp#43,hostname#44] json
   +- ResolvedHint isBroadcastable=true
  +- LocalRelation [hostname#13]

== Physical Plan ==
*Project [hostname#44, app_id#40, event#41, event_id#42, timestamp#43]
+- *BroadcastHashJoin [hostname#44], [hostname#13], LeftAnti, BuildRight
   :- *FileScan json [app_id#40,event#41,event_id#42,timestamp#43,hostname#44] 
Batched: false, Format: JSON, Location: 
InMemoryFileIndex[file:/tmp/events628533427690545694/data], PartitionFilters: 
[], PushedFilters: [], ReadSchema: 
struct
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]))
  +- LocalTableScan [hostname#13]
{noformat}

The problem of stats in InMemoryRelation is addressed by SPARK-22673. I'll 
check if it fixes this bug as well.

> broadcast hint not applied in a streaming left anti join
> 
>
> Key: SPARK-23220
> URL: https://issues.apache.org/jira/browse/SPARK-23220
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
>Reporter: Mathieu DESPRIEE
>Priority: Major
> Attachments: Screenshot from 2018-01-25 17-32-45.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, 
> and a static dataframe. This one is quite small (a few 100s of rows), but the 
> query plan by default is a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed 
> the same app with a FileSource pointing to our S3 storage with all archives. 
> In that situation, the first mini-batch is quite heavy (several 100'000s of 
> input files), and the time spent in sort-merge join is non-acceptable. 
> Additionally it's highly skewed, so partition sizes are completely uneven, 
> and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>  +- Join LeftAnti, (hostname#3584 = hostname#190)
> :- Project [app_id, ...
> <-- snip -->
>+- StreamingExecutionRelation 
> FileStreamSource[s3://{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], 
> [app_id
>  <--snip--> ... 62 more fields]
> +- ResolvedHint isBroadcastable=true
>+- Relation[hostname#190,descr#191] 
> RedshiftRelation("PUBLIC"."hostname_filter")
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23220) broadcast hint not applied in a streaming left anti join

2018-01-27 Thread Mathieu DESPRIEE (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mathieu DESPRIEE updated SPARK-23220:
-
Attachment: Screenshot from 2018-01-27 22-32-30.png

> broadcast hint not applied in a streaming left anti join
> 
>
> Key: SPARK-23220
> URL: https://issues.apache.org/jira/browse/SPARK-23220
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
>Reporter: Mathieu DESPRIEE
>Priority: Major
> Attachments: Screenshot from 2018-01-25 17-32-45.png, Screenshot from 
> 2018-01-27 22-32-30.png
>
>
> We have a structured streaming app doing a left anti-join between a stream, 
> and a static dataframe. This one is quite small (a few 100s of rows), but the 
> query plan by default is a sort merge join.
>   
>  It happens sometimes we need to re-process some historical data, so we feed 
> the same app with a FileSource pointing to our S3 storage with all archives. 
> In that situation, the first mini-batch is quite heavy (several 100'000s of 
> input files), and the time spent in sort-merge join is non-acceptable. 
> Additionally it's highly skewed, so partition sizes are completely uneven, 
> and executors tend to crash with OOMs.
> I tried to switch to a broadcast join, but Spark still applies a sort-merge.
> {noformat}
> ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
> {noformat}
> !Screenshot from 2018-01-25 17-32-45.png!
> The logical plan is :
> {noformat}
> Project [app_id#5203, <--- snip ---> ... 18 more fields]
> +- Project ...
> <-- snip -->
>  +- Join LeftAnti, (hostname#3584 = hostname#190)
> :- Project [app_id, ...
> <-- snip -->
>+- StreamingExecutionRelation 
> FileStreamSource[s3://{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], 
> [app_id
>  <--snip--> ... 62 more fields]
> +- ResolvedHint isBroadcastable=true
>+- Relation[hostname#190,descr#191] 
> RedshiftRelation("PUBLIC"."hostname_filter")
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23233) asNondeterministic in Python UDF not being set when the UDF is called at least once

2018-01-27 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-23233.
-
   Resolution: Fixed
 Assignee: Hyukjin Kwon
Fix Version/s: 2.3.0

> asNondeterministic in Python UDF not being set when the UDF is called at 
> least once
> ---
>
> Key: SPARK-23233
> URL: https://issues.apache.org/jira/browse/SPARK-23233
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>Assignee: Hyukjin Kwon
>Priority: Major
> Fix For: 2.3.0
>
>
> With this diff
> {code}
> diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
> index de96846c5c7..026a78bf547 100644
> --- a/python/pyspark/sql/udf.py
> +++ b/python/pyspark/sql/udf.py
> @@ -180,6 +180,7 @@ class UserDefinedFunction(object):
>  wrapper.deterministic = self.deterministic
>  wrapper.asNondeterministic = functools.wraps(
>  self.asNondeterministic)(lambda: 
> self.asNondeterministic()._wrapped())
> +wrapper._unwrapped = lambda: self
>  return wrapper
>  def asNondeterministic(self):
> {code}
> {code:java}
> >>> from pyspark.sql.functions import udf
> >>> f = udf(lambda x: x)
> >>> spark.range(1).select(f("id"))
> DataFrame[(id): string]
> >>> f._unwrapped()._judf_placeholder.udfDeterministic()
> True
> >>> ndf = f.asNondeterministic()
> >>> ndf.deterministic
> False
> >>> spark.range(1).select(ndf("id"))
> DataFrame[(id): string]
> >>> ndf._unwrapped()._judf_placeholder.udfDeterministic()
> True
> {code}
> Seems we don't actually update the {{deterministic}} once it's called due to 
> cache in Python side. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23246) (Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

2018-01-27 Thread MBA Learns to Code (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342220#comment-16342220
 ] 

MBA Learns to Code edited comment on SPARK-23246 at 1/27/18 6:38 PM:
-

[~srowen] thanks for the follow-up. I've updated the description to include 
more details about the error messages. 

I get your point that data about jobs completed also build up naturally as the 
session goes on. But I feel that it is unlikely as big of a culprit as 
table/RDD metadata: as I increase the complexity of the temporary DataFrames 
for each iteration (illustrated by a bigger --n-partitions in the example 
script above), the OOM occurs faster. For example, if the temporary DataFrames 
are reasonably complex (--n-partitions = 1,000, say), OOM would occur after 
about 350-400 iterations in the above setting if spark.driver.memory = 512m. In 
more complex real iterative programs that I run for ML/DL-related workloads, 
even when spark.driver.memory is big (e.g., 6g), we also have OOM trouble after 
a few hundred or a few thousand iterations.


was (Author: mbalearnstocode):
[~srowen] thanks for the follow-up. I've updated the description to include 
more details about the error messages. 

I get your point that data about jobs completed also build up naturally as the 
session goes on. But I feel that it is unlikely as big of a culprit as 
table/RDD metadata: as I increase the complexity of the temporary DataFrames 
for each iteration (illustrated by a bigger --n-partitions in the example 
script above), the OOM occurs faster. For example, if the temporary DataFrames 
are reasonably complex (--n-partitions = 1,000, say), OOM would occur after 
about 150 iterations in the above setting if spark.driver.memory = 512m. In 
more complex real iterative programs that I run for ML/DL-related workloads, 
even when spark.driver.memory is big (e.g., 6g), we also have OOM trouble after 
a few hundred or a few thousand iterations.

> (Py)Spark OOM because of iteratively accumulated metadata that cannot be 
> cleared
> 
>
> Key: SPARK-23246
> URL: https://issues.apache.org/jira/browse/SPARK-23246
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core, SQL
>Affects Versions: 2.2.1
>Reporter: MBA Learns to Code
>Priority: Critical
>
> I am having consistent OOM crashes when trying to use PySpark for iterative 
> algorithms in which I create new DataFrames per iteration (e.g. by sampling 
> from a "mother" DataFrame), do something with such DataFrames, and never need 
> such DataFrames ever in future iterations.
> The below script simulates such OOM failures. Even when one tries explicitly 
> .unpersist() the temporary DataFrames (by using the --unpersist flag below) 
> and/or deleting and garbage-collecting the Python objects (by using the 
> --py-gc flag below), the Java objects seem to stay on and accumulate until 
> they exceed the JVM/driver memory.
> The more complex the temporary DataFrames in each iteration (illustrated by 
> the --n-partitions flag below), the faster OOM occurs.
> The typical error messages include:
>  - "java.lang.OutOfMemoryError : GC overhead limit exceeded"
>  - "Java heap space"
>  - "ERROR TransportRequestHandler: Error sending result 
> RpcResponse{requestId=6053742323219781
>  161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 
> cap=64]}} to /; closing connection"
> Please suggest how I may overcome this so that we can have long-running 
> iterative programs using Spark that uses resources only up to a bounded, 
> controllable limit.
>  
> {code:java}
> from __future__ import print_function
> import argparse
> import gc
> import pandas
> import pyspark
> arg_parser = argparse.ArgumentParser()
> arg_parser.add_argument('--unpersist', action='store_true')
> arg_parser.add_argument('--py-gc', action='store_true')
> arg_parser.add_argument('--n-partitions', type=int, default=1000)
> args = arg_parser.parse_args()
> # create SparkSession (*** set spark.driver.memory to 512m in 
> spark-defaults.conf ***)
> spark = pyspark.sql.SparkSession.builder \
> .config('spark.executor.instances', 2) \
> .config('spark.executor.cores', 2) \
> .config('spark.executor.memory', '512m') \
> .config('spark.ui.enabled', False) \
> .config('spark.ui.retainedJobs', 10) \
> .enableHiveSupport() \
> .getOrCreate()
> # create Parquet file for subsequent repeated loading
> df = spark.createDataFrame(
> pandas.DataFrame(
> dict(
> row=range(args.n_partitions),
> x=args.n_partitions * [0]
> )
> )
> )
> parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)
> df.write.parquet(
> path=parquet_path,
> 

[jira] [Comment Edited] (SPARK-23246) (Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

2018-01-27 Thread MBA Learns to Code (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342220#comment-16342220
 ] 

MBA Learns to Code edited comment on SPARK-23246 at 1/27/18 6:15 PM:
-

[~srowen] thanks for the follow-up. I've updated the description to include 
more details about the error messages. 

I get your point that data about jobs completed also build up naturally as the 
session goes on. But I feel that it is unlikely as big of a culprit as 
table/RDD metadata: as I increase the complexity of the temporary DataFrames 
for each iteration (illustrated by a bigger --n-partitions in the example 
script above), the OOM occurs faster. For example, if the temporary DataFrames 
are reasonably complex (--n-partitions = 1,000, say), OOM would occur after 
about 150 iterations in the above setting if spark.driver.memory = 512m. In 
more complex real iterative programs that I run for ML/DL-related workloads, 
even when spark.driver.memory is big (e.g., 6g), we also have OOM trouble after 
a few hundred or a few thousand iterations.


was (Author: mbalearnstocode):
[~srowen] thanks for the follow-up. I've updated the description to include 
more details about the error messages.

 

I get your point that data about jobs completed also build up naturally as the 
session goes on. But I feel that it is unlikely as big of a culprit as 
table/RDD metadata: as I increase the complexity of the temporary DataFrames 
for each iteration (illustrated by a bigger --n-partitions in the example 
script above), the OOM occurs faster. For example, if the temporary DataFrames 
are reasonably complex (--n-partitions = 1,000, say), OOM would occur after 
about 150 iterations in the above setting if spark.driver.memory = 512m. In 
more complex real iterative programs that I run for ML/DL-related workloads, 
even when spark.driver.memory is big (e.g., 6g), we also have OOM trouble after 
a few hundred or a few thousand iterations.

> (Py)Spark OOM because of iteratively accumulated metadata that cannot be 
> cleared
> 
>
> Key: SPARK-23246
> URL: https://issues.apache.org/jira/browse/SPARK-23246
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core, SQL
>Affects Versions: 2.2.1
>Reporter: MBA Learns to Code
>Priority: Critical
>
> I am having consistent OOM crashes when trying to use PySpark for iterative 
> algorithms in which I create new DataFrames per iteration (e.g. by sampling 
> from a "mother" DataFrame), do something with such DataFrames, and never need 
> such DataFrames ever in future iterations.
> The below script simulates such OOM failures. Even when one tries explicitly 
> .unpersist() the temporary DataFrames (by using the --unpersist flag below) 
> and/or deleting and garbage-collecting the Python objects (by using the 
> --py-gc flag below), the Java objects seem to stay on and accumulate until 
> they exceed the JVM/driver memory.
> The more complex the temporary DataFrames in each iteration (illustrated by 
> the --n-partitions flag below), the faster OOM occurs.
> The typical error messages include:
>  - "java.lang.OutOfMemoryError : GC overhead limit exceeded"
>  - "Java heap space"
>  - "ERROR TransportRequestHandler: Error sending result 
> RpcResponse{requestId=6053742323219781
>  161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 
> cap=64]}} to /; closing connection"
> Please suggest how I may overcome this so that we can have long-running 
> iterative programs using Spark that uses resources only up to a bounded, 
> controllable limit.
>  
> {code:java}
> from __future__ import print_function
> import argparse
> import gc
> import pandas
> import pyspark
> arg_parser = argparse.ArgumentParser()
> arg_parser.add_argument('--unpersist', action='store_true')
> arg_parser.add_argument('--py-gc', action='store_true')
> arg_parser.add_argument('--n-partitions', type=int, default=1000)
> args = arg_parser.parse_args()
> # create SparkSession (*** set spark.driver.memory to 512m in 
> spark-defaults.conf ***)
> spark = pyspark.sql.SparkSession.builder \
> .config('spark.executor.instances', 2) \
> .config('spark.executor.cores', 2) \
> .config('spark.executor.memory', '512m') \
> .config('spark.ui.enabled', False) \
> .config('spark.ui.retainedJobs', 10) \
> .enableHiveSupport() \
> .getOrCreate()
> # create Parquet file for subsequent repeated loading
> df = spark.createDataFrame(
> pandas.DataFrame(
> dict(
> row=range(args.n_partitions),
> x=args.n_partitions * [0]
> )
> )
> )
> parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)
> df.write.parquet(
> path=parquet_path,
> partitionBy='row',

[jira] [Updated] (SPARK-23246) (Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

2018-01-27 Thread MBA Learns to Code (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MBA Learns to Code updated SPARK-23246:
---
Description: 
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), do something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

The more complex the temporary DataFrames in each iteration (illustrated by the 
--n-partitions flag below), the faster OOM occurs.

The typical error messages include:
 - "java.lang.OutOfMemoryError : GC overhead limit exceeded"

 - "Java heap space"
 - "ERROR TransportRequestHandler: Error sending result 
RpcResponse{requestId=6053742323219781
 161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} 
to /; closing connection"

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
.config('spark.executor.instances', 2) \
.config('spark.executor.cores', 2) \
.config('spark.executor.memory', '512m') \
.config('spark.ui.enabled', False) \
.config('spark.ui.retainedJobs', 10) \
.enableHiveSupport() \
.getOrCreate()


# create Parquet file for subsequent repeated loading
df = spark.createDataFrame(
pandas.DataFrame(
dict(
row=range(args.n_partitions),
x=args.n_partitions * [0]
)
)
)

parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)

df.write.parquet(
path=parquet_path,
partitionBy='row',
mode='overwrite'
)


i = 0


# the below loop simulates an iterative algorithm that creates new DataFrames 
in each iteration (e.g. sampling from a "mother" DataFrame), do something, and 
never need those DataFrames again in future iterations
# we are having a problem cleaning up the built-up metadata
# hence the program will crash after while because of OOM
while True:
_df = spark.read.parquet(parquet_path)

if args.unpersist:
_df.unpersist()

if args.py_gc:
del _df
gc.collect()

i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
{code}
 

  was:
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), do something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

The more complex the temporary DataFrames in each iteration (illustrated by the 
--n-partitions flag below), the faster OOM occurs.

The typical error messages include:
 - "java.lang.OutOfMemoryError : GC overhead limit exceeded"

 - "Java heap space"
 - "ERROR TransportRequestHandler: Error sending result 
RpcResponse{requestId=6053742323219781
 161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} 
to /; closing connection"

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
.config('spark.executor.instances', 2) \

[jira] [Updated] (SPARK-23246) (Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

2018-01-27 Thread MBA Learns to Code (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MBA Learns to Code updated SPARK-23246:
---
Description: 
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), do something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

The more complex the temporary DataFrames in each iteration (illustrated by the 
--n-partitions flag below), the faster OOM occurs.

The typical error messages include:
 - "java.lang.OutOfMemoryError : GC overhead limit exceeded"

 - "Java heap space"
 - "ERROR TransportRequestHandler: Error sending result 
RpcResponse{requestId=6053742323219781
 161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} 
to /; closing connection"

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
.config('spark.executor.instances', 2) \
.config('spark.executor.cores', 2) \
.config('spark.executor.memory', '512m') \
.config('spark.ui.enabled', False) \
.enableHiveSupport() \
.getOrCreate()


# create Parquet file for subsequent repeated loading
df = spark.createDataFrame(
pandas.DataFrame(
dict(
row=range(args.n_partitions),
x=args.n_partitions * [0]
)
)
)

parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)

df.write.parquet(
path=parquet_path,
partitionBy='row',
mode='overwrite'
)


i = 0


# the below loop simulates an iterative algorithm that creates new DataFrames 
in each iteration (e.g. sampling from a "mother" DataFrame), do something, and 
never need those DataFrames again in future iterations
# we are having a problem cleaning up the built-up metadata
# hence the program will crash after while because of OOM
while True:
_df = spark.read.parquet(parquet_path)

if args.unpersist:
_df.unpersist()

if args.py_gc:
del _df
gc.collect()

i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
{code}
 

  was:
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), do something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

The more complex the temporary DataFrames in each iteration (illustrated by the 
--n-partitions flag below), the faster OOM occurs.

The typical error messages include:
 - "java.lang.OutOfMemoryError : GC overhead limit exceeded"

 - "Java heap space"
 - "ERROR TransportRequestHandler: Error sending result 
RpcResponse{requestId=6053742323219781
 161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} 
to /; closing connection"

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
.config('spark.executor.instances', '2') \
.config('spark.executor.cores', '2') \

[jira] [Updated] (SPARK-21866) SPIP: Image support in Spark

2018-01-27 Thread Denny Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Denny Lee updated SPARK-21866:
--
Description: 
h2. Background and motivation

As Apache Spark is being used more and more in the industry, some new use cases 
are emerging for different data formats beyond the traditional SQL types or the 
numerical types (vectors and matrices). Deep Learning applications commonly 
deal with image processing. A number of projects add some Deep Learning 
capabilities to Spark (see list below), but they struggle to communicate with 
each other or with MLlib pipelines because there is no standard way to 
represent an image in Spark DataFrames. We propose to federate efforts for 
representing images in Spark by defining a representation that caters to the 
most common needs of users and library developers.

This SPIP proposes a specification to represent images in Spark DataFrames and 
Datasets (based on existing industrial standards), and an interface for loading 
sources of images. It is not meant to be a full-fledged image processing 
library, but rather the core description that other libraries and users can 
rely on. Several packages already offer various processing facilities for 
transforming images or doing more complex operations, and each has various 
design tradeoffs that make them better as standalone solutions.

This project is a joint collaboration between Microsoft and Databricks, which 
have been testing this design in two open source packages: MMLSpark and Deep 
Learning Pipelines.

The proposed image format is an in-memory, decompressed representation that 
targets low-level applications. It is significantly more liberal in memory 
usage than compressed image representations such as JPEG, PNG, etc., but it 
allows easy communication with popular image processing libraries and has no 
decoding overhead.
h2. Targets users and personas:

Data scientists, data engineers, library developers.
The following libraries define primitives for loading and representing images, 
and will gain from a common interchange format (in alphabetical order):
 * BigDL
 * DeepLearning4J
 * Deep Learning Pipelines
 * MMLSpark
 * TensorFlow (Spark connector)
 * TensorFlowOnSpark
 * TensorFrames
 * Thunder

h2. Goals:
 * Simple representation of images in Spark DataFrames, based on pre-existing 
industrial standards (OpenCV)
 * This format should eventually allow the development of high-performance 
integration points with image processing libraries such as libOpenCV, Google 
TensorFlow, CNTK, and other C libraries.
 * The reader should be able to read popular formats of images from distributed 
sources.

h2. Non-Goals:

Images are a versatile medium and encompass a very wide range of formats and 
representations. This SPIP explicitly aims at the most common use case in the 
industry currently: multi-channel matrices of binary, int32, int64, float or 
double data that can fit comfortably in the heap of the JVM:
 * the total size of an image should be restricted to less than 2GB (roughly)
 * the meaning of color channels is application-specific and is not mandated by 
the standard (in line with the OpenCV standard)
 * specialized formats used in meteorology, the medical field, etc. are not 
supported
 * this format is specialized to images and does not attempt to solve the more 
general problem of representing n-dimensional tensors in Spark

h2. Proposed API changes

We propose to add a new package in the package structure, under the MLlib 
project:
 {{org.apache.spark.image}}
h3. Data format

We propose to add the following structure:

imageSchema = StructType([
 * StructField("mode", StringType(), False),
 ** The exact representation of the data.
 ** The values are described in the following OpenCV convention. Basically, the 
type has both "depth" and "number of channels" info: in particular, type 
"CV_8UC3" means "3 channel unsigned bytes". BGRA format would be CV_8UC4 (value 
32 in the table) with the channel order specified by convention.
 ** The exact channel ordering and meaning of each channel is dictated by 
convention. By default, the order is RGB (3 channels) and BGRA (4 channels).
If the image failed to load, the value is the empty string "".

 * StructField("origin", StringType(), True),
 ** Some information about the origin of the image. The content of this is 
application-specific.
 ** When the image is loaded from files, users should expect to find the file 
name in this field.

 * StructField("height", IntegerType(), False),
 ** the height of the image, pixels
 ** If the image fails to load, the value is -1.

 * StructField("width", IntegerType(), False),
 ** the width of the image, pixels
 ** If the image fails to load, the value is -1.

 * StructField("nChannels", IntegerType(), False),
 ** The number of channels in this image: it is typically a value of 1 (B), 3 
(RGB), or 4 (BGRA)
 ** If the image fails to 

[jira] [Commented] (SPARK-23246) (Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

2018-01-27 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342237#comment-16342237
 ] 

Sean Owen commented on SPARK-23246:
---

It is easy to test - do a simple heap dump on the driver. The UI history info 
would also increase in size with partitions. Your driver memory is small. Try 
reducing the spark.ui.retainedJobs and similar params. Without evidence that it 
is just this I'd have to close this

> (Py)Spark OOM because of iteratively accumulated metadata that cannot be 
> cleared
> 
>
> Key: SPARK-23246
> URL: https://issues.apache.org/jira/browse/SPARK-23246
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core, SQL
>Affects Versions: 2.2.1
>Reporter: MBA Learns to Code
>Priority: Critical
>
> I am having consistent OOM crashes when trying to use PySpark for iterative 
> algorithms in which I create new DataFrames per iteration (e.g. by sampling 
> from a "mother" DataFrame), do something with such DataFrames, and never need 
> such DataFrames ever in future iterations.
> The below script simulates such OOM failures. Even when one tries explicitly 
> .unpersist() the temporary DataFrames (by using the --unpersist flag below) 
> and/or deleting and garbage-collecting the Python objects (by using the 
> --py-gc flag below), the Java objects seem to stay on and accumulate until 
> they exceed the JVM/driver memory.
> The more complex the temporary DataFrames in each iteration (illustrated by 
> the --n-partitions flag below), the faster OOM occurs.
> The typical error messages include:
>  - "java.lang.OutOfMemoryError : GC overhead limit exceeded"
>  - "Java heap space"
>  - "ERROR TransportRequestHandler: Error sending result 
> RpcResponse{requestId=6053742323219781
>  161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 
> cap=64]}} to /; closing connection"
> Please suggest how I may overcome this so that we can have long-running 
> iterative programs using Spark that uses resources only up to a bounded, 
> controllable limit.
>  
> {code:java}
> from __future__ import print_function
> import argparse
> import gc
> import pandas
> import pyspark
> arg_parser = argparse.ArgumentParser()
> arg_parser.add_argument('--unpersist', action='store_true')
> arg_parser.add_argument('--py-gc', action='store_true')
> arg_parser.add_argument('--n-partitions', type=int, default=1000)
> args = arg_parser.parse_args()
> # create SparkSession (*** set spark.driver.memory to 512m in 
> spark-defaults.conf ***)
> spark = pyspark.sql.SparkSession.builder \
> .config('spark.executor.instances', '2') \
> .config('spark.executor.cores', '2') \
> .config('spark.executor.memory', '512m') \
> .enableHiveSupport() \
> .getOrCreate()
> # create Parquet file for subsequent repeated loading
> df = spark.createDataFrame(
> pandas.DataFrame(
> dict(
> row=range(args.n_partitions),
> x=args.n_partitions * [0]
> )
> )
> )
> parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)
> df.write.parquet(
> path=parquet_path,
> partitionBy='row',
> mode='overwrite'
> )
> i = 0
> # the below loop simulates an iterative algorithm that creates new DataFrames 
> in each iteration (e.g. sampling from a "mother" DataFrame), do something, 
> and never need those DataFrames again in future iterations
> # we are having a problem cleaning up the built-up metadata
> # hence the program will crash after while because of OOM
> while True:
> _df = spark.read.parquet(parquet_path)
> if args.unpersist:
> _df.unpersist()
> if args.py_gc:
> del _df
> gc.collect()
> i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23246) (Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

2018-01-27 Thread MBA Learns to Code (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342220#comment-16342220
 ] 

MBA Learns to Code edited comment on SPARK-23246 at 1/27/18 4:49 PM:
-

[~srowen] thanks for the follow-up. I've updated the description to include 
more details about the error messages.

 

I get your point that data about jobs completed also build up naturally as the 
session goes on. But I feel that it is unlikely as big of a culprit as 
table/RDD metadata: as I increase the complexity of the temporary DataFrames 
for each iteration (illustrated by a bigger --n-partitions in the example 
script above), the OOM occurs faster. For example, if the temporary DataFrames 
are reasonably complex (--n-partitions = 1,000, say), OOM would occur after 
about 150 iterations in the above setting if spark.driver.memory = 512m. In 
more complex real iterative programs that I run for ML/DL-related workloads, 
even when spark.driver.memory is big (e.g., 6g), we also have OOM trouble after 
a few hundred or a few thousand iterations.


was (Author: mbalearnstocode):
[~srowen] thanks for the follow-up. I've updated the description to include 
more details about the error messages.

 

I get your point that data about jobs completed also build up naturally as the 
session goes on. But I feel that it is unlikely as big of a culprit as 
table/RDD metadata: as I increase the complexity of the temporary DataFrames 
for each iteration (illustrated by a bigger --n-partitions in the example 
script above), the OOM occurs faster. For example, if the temporary DataFrames 
are reasonably complex (--n-partitions = 1,000, say), OOM would occur after 
about 150 iterations in the above setting if spark.driver.memory = 512m. In 
more complex real iterative programs that I run for ML/DL-related workloads, 
even when spark.driver.memory is big (e.g., 6g), we have OOM trouble after a 
few thousand iterations.

> (Py)Spark OOM because of iteratively accumulated metadata that cannot be 
> cleared
> 
>
> Key: SPARK-23246
> URL: https://issues.apache.org/jira/browse/SPARK-23246
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core, SQL
>Affects Versions: 2.2.1
>Reporter: MBA Learns to Code
>Priority: Critical
>
> I am having consistent OOM crashes when trying to use PySpark for iterative 
> algorithms in which I create new DataFrames per iteration (e.g. by sampling 
> from a "mother" DataFrame), do something with such DataFrames, and never need 
> such DataFrames ever in future iterations.
> The below script simulates such OOM failures. Even when one tries explicitly 
> .unpersist() the temporary DataFrames (by using the --unpersist flag below) 
> and/or deleting and garbage-collecting the Python objects (by using the 
> --py-gc flag below), the Java objects seem to stay on and accumulate until 
> they exceed the JVM/driver memory.
> The more complex the temporary DataFrames in each iteration (illustrated by 
> the --n-partitions flag below), the faster OOM occurs.
> The typical error messages include:
>  - "java.lang.OutOfMemoryError : GC overhead limit exceeded"
>  - "Java heap space"
>  - "ERROR TransportRequestHandler: Error sending result 
> RpcResponse{requestId=6053742323219781
>  161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 
> cap=64]}} to /; closing connection"
> Please suggest how I may overcome this so that we can have long-running 
> iterative programs using Spark that uses resources only up to a bounded, 
> controllable limit.
>  
> {code:java}
> from __future__ import print_function
> import argparse
> import gc
> import pandas
> import pyspark
> arg_parser = argparse.ArgumentParser()
> arg_parser.add_argument('--unpersist', action='store_true')
> arg_parser.add_argument('--py-gc', action='store_true')
> arg_parser.add_argument('--n-partitions', type=int, default=1000)
> args = arg_parser.parse_args()
> # create SparkSession (*** set spark.driver.memory to 512m in 
> spark-defaults.conf ***)
> spark = pyspark.sql.SparkSession.builder \
> .config('spark.executor.instances', '2') \
> .config('spark.executor.cores', '2') \
> .config('spark.executor.memory', '512m') \
> .enableHiveSupport() \
> .getOrCreate()
> # create Parquet file for subsequent repeated loading
> df = spark.createDataFrame(
> pandas.DataFrame(
> dict(
> row=range(args.n_partitions),
> x=args.n_partitions * [0]
> )
> )
> )
> parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)
> df.write.parquet(
> path=parquet_path,
> partitionBy='row',
> mode='overwrite'
> )
> i = 0
> # the below loop simulates an iterative algorithm that creates new 

[jira] [Commented] (SPARK-23246) (Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

2018-01-27 Thread MBA Learns to Code (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342220#comment-16342220
 ] 

MBA Learns to Code commented on SPARK-23246:


[~srowen] thanks for the follow-up. I've updated the description to include 
more details about the error messages.

 

I get your point that data about jobs completed also build up naturally as the 
session goes on. But I feel that it is unlikely as big of a culprit as 
table/RDD metadata: as I increase the complexity of the temporary DataFrames 
for each iteration (illustrated by a bigger --n-partitions in the example 
script above), the OOM occurs faster. For example, if the temporary DataFrames 
are reasonably complex (--n-partitions = 1,000, say), OOM would occur after 
about 150 iterations in the above setting if spark.driver.memory = 512m. In 
more complex real iterative programs that I run for ML/DL-related workloads, 
even when spark.driver.memory is big (e.g., 6g), we have OOM trouble after a 
few thousand iterations.

> (Py)Spark OOM because of iteratively accumulated metadata that cannot be 
> cleared
> 
>
> Key: SPARK-23246
> URL: https://issues.apache.org/jira/browse/SPARK-23246
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core, SQL
>Affects Versions: 2.2.1
>Reporter: MBA Learns to Code
>Priority: Critical
>
> I am having consistent OOM crashes when trying to use PySpark for iterative 
> algorithms in which I create new DataFrames per iteration (e.g. by sampling 
> from a "mother" DataFrame), do something with such DataFrames, and never need 
> such DataFrames ever in future iterations.
> The below script simulates such OOM failures. Even when one tries explicitly 
> .unpersist() the temporary DataFrames (by using the --unpersist flag below) 
> and/or deleting and garbage-collecting the Python objects (by using the 
> --py-gc flag below), the Java objects seem to stay on and accumulate until 
> they exceed the JVM/driver memory.
> The more complex the temporary DataFrames in each iteration (illustrated by 
> the --n-partitions flag below), the faster OOM occurs.
> The typical error messages include:
>  - "java.lang.OutOfMemoryError : GC overhead limit exceeded"
>  - "Java heap space"
>  - "ERROR TransportRequestHandler: Error sending result 
> RpcResponse{requestId=6053742323219781
>  161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 
> cap=64]}} to /; closing connection"
> Please suggest how I may overcome this so that we can have long-running 
> iterative programs using Spark that uses resources only up to a bounded, 
> controllable limit.
>  
> {code:java}
> from __future__ import print_function
> import argparse
> import gc
> import pandas
> import pyspark
> arg_parser = argparse.ArgumentParser()
> arg_parser.add_argument('--unpersist', action='store_true')
> arg_parser.add_argument('--py-gc', action='store_true')
> arg_parser.add_argument('--n-partitions', type=int, default=1000)
> args = arg_parser.parse_args()
> # create SparkSession (*** set spark.driver.memory to 512m in 
> spark-defaults.conf ***)
> spark = pyspark.sql.SparkSession.builder \
> .config('spark.executor.instances', '2') \
> .config('spark.executor.cores', '2') \
> .config('spark.executor.memory', '512m') \
> .enableHiveSupport() \
> .getOrCreate()
> # create Parquet file for subsequent repeated loading
> df = spark.createDataFrame(
> pandas.DataFrame(
> dict(
> row=range(args.n_partitions),
> x=args.n_partitions * [0]
> )
> )
> )
> parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)
> df.write.parquet(
> path=parquet_path,
> partitionBy='row',
> mode='overwrite'
> )
> i = 0
> # the below loop simulates an iterative algorithm that creates new DataFrames 
> in each iteration (e.g. sampling from a "mother" DataFrame), do something, 
> and never need those DataFrames again in future iterations
> # we are having a problem cleaning up the built-up metadata
> # hence the program will crash after while because of OOM
> while True:
> _df = spark.read.parquet(parquet_path)
> if args.unpersist:
> _df.unpersist()
> if args.py_gc:
> del _df
> gc.collect()
> i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23246) (Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

2018-01-27 Thread MBA Learns to Code (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MBA Learns to Code updated SPARK-23246:
---
Description: 
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), do something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

The more complex the temporary DataFrames in each iteration (illustrated by the 
--n-partitions flag below), the faster OOM occurs.

The typical error messages include:
 - "java.lang.OutOfMemoryError : GC overhead limit exceeded"

 - "Java heap space"
 - "ERROR TransportRequestHandler: Error sending result 
RpcResponse{requestId=6053742323219781
 161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} 
to /; closing connection"

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
.config('spark.executor.instances', '2') \
.config('spark.executor.cores', '2') \
.config('spark.executor.memory', '512m') \
.enableHiveSupport() \
.getOrCreate()


# create Parquet file for subsequent repeated loading
df = spark.createDataFrame(
pandas.DataFrame(
dict(
row=range(args.n_partitions),
x=args.n_partitions * [0]
)
)
)

parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)

df.write.parquet(
path=parquet_path,
partitionBy='row',
mode='overwrite'
)


i = 0


# the below loop simulates an iterative algorithm that creates new DataFrames 
in each iteration (e.g. sampling from a "mother" DataFrame), do something, and 
never need those DataFrames again in future iterations
# we are having a problem cleaning up the built-up metadata
# hence the program will crash after while because of OOM
while True:
_df = spark.read.parquet(parquet_path)

if args.unpersist:
_df.unpersist()

if args.py_gc:
del _df
gc.collect()

i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
{code}
 

  was:
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), do something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

The more complex the temporary DataFrames in each iteration (illustrated by the 
--n-partitions flag below), the faster OOM occurs.

The typical error messages include:

- "java.lang.OutOfMemoryError : GC overhead limit exceeded"

- "Java heap space"
- "ERROR TransportRequestHandler: Error sending result 
RpcResponse{requestId=6053742323219781
161, body=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} 
to /; closing
connection"

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
.config('spark.executor.instances', '2') \
.config('spark.executor.cores', '2') \
.config('spark.executor.memory', '512m') \

[jira] [Updated] (SPARK-23246) (Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

2018-01-27 Thread MBA Learns to Code (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MBA Learns to Code updated SPARK-23246:
---
Description: 
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), do something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

The more complex the temporary DataFrames in each iteration (illustrated by the 
--n-partitions flag below), the faster OOM occurs.

The typical error messages include:

- "java.lang.OutOfMemoryError : GC overhead limit exceeded"

- "Java heap space"
- "ERROR TransportRequestHandler: Error sending result 
RpcResponse{requestId=6053742323219781
161, body=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} 
to /; closing
connection"

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
.config('spark.executor.instances', '2') \
.config('spark.executor.cores', '2') \
.config('spark.executor.memory', '512m') \
.enableHiveSupport() \
.getOrCreate()


# create Parquet file for subsequent repeated loading
df = spark.createDataFrame(
pandas.DataFrame(
dict(
row=range(args.n_partitions),
x=args.n_partitions * [0]
)
)
)

parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)

df.write.parquet(
path=parquet_path,
partitionBy='row',
mode='overwrite'
)


i = 0


# the below loop simulates an iterative algorithm that creates new DataFrames 
in each iteration (e.g. sampling from a "mother" DataFrame), do something, and 
never need those DataFrames again in future iterations
# we are having a problem cleaning up the built-up metadata
# hence the program will crash after while because of OOM
while True:
_df = spark.read.parquet(parquet_path)

if args.unpersist:
_df.unpersist()

if args.py_gc:
del _df
gc.collect()

i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
{code}
 

  was:
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), do something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

The more complex the temporary DataFrames in each iteration (illustrated by the 
--n-partitions flag below), the faster OOM occurs.

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
.config('spark.executor.instances', '2') \
.config('spark.executor.cores', '2') \
.config('spark.executor.memory', '512m') \
.enableHiveSupport() \
.getOrCreate()


# create Parquet file for subsequent repeated loading
df = spark.createDataFrame(
pandas.DataFrame(
dict(
row=range(args.n_partitions),
x=args.n_partitions * [0]
)
)
)

parquet_path = 

[jira] [Updated] (SPARK-23246) (Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

2018-01-27 Thread MBA Learns to Code (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MBA Learns to Code updated SPARK-23246:
---
Description: 
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), do something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

The more complex the temporary DataFrames in each iteration (illustrated by the 
--n-partitions flag below), the faster OOM occurs.

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
.config('spark.executor.instances', '2') \
.config('spark.executor.cores', '2') \
.config('spark.executor.memory', '512m') \
.enableHiveSupport() \
.getOrCreate()


# create Parquet file for subsequent repeated loading
df = spark.createDataFrame(
pandas.DataFrame(
dict(
row=range(args.n_partitions),
x=args.n_partitions * [0]
)
)
)

parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)

df.write.parquet(
path=parquet_path,
partitionBy='row',
mode='overwrite'
)


i = 0


# the below loop simulates an iterative algorithm that creates new DataFrames 
in each iteration (e.g. sampling from a "mother" DataFrame), do something, and 
never need those DataFrames again in future iterations
# we are having a problem cleaning up the built-up metadata
# hence the program will crash after while because of OOM
while True:
_df = spark.read.parquet(parquet_path)

if args.unpersist:
_df.unpersist()

if args.py_gc:
del _df
gc.collect()

i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
{code}
 

  was:
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), do something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
.config('spark.executor.instances', '2') \
.config('spark.executor.cores', '2') \
.config('spark.executor.memory', '512m') \
.enableHiveSupport() \
.getOrCreate()


# create Parquet file for subsequent repeated loading
df = spark.createDataFrame(
pandas.DataFrame(
dict(
row=range(args.n_partitions),
x=args.n_partitions * [0]
)
)
)

parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)

df.write.parquet(
path=parquet_path,
partitionBy='row',
mode='overwrite'
)


i = 0


# the below loop simulates an iterative algorithm that creates new DataFrames 
in each iteration (e.g. sampling from a "mother" DataFrame), do something, and 
never need those DataFrames again in future iterations
# we are having a problem cleaning up the built-up metadata
# hence the program will crash after while because of OOM
while True:

[jira] [Commented] (SPARK-23249) Improve partition bin-filling algorithm to have less skew and fewer partitions

2018-01-27 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342207#comment-16342207
 ] 

Apache Spark commented on SPARK-23249:
--

User 'glentakahashi' has created a pull request for this issue:
https://github.com/apache/spark/pull/20372

> Improve partition bin-filling algorithm to have less skew and fewer partitions
> --
>
> Key: SPARK-23249
> URL: https://issues.apache.org/jira/browse/SPARK-23249
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Glen Takahashi
>Priority: Major
>
> Change DataSourceScanExec so that when grouping blocks together into 
> partitions, also checks the end of the sorted list of splits to more 
> efficiently fill out partitions.
>  
> h2. Rationale
> The current bin-packing method of next-fit descending for blocks into 
> partitions is sub-optimal in a lot of cases and will result in extra 
> partitions, un-even distribution of block-counts across partitions, and 
> un-even distribution of partition sizes.
> As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 
> 82 partitions with the current algorithm, but only 64 using this algorithm. 
> Also in this example, the max # of blocks per partition in NFD is 13, while 
> in this algorithm is is 2.
> More generally, running a simulation of 1000 runs using 128MB blocksize, 
> between 1-1000 normally distributed file sizes between 1-500Mb, you can see 
> an improvement of approx 5% reduction of partition counts, and a large 
> reduction in standard deviation of blocks per partition.
> This algorithm also runs in O(n) time as NFD does, and in every case is 
> strictly better results than NFD.
> Overall, the more even distribution of blocks across partitions and therefore 
> reduced partition counts should result in a small but significant performance 
> increase across the board



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23249) Improve partition bin-filling algorithm to have less skew and fewer partitions

2018-01-27 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23249:


Assignee: (was: Apache Spark)

> Improve partition bin-filling algorithm to have less skew and fewer partitions
> --
>
> Key: SPARK-23249
> URL: https://issues.apache.org/jira/browse/SPARK-23249
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Glen Takahashi
>Priority: Major
>
> Change DataSourceScanExec so that when grouping blocks together into 
> partitions, also checks the end of the sorted list of splits to more 
> efficiently fill out partitions.
>  
> h2. Rationale
> The current bin-packing method of next-fit descending for blocks into 
> partitions is sub-optimal in a lot of cases and will result in extra 
> partitions, un-even distribution of block-counts across partitions, and 
> un-even distribution of partition sizes.
> As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 
> 82 partitions with the current algorithm, but only 64 using this algorithm. 
> Also in this example, the max # of blocks per partition in NFD is 13, while 
> in this algorithm is is 2.
> More generally, running a simulation of 1000 runs using 128MB blocksize, 
> between 1-1000 normally distributed file sizes between 1-500Mb, you can see 
> an improvement of approx 5% reduction of partition counts, and a large 
> reduction in standard deviation of blocks per partition.
> This algorithm also runs in O(n) time as NFD does, and in every case is 
> strictly better results than NFD.
> Overall, the more even distribution of blocks across partitions and therefore 
> reduced partition counts should result in a small but significant performance 
> increase across the board



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23249) Improve partition bin-filling algorithm to have less skew and fewer partitions

2018-01-27 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23249:


Assignee: Apache Spark

> Improve partition bin-filling algorithm to have less skew and fewer partitions
> --
>
> Key: SPARK-23249
> URL: https://issues.apache.org/jira/browse/SPARK-23249
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Glen Takahashi
>Assignee: Apache Spark
>Priority: Major
>
> Change DataSourceScanExec so that when grouping blocks together into 
> partitions, also checks the end of the sorted list of splits to more 
> efficiently fill out partitions.
>  
> h2. Rationale
> The current bin-packing method of next-fit descending for blocks into 
> partitions is sub-optimal in a lot of cases and will result in extra 
> partitions, un-even distribution of block-counts across partitions, and 
> un-even distribution of partition sizes.
> As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 
> 82 partitions with the current algorithm, but only 64 using this algorithm. 
> Also in this example, the max # of blocks per partition in NFD is 13, while 
> in this algorithm is is 2.
> More generally, running a simulation of 1000 runs using 128MB blocksize, 
> between 1-1000 normally distributed file sizes between 1-500Mb, you can see 
> an improvement of approx 5% reduction of partition counts, and a large 
> reduction in standard deviation of blocks per partition.
> This algorithm also runs in O(n) time as NFD does, and in every case is 
> strictly better results than NFD.
> Overall, the more even distribution of blocks across partitions and therefore 
> reduced partition counts should result in a small but significant performance 
> increase across the board



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23249) Improve partition bin-filling algorithm to have less skew and fewer partitions

2018-01-27 Thread Glen Takahashi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Glen Takahashi updated SPARK-23249:
---
Description: 
Change DataSourceScanExec so that when grouping blocks together into 
partitions, also checks the end of the sorted list of splits to more 
efficiently fill out partitions.

 
h2. Rationale

The current bin-packing method of next-fit descending for blocks into 
partitions is sub-optimal in a lot of cases and will result in extra 
partitions, un-even distribution of block-counts across partitions, and un-even 
distribution of partition sizes.

As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 
82 partitions with the current algorithm, but only 64 using this algorithm. 
Also in this example, the max # of blocks per partition in NFD is 13, while in 
this algorithm is is 2.

More generally, running a simulation of 1000 runs using 128MB blocksize, 
between 1-1000 normally distributed file sizes between 1-500Mb, you can see an 
improvement of approx 5% reduction of partition counts, and a large reduction 
in standard deviation of blocks per partition.

This algorithm also runs in O(n) time as NFD does, and in every case is 
strictly better results than NFD.

Overall, the more even distribution of blocks across partitions and therefore 
reduced partition counts should result in a small but significant performance 
increase across the board

  was:
Change DataSourceScanExec so that when grouping blocks together into 
partitions, also checks the end of the sorted list of splits to more 
efficiently fill out partitions.

 
h2. Rationale

The current bin-packing method of next-fit descending for blocks into 
partitions is sub-optimal in a lot of cases and will result in extra 
partitions, un-even distribution of block-counts across partitions, and un-even 
distribution of partition sizes.

As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 
82 partitions with the current algorithm, but only 64 using this algorithm. 
Also in this example, the max # of blocks per partition in NFD is 13, while in 
this algorithm is is 2.

More generally, running a simulation of 1000 runs using 128MB blocksize, 
between 1-1000 normally distributed file sizes between 1-500Mb, you can see an 
improvement of approx 5% reduction of partition counts, and a large reduction 
in standard deviation of blocks per partition.

This algorithm also runs in O(n) time as NFD does, and in every case is 
strictly better results than NFD.

Overall, the more even distribution of blocks across partitions and therefore 
reduced partition counts should result in a small but significant performance 
increase across the board


> Improve partition bin-filling algorithm to have less skew and fewer partitions
> --
>
> Key: SPARK-23249
> URL: https://issues.apache.org/jira/browse/SPARK-23249
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Glen Takahashi
>Priority: Major
>
> Change DataSourceScanExec so that when grouping blocks together into 
> partitions, also checks the end of the sorted list of splits to more 
> efficiently fill out partitions.
>  
> h2. Rationale
> The current bin-packing method of next-fit descending for blocks into 
> partitions is sub-optimal in a lot of cases and will result in extra 
> partitions, un-even distribution of block-counts across partitions, and 
> un-even distribution of partition sizes.
> As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 
> 82 partitions with the current algorithm, but only 64 using this algorithm. 
> Also in this example, the max # of blocks per partition in NFD is 13, while 
> in this algorithm is is 2.
> More generally, running a simulation of 1000 runs using 128MB blocksize, 
> between 1-1000 normally distributed file sizes between 1-500Mb, you can see 
> an improvement of approx 5% reduction of partition counts, and a large 
> reduction in standard deviation of blocks per partition.
> This algorithm also runs in O(n) time as NFD does, and in every case is 
> strictly better results than NFD.
> Overall, the more even distribution of blocks across partitions and therefore 
> reduced partition counts should result in a small but significant performance 
> increase across the board



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23249) Improve partition bin-filling algorithm to have less skew and fewer partitions

2018-01-27 Thread Glen Takahashi (JIRA)
Glen Takahashi created SPARK-23249:
--

 Summary: Improve partition bin-filling algorithm to have less skew 
and fewer partitions
 Key: SPARK-23249
 URL: https://issues.apache.org/jira/browse/SPARK-23249
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.1
Reporter: Glen Takahashi


Change DataSourceScanExec so that when grouping blocks together into 
partitions, also checks the end of the sorted list of splits to more 
efficiently fill out partitions.

 
h2. Rationale

The current bin-packing method of next-fit descending for blocks into 
partitions is sub-optimal in a lot of cases and will result in extra 
partitions, un-even distribution of block-counts across partitions, and un-even 
distribution of partition sizes.

As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 
82 partitions with the current algorithm, but only 64 using this algorithm. 
Also in this example, the max # of blocks per partition in NFD is 13, while in 
this algorithm is is 2.

More generally, running a simulation of 1000 runs using 128MB blocksize, 
between 1-1000 normally distributed file sizes between 1-500Mb, you can see an 
improvement of approx 5% reduction of partition counts, and a large reduction 
in standard deviation of blocks per partition.

This algorithm also runs in O(n) time as NFD does, and in every case is 
strictly better results than NFD.

Overall, the more even distribution of blocks across partitions and therefore 
reduced partition counts should result in a small but significant performance 
increase across the board



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23246) (Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

2018-01-27 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342166#comment-16342166
 ] 

Sean Owen commented on SPARK-23246:
---

What's the memory leak -- in a heap dump, what objects are you saying are 
retained indefinitely?
You may be legitimately using that memory, like in data about completed stages 
for the UI. You may just need more driver memory or to turn down the number of 
remembered jobs, etc. That is I'm not sure this established any memory leak by 
itself.

> (Py)Spark OOM because of iteratively accumulated metadata that cannot be 
> cleared
> 
>
> Key: SPARK-23246
> URL: https://issues.apache.org/jira/browse/SPARK-23246
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core, SQL
>Affects Versions: 2.2.1
>Reporter: MBA Learns to Code
>Priority: Critical
>
> I am having consistent OOM crashes when trying to use PySpark for iterative 
> algorithms in which I create new DataFrames per iteration (e.g. by sampling 
> from a "mother" DataFrame), do something with such DataFrames, and never need 
> such DataFrames ever in future iterations.
> The below script simulates such OOM failures. Even when one tries explicitly 
> .unpersist() the temporary DataFrames (by using the --unpersist flag below) 
> and/or deleting and garbage-collecting the Python objects (by using the 
> --py-gc flag below), the Java objects seem to stay on and accumulate until 
> they exceed the JVM/driver memory.
> Please suggest how I may overcome this so that we can have long-running 
> iterative programs using Spark that uses resources only up to a bounded, 
> controllable limit.
>  
> {code:java}
> from __future__ import print_function
> import argparse
> import gc
> import pandas
> import pyspark
> arg_parser = argparse.ArgumentParser()
> arg_parser.add_argument('--unpersist', action='store_true')
> arg_parser.add_argument('--py-gc', action='store_true')
> arg_parser.add_argument('--n-partitions', type=int, default=1000)
> args = arg_parser.parse_args()
> # create SparkSession (*** set spark.driver.memory to 512m in 
> spark-defaults.conf ***)
> spark = pyspark.sql.SparkSession.builder \
> .config('spark.executor.instances', '2') \
> .config('spark.executor.cores', '2') \
> .config('spark.executor.memory', '512m') \
> .enableHiveSupport() \
> .getOrCreate()
> # create Parquet file for subsequent repeated loading
> df = spark.createDataFrame(
> pandas.DataFrame(
> dict(
> row=range(args.n_partitions),
> x=args.n_partitions * [0]
> )
> )
> )
> parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)
> df.write.parquet(
> path=parquet_path,
> partitionBy='row',
> mode='overwrite'
> )
> i = 0
> # the below loop simulates an iterative algorithm that creates new DataFrames 
> in each iteration (e.g. sampling from a "mother" DataFrame), do something, 
> and never need those DataFrames again in future iterations
> # we are having a problem cleaning up the built-up metadata
> # hence the program will crash after while because of OOM
> while True:
> _df = spark.read.parquet(parquet_path)
> if args.unpersist:
> _df.unpersist()
> if args.py_gc:
> del _df
> gc.collect()
> i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23190) Error when infering date columns

2018-01-27 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-23190.
---
Resolution: Not A Problem

> Error when infering date columns
> 
>
> Key: SPARK-23190
> URL: https://issues.apache.org/jira/browse/SPARK-23190
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1, 2.1.2, 2.2.1
>Reporter: Nacho García Fernández
>Priority: Major
>
> Hi.
> I'm trying to read the following file using the spark.sql read utility:
>  c1;c2;c3;c4;c5
> "+000.";"2";"x";"20001122";2000
> "-010.21";"2";"x";"20001122";2000
> "+113.34";"00";"v";"20001122";2000
> "+000.";"0";"a";"20001122";2000
>  
> I'm doing this in the spark-shell using the following command: 
>  
> {code:java}
> spark.sqlContext.read.option("inferSchema", "true").option("header", 
> "true").option("delimiter", 
> ";").option("timestampFormat","MMdd").csv("myfile.csv").printSchema
> {code}
> and I'm getting the following schema:
>  
> {code:java}
> root 
>  – c1: double (nullable = true)
>  – c2: integer (nullable = true)
>  – c3: string (nullable = true)
>  – c4: integer (nullable = true)
>  – c5: integer (nullable = true)
> {code}
>  
> As you can see, the column c4 is being infered as Integer, instead of 
> Timestamp. I think this is due to the order used in the following match 
> clause: 
> [https://github.com/apache/spark/blob/1c9f95cb771ac78775a77edd1abfeb2d8ae2a124/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L87]
>  Since my date  consists only of decimal values, it is being infered as 
> Integer.  Would be correct to change the order in the match clause and to 
> give preference to Timestamps? I think this is not good in terms of 
> performance, since all the interger values would be tried to be casted to 
> timestamps, but I also think that the current implementation is not valid for 
> dates with are only based on digits.
>  
>  
> Thanks in advance.
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23248) Relocate module docstrings to the top in PySpark examples

2018-01-27 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23248:


Assignee: (was: Apache Spark)

> Relocate module docstrings to the top in PySpark examples
> -
>
> Key: SPARK-23248
> URL: https://issues.apache.org/jira/browse/SPARK-23248
> Project: Spark
>  Issue Type: Improvement
>  Components: Examples, PySpark
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>Priority: Minor
>
> docstrings for some examples looks placed wrongly (under imports). For 
> example, if i run
> {code}
> >>> help(aft_survival_regression)
> {code}
> It shows as below:
> {code}
> Help on module aft_survival_regression:
> NAME
> aft_survival_regression
> ...
> DESCRIPTION
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> #
> ...
> (END)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23248) Relocate module docstrings to the top in PySpark examples

2018-01-27 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23248:


Assignee: Apache Spark

> Relocate module docstrings to the top in PySpark examples
> -
>
> Key: SPARK-23248
> URL: https://issues.apache.org/jira/browse/SPARK-23248
> Project: Spark
>  Issue Type: Improvement
>  Components: Examples, PySpark
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>Assignee: Apache Spark
>Priority: Minor
>
> docstrings for some examples looks placed wrongly (under imports). For 
> example, if i run
> {code}
> >>> help(aft_survival_regression)
> {code}
> It shows as below:
> {code}
> Help on module aft_survival_regression:
> NAME
> aft_survival_regression
> ...
> DESCRIPTION
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> #
> ...
> (END)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23248) Relocate module docstrings to the top in PySpark examples

2018-01-27 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342110#comment-16342110
 ] 

Apache Spark commented on SPARK-23248:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/20416

> Relocate module docstrings to the top in PySpark examples
> -
>
> Key: SPARK-23248
> URL: https://issues.apache.org/jira/browse/SPARK-23248
> Project: Spark
>  Issue Type: Improvement
>  Components: Examples, PySpark
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>Priority: Minor
>
> docstrings for some examples looks placed wrongly (under imports). For 
> example, if i run
> {code}
> >>> help(aft_survival_regression)
> {code}
> It shows as below:
> {code}
> Help on module aft_survival_regression:
> NAME
> aft_survival_regression
> ...
> DESCRIPTION
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> #
> ...
> (END)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23248) Relocate module docstrings to the top in PySpark examples

2018-01-27 Thread Hyukjin Kwon (JIRA)
Hyukjin Kwon created SPARK-23248:


 Summary: Relocate module docstrings to the top in PySpark examples
 Key: SPARK-23248
 URL: https://issues.apache.org/jira/browse/SPARK-23248
 Project: Spark
  Issue Type: Improvement
  Components: Examples, PySpark
Affects Versions: 2.3.0
Reporter: Hyukjin Kwon


docstrings for some examples looks placed wrongly (under imports). For example, 
if i run

{code}
>>> help(aft_survival_regression)
{code}

It shows as below:

{code}
Help on module aft_survival_regression:

NAME
aft_survival_regression

...

DESCRIPTION
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

...

(END)
{code}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21866) SPIP: Image support in Spark

2018-01-27 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342096#comment-16342096
 ] 

Hyukjin Kwon commented on SPARK-21866:
--

I don't think the user guide's done if I haven't missed some changes. ++1 for a 
the guide!

> SPIP: Image support in Spark
> 
>
> Key: SPARK-21866
> URL: https://issues.apache.org/jira/browse/SPARK-21866
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.2.0
>Reporter: Timothy Hunter
>Assignee: Ilya Matiach
>Priority: Major
>  Labels: SPIP
> Fix For: 2.3.0
>
> Attachments: SPIP - Image support for Apache Spark V1.1.pdf
>
>
> h2. Background and motivation
> As Apache Spark is being used more and more in the industry, some new use 
> cases are emerging for different data formats beyond the traditional SQL 
> types or the numerical types (vectors and matrices). Deep Learning 
> applications commonly deal with image processing. A number of projects add 
> some Deep Learning capabilities to Spark (see list below), but they struggle 
> to  communicate with each other or with MLlib pipelines because there is no 
> standard way to represent an image in Spark DataFrames. We propose to 
> federate efforts for representing images in Spark by defining a 
> representation that caters to the most common needs of users and library 
> developers.
> This SPIP proposes a specification to represent images in Spark DataFrames 
> and Datasets (based on existing industrial standards), and an interface for 
> loading sources of images. It is not meant to be a full-fledged image 
> processing library, but rather the core description that other libraries and 
> users can rely on. Several packages already offer various processing 
> facilities for transforming images or doing more complex operations, and each 
> has various design tradeoffs that make them better as standalone solutions.
> This project is a joint collaboration between Microsoft and Databricks, which 
> have been testing this design in two open source packages: MMLSpark and Deep 
> Learning Pipelines.
> The proposed image format is an in-memory, decompressed representation that 
> targets low-level applications. It is significantly more liberal in memory 
> usage than compressed image representations such as JPEG, PNG, etc., but it 
> allows easy communication with popular image processing libraries and has no 
> decoding overhead.
> h2. Targets users and personas:
> Data scientists, data engineers, library developers.
> The following libraries define primitives for loading and representing 
> images, and will gain from a common interchange format (in alphabetical 
> order):
> * BigDL
> * DeepLearning4J
> * Deep Learning Pipelines
> * MMLSpark
> * TensorFlow (Spark connector)
> * TensorFlowOnSpark
> * TensorFrames
> * Thunder
> h2. Goals:
> * Simple representation of images in Spark DataFrames, based on pre-existing 
> industrial standards (OpenCV)
> * This format should eventually allow the development of high-performance 
> integration points with image processing libraries such as libOpenCV, Google 
> TensorFlow, CNTK, and other C libraries.
> * The reader should be able to read popular formats of images from 
> distributed sources.
> h2. Non-Goals:
> Images are a versatile medium and encompass a very wide range of formats and 
> representations. This SPIP explicitly aims at the most common use case in the 
> industry currently: multi-channel matrices of binary, int32, int64, float or 
> double data that can fit comfortably in the heap of the JVM:
> * the total size of an image should be restricted to less than 2GB (roughly)
> * the meaning of color channels is application-specific and is not mandated 
> by the standard (in line with the OpenCV standard)
> * specialized formats used in meteorology, the medical field, etc. are not 
> supported
> * this format is specialized to images and does not attempt to solve the more 
> general problem of representing n-dimensional tensors in Spark
> h2. Proposed API changes
> We propose to add a new package in the package structure, under the MLlib 
> project:
> {{org.apache.spark.image}}
> h3. Data format
> We propose to add the following structure:
> imageSchema = StructType([
> * StructField("mode", StringType(), False),
> ** The exact representation of the data.
> ** The values are described in the following OpenCV convention. Basically, 
> the type has both "depth" and "number of channels" info: in particular, type 
> "CV_8UC3" means "3 channel unsigned bytes". BGRA format would be CV_8UC4 
> (value 32 in the table) with the channel order specified by convention.
> ** The exact channel ordering and meaning of each channel is dictated by 
> convention. By default, the order is RGB (3 channels) and