[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Gerard Maas (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861991#comment-16861991
 ] 

Gerard Maas commented on SPARK-28025:
-

I reproduced  the issue in a  spark-shell session:
{code:java}
 __
/ __/__ ___ _/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.3
/_/

scala> import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming._

scala> val hadoopConf = spark.sparkContext.hadoopConfiguration

scala> import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf

scala> SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key
res1: String = spark.sql.streaming.checkpointFileManagerClass

scala> hadoopConf.getSQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key)
res2: String = null

// mount point for the shared PVC: /storage
scala> val glusterCpfm = new org.apache.hadoop.fs.Path("/storage/crc-store")
glusterCpfm: org.apache.hadoop.fs.Path = /storage/crc-store

scala> val glusterfm = CheckpointFileManager.create(glusterCpfm, hadoopConf)
glusterfm: org.apache.spark.sql.execution.streaming.CheckpointFileManager = 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager@28d00f54

scala> glusterfm.isLocal
res17: Boolean = true

scala> glusterfm.mkdirs(glusterCpfm)

scala> val atomicFile = glusterfm.createAtomic(new 
org.apache.hadoop.fs.Path("/storage/crc-store/file.log"), false)
atomicFile: 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
 = 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@1c6e065

scala> atomicFile.writeChars("Hello, World")

scala> atomicFile.close

/**
* Inspect the file system
*
* $ cat file.log
* Hello, World
* $ ls -al
* total 5
* drwxr-sr-x. 2 jboss 2000 85 Jun 12 09:44 .
* drwxrwsr-x. 8 root 2000 4096 Jun 12 09:42 ..
* -rw-r--r--. 1 jboss 2000 12 Jun 12 09:44 
..file.log.c6f90863-77d2-494e-b1cc-0d0ed1344f74.tmp.crc
* -rw-r--r--. 1 jboss 2000 24 Jun 12 09:44 file.log
**/

// Delete the file -- simulate the operation done by the 
HDFSBackedStateStoreProvider#cleanup

scala> glusterfm.delete(new 
org.apache.hadoop.fs.Path("/storage/crc-store/file.log"))

/**
* Inspect the file system -> .crc file left behind
* $ ls -al
* total 9
* drwxr-sr-x. 2 jboss 2000 4096 Jun 12 09:46 .
* drwxrwsr-x. 8 root 2000 4096 Jun 12 09:42 ..
* -rw-r--r--. 1 jboss 2000 12 Jun 12 09:44 
..file.log.c6f90863-77d2-494e-b1cc-0d0ed1344f74.tmp.crc
**/
{code}

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

2019-06-12 Thread Gerard Maas (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-28025:

Summary: HDFSBackedStateStoreProvider should not leak .crc files   (was: 
HDFSBackedStateStoreProvider leaks .crc files )

> HDFSBackedStateStoreProvider should not leak .crc files 
> 
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider leaks .crc files

2019-06-12 Thread Gerard Maas (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861983#comment-16861983
 ] 

Gerard Maas commented on SPARK-28025:
-

Same problem. A different part of the code.

> HDFSBackedStateStoreProvider leaks .crc files 
> --
>
> Key: SPARK-28025
> URL: https://issues.apache.org/jira/browse/SPARK-28025
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
> `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>Reporter: Gerard Maas
>Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
> is leaving '.crc' files behind. There's a .crc file created for each 
> `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store 
> file system constantly increase in size and, in our case, deteriorates the 
> file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution 
> (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the 
> GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
> shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28025) HDFSBackedStateStoreProvider leaks .crc files

2019-06-12 Thread Gerard Maas (JIRA)
Gerard Maas created SPARK-28025:
---

 Summary: HDFSBackedStateStoreProvider leaks .crc files 
 Key: SPARK-28025
 URL: https://issues.apache.org/jira/browse/SPARK-28025
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.3
 Environment: Spark 2.4.3

Kubernetes 1.11(?) (OpenShift)

StateStore storage on a mounted PVC. Viewed as a local filesystem by the 
`FileContextBasedCheckpointFileManager` : 
{noformat}
scala> glusterfm.isLocal
res17: Boolean = true{noformat}
Reporter: Gerard Maas


The HDFSBackedStateStoreProvider when using the default CheckpointFileManager 
is leaving '.crc' files behind. There's a .crc file created for each 
`atomicFile` operation of the CheckpointFileManager.

Over time, the number of files becomes very large. It makes the state store 
file system constantly increase in size and, in our case, deteriorates the file 
system performance.

Here's a sample of one of our spark storage volumes after 2 days of execution 
(4 stateful streaming jobs, each on a different sub-dir):
 # 
{noformat}
Total files in PVC (used for checkpoints and state store)
$find . | wc -l
431796

# .crc files
$find . -name "*.crc" | wc -l
418053{noformat}

With each .crc file taking one storage block, the used storage runs into the 
GBs of data.

These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, 
shows serious performance deterioration with this large number of files:
{noformat}
DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24264) [Structured Streaming] Remove 'mergeSchema' option from Parquet source configuration

2018-05-27 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492046#comment-16492046
 ] 

Gerard Maas commented on SPARK-24264:
-

This scenario demonstrates the claims in this ticket:
 
Given PickUp and CashierOrder:
{{case class PickUp(orderId: String, waitDuration: Long)}}

{{case class Cashier(orderId: String, totalPrice: Float)}}
 
I create 2 parquet files with some random data following that schema
 
When I read the two files at the same time, using the batch API, we get:
 
{{val batchMerged = 
session.read.option("mergeSchema","true").parquet("/tmp/data/batch/")}}

{{batchMerged.schema}}

{{>StructType(StructField(orderId,StringType,true), 
StructField(waitDuration,LongType,true), 
StructField(totalPrice,FloatType,true))}}
 
Data in the resulting dataframe is null depending on the source file:
 
{code:java}
>batchMerged.show
+++--+
| orderId|waitDuration|totalPrice|
+++--+
| order_1| 498|  null|
| order_2| 819|  null|
| order_3| 576|  null|
| order_4| 741|  null|
{code}
 
The schema is merged as expected, the resulting dataset will contain nulls when 
the source does not contain the field.
e.g.:
{code:java}
batchMerged.where(!$"totalPrice".isNull).show
+++--+
| orderId|waitDuration|totalPrice|
+++--+
| order_1|null|  66.91808|
| order_2|null| 21.761215|
| order_3|null|1.8776|
| order_4|null| 45.613895|
| order_5|null| 62.664383|
| order_6|null|  78.24584|

{code}
 
In streaming mode, we must provide the schema at creation time. There's no 
schema inference:
 
{{val streamingMerged = 
session.readStream.option("mergeSchema","true").schema(schema).parquet("/tmp/data/stream")}}
When I materialize this stream to a table, I get:
 
{code:java}
val query = 
streamingMerged.writeStream.format("memory").queryName("parquet_merged").start()
val queryResultMerged = session.sql("select * from parquet_merged")
queryResultMerged.show
+++--+
| orderId|waitDuration|totalPrice|
+++--+
| order_1| 498|  null|
| order_2| 819|  null|
| order_3| 576|  null|
| order_4| 741|  null|
| order_5| 844|  null|

{code}
 
I put that flag to 'off':
 
{code:java}
val streamingNotMerged = 
session.readStream.option("mergeSchema","false").schema(schema).parquet("/tmp/data/stream")
... same materialization process...
+++--+
| orderId|waitDuration|totalPrice|
+++--+
| order_1| 498|  null|
| order_2| 819|  null|
| order_3| 576|  null|
| order_4| 741|  null|
{code}
When compared using subtraction:
{code:java}
val leftDiff = queryResultsNotMerged.except(queryResultMerged)
 
leftDiff.count
res58: Long = 0

val rightDiff = queryResultMerged.except(queryResultsNotMerged)
rightDiff.count
res61: Long = 0

{code}
 
There is no visible difference, except that Spark needs to do the schemaMerge 
when this flag is set to true, which I understood has an additional expense.

> [Structured Streaming] Remove 'mergeSchema' option from Parquet source 
> configuration
> 
>
> Key: SPARK-24264
> URL: https://issues.apache.org/jira/browse/SPARK-24264
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Gerard Maas
>Priority: Major
>  Labels: features, usability
>
> Looking into the Parquet format support for the File source in Structured 
> Streaming, the docs mention the use of the option 'mergeSchema' to merge the 
> schemas of the part files found.[1]
>  
> There seems to be no practical use of that configuration in a streaming 
> context.
>  
> In its batch counterpart, `mergeSchemas` would infer the schema superset of 
> the part-files found. 
>  
>  When using the File source + parquet format in streaming mode, we must 
> provide a schema to the readStream.schema(...) builder and that schema is 
> fixed for the duration of the stream.
>  
> My current understanding is that:
>  
> - Files containing a subset of the fields declared in the schema will render 
> null values for the non-existing fields.
> - For files containing a superset of the fields, the additional data fields 
> will be lost. 
> - Files not matching the schema set on the streaming source will render all 
> fields null for each record in the file.
>  
> It looks like 'mergeSchema' has no practical effect, although enabling it 
> might lead to additional processing to actually merge the Parquet schema of 
> the input files. 
>  

[jira] [Updated] (SPARK-24264) [Structured Streaming] Remove 'mergeSchema' option from Parquet source configuration

2018-05-13 Thread Gerard Maas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-24264:

Issue Type: Bug  (was: Improvement)

> [Structured Streaming] Remove 'mergeSchema' option from Parquet source 
> configuration
> 
>
> Key: SPARK-24264
> URL: https://issues.apache.org/jira/browse/SPARK-24264
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Gerard Maas
>Priority: Major
>  Labels: features, usability
>
> Looking into the Parquet format support for the File source in Structured 
> Streaming, the docs mention the use of the option 'mergeSchema' to merge the 
> schemas of the part files found.[1]
>  
> There seems to be no practical use of that configuration in a streaming 
> context.
>  
> In its batch counterpart, `mergeSchemas` would infer the schema superset of 
> the part-files found. 
>  
>  When using the File source + parquet format in streaming mode, we must 
> provide a schema to the readStream.schema(...) builder and that schema is 
> fixed for the duration of the stream.
>  
> My current understanding is that:
>  
> - Files containing a subset of the fields declared in the schema will render 
> null values for the non-existing fields.
> - For files containing a superset of the fields, the additional data fields 
> will be lost. 
> - Files not matching the schema set on the streaming source will render all 
> fields null for each record in the file.
>  
> It looks like 'mergeSchema' has no practical effect, although enabling it 
> might lead to additional processing to actually merge the Parquet schema of 
> the input files. 
>  
> I inquired on the dev+user mailing lists about any other behavior but I got 
> no responses.
>  
> From the user perspective, they may think that this option would help their 
> job cope with schema evolution at runtime, but that is also not the case. 
>  
> Looks like removing this option and leaving the value always set to false is 
> the reasonable thing to do.  
>  
> [1] 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala#L376]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24264) [Structured Streaming] Remove 'mergeSchema' option from Parquet source configuration

2018-05-13 Thread Gerard Maas (JIRA)
Gerard Maas created SPARK-24264:
---

 Summary: [Structured Streaming] Remove 'mergeSchema' option from 
Parquet source configuration
 Key: SPARK-24264
 URL: https://issues.apache.org/jira/browse/SPARK-24264
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Gerard Maas


Looking into the Parquet format support for the File source in Structured 
Streaming, the docs mention the use of the option 'mergeSchema' to merge the 
schemas of the part files found.[1]
 
There seems to be no practical use of that configuration in a streaming context.
 
In its batch counterpart, `mergeSchemas` would infer the schema superset of the 
part-files found. 
 
 When using the File source + parquet format in streaming mode, we must provide 
a schema to the readStream.schema(...) builder and that schema is fixed for the 
duration of the stream.
 
My current understanding is that:
 
- Files containing a subset of the fields declared in the schema will render 
null values for the non-existing fields.
- For files containing a superset of the fields, the additional data fields 
will be lost. 
- Files not matching the schema set on the streaming source will render all 
fields null for each record in the file.
 
It looks like 'mergeSchema' has no practical effect, although enabling it might 
lead to additional processing to actually merge the Parquet schema of the input 
files. 
 
I inquired on the dev+user mailing lists about any other behavior but I got no 
responses.
 
>From the user perspective, they may think that this option would help their 
>job cope with schema evolution at runtime, but that is also not the case. 
 
Looks like removing this option and leaving the value always set to false is 
the reasonable thing to do.  
 
[1] 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala#L376]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24202) Separate SQLContext dependency from SparkSession.implicits

2018-05-08 Thread Gerard Maas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-24202:

Description: 
The current implementation of the implicits in SparkSession passes the current 
active SQLContext to the SQLImplicits class. This implies that all usage of 
these (extremely helpful) implicits require the prior creation of a Spark 
Session instance.

Usage is typically done as follows:

 
{code:java}
val sparkSession = SparkSession.builder()
getOrCreate()
import sparkSession.implicits._
{code}
 

This is OK in user code, but it burdens the creation of library code that uses 
Spark, where  static imports for _Encoder_ support is required.

A simple example would be:

 
{code:java}
class SparkTransformation[In: Encoder, Out: Encoder] {
    def transform(ds: Dataset[In]): Dataset[Out]
}
{code}
 

Attempting to compile such code would result in the following exception:
{code:java}
Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing 
spark.implicits._  Support for serializing other types will be added in future 
releases.{code}
The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two 
utilities to transform _RDD_ and local collections into a _Dataset_.

These are 2 methods of the 46 implicit conversions offered by this class.

The request is to separate the two implicit methods that depend on the 
SQLContext instance creation into a separate class:
{code:java}
SQLImplicits#214-229
/**
 * Creates a [[Dataset]] from an RDD.
 *
 * @since 1.6.0
 */
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
 DatasetHolder(_sqlContext.createDataset(rdd))
}

/**
 * Creates a [[Dataset]] from a local Seq.
 * @since 1.6.0
 */
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] 
= {
 DatasetHolder(_sqlContext.createDataset(s))
}{code}
By separating the static methods from these two methods that depend on 
_sqlContext_ into  separate classes, we could provide static imports for all 
the other functionality and only require the instance-bound  implicits for the 
RDD and collection support (Which is an uncommon use case these days)

As this is potentially breaking the current interface, this might be a 
candidate for Spark 3.0. Although there's nothing stopping us from creating a 
separate hierarchy for the static encoders already. 

  was:
The current implementation of the implicits in SparkSession passes the current 
active SQLContext to the SQLImplicits class. This implies that all usage of 
these (extremely helpful) implicits require the prior creation of a Spark 
Session instance.

Usage is typically done as follows:

 
{code:java}
val sparkSession = SparkSession.builder()
getOrCreate()
import sparkSession.implicits._
{code}
 

This is OK in user code, but it burdens the creation of library code that uses 
Spark, where  static imports for _Encoder_ support is required.

A simple example would be:

 
{code:java}
class SparkTransformation[In: Encoder, Out: Encoder] {
    def transform(ds: Dataset[In]): Dataset[Out]
}
{code}
 

Attempting to compile such code would result in the following exception:

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing 
spark.implicits._  Support for serializing other types will be added in future 
releases.

The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two 
utilities to transform _RDD_ and local collections into a _Dataset_.

These are 2 methods of the 46 implicit conversions offered by this class.

The request is to separate the two implicit methods that depend on the instance 
creation into a separate class:
{code:java}
SQLImplicits#214-229
/**
 * Creates a [[Dataset]] from an RDD.
 *
 * @since 1.6.0
 */
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
 DatasetHolder(_sqlContext.createDataset(rdd))
}

/**
 * Creates a [[Dataset]] from a local Seq.
 * @since 1.6.0
 */
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] 
= {
 DatasetHolder(_sqlContext.createDataset(s))
}{code}
By separating the static methods from these two methods that depend on 
_sqlContext_ into  separate classes, we could provide static imports for all 
the other functionality and only require the instance-bound  implicits for the 
RDD and collection support (Which is an uncommon use case these days)

As this is potentially breaking the current interface, this might be a 
candidate for Spark 3.0. Although there's nothing stopping us from creating a 
separate hierarchy for the static encoders already. 


> Separate SQLContext dependency from SparkSession.implicits
> --
>
> Key: SPARK-24202
> 

[jira] [Updated] (SPARK-24202) Separate SQLContext dependency from SparkSession.implicits

2018-05-08 Thread Gerard Maas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-24202:

Summary: Separate SQLContext dependency from SparkSession.implicits  (was: 
Separate SQLContext dependencies from SparkSession.implicits)

> Separate SQLContext dependency from SparkSession.implicits
> --
>
> Key: SPARK-24202
> URL: https://issues.apache.org/jira/browse/SPARK-24202
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Gerard Maas
>Priority: Major
>
> The current implementation of the implicits in SparkSession passes the 
> current active SQLContext to the SQLImplicits class. This implies that all 
> usage of these (extremely helpful) implicits require the prior creation of a 
> Spark Session instance.
> Usage is typically done as follows:
>  
> {code:java}
> val sparkSession = SparkSession.builder()
> getOrCreate()
> import sparkSession.implicits._
> {code}
>  
> This is OK in user code, but it burdens the creation of library code that 
> uses Spark, where  static imports for _Encoder_ support is required.
> A simple example would be:
>  
> {code:java}
> class SparkTransformation[In: Encoder, Out: Encoder] {
>     def transform(ds: Dataset[In]): Dataset[Out]
> }
> {code}
>  
> Attempting to compile such code would result in the following exception:
> Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
> String, etc) and Product types (case classes) are supported by importing 
> spark.implicits._  Support for serializing other types will be added in 
> future releases.
> The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two 
> utilities to transform _RDD_ and local collections into a _Dataset_.
> These are 2 methods of the 46 implicit conversions offered by this class.
> The request is to separate the two implicit methods that depend on the 
> instance creation into a separate class:
> {code:java}
> SQLImplicits#214-229
> /**
>  * Creates a [[Dataset]] from an RDD.
>  *
>  * @since 1.6.0
>  */
> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = 
> {
>  DatasetHolder(_sqlContext.createDataset(rdd))
> }
> /**
>  * Creates a [[Dataset]] from a local Seq.
>  * @since 1.6.0
>  */
> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): 
> DatasetHolder[T] = {
>  DatasetHolder(_sqlContext.createDataset(s))
> }{code}
> By separating the static methods from these two methods that depend on 
> _sqlContext_ into  separate classes, we could provide static imports for all 
> the other functionality and only require the instance-bound  implicits for 
> the RDD and collection support (Which is an uncommon use case these days)
> As this is potentially breaking the current interface, this might be a 
> candidate for Spark 3.0. Although there's nothing stopping us from creating a 
> separate hierarchy for the static encoders already. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24202) Separate SQLContext dependencies from SparkSession.implicits

2018-05-07 Thread Gerard Maas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-24202:

Description: 
The current implementation of the implicits in SparkSession passes the current 
active SQLContext to the SQLImplicits class. This implies that all usage of 
these (extremely helpful) implicits require the prior creation of a Spark 
Session instance.

Usage is typically done as follows:

 
{code:java}
val sparkSession = SparkSession.builder()
build()
import sparkSession.implicits._
{code}
 

This is OK in user code, but it burdens the creation of library code that uses 
Spark, where  static imports for _Encoder_ support is required.

A simple example would be:

 
{code:java}
class SparkTransformation[In: Encoder, Out: Encoder] {
    def transform(ds: Dataset[In]): Dataset[Out]
}
{code}
 

Attempting to compile such code would result in the following exception:

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing 
spark.implicits._  Support for serializing other types will be added in future 
releases.

The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two 
utilities to transform _RDD_ and local collections into a _Dataset_.

These are 2 methods of the 46 implicit conversions offered by this class.

The request is to separate the two implicit methods that depend on the instance 
creation into a separate class:
{code:java}
SQLImplicits#214-229
/**
 * Creates a [[Dataset]] from an RDD.
 *
 * @since 1.6.0
 */
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
 DatasetHolder(_sqlContext.createDataset(rdd))
}

/**
 * Creates a [[Dataset]] from a local Seq.
 * @since 1.6.0
 */
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] 
= {
 DatasetHolder(_sqlContext.createDataset(s))
}{code}
By separating the static methods from these two methods that depend on 
_sqlContext_ into  separate classes, we could provide static imports for all 
the other functionality and only require the instance-bound  implicits for the 
RDD and collection support (Which is an uncommon use case these days)

As this is potentially breaking the current interface, this might be a 
candidate for Spark 3.0. Although there's nothing stopping us from creating a 
separate hierarchy for the static encoders already. 

  was:
The current implementation of the implicits in SparkSession passes the current 
active SQLContext to the SQLImplicits class. This implies that all usage of 
these (extremely helpful) implicits require the prior creation of a Spark 
Session instance.

Usage is typically done as follows:

 
{code:java}
val sparkSession = SessionBuilderbuild()
import sparkSession.implicits._
{code}
 

This is OK in user code, but it burdens the creation of library code that uses 
Spark, where  static imports for _Encoder_ support is required.

A simple example would be:

 
{code:java}
class SparkTransformation[In: Encoder, Out: Encoder] {
    def transform(ds: Dataset[In]): Dataset[Out]
}
{code}
 

Attempting to compile such code would result in the following exception:

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing 
spark.implicits._  Support for serializing other types will be added in future 
releases.

The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two 
utilities to transform _RDD_ and local collections into a _Dataset_.

These are 2 methods of the 46 implicit conversions offered by this class.

The request is to separate the two implicit methods that depend on the instance 
creation into a separate class:
{code:java}
SQLImplicits#214-229
/**
 * Creates a [[Dataset]] from an RDD.
 *
 * @since 1.6.0
 */
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
 DatasetHolder(_sqlContext.createDataset(rdd))
}

/**
 * Creates a [[Dataset]] from a local Seq.
 * @since 1.6.0
 */
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] 
= {
 DatasetHolder(_sqlContext.createDataset(s))
}{code}
By separating the static methods from these two methods that depend on 
_sqlContext_ into  separate classes, we could provide static imports for all 
the other functionality and only require the instance-bound  implicits for the 
RDD and collection support (Which is an uncommon use case these days)

As this is potentially breaking the current interface, this might be a 
candidate for Spark 3.0. Although there's nothing stopping us from creating a 
separate hierarchy for the static encoders already. 


> Separate SQLContext dependencies from SparkSession.implicits
> 
>
> Key: SPARK-24202
> URL: 

[jira] [Updated] (SPARK-24202) Separate SQLContext dependencies from SparkSession.implicits

2018-05-07 Thread Gerard Maas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-24202:

Description: 
The current implementation of the implicits in SparkSession passes the current 
active SQLContext to the SQLImplicits class. This implies that all usage of 
these (extremely helpful) implicits require the prior creation of a Spark 
Session instance.

Usage is typically done as follows:

 
{code:java}
val sparkSession = SparkSession.builder()
getOrCreate()
import sparkSession.implicits._
{code}
 

This is OK in user code, but it burdens the creation of library code that uses 
Spark, where  static imports for _Encoder_ support is required.

A simple example would be:

 
{code:java}
class SparkTransformation[In: Encoder, Out: Encoder] {
    def transform(ds: Dataset[In]): Dataset[Out]
}
{code}
 

Attempting to compile such code would result in the following exception:

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing 
spark.implicits._  Support for serializing other types will be added in future 
releases.

The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two 
utilities to transform _RDD_ and local collections into a _Dataset_.

These are 2 methods of the 46 implicit conversions offered by this class.

The request is to separate the two implicit methods that depend on the instance 
creation into a separate class:
{code:java}
SQLImplicits#214-229
/**
 * Creates a [[Dataset]] from an RDD.
 *
 * @since 1.6.0
 */
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
 DatasetHolder(_sqlContext.createDataset(rdd))
}

/**
 * Creates a [[Dataset]] from a local Seq.
 * @since 1.6.0
 */
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] 
= {
 DatasetHolder(_sqlContext.createDataset(s))
}{code}
By separating the static methods from these two methods that depend on 
_sqlContext_ into  separate classes, we could provide static imports for all 
the other functionality and only require the instance-bound  implicits for the 
RDD and collection support (Which is an uncommon use case these days)

As this is potentially breaking the current interface, this might be a 
candidate for Spark 3.0. Although there's nothing stopping us from creating a 
separate hierarchy for the static encoders already. 

  was:
The current implementation of the implicits in SparkSession passes the current 
active SQLContext to the SQLImplicits class. This implies that all usage of 
these (extremely helpful) implicits require the prior creation of a Spark 
Session instance.

Usage is typically done as follows:

 
{code:java}
val sparkSession = SparkSession.builder()
build()
import sparkSession.implicits._
{code}
 

This is OK in user code, but it burdens the creation of library code that uses 
Spark, where  static imports for _Encoder_ support is required.

A simple example would be:

 
{code:java}
class SparkTransformation[In: Encoder, Out: Encoder] {
    def transform(ds: Dataset[In]): Dataset[Out]
}
{code}
 

Attempting to compile such code would result in the following exception:

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing 
spark.implicits._  Support for serializing other types will be added in future 
releases.

The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two 
utilities to transform _RDD_ and local collections into a _Dataset_.

These are 2 methods of the 46 implicit conversions offered by this class.

The request is to separate the two implicit methods that depend on the instance 
creation into a separate class:
{code:java}
SQLImplicits#214-229
/**
 * Creates a [[Dataset]] from an RDD.
 *
 * @since 1.6.0
 */
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
 DatasetHolder(_sqlContext.createDataset(rdd))
}

/**
 * Creates a [[Dataset]] from a local Seq.
 * @since 1.6.0
 */
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] 
= {
 DatasetHolder(_sqlContext.createDataset(s))
}{code}
By separating the static methods from these two methods that depend on 
_sqlContext_ into  separate classes, we could provide static imports for all 
the other functionality and only require the instance-bound  implicits for the 
RDD and collection support (Which is an uncommon use case these days)

As this is potentially breaking the current interface, this might be a 
candidate for Spark 3.0. Although there's nothing stopping us from creating a 
separate hierarchy for the static encoders already. 


> Separate SQLContext dependencies from SparkSession.implicits
> 
>
> Key: SPARK-24202
> URL: 

[jira] [Created] (SPARK-24202) Separate SQLContext dependencies from SparkSession.implicits

2018-05-07 Thread Gerard Maas (JIRA)
Gerard Maas created SPARK-24202:
---

 Summary: Separate SQLContext dependencies from 
SparkSession.implicits
 Key: SPARK-24202
 URL: https://issues.apache.org/jira/browse/SPARK-24202
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Gerard Maas


The current implementation of the implicits in SparkSession passes the current 
active SQLContext to the SQLImplicits class. This implies that all usage of 
these (extremely helpful) implicits require the prior creation of a Spark 
Session instance.

Usage is typically done as follows:

 
{code:java}
val sparkSession = SessionBuilderbuild()
import sparkSession.implicits._
{code}
 

This is OK in user code, but it burdens the creation of library code that uses 
Spark, where  static imports for _Encoder_ support is required.

A simple example would be:

 
{code:java}
class SparkTransformation[In: Encoder, Out: Encoder] {
    def transform(ds: Dataset[In]): Dataset[Out]
}
{code}
 

Attempting to compile such code would result in the following exception:

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing 
spark.implicits._  Support for serializing other types will be added in future 
releases.

The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two 
utilities to transform _RDD_ and local collections into a _Dataset_.

These are 2 methods of the 46 implicit conversions offered by this class.

The request is to separate the two implicit methods that depend on the instance 
creation into a separate class:
{code:java}
SQLImplicits#214-229
/**
 * Creates a [[Dataset]] from an RDD.
 *
 * @since 1.6.0
 */
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
 DatasetHolder(_sqlContext.createDataset(rdd))
}

/**
 * Creates a [[Dataset]] from a local Seq.
 * @since 1.6.0
 */
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] 
= {
 DatasetHolder(_sqlContext.createDataset(s))
}{code}
By separating the static methods from these two methods that depend on 
_sqlContext_ into  separate classes, we could provide static imports for all 
the other functionality and only require the instance-bound  implicits for the 
RDD and collection support (Which is an uncommon use case these days)

As this is potentially breaking the current interface, this might be a 
candidate for Spark 3.0. Although there's nothing stopping us from creating a 
separate hierarchy for the static encoders already. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24046) Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond

2018-04-22 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447349#comment-16447349
 ] 

Gerard Maas commented on SPARK-24046:
-

In graphical form, this is what is happening when  rampUpTime >= rowsPerSecond 

(rampUpTime=10,rowsPerSecond=10) in this case

!image-2018-04-22-22-03-03-945.png!

(rampUpTime=60, rowsPerSecond=10)

 

!image-2018-04-22-22-06-49-202.png!

 

> Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond
> --
>
> Key: SPARK-24046
> URL: https://issues.apache.org/jira/browse/SPARK-24046
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
> Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4
> (Environment is not important, the issue lies in the rate calculation)
>Reporter: Gerard Maas
>Priority: Major
>  Labels: RateSource
> Attachments: image-2018-04-22-22-03-03-945.png, 
> image-2018-04-22-22-06-49-202.png
>
>
> When using the rate source in Structured streaming, the `rampUpTime` feature 
> fails to gradually increase the stream rate when the `rampUpTime` option is 
> equal or greater than `rowsPerSecond`. 
> When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain 
> 0 values. The rate jumps to  `rowsPerSecond` when `time>rampUpTime`.
> The following scenario, executed in the `spark-shell` demonstrates this issue:
> {code:java}
> // Using rampUpTime(10) > rowsPerSecond(5)  
> {code}
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 5)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58
> ---
> Batch: 0
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 1
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 2
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 3
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 4
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 5
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 6
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 7
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 8
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 9
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 10
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 11
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 0|
> |2018-04-22 17:08:...| 1|
> |2018-04-22 17:08:...| 2|
> |2018-04-22 17:08:...| 3|
> |2018-04-22 17:08:...| 4|
> ++-+
> ---
> Batch: 12
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 5|
> |2018-04-22 17:08:...| 6|
> |2018-04-22 17:08:...| 7|
> |2018-04-22 17:08:...| 8|
> |2018-04-22 17:08:...| 9|
> ++-+
> {code}
>  
> This scenario shows rowsPerSecond == rampUpTime,  which also fails
> {code:java}
> val stream 

[jira] [Updated] (SPARK-24046) Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond

2018-04-22 Thread Gerard Maas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-24046:

Attachment: image-2018-04-22-22-06-49-202.png

> Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond
> --
>
> Key: SPARK-24046
> URL: https://issues.apache.org/jira/browse/SPARK-24046
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
> Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4
> (Environment is not important, the issue lies in the rate calculation)
>Reporter: Gerard Maas
>Priority: Major
>  Labels: RateSource
> Attachments: image-2018-04-22-22-03-03-945.png, 
> image-2018-04-22-22-06-49-202.png
>
>
> When using the rate source in Structured streaming, the `rampUpTime` feature 
> fails to gradually increase the stream rate when the `rampUpTime` option is 
> equal or greater than `rowsPerSecond`. 
> When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain 
> 0 values. The rate jumps to  `rowsPerSecond` when `time>rampUpTime`.
> The following scenario, executed in the `spark-shell` demonstrates this issue:
> {code:java}
> // Using rampUpTime(10) > rowsPerSecond(5)  
> {code}
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 5)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58
> ---
> Batch: 0
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 1
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 2
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 3
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 4
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 5
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 6
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 7
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 8
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 9
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 10
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 11
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 0|
> |2018-04-22 17:08:...| 1|
> |2018-04-22 17:08:...| 2|
> |2018-04-22 17:08:...| 3|
> |2018-04-22 17:08:...| 4|
> ++-+
> ---
> Batch: 12
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 5|
> |2018-04-22 17:08:...| 6|
> |2018-04-22 17:08:...| 7|
> |2018-04-22 17:08:...| 8|
> |2018-04-22 17:08:...| 9|
> ++-+
> {code}
>  
> This scenario shows rowsPerSecond == rampUpTime,  which also fails
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: 

[jira] [Updated] (SPARK-24046) Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond

2018-04-22 Thread Gerard Maas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-24046:

Attachment: image-2018-04-22-22-03-03-945.png

> Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond
> --
>
> Key: SPARK-24046
> URL: https://issues.apache.org/jira/browse/SPARK-24046
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
> Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4
> (Environment is not important, the issue lies in the rate calculation)
>Reporter: Gerard Maas
>Priority: Major
>  Labels: RateSource
> Attachments: image-2018-04-22-22-03-03-945.png
>
>
> When using the rate source in Structured streaming, the `rampUpTime` feature 
> fails to gradually increase the stream rate when the `rampUpTime` option is 
> equal or greater than `rowsPerSecond`. 
> When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain 
> 0 values. The rate jumps to  `rowsPerSecond` when `time>rampUpTime`.
> The following scenario, executed in the `spark-shell` demonstrates this issue:
> {code:java}
> // Using rampUpTime(10) > rowsPerSecond(5)  
> {code}
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 5)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58
> ---
> Batch: 0
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 1
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 2
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 3
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 4
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 5
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 6
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 7
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 8
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 9
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 10
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 11
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 0|
> |2018-04-22 17:08:...| 1|
> |2018-04-22 17:08:...| 2|
> |2018-04-22 17:08:...| 3|
> |2018-04-22 17:08:...| 4|
> ++-+
> ---
> Batch: 12
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 5|
> |2018-04-22 17:08:...| 6|
> |2018-04-22 17:08:...| 7|
> |2018-04-22 17:08:...| 8|
> |2018-04-22 17:08:...| 9|
> ++-+
> {code}
>  
> This scenario shows rowsPerSecond == rampUpTime,  which also fails
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: org.apache.spark.sql.DataFrame = [timestamp: 

[jira] [Commented] (SPARK-24046) Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond

2018-04-22 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447347#comment-16447347
 ] 

Gerard Maas commented on SPARK-24046:
-

The problem seems to come from this integer division:

https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala#L110

> Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond
> --
>
> Key: SPARK-24046
> URL: https://issues.apache.org/jira/browse/SPARK-24046
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
> Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4
> (Environment is not important, the issue lies in the rate calculation)
>Reporter: Gerard Maas
>Priority: Major
>  Labels: RateSource
>
> When using the rate source in Structured streaming, the `rampUpTime` feature 
> fails to gradually increase the stream rate when the `rampUpTime` option is 
> equal or greater than `rowsPerSecond`. 
> When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain 
> 0 values. The rate jumps to  `rowsPerSecond` when `time>rampUpTime`.
> The following scenario, executed in the `spark-shell` demonstrates this issue:
> {code:java}
> // Using rampUpTime(10) > rowsPerSecond(5)  
> {code}
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 5)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58
> ---
> Batch: 0
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 1
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 2
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 3
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 4
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 5
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 6
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 7
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 8
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 9
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 10
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 11
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 0|
> |2018-04-22 17:08:...| 1|
> |2018-04-22 17:08:...| 2|
> |2018-04-22 17:08:...| 3|
> |2018-04-22 17:08:...| 4|
> ++-+
> ---
> Batch: 12
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 5|
> |2018-04-22 17:08:...| 6|
> |2018-04-22 17:08:...| 7|
> |2018-04-22 17:08:...| 8|
> |2018-04-22 17:08:...| 9|
> ++-+
> {code}
>  
> This scenario shows rowsPerSecond == rampUpTime,  which also fails
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .option("rampUpTime", 10)
> .load()
> val query = 

[jira] [Updated] (SPARK-24046) Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond

2018-04-22 Thread Gerard Maas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-24046:

Summary: Rate Source doesn't gradually increase rate when 
rampUpTime>=RowsPerSecond  (was: Rate Source does gradually increase rate when 
rampUpTime>=RowsPerSecond)

> Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond
> --
>
> Key: SPARK-24046
> URL: https://issues.apache.org/jira/browse/SPARK-24046
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
> Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4
> (Environment is not important, the issue lies in the rate calculation)
>Reporter: Gerard Maas
>Priority: Major
>  Labels: RateSource
>
> When using the rate source in Structured streaming, the `rampUpTime` feature 
> fails to gradually increase the stream rate when the `rampUpTime` option is 
> equal or greater than `rowsPerSecond`. 
> When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain 
> 0 values. The rate jumps to  `rowsPerSecond` when `time>rampUpTime`.
> The following scenario, executed in the `spark-shell` demonstrates this issue:
> {code:java}
> // Using rampUpTime(10) > rowsPerSecond(5)  
> {code}
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 5)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58
> ---
> Batch: 0
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 1
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 2
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 3
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 4
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 5
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 6
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 7
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 8
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 9
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 10
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 11
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 0|
> |2018-04-22 17:08:...| 1|
> |2018-04-22 17:08:...| 2|
> |2018-04-22 17:08:...| 3|
> |2018-04-22 17:08:...| 4|
> ++-+
> ---
> Batch: 12
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 5|
> |2018-04-22 17:08:...| 6|
> |2018-04-22 17:08:...| 7|
> |2018-04-22 17:08:...| 8|
> |2018-04-22 17:08:...| 9|
> ++-+
> {code}
>  
> This scenario shows rowsPerSecond == rampUpTime,  which also fails
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> 

[jira] [Updated] (SPARK-24046) Rate Source does gradually increase rate when rampUpTime>=RowsPerSecond

2018-04-22 Thread Gerard Maas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-24046:

Summary: Rate Source does gradually increase rate when 
rampUpTime>=RowsPerSecond  (was: rampUpTime in Rate Source does not work for 
rampUpTime>=RowsPerSecond)

> Rate Source does gradually increase rate when rampUpTime>=RowsPerSecond
> ---
>
> Key: SPARK-24046
> URL: https://issues.apache.org/jira/browse/SPARK-24046
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
> Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4
> (Environment is not important, the issue lies in the rate calculation)
>Reporter: Gerard Maas
>Priority: Major
>  Labels: RateSource
>
> When using the rate source in Structured streaming, the `rampUpTime` feature 
> fails to gradually increase the stream rate when the `rampUpTime` option is 
> equal or greater than `rowsPerSecond`. 
> When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain 
> 0 values. The rate jumps to  `rowsPerSecond` when `time>rampUpTime`.
> The following scenario, executed in the `spark-shell` demonstrates this issue:
> {code:java}
> // Using rampUpTime(10) > rowsPerSecond(5)  
> {code}
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 5)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58
> ---
> Batch: 0
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 1
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 2
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 3
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 4
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 5
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 6
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 7
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 8
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 9
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 10
> ---
> +-+-+
> |timestamp|value|
> +-+-+
> +-+-+
> ---
> Batch: 11
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 0|
> |2018-04-22 17:08:...| 1|
> |2018-04-22 17:08:...| 2|
> |2018-04-22 17:08:...| 3|
> |2018-04-22 17:08:...| 4|
> ++-+
> ---
> Batch: 12
> ---
> ++-+
> | timestamp|value|
> ++-+
> |2018-04-22 17:08:...| 5|
> |2018-04-22 17:08:...| 6|
> |2018-04-22 17:08:...| 7|
> |2018-04-22 17:08:...| 8|
> |2018-04-22 17:08:...| 9|
> ++-+
> {code}
>  
> This scenario shows rowsPerSecond == rampUpTime,  which also fails
> {code:java}
> val stream = spark.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .option("rampUpTime", 10)
> .load()
> val query = stream.writeStream.format("console").start()
> // Exiting paste mode, now interpreting.
> stream: 

[jira] [Created] (SPARK-24046) rampUpTime in Rate Source does not work for rampUpTime>=RowsPerSecond

2018-04-22 Thread Gerard Maas (JIRA)
Gerard Maas created SPARK-24046:
---

 Summary: rampUpTime in Rate Source does not work for 
rampUpTime>=RowsPerSecond
 Key: SPARK-24046
 URL: https://issues.apache.org/jira/browse/SPARK-24046
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.3.0
 Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4

(Environment is not important, the issue lies in the rate calculation)
Reporter: Gerard Maas


When using the rate source in Structured streaming, the `rampUpTime` feature 
fails to gradually increase the stream rate when the `rampUpTime` option is 
equal or greater than `rowsPerSecond`. 

When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain 0 
values. The rate jumps to  `rowsPerSecond` when `time>rampUpTime`.

The following scenario, executed in the `spark-shell` demonstrates this issue:
{code:java}
// Using rampUpTime(10) > rowsPerSecond(5)  
{code}
{code:java}
val stream = spark.readStream
.format("rate")
.option("rowsPerSecond", 5)
.option("rampUpTime", 10)
.load()

val query = stream.writeStream.format("console").start()

// Exiting paste mode, now interpreting.

stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
query: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58


---
Batch: 0
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 1
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 2
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 3
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 4
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 5
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 6
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 7
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 8
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 9
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 10
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 11
---
++-+
| timestamp|value|
++-+
|2018-04-22 17:08:...| 0|
|2018-04-22 17:08:...| 1|
|2018-04-22 17:08:...| 2|
|2018-04-22 17:08:...| 3|
|2018-04-22 17:08:...| 4|
++-+

---
Batch: 12
---
++-+
| timestamp|value|
++-+
|2018-04-22 17:08:...| 5|
|2018-04-22 17:08:...| 6|
|2018-04-22 17:08:...| 7|
|2018-04-22 17:08:...| 8|
|2018-04-22 17:08:...| 9|
++-+

{code}
 

This scenario shows rowsPerSecond == rampUpTime,  which also fails
{code:java}
val stream = spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.option("rampUpTime", 10)
.load()

val query = stream.writeStream.format("console").start()

// Exiting paste mode, now interpreting.

stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
query: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@149ef64a

scala> ---
Batch: 0
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 1
---
+-+-+
|timestamp|value|
+-+-+
+-+-+

---
Batch: 2
---
+-+-+

[jira] [Commented] (SPARK-21710) ConsoleSink causes OOM crashes with large inputs.

2017-08-11 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16123485#comment-16123485
 ] 

Gerard Maas commented on SPARK-21710:
-

PR: https://github.com/apache/spark/pull/18923

> ConsoleSink causes OOM crashes with large inputs.
> -
>
> Key: SPARK-21710
> URL: https://issues.apache.org/jira/browse/SPARK-21710
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: affects all environments
>Reporter: Gerard Maas
>  Labels: easyfix
>
> ConsoleSink does a full collect of the streaming dataset in order to show few 
> lines on screen. This is problematic with large inputs, like a kafka backlog 
> or a file source with files larger than the driver's memory.
> Here's an example:
> {code:java}
> import spark.implicits._
> import org.apache.spark.sql.functions
> import org.apache.spark.sql.types.StructType
> import org.apache.spark.sql.types._
> val schema = StructType(StructField("text", StringType, true) :: Nil)
> val lines = spark
>   .readStream
>   .format("text")
>   .option("path", "/tmp/data")
>   .schema(schema)
>   .load()
> val base = lines.writeStream
>   .outputMode("append")
>   .format("console")
>   .start()
> {code}
> When a large file larger than the available driver memory is fed through this 
> streaming job, we get:
> {code:java}
> ---
> Batch: 0
> ---
> [Stage 0:>(0 + 8) / 
> 111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID 
> 6)
> java.lang.OutOfMemoryError: Java heap space
>   at java.util.Arrays.copyOf(Arrays.java:3236)
>   at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>   at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>   at 
> net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
>   at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
>   at java.io.DataOutputStream.write(DataOutputStream.java:107)
>   at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:237)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> 17/08/11 15:10:45 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker for task 6,5,main]
> java.lang.OutOfMemoryError: Java heap space
> {code}
> This issue can be traced back to a `collect` on the source `DataFrame`:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala#L52
> A fairly simple solution would be to do a `take(numRows)` instead of the 
> collect. (PR in progress)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21710) ConsoleSink causes OOM crashes with large inputs.

2017-08-11 Thread Gerard Maas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-21710:

Description: 
ConsoleSink does a full collect of the streaming dataset in order to show few 
lines on screen. This is problematic with large inputs, like a kafka backlog or 
a file source with files larger than the driver's memory.

Here's an example:

{code:java}
import spark.implicits._
import org.apache.spark.sql.functions
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._

val schema = StructType(StructField("text", StringType, true) :: Nil)

val lines = spark
  .readStream
  .format("text")
  .option("path", "/tmp/data")
  .schema(schema)
  .load()

val base = lines.writeStream
  .outputMode("append")
  .format("console")
  .start()
{code}

When a large file larger than the available driver memory is fed through this 
streaming job, we get:

{code:java}
---
Batch: 0
---

[Stage 0:>(0 + 8) / 
111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID 6)
java.lang.OutOfMemoryError: Java heap space
  at java.util.Arrays.copyOf(Arrays.java:3236)
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
  at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
  at 
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
  at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
  at java.io.DataOutputStream.write(DataOutputStream.java:107)
  at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:237)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
17/08/11 15:10:45 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker for task 6,5,main]
java.lang.OutOfMemoryError: Java heap space
{code}

This issue can be traced back to a `collect` on the source `DataFrame`:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala#L52

A fairly simple solution would be to do a `take(numRows)` instead of the 
collect. (PR in progress)

  was:
ConsoleSink does a full collect of the streaming dataset in order to show few 
lines on screen. This is problematic with large inputs, like a kafka backlog or 
a file source with files larger than the driver's memory.

Here's an example:

{code:scala}
import spark.implicits._
import org.apache.spark.sql.functions
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._

val schema = StructType(StructField("text", StringType, true) :: Nil)

val lines = spark
  .readStream
  .format("text")
  .option("path", "/tmp/data")
  .schema(schema)
  .load()

val base = lines.writeStream
  .outputMode("append")
  .format("console")
  .start()
{code}

When a large file larger than the available driver memory is fed through this 
streaming job, we get:

{code:java}
---
Batch: 0
---

[Stage 0:>(0 + 8) / 
111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID 6)
java.lang.OutOfMemoryError: Java heap space
  at java.util.Arrays.copyOf(Arrays.java:3236)
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
  at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
  at 
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
  at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
  at java.io.DataOutputStream.write(DataOutputStream.java:107)
  at 

[jira] [Created] (SPARK-21710) ConsoleSink causes OOM crashes with large inputs.

2017-08-11 Thread Gerard Maas (JIRA)
Gerard Maas created SPARK-21710:
---

 Summary: ConsoleSink causes OOM crashes with large inputs.
 Key: SPARK-21710
 URL: https://issues.apache.org/jira/browse/SPARK-21710
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.2.0
 Environment: affects all environments
Reporter: Gerard Maas


ConsoleSink does a full collect of the streaming dataset in order to show few 
lines on screen. This is problematic with large inputs, like a kafka backlog or 
a file source with files larger than the driver's memory.

Here's an example:

{code:scala}
import spark.implicits._
import org.apache.spark.sql.functions
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._

val schema = StructType(StructField("text", StringType, true) :: Nil)

val lines = spark
  .readStream
  .format("text")
  .option("path", "/tmp/data")
  .schema(schema)
  .load()

val base = lines.writeStream
  .outputMode("append")
  .format("console")
  .start()
{code}

When a large file larger than the available driver memory is fed through this 
streaming job, we get:

{code:java}
---
Batch: 0
---

[Stage 0:>(0 + 8) / 
111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID 6)
java.lang.OutOfMemoryError: Java heap space
  at java.util.Arrays.copyOf(Arrays.java:3236)
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
  at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
  at 
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
  at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
  at java.io.DataOutputStream.write(DataOutputStream.java:107)
  at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:237)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
17/08/11 15:10:45 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Executor task launch worker for task 6,5,main]
java.lang.OutOfMemoryError: Java heap space
{code}

This issue can be traced back to a `collect` on the source `DataFrame`:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala#L52

A fairly simple solution would be to do a `take(numRows)` instead of the 
collect. (PR in progress)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast

2016-01-25 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114906#comment-15114906
 ] 

Gerard Maas commented on SPARK-3958:


[~pwendell] [~joshrosen] We just hit this bug in one of our production jobs 
using Spark Streaming 1.4.1. Each task spawned by the streaming job fails down 
the road.
This jobs has been working fine for months, so I'm not clear on whether we can 
narrow down the conditions to reproduce it.

Here's the exception:

{code}
[Stage 16049:(0 + 0) / 24][Stage 16056:(0 + 0) / 24][Stage 16058:(0 + 0) / 
24]Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
to stage failure: Task 23 in stage 17478.0 failed 6 times, most recent failure: 
Lost task 23.5 in stage 17478.0 (TID 172352, dnode-6.hdfs.private): 
java.io.IOException: PARSING_ERROR(2)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
at 
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:358)
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:387)
at 
java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2296)
at 
java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2589)
at 
java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2599)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at 
org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
at 
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:200)
at 
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:197)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127)
at 
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at 

[jira] [Created] (SPARK-8009) [Mesos] Allow provisioning of executor logging configuration

2015-06-01 Thread Gerard Maas (JIRA)
Gerard Maas created SPARK-8009:
--

 Summary: [Mesos] Allow provisioning of executor logging 
configuration 
 Key: SPARK-8009
 URL: https://issues.apache.org/jira/browse/SPARK-8009
 Project: Spark
  Issue Type: Improvement
  Components: Mesos
Affects Versions: 1.3.1
 Environment: Mesos executor
Reporter: Gerard Maas


It's currently not possible to provide a custom logging configuration for the 
Mesos executors. 
Upon startup of the executor JVM, it loads a default config file from the Spark 
assembly, visible by this line in stderr: 

 Using Spark's default log4j profile: 
 org/apache/spark/log4j-defaults.properties

That line comes from Logging.scala [1] where a default config is loaded if none 
is found in the classpath upon the startup of the Spark Mesos executor in the 
Mesos sandbox. At that point in time, none of the application-specific 
resources have been shipped yet, as the executor JVM is just starting up.  

To load a custom configuration file we should have it already on the sandbox 
before the executor JVM starts and add it to the classpath on the startup 
command.

For the classpath customization, It looks like it should be possible to pass a 
-Dlog4j.configuration  property by using the 'spark.executor.extraClassPath' 
that will be picked up at [2] and that should be added to the command that 
starts the executor JVM, but the resource must be already on the host before we 
can do that. Therefore we need some means of 'shipping' the log4j.configuration 
file to the allocated executor.

This all boils down to the need of shipping extra files to the sandbox. 

There's a workaround: open up the Spark assembly, replace the 
log4j-default.properties and pack it up again.  That would work, although kind 
of rudimentary as people may use the same assembly for many jobs.  Probably, 
accessing the log4j API programmatically should also work (we didn't try that 
yet)

[1] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Logging.scala#L128
[2] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L77




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5095) Support launching multiple mesos executors in coarse grained mesos mode

2015-01-14 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14276934#comment-14276934
 ] 

Gerard Maas commented on SPARK-5095:


Great. I left you some comments on the docs. I'll try to give it a spin. 
Probably on Friday.

 Support launching multiple mesos executors in coarse grained mesos mode
 ---

 Key: SPARK-5095
 URL: https://issues.apache.org/jira/browse/SPARK-5095
 Project: Spark
  Issue Type: Improvement
  Components: Mesos
Reporter: Timothy Chen

 Currently in coarse grained mesos mode, it's expected that we only launch one 
 Mesos executor that launches one JVM process to launch multiple spark 
 executors.
 However, this become a problem when the JVM process launched is larger than 
 an ideal size (30gb is recommended value from databricks), which causes GC 
 problems reported on the mailing list.
 We should support launching mulitple executors when large enough resources 
 are available for spark to use, and these resources are still under the 
 configured limit.
 This is also applicable when users want to specifiy number of executors to be 
 launched on each node



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5095) Support launching multiple mesos executors in coarse grained mesos mode

2015-01-09 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14271508#comment-14271508
 ] 

Gerard Maas commented on SPARK-5095:


If I understand correctly, setting `spark.mesos.coarse.executors.max = 1` will 
restrict a single executor per node, effectively forcing an allocation of  
spark.cores.max/spark.mesos.coarse.cores.max  nodes. This indeed improves 
https://issues.apache.org/jira/browse/SPARK-4940


 Support launching multiple mesos executors in coarse grained mesos mode
 ---

 Key: SPARK-5095
 URL: https://issues.apache.org/jira/browse/SPARK-5095
 Project: Spark
  Issue Type: Improvement
  Components: Mesos
Reporter: Timothy Chen

 Currently in coarse grained mesos mode, it's expected that we only launch one 
 Mesos executor that launches one JVM process to launch multiple spark 
 executors.
 However, this become a problem when the JVM process launched is larger than 
 an ideal size (30gb is recommended value from databricks), which causes GC 
 problems reported on the mailing list.
 We should support launching mulitple executors when large enough resources 
 are available for spark to use, and these resources are still under the 
 configured limit.
 This is also applicable when users want to specifiy number of executors to be 
 launched on each node



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4940) Support more evenly distributing cores for Mesos mode

2015-01-08 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268440#comment-14268440
 ] 

Gerard Maas edited comment on SPARK-4940 at 1/8/15 9:21 AM:


Hi Tim,

We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes 
much sense for Spark Streaming.

Here're few examples of resource allocation. They are taken from several runs 
of the same job with identical configuration:
Job config:
spark.cores.max = 18
spark.mesos.coarse = true
spark.executor.memory = 4g

The job logic will start 6 Kafka receivers.

#1
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 4 |  4GB | 3  | 2  |
| 2 | 6 |  4GB | 2  | 1  | 
| 3 | 7 | 4GB  | 3  | 2  |
| 4 | 1 | 4GB | 1 | 1 |

Total mem: 16 GB
Total CPUs: 18

Observations: 
Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process 
the received data, so all data received needs to be sent to other node for 
non-local processing  (not sure how replication helps or not in this case, the 
blocks of data are processed on other nodes). Also the nodes with 2 streaming 
receivers have higher load that the node with 1 receiver.

#2
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 7 |  4GB | 7  | 4  |
| 2 | 2 |  4GB | 2  | 2  | 

Total mem: 8 GB
Total CPUs: 9

Observations: 
This is the worst configuration of the day. Totally unbalanced (4 vs 2 
receivers) and for some reason, the job didn't get all the resources assigned 
in the configuration. The job processing time is also slower as there're less 
cores to handle the data and less overall memory.

#3
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 3 |  4GB | 3  | 2  |
| 2 | 8 |  4GB | 2  | 2  | 
| 3 | 7 | 4GB  | 3  | 2  |

Total mem: 12GB
Total CPU: 18

Observations: 
This is a fairly good configuration with a more evenly distributed receivers 
and CPUs although there's one  considerable smaller node in terms of CPU 
assignment.
 
We can observe that the current resource assignment policy results in less than 
ideal and in particular random assignments that have a strong impact on the job 
execution and performance. Given that CPU allocation is by executor (and not by 
job), makes total memory for the job variable as it can get 2 to 4 executors 
assigned. It's also weird and unexpected to observe less than max CPU 
allocations.
Here's a performance chart of the same job jumping from one config to another 
(*):  3 nodes (left of the spike)  and  2 nodes (right):
 
!mesos-config-difference-3nodes-vs-2nodes.png!
(chart line: processing time in ms, load is fairly constant, higher is worst. 
Note how the job performance is degraded)

(*) for some reason we didn't find yet, Mesos often kills the job. When 
Marathon relaunches it, it results in a different resource assignment.


was (Author: gmaas):
Hi Tim,

We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes 
much sense for Spark Streaming.

Here're few examples of resource allocation. They are taken from several runs 
of the same job with identical configuration:
Job config:
spark.cores.max = 18
spark.mesos.coarse = true
spark.executor.memory = 4g

The job logic will start 6 Kafka receivers.

#1
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 4 |  4GB | 3  | 2  |
| 2 | 6 |  4GB | 2  | 1  | 
| 3 | 7 | 4GB  | 3  | 2  |
| 4 | 1 | 4GB | 1 | 1 |

Total mem: 16 GB
Total CPUs: 18

Observations: 
Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process 
the received data, so all data received needs to be sent to other node for 
non-local processing  (not sure how replication helps or not in this case, the 
blocks of data are processed on other nodes). Also the nodes with 2 streaming 
receivers have higher load that the node with 1 receiver.

#2
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 7 |  4GB | 7  | 4  |
| 2 | 2 |  4GB | 2  | 2  | 

Total mem: 8 GB
Total CPUs: 9

Observations: 
This is the worst configuration of the day. Totally unbalanced (4 vs 2 
receivers) and for some reason, the job didn't get all the resources assigned 
in the configuration. The job processing time is also slower as there're less 
cores to handle the data and less overall memory.

#3
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 3 |  4GB | 3  | 2  |
| 2 | 8 |  4GB | 2  | 2  | 
| 3 | 7 | 4GB  | 3  | 2  |

Total mem: 12GB
Total CPU: 18

Observations: 
This is a fairly good configuration with a more evenly distributed receivers 
and CPUs although there's one  considerable smaller node in terms of CPU 
assignment.
 
We can observe that the current resource assignment policy results in less than 
ideal and in particular random assignments that have a strong impact on the job 

[jira] [Comment Edited] (SPARK-4940) Support more evenly distributing cores for Mesos mode

2015-01-08 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14269038#comment-14269038
 ] 

Gerard Maas edited comment on SPARK-4940 at 1/8/15 9:28 AM:


I forgot to mention that, in the previous example, the ideal resource 
assignment would be:

|| Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 3 |  4GB | 2  | 1  |
| 3 |  4GB | 2  | 1  | 
| 3 | 4GB  | 2  | 1  |
| 3 | 4GB | 2 | 1 |
| 3 | 4GB | 2 | 1 |
| 3 | 4GB | 2 | 1 |

Total resources: 18 CPUs, 24GB

This is an evenly distributed number CPUs and receivers over physical nodes to 
maximize the spread of network receivers over nodes.


was (Author: gmaas):
I forgot to mention that in the previous example, the ideal resource assignment 
would be:

|| Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 3 |  4GB | 2  | 1  |
| 3 |  4GB | 2  | 1  | 
| 3 | 4GB  | 2  | 1  |
| 3 | 4GB | 2 | 1 |
| 3 | 4GB | 2 | 1 |
| 3 | 4GB | 2 | 1 |

Total resources: 18 CPUs, 24GB

This is an evenly distributed number CPUs and receivers over physical nodes to 
maximize the spread of network receivers over nodes.

 Support more evenly distributing cores for Mesos mode
 -

 Key: SPARK-4940
 URL: https://issues.apache.org/jira/browse/SPARK-4940
 Project: Spark
  Issue Type: Improvement
  Components: Mesos
Reporter: Timothy Chen
 Attachments: mesos-config-difference-3nodes-vs-2nodes.png


 Currently in Coarse grain mode the spark scheduler simply takes all the 
 resources it can on each node, but can cause uneven distribution based on 
 resources available on each slave.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4940) Support more evenly distributing cores for Mesos mode

2015-01-08 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14269038#comment-14269038
 ] 

Gerard Maas commented on SPARK-4940:


I forgot to mention that in the previous example, the ideal resource assignment 
would be:

|| Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 3 |  4GB | 2  | 1  |
| 3 |  4GB | 2  | 1  | 
| 3 | 4GB  | 2  | 1  |
| 3 | 4GB | 2 | 1 |
| 3 | 4GB | 2 | 1 |
| 3 | 4GB | 2 | 1 |

Total resources: 18 CPUs, 24GB

This is an evenly distributed number CPUs and receivers over physical nodes to 
maximize the spread of network receivers over nodes.

 Support more evenly distributing cores for Mesos mode
 -

 Key: SPARK-4940
 URL: https://issues.apache.org/jira/browse/SPARK-4940
 Project: Spark
  Issue Type: Improvement
  Components: Mesos
Reporter: Timothy Chen
 Attachments: mesos-config-difference-3nodes-vs-2nodes.png


 Currently in Coarse grain mode the spark scheduler simply takes all the 
 resources it can on each node, but can cause uneven distribution based on 
 resources available on each slave.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-4940) Support more evenly distributing cores for Mesos mode

2015-01-08 Thread Gerard Maas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerard Maas updated SPARK-4940:
---
Attachment: mesos-config-difference-3nodes-vs-2nodes.png

Difference of job performance due to changing Mesos resource allocation.

 Support more evenly distributing cores for Mesos mode
 -

 Key: SPARK-4940
 URL: https://issues.apache.org/jira/browse/SPARK-4940
 Project: Spark
  Issue Type: Improvement
  Components: Mesos
Reporter: Timothy Chen
 Attachments: mesos-config-difference-3nodes-vs-2nodes.png


 Currently in Coarse grain mode the spark scheduler simply takes all the 
 resources it can on each node, but can cause uneven distribution based on 
 resources available on each slave.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4940) Support more evenly distributing cores for Mesos mode

2015-01-07 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268440#comment-14268440
 ] 

Gerard Maas commented on SPARK-4940:


Hi Tim,

We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes 
much sense for Spark Streaming.

Here're few examples of resource allocation. They are taken from several runs 
of the same job with identical configuration:
Job config:
spark.cores.max = 18
spark.mesos.coarse = true
spark.executor.memory = 4g

The job logic will start 6 Kafka receivers.

#1
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 4 |  4GB | 3  | 2  |
| 2 | 6 |  4GB | 2  | 1  | 
| 3 | 7 | 4GB  | 3  | 2  |
| 4 | 1 | 4GB | 1 | 1 |

Total mem: 16 GB
Total CPUs: 18

Observations: 
Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process 
the received data, so all data received needs to be sent to other node for 
non-local processing  (not sure how replication helps or not in this case, the 
blocks of data are processed on other nodes). Also the nodes with 2 streaming 
receivers have higher load that the node with 1 receiver.

#2
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 7 |  4GB | 7  | 4  |
| 2 | 2 |  4GB | 2  | 2  | 

Total mem: 8 GB
Total CPUs: 9

Observations: 
This is the worst configuration of the day. Totally unbalanced (4 vs 2 
receivers) and for some reason, the job didn't get all the resources assigned 
in the configuration. The job processing time is also slower as there're less 
cores to handle the data and less overall memory.

#3
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 3 |  4GB | 3  | 2  |
| 2 | 8 |  4GB | 2  | 2  | 
| 3 | 7 | 4GB  | 3  | 2  |

Total mem: 12GB
Total CPU: 18

Observations: 
This is a fairly good configuration with a more evenly distributed receivers 
and CPUs although there's one  considerable smaller node in terms of CPU 
assignment.
 
We can observe that the current resource assignment policy results in less than 
ideal and in particular random assignments that have a strong impact on the job 
execution and performance. Given that CPU allocation is by executor (and not by 
job), makes total memory for the job variable as it can get 2 to 4 executors 
assigned. It's also weird and unexpected to observe less than max CPU 
allocations.
Here's a performance chart of the same job across two configurations, one with 
3 (left) nodes and one with 2 (right): 
!https://lh3.googleusercontent.com/Z1C71OKoQzGA13uNJ8Yvf_xz_glRUqU_IGGvLsfkPvUPK2lahrEatweiWl-PDDfysjXtbs1Sl_k=w1682-h689!
(chart line: processing time in ms, load is fairly constant)

 Support more evenly distributing cores for Mesos mode
 -

 Key: SPARK-4940
 URL: https://issues.apache.org/jira/browse/SPARK-4940
 Project: Spark
  Issue Type: Improvement
  Components: Mesos
Reporter: Timothy Chen

 Currently in Coarse grain mode the spark scheduler simply takes all the 
 resources it can on each node, but can cause uneven distribution based on 
 resources available on each slave.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4940) Support more evenly distributing cores for Mesos mode

2015-01-07 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268440#comment-14268440
 ] 

Gerard Maas edited comment on SPARK-4940 at 1/7/15 10:54 PM:
-

Hi Tim,

We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes 
much sense for Spark Streaming.

Here're few examples of resource allocation. They are taken from several runs 
of the same job with identical configuration:
Job config:
spark.cores.max = 18
spark.mesos.coarse = true
spark.executor.memory = 4g

The job logic will start 6 Kafka receivers.

#1
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 4 |  4GB | 3  | 2  |
| 2 | 6 |  4GB | 2  | 1  | 
| 3 | 7 | 4GB  | 3  | 2  |
| 4 | 1 | 4GB | 1 | 1 |

Total mem: 16 GB
Total CPUs: 18

Observations: 
Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process 
the received data, so all data received needs to be sent to other node for 
non-local processing  (not sure how replication helps or not in this case, the 
blocks of data are processed on other nodes). Also the nodes with 2 streaming 
receivers have higher load that the node with 1 receiver.

#2
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 7 |  4GB | 7  | 4  |
| 2 | 2 |  4GB | 2  | 2  | 

Total mem: 8 GB
Total CPUs: 9

Observations: 
This is the worst configuration of the day. Totally unbalanced (4 vs 2 
receivers) and for some reason, the job didn't get all the resources assigned 
in the configuration. The job processing time is also slower as there're less 
cores to handle the data and less overall memory.

#3
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 3 |  4GB | 3  | 2  |
| 2 | 8 |  4GB | 2  | 2  | 
| 3 | 7 | 4GB  | 3  | 2  |

Total mem: 12GB
Total CPU: 18

Observations: 
This is a fairly good configuration with a more evenly distributed receivers 
and CPUs although there's one  considerable smaller node in terms of CPU 
assignment.
 
We can observe that the current resource assignment policy results in less than 
ideal and in particular random assignments that have a strong impact on the job 
execution and performance. Given that CPU allocation is by executor (and not by 
job), makes total memory for the job variable as it can get 2 to 4 executors 
assigned. It's also weird and unexpected to observe less than max CPU 
allocations.
Here's a performance chart of the same job jumping from one config to another 
(*):  3 nodes (left of the spike)  and  2 nodes (right):
 
!https://lh3.googleusercontent.com/Z1C71OKoQzGA13uNJ8Yvf_xz_glRUqU_IGGvLsfkPvUPK2lahrEatweiWl-PDDfysjXtbs1Sl_k=w1682-h689!
(chart line: processing time in ms, load is fairly constant, higher is worst. 
Note how the job performance is degraded)

(*) for some reason we didn't find yet, Mesos often kills the job. When 
Marathon relaunches it, it results in a different resource assignment.


was (Author: gmaas):
Hi Tim,

We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes 
much sense for Spark Streaming.

Here're few examples of resource allocation. They are taken from several runs 
of the same job with identical configuration:
Job config:
spark.cores.max = 18
spark.mesos.coarse = true
spark.executor.memory = 4g

The job logic will start 6 Kafka receivers.

#1
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 4 |  4GB | 3  | 2  |
| 2 | 6 |  4GB | 2  | 1  | 
| 3 | 7 | 4GB  | 3  | 2  |
| 4 | 1 | 4GB | 1 | 1 |

Total mem: 16 GB
Total CPUs: 18

Observations: 
Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process 
the received data, so all data received needs to be sent to other node for 
non-local processing  (not sure how replication helps or not in this case, the 
blocks of data are processed on other nodes). Also the nodes with 2 streaming 
receivers have higher load that the node with 1 receiver.

#2
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 7 |  4GB | 7  | 4  |
| 2 | 2 |  4GB | 2  | 2  | 

Total mem: 8 GB
Total CPUs: 9

Observations: 
This is the worst configuration of the day. Totally unbalanced (4 vs 2 
receivers) and for some reason, the job didn't get all the resources assigned 
in the configuration. The job processing time is also slower as there're less 
cores to handle the data and less overall memory.

#3
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 3 |  4GB | 3  | 2  |
| 2 | 8 |  4GB | 2  | 2  | 
| 3 | 7 | 4GB  | 3  | 2  |

Total mem: 12GB
Total CPU: 18

Observations: 
This is a fairly good configuration with a more evenly distributed receivers 
and CPUs although there's one  considerable smaller node in terms of CPU 
assignment.
 
We can observe that the current resource assignment policy results in less than 
ideal 

[jira] [Comment Edited] (SPARK-4940) Support more evenly distributing cores for Mesos mode

2015-01-07 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268440#comment-14268440
 ] 

Gerard Maas edited comment on SPARK-4940 at 1/7/15 10:53 PM:
-

Hi Tim,

We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes 
much sense for Spark Streaming.

Here're few examples of resource allocation. They are taken from several runs 
of the same job with identical configuration:
Job config:
spark.cores.max = 18
spark.mesos.coarse = true
spark.executor.memory = 4g

The job logic will start 6 Kafka receivers.

#1
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 4 |  4GB | 3  | 2  |
| 2 | 6 |  4GB | 2  | 1  | 
| 3 | 7 | 4GB  | 3  | 2  |
| 4 | 1 | 4GB | 1 | 1 |

Total mem: 16 GB
Total CPUs: 18

Observations: 
Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process 
the received data, so all data received needs to be sent to other node for 
non-local processing  (not sure how replication helps or not in this case, the 
blocks of data are processed on other nodes). Also the nodes with 2 streaming 
receivers have higher load that the node with 1 receiver.

#2
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 7 |  4GB | 7  | 4  |
| 2 | 2 |  4GB | 2  | 2  | 

Total mem: 8 GB
Total CPUs: 9

Observations: 
This is the worst configuration of the day. Totally unbalanced (4 vs 2 
receivers) and for some reason, the job didn't get all the resources assigned 
in the configuration. The job processing time is also slower as there're less 
cores to handle the data and less overall memory.

#3
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 3 |  4GB | 3  | 2  |
| 2 | 8 |  4GB | 2  | 2  | 
| 3 | 7 | 4GB  | 3  | 2  |

Total mem: 12GB
Total CPU: 18

Observations: 
This is a fairly good configuration with a more evenly distributed receivers 
and CPUs although there's one  considerable smaller node in terms of CPU 
assignment.
 
We can observe that the current resource assignment policy results in less than 
ideal and in particular random assignments that have a strong impact on the job 
execution and performance. Given that CPU allocation is by executor (and not by 
job), makes total memory for the job variable as it can get 2 to 4 executors 
assigned. It's also weird and unexpected to observe less than max CPU 
allocations.
Here's a performance chart of the same job jumping from one config to another 
(*), one with 3 (left) nodes and one with 2 (right): 
!https://lh3.googleusercontent.com/Z1C71OKoQzGA13uNJ8Yvf_xz_glRUqU_IGGvLsfkPvUPK2lahrEatweiWl-PDDfysjXtbs1Sl_k=w1682-h689!
(chart line: processing time in ms, load is fairly constant)

(*) for some reason we didn't find yet, Mesos often kills the job. When 
Marathon relaunches it, it results in a different resource assignment.


was (Author: gmaas):
Hi Tim,

We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes 
much sense for Spark Streaming.

Here're few examples of resource allocation. They are taken from several runs 
of the same job with identical configuration:
Job config:
spark.cores.max = 18
spark.mesos.coarse = true
spark.executor.memory = 4g

The job logic will start 6 Kafka receivers.

#1
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 4 |  4GB | 3  | 2  |
| 2 | 6 |  4GB | 2  | 1  | 
| 3 | 7 | 4GB  | 3  | 2  |
| 4 | 1 | 4GB | 1 | 1 |

Total mem: 16 GB
Total CPUs: 18

Observations: 
Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process 
the received data, so all data received needs to be sent to other node for 
non-local processing  (not sure how replication helps or not in this case, the 
blocks of data are processed on other nodes). Also the nodes with 2 streaming 
receivers have higher load that the node with 1 receiver.

#2
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 7 |  4GB | 7  | 4  |
| 2 | 2 |  4GB | 2  | 2  | 

Total mem: 8 GB
Total CPUs: 9

Observations: 
This is the worst configuration of the day. Totally unbalanced (4 vs 2 
receivers) and for some reason, the job didn't get all the resources assigned 
in the configuration. The job processing time is also slower as there're less 
cores to handle the data and less overall memory.

#3
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 3 |  4GB | 3  | 2  |
| 2 | 8 |  4GB | 2  | 2  | 
| 3 | 7 | 4GB  | 3  | 2  |

Total mem: 12GB
Total CPU: 18

Observations: 
This is a fairly good configuration with a more evenly distributed receivers 
and CPUs although there's one  considerable smaller node in terms of CPU 
assignment.
 
We can observe that the current resource assignment policy results in less than 
ideal and in particular random assignments that have a strong impact on 

[jira] [Comment Edited] (SPARK-4940) Support more evenly distributing cores for Mesos mode

2015-01-07 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268440#comment-14268440
 ] 

Gerard Maas edited comment on SPARK-4940 at 1/7/15 10:52 PM:
-

Hi Tim,

We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes 
much sense for Spark Streaming.

Here're few examples of resource allocation. They are taken from several runs 
of the same job with identical configuration:
Job config:
spark.cores.max = 18
spark.mesos.coarse = true
spark.executor.memory = 4g

The job logic will start 6 Kafka receivers.

#1
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 4 |  4GB | 3  | 2  |
| 2 | 6 |  4GB | 2  | 1  | 
| 3 | 7 | 4GB  | 3  | 2  |
| 4 | 1 | 4GB | 1 | 1 |

Total mem: 16 GB
Total CPUs: 18

Observations: 
Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process 
the received data, so all data received needs to be sent to other node for 
non-local processing  (not sure how replication helps or not in this case, the 
blocks of data are processed on other nodes). Also the nodes with 2 streaming 
receivers have higher load that the node with 1 receiver.

#2
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 7 |  4GB | 7  | 4  |
| 2 | 2 |  4GB | 2  | 2  | 

Total mem: 8 GB
Total CPUs: 9

Observations: 
This is the worst configuration of the day. Totally unbalanced (4 vs 2 
receivers) and for some reason, the job didn't get all the resources assigned 
in the configuration. The job processing time is also slower as there're less 
cores to handle the data and less overall memory.

#3
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 3 |  4GB | 3  | 2  |
| 2 | 8 |  4GB | 2  | 2  | 
| 3 | 7 | 4GB  | 3  | 2  |

Total mem: 12GB
Total CPU: 18

Observations: 
This is a fairly good configuration with a more evenly distributed receivers 
and CPUs although there's one  considerable smaller node in terms of CPU 
assignment.
 
We can observe that the current resource assignment policy results in less than 
ideal and in particular random assignments that have a strong impact on the job 
execution and performance. Given that CPU allocation is by executor (and not by 
job), makes total memory for the job variable as it can get 2 to 4 executors 
assigned. It's also weird and unexpected to observe less than max CPU 
allocations.
Here's a performance chart of the same job jumping from one config to another 
(*), one with 3 (left) nodes and one with 2 (right): 
!https://lh3.googleusercontent.com/Z1C71OKoQzGA13uNJ8Yvf_xz_glRUqU_IGGvLsfkPvUPK2lahrEatweiWl-PDDfysjXtbs1Sl_k=w1682-h689!
(chart line: processing time in ms, load is fairly constant)

(*) for some reason we didn't find yet, Mesos often kills the job. When 
Marathon relaunches it, it results in a different resource assignment.


was (Author: gmaas):
Hi Tim,

We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes 
much sense for Spark Streaming.

Here're few examples of resource allocation. They are taken from several runs 
of the same job with identical configuration:
Job config:
spark.cores.max = 18
spark.mesos.coarse = true
spark.executor.memory = 4g

The job logic will start 6 Kafka receivers.

#1
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 4 |  4GB | 3  | 2  |
| 2 | 6 |  4GB | 2  | 1  | 
| 3 | 7 | 4GB  | 3  | 2  |
| 4 | 1 | 4GB | 1 | 1 |

Total mem: 16 GB
Total CPUs: 18

Observations: 
Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process 
the received data, so all data received needs to be sent to other node for 
non-local processing  (not sure how replication helps or not in this case, the 
blocks of data are processed on other nodes). Also the nodes with 2 streaming 
receivers have higher load that the node with 1 receiver.

#2
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 7 |  4GB | 7  | 4  |
| 2 | 2 |  4GB | 2  | 2  | 

Total mem: 8 GB
Total CPUs: 9

Observations: 
This is the worst configuration of the day. Totally unbalanced (4 vs 2 
receivers) and for some reason, the job didn't get all the resources assigned 
in the configuration. The job processing time is also slower as there're less 
cores to handle the data and less overall memory.

#3
--
|| Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers ||
| 1 | 3 |  4GB | 3  | 2  |
| 2 | 8 |  4GB | 2  | 2  | 
| 3 | 7 | 4GB  | 3  | 2  |

Total mem: 12GB
Total CPU: 18

Observations: 
This is a fairly good configuration with a more evenly distributed receivers 
and CPUs although there's one  considerable smaller node in terms of CPU 
assignment.
 
We can observe that the current resource assignment policy results in less than 
ideal and in particular random assignments that have a strong impact on 

[jira] [Commented] (SPARK-4940) Document or Support more evenly distributing cores for Mesos mode

2015-01-05 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14264579#comment-14264579
 ] 

Gerard Maas commented on SPARK-4940:


From the perspective of evenly allocating  Spark Streaming consumers 
(network-bound), the ideal solution would be to explicitly set the number of 
hosts.
 
With the current resource allocation policy, we can have eg.  (4),(1),(1) 
consumers over 3 hosts, instead of the ideal (2),(2),(2). Given that the 
resource allocation is dynamic at job startup time, this results in variable 
performance characteristic for the job being submitted.   
In practice, we have been restarting the job (using Marathon) until we get a 
favorable resource allocation. 

Not sure how well the requirement of a fix amount of executors would fit with 
the node transparency offered by Mesos. I'm just trying to elaborate on the 
requirements from the Spark Streaming job perspective.

 Document or Support more evenly distributing cores for Mesos mode
 -

 Key: SPARK-4940
 URL: https://issues.apache.org/jira/browse/SPARK-4940
 Project: Spark
  Issue Type: Improvement
  Components: Mesos
Reporter: Timothy Chen

 Currently in Coarse grain mode the spark scheduler simply takes all the 
 resources it can on each node, but can cause uneven distribution based on 
 resources available on each slave.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4537) Add 'processing delay' and 'totalDelay' to the metrics reported by the Spark Streaming subsystem

2014-11-26 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225945#comment-14225945
 ] 

Gerard Maas commented on SPARK-4537:


I was waiting for internal clearance to put a PR out, but this is probably much 
faster. Thanks.

 Add 'processing delay' and 'totalDelay' to the metrics reported by the Spark 
 Streaming subsystem
 

 Key: SPARK-4537
 URL: https://issues.apache.org/jira/browse/SPARK-4537
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.1.0
Reporter: Gerard Maas
  Labels: metrics

 As the Spark Streaming tuning guide indicates, the key indicators of a 
 healthy streaming job are:
 - Processing Time
 - Total Delay
 The Spark UI page for the Streaming job [1] shows these two indicators but 
 the metrics source for Spark Streaming (StreamingSource.scala)  [2] does not.
 Adding these metrics will allow external monitoring systems that consume the 
 Spark metrics interface to track these two critical pieces of information on 
 a streaming job performance.
 [1] 
 https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala#L127
 [2] 
 https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4537) Add 'processing delay' and 'totalDelay' to the metrics reported by the Spark Streaming subsystem

2014-11-21 Thread Gerard Maas (JIRA)
Gerard Maas created SPARK-4537:
--

 Summary: Add 'processing delay' and 'totalDelay' to the metrics 
reported by the Spark Streaming subsystem
 Key: SPARK-4537
 URL: https://issues.apache.org/jira/browse/SPARK-4537
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.1.0
Reporter: Gerard Maas


As the Spark Streaming tuning guide indicates, the key indicators of a healthy 
streaming job are:
- Processing Time
- Total Delay

The Spark UI page for the Streaming job [1] shows these two indicators but the 
metrics source for Spark Streaming (StreamingSource.scala)  [2] does not.

Adding these metrics will allow external monitoring systems that consume the 
Spark metrics interface to track these two critical pieces of information on a 
streaming job performance.


[1] 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala#L127

[2] 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-2620) case class cannot be used as key for reduce

2014-07-22 Thread Gerard Maas (JIRA)
Gerard Maas created SPARK-2620:
--

 Summary: case class cannot be used as key for reduce
 Key: SPARK-2620
 URL: https://issues.apache.org/jira/browse/SPARK-2620
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
 Environment: reproduced on spark-shell local[4]
Reporter: Gerard Maas
Priority: Critical


Using a case class as a key doesn't seem to work properly on Spark 1.0.0

A minimal example:

case class P(name:String)
val ps = Array(P(alice), P(bob), P(charly), P(bob))
sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect
[Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), (P(bob),1), 
(P(abe),1), (P(charly),1))

In contrast to the expected behavior, that should be equivalent to:
sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect
Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))

groupByKey and distinct also present the same behavior.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2620) case class cannot be used as key for reduce

2014-07-22 Thread Gerard Maas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14070443#comment-14070443
 ] 

Gerard Maas commented on SPARK-2620:


[~sowen] No, doesn't look like it is.

 case class cannot be used as key for reduce
 ---

 Key: SPARK-2620
 URL: https://issues.apache.org/jira/browse/SPARK-2620
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
 Environment: reproduced on spark-shell local[4]
Reporter: Gerard Maas
Priority: Critical
  Labels: case-class, core

 Using a case class as a key doesn't seem to work properly on Spark 1.0.0
 A minimal example:
 case class P(name:String)
 val ps = Array(P(alice), P(bob), P(charly), P(bob))
 sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect
 [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), 
 (P(bob),1), (P(abe),1), (P(charly),1))
 In contrast to the expected behavior, that should be equivalent to:
 sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect
 Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
 groupByKey and distinct also present the same behavior.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-1985) SPARK_HOME shouldn't be required when spark.executor.uri is provided

2014-06-01 Thread Gerard Maas (JIRA)
Gerard Maas created SPARK-1985:
--

 Summary: SPARK_HOME shouldn't be required when spark.executor.uri 
is provided
 Key: SPARK-1985
 URL: https://issues.apache.org/jira/browse/SPARK-1985
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
 Environment: MESOS
Reporter: Gerard Maas


When trying to run that simple example on a  Mesos installation,  I get an 
error that SPARK_HOME is not set. A local spark installation should not be 
required to run a job on Mesos. All that's needed is the executor package, 
being the assembly.tar.gz on a reachable location (HDFS/S3/HTTP).

I went looking into the code and indeed there's a check on SPARK_HOME [2] 
regardless of the presence of the assembly but it's actually only used if the 
assembly is not provided (which is a kind-of best-effort recovery strategy).

Current flow:

if (!SPARK_HOME) fail(No SPARK_HOME) 
else if (assembly) { use assembly) }
else { try use SPARK_HOME to build spark_executor } 

Should be:
sparkExecutor =  if (assembly) {assembly} 
 else if (SPARK_HOME) {try use SPARK_HOME to build 
spark_executor}
 else { fail(No executor found. Please provide 
spark.executor.uri (preferred) or spark.home)


[1] 
http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-with-Spark-Mesos-spark-shell-works-fine-td6165.html

[2] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L89



--
This message was sent by Atlassian JIRA
(v6.2#6252)