[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files
[ https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861991#comment-16861991 ] Gerard Maas commented on SPARK-28025: - I reproduced the issue in a spark-shell session: {code:java} __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.3 /_/ scala> import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming._ scala> val hadoopConf = spark.sparkContext.hadoopConfiguration scala> import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf scala> SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key res1: String = spark.sql.streaming.checkpointFileManagerClass scala> hadoopConf.getSQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key) res2: String = null // mount point for the shared PVC: /storage scala> val glusterCpfm = new org.apache.hadoop.fs.Path("/storage/crc-store") glusterCpfm: org.apache.hadoop.fs.Path = /storage/crc-store scala> val glusterfm = CheckpointFileManager.create(glusterCpfm, hadoopConf) glusterfm: org.apache.spark.sql.execution.streaming.CheckpointFileManager = org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager@28d00f54 scala> glusterfm.isLocal res17: Boolean = true scala> glusterfm.mkdirs(glusterCpfm) scala> val atomicFile = glusterfm.createAtomic(new org.apache.hadoop.fs.Path("/storage/crc-store/file.log"), false) atomicFile: org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream = org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@1c6e065 scala> atomicFile.writeChars("Hello, World") scala> atomicFile.close /** * Inspect the file system * * $ cat file.log * Hello, World * $ ls -al * total 5 * drwxr-sr-x. 2 jboss 2000 85 Jun 12 09:44 . * drwxrwsr-x. 8 root 2000 4096 Jun 12 09:42 .. * -rw-r--r--. 1 jboss 2000 12 Jun 12 09:44 ..file.log.c6f90863-77d2-494e-b1cc-0d0ed1344f74.tmp.crc * -rw-r--r--. 1 jboss 2000 24 Jun 12 09:44 file.log **/ // Delete the file -- simulate the operation done by the HDFSBackedStateStoreProvider#cleanup scala> glusterfm.delete(new org.apache.hadoop.fs.Path("/storage/crc-store/file.log")) /** * Inspect the file system -> .crc file left behind * $ ls -al * total 9 * drwxr-sr-x. 2 jboss 2000 4096 Jun 12 09:46 . * drwxrwsr-x. 8 root 2000 4096 Jun 12 09:42 .. * -rw-r--r--. 1 jboss 2000 12 Jun 12 09:44 ..file.log.c6f90863-77d2-494e-b1cc-0d0ed1344f74.tmp.crc **/ {code} > HDFSBackedStateStoreProvider should not leak .crc files > > > Key: SPARK-28025 > URL: https://issues.apache.org/jira/browse/SPARK-28025 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.3 > Environment: Spark 2.4.3 > Kubernetes 1.11(?) (OpenShift) > StateStore storage on a mounted PVC. Viewed as a local filesystem by the > `FileContextBasedCheckpointFileManager` : > {noformat} > scala> glusterfm.isLocal > res17: Boolean = true{noformat} >Reporter: Gerard Maas >Priority: Major > > The HDFSBackedStateStoreProvider when using the default CheckpointFileManager > is leaving '.crc' files behind. There's a .crc file created for each > `atomicFile` operation of the CheckpointFileManager. > Over time, the number of files becomes very large. It makes the state store > file system constantly increase in size and, in our case, deteriorates the > file system performance. > Here's a sample of one of our spark storage volumes after 2 days of execution > (4 stateful streaming jobs, each on a different sub-dir): > # > {noformat} > Total files in PVC (used for checkpoints and state store) > $find . | wc -l > 431796 > # .crc files > $find . -name "*.crc" | wc -l > 418053{noformat} > With each .crc file taking one storage block, the used storage runs into the > GBs of data. > These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, > shows serious performance deterioration with this large number of files: > {noformat} > DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files
[ https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-28025: Summary: HDFSBackedStateStoreProvider should not leak .crc files (was: HDFSBackedStateStoreProvider leaks .crc files ) > HDFSBackedStateStoreProvider should not leak .crc files > > > Key: SPARK-28025 > URL: https://issues.apache.org/jira/browse/SPARK-28025 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.3 > Environment: Spark 2.4.3 > Kubernetes 1.11(?) (OpenShift) > StateStore storage on a mounted PVC. Viewed as a local filesystem by the > `FileContextBasedCheckpointFileManager` : > {noformat} > scala> glusterfm.isLocal > res17: Boolean = true{noformat} >Reporter: Gerard Maas >Priority: Major > > The HDFSBackedStateStoreProvider when using the default CheckpointFileManager > is leaving '.crc' files behind. There's a .crc file created for each > `atomicFile` operation of the CheckpointFileManager. > Over time, the number of files becomes very large. It makes the state store > file system constantly increase in size and, in our case, deteriorates the > file system performance. > Here's a sample of one of our spark storage volumes after 2 days of execution > (4 stateful streaming jobs, each on a different sub-dir): > # > {noformat} > Total files in PVC (used for checkpoints and state store) > $find . | wc -l > 431796 > # .crc files > $find . -name "*.crc" | wc -l > 418053{noformat} > With each .crc file taking one storage block, the used storage runs into the > GBs of data. > These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, > shows serious performance deterioration with this large number of files: > {noformat} > DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider leaks .crc files
[ https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861983#comment-16861983 ] Gerard Maas commented on SPARK-28025: - Same problem. A different part of the code. > HDFSBackedStateStoreProvider leaks .crc files > -- > > Key: SPARK-28025 > URL: https://issues.apache.org/jira/browse/SPARK-28025 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.3 > Environment: Spark 2.4.3 > Kubernetes 1.11(?) (OpenShift) > StateStore storage on a mounted PVC. Viewed as a local filesystem by the > `FileContextBasedCheckpointFileManager` : > {noformat} > scala> glusterfm.isLocal > res17: Boolean = true{noformat} >Reporter: Gerard Maas >Priority: Major > > The HDFSBackedStateStoreProvider when using the default CheckpointFileManager > is leaving '.crc' files behind. There's a .crc file created for each > `atomicFile` operation of the CheckpointFileManager. > Over time, the number of files becomes very large. It makes the state store > file system constantly increase in size and, in our case, deteriorates the > file system performance. > Here's a sample of one of our spark storage volumes after 2 days of execution > (4 stateful streaming jobs, each on a different sub-dir): > # > {noformat} > Total files in PVC (used for checkpoints and state store) > $find . | wc -l > 431796 > # .crc files > $find . -name "*.crc" | wc -l > 418053{noformat} > With each .crc file taking one storage block, the used storage runs into the > GBs of data. > These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, > shows serious performance deterioration with this large number of files: > {noformat} > DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28025) HDFSBackedStateStoreProvider leaks .crc files
Gerard Maas created SPARK-28025: --- Summary: HDFSBackedStateStoreProvider leaks .crc files Key: SPARK-28025 URL: https://issues.apache.org/jira/browse/SPARK-28025 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.3 Environment: Spark 2.4.3 Kubernetes 1.11(?) (OpenShift) StateStore storage on a mounted PVC. Viewed as a local filesystem by the `FileContextBasedCheckpointFileManager` : {noformat} scala> glusterfm.isLocal res17: Boolean = true{noformat} Reporter: Gerard Maas The HDFSBackedStateStoreProvider when using the default CheckpointFileManager is leaving '.crc' files behind. There's a .crc file created for each `atomicFile` operation of the CheckpointFileManager. Over time, the number of files becomes very large. It makes the state store file system constantly increase in size and, in our case, deteriorates the file system performance. Here's a sample of one of our spark storage volumes after 2 days of execution (4 stateful streaming jobs, each on a different sub-dir): # {noformat} Total files in PVC (used for checkpoints and state store) $find . | wc -l 431796 # .crc files $find . -name "*.crc" | wc -l 418053{noformat} With each .crc file taking one storage block, the used storage runs into the GBs of data. These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, shows serious performance deterioration with this large number of files: {noformat} DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24264) [Structured Streaming] Remove 'mergeSchema' option from Parquet source configuration
[ https://issues.apache.org/jira/browse/SPARK-24264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492046#comment-16492046 ] Gerard Maas commented on SPARK-24264: - This scenario demonstrates the claims in this ticket: Given PickUp and CashierOrder: {{case class PickUp(orderId: String, waitDuration: Long)}} {{case class Cashier(orderId: String, totalPrice: Float)}} I create 2 parquet files with some random data following that schema When I read the two files at the same time, using the batch API, we get: {{val batchMerged = session.read.option("mergeSchema","true").parquet("/tmp/data/batch/")}} {{batchMerged.schema}} {{>StructType(StructField(orderId,StringType,true), StructField(waitDuration,LongType,true), StructField(totalPrice,FloatType,true))}} Data in the resulting dataframe is null depending on the source file: {code:java} >batchMerged.show +++--+ | orderId|waitDuration|totalPrice| +++--+ | order_1| 498| null| | order_2| 819| null| | order_3| 576| null| | order_4| 741| null| {code} The schema is merged as expected, the resulting dataset will contain nulls when the source does not contain the field. e.g.: {code:java} batchMerged.where(!$"totalPrice".isNull).show +++--+ | orderId|waitDuration|totalPrice| +++--+ | order_1|null| 66.91808| | order_2|null| 21.761215| | order_3|null|1.8776| | order_4|null| 45.613895| | order_5|null| 62.664383| | order_6|null| 78.24584| {code} In streaming mode, we must provide the schema at creation time. There's no schema inference: {{val streamingMerged = session.readStream.option("mergeSchema","true").schema(schema).parquet("/tmp/data/stream")}} When I materialize this stream to a table, I get: {code:java} val query = streamingMerged.writeStream.format("memory").queryName("parquet_merged").start() val queryResultMerged = session.sql("select * from parquet_merged") queryResultMerged.show +++--+ | orderId|waitDuration|totalPrice| +++--+ | order_1| 498| null| | order_2| 819| null| | order_3| 576| null| | order_4| 741| null| | order_5| 844| null| {code} I put that flag to 'off': {code:java} val streamingNotMerged = session.readStream.option("mergeSchema","false").schema(schema).parquet("/tmp/data/stream") ... same materialization process... +++--+ | orderId|waitDuration|totalPrice| +++--+ | order_1| 498| null| | order_2| 819| null| | order_3| 576| null| | order_4| 741| null| {code} When compared using subtraction: {code:java} val leftDiff = queryResultsNotMerged.except(queryResultMerged) leftDiff.count res58: Long = 0 val rightDiff = queryResultMerged.except(queryResultsNotMerged) rightDiff.count res61: Long = 0 {code} There is no visible difference, except that Spark needs to do the schemaMerge when this flag is set to true, which I understood has an additional expense. > [Structured Streaming] Remove 'mergeSchema' option from Parquet source > configuration > > > Key: SPARK-24264 > URL: https://issues.apache.org/jira/browse/SPARK-24264 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Gerard Maas >Priority: Major > Labels: features, usability > > Looking into the Parquet format support for the File source in Structured > Streaming, the docs mention the use of the option 'mergeSchema' to merge the > schemas of the part files found.[1] > > There seems to be no practical use of that configuration in a streaming > context. > > In its batch counterpart, `mergeSchemas` would infer the schema superset of > the part-files found. > > When using the File source + parquet format in streaming mode, we must > provide a schema to the readStream.schema(...) builder and that schema is > fixed for the duration of the stream. > > My current understanding is that: > > - Files containing a subset of the fields declared in the schema will render > null values for the non-existing fields. > - For files containing a superset of the fields, the additional data fields > will be lost. > - Files not matching the schema set on the streaming source will render all > fields null for each record in the file. > > It looks like 'mergeSchema' has no practical effect, although enabling it > might lead to additional processing to actually merge the Parquet schema of > the input files. >
[jira] [Updated] (SPARK-24264) [Structured Streaming] Remove 'mergeSchema' option from Parquet source configuration
[ https://issues.apache.org/jira/browse/SPARK-24264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-24264: Issue Type: Bug (was: Improvement) > [Structured Streaming] Remove 'mergeSchema' option from Parquet source > configuration > > > Key: SPARK-24264 > URL: https://issues.apache.org/jira/browse/SPARK-24264 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Gerard Maas >Priority: Major > Labels: features, usability > > Looking into the Parquet format support for the File source in Structured > Streaming, the docs mention the use of the option 'mergeSchema' to merge the > schemas of the part files found.[1] > > There seems to be no practical use of that configuration in a streaming > context. > > In its batch counterpart, `mergeSchemas` would infer the schema superset of > the part-files found. > > When using the File source + parquet format in streaming mode, we must > provide a schema to the readStream.schema(...) builder and that schema is > fixed for the duration of the stream. > > My current understanding is that: > > - Files containing a subset of the fields declared in the schema will render > null values for the non-existing fields. > - For files containing a superset of the fields, the additional data fields > will be lost. > - Files not matching the schema set on the streaming source will render all > fields null for each record in the file. > > It looks like 'mergeSchema' has no practical effect, although enabling it > might lead to additional processing to actually merge the Parquet schema of > the input files. > > I inquired on the dev+user mailing lists about any other behavior but I got > no responses. > > From the user perspective, they may think that this option would help their > job cope with schema evolution at runtime, but that is also not the case. > > Looks like removing this option and leaving the value always set to false is > the reasonable thing to do. > > [1] > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala#L376] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24264) [Structured Streaming] Remove 'mergeSchema' option from Parquet source configuration
Gerard Maas created SPARK-24264: --- Summary: [Structured Streaming] Remove 'mergeSchema' option from Parquet source configuration Key: SPARK-24264 URL: https://issues.apache.org/jira/browse/SPARK-24264 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.3.0 Reporter: Gerard Maas Looking into the Parquet format support for the File source in Structured Streaming, the docs mention the use of the option 'mergeSchema' to merge the schemas of the part files found.[1] There seems to be no practical use of that configuration in a streaming context. In its batch counterpart, `mergeSchemas` would infer the schema superset of the part-files found. When using the File source + parquet format in streaming mode, we must provide a schema to the readStream.schema(...) builder and that schema is fixed for the duration of the stream. My current understanding is that: - Files containing a subset of the fields declared in the schema will render null values for the non-existing fields. - For files containing a superset of the fields, the additional data fields will be lost. - Files not matching the schema set on the streaming source will render all fields null for each record in the file. It looks like 'mergeSchema' has no practical effect, although enabling it might lead to additional processing to actually merge the Parquet schema of the input files. I inquired on the dev+user mailing lists about any other behavior but I got no responses. >From the user perspective, they may think that this option would help their >job cope with schema evolution at runtime, but that is also not the case. Looks like removing this option and leaving the value always set to false is the reasonable thing to do. [1] [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala#L376] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24202) Separate SQLContext dependency from SparkSession.implicits
[ https://issues.apache.org/jira/browse/SPARK-24202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-24202: Description: The current implementation of the implicits in SparkSession passes the current active SQLContext to the SQLImplicits class. This implies that all usage of these (extremely helpful) implicits require the prior creation of a Spark Session instance. Usage is typically done as follows: {code:java} val sparkSession = SparkSession.builder() getOrCreate() import sparkSession.implicits._ {code} This is OK in user code, but it burdens the creation of library code that uses Spark, where static imports for _Encoder_ support is required. A simple example would be: {code:java} class SparkTransformation[In: Encoder, Out: Encoder] { def transform(ds: Dataset[In]): Dataset[Out] } {code} Attempting to compile such code would result in the following exception: {code:java} Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.{code} The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two utilities to transform _RDD_ and local collections into a _Dataset_. These are 2 methods of the 46 implicit conversions offered by this class. The request is to separate the two implicit methods that depend on the SQLContext instance creation into a separate class: {code:java} SQLImplicits#214-229 /** * Creates a [[Dataset]] from an RDD. * * @since 1.6.0 */ implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(rdd)) } /** * Creates a [[Dataset]] from a local Seq. * @since 1.6.0 */ implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(s)) }{code} By separating the static methods from these two methods that depend on _sqlContext_ into separate classes, we could provide static imports for all the other functionality and only require the instance-bound implicits for the RDD and collection support (Which is an uncommon use case these days) As this is potentially breaking the current interface, this might be a candidate for Spark 3.0. Although there's nothing stopping us from creating a separate hierarchy for the static encoders already. was: The current implementation of the implicits in SparkSession passes the current active SQLContext to the SQLImplicits class. This implies that all usage of these (extremely helpful) implicits require the prior creation of a Spark Session instance. Usage is typically done as follows: {code:java} val sparkSession = SparkSession.builder() getOrCreate() import sparkSession.implicits._ {code} This is OK in user code, but it burdens the creation of library code that uses Spark, where static imports for _Encoder_ support is required. A simple example would be: {code:java} class SparkTransformation[In: Encoder, Out: Encoder] { def transform(ds: Dataset[In]): Dataset[Out] } {code} Attempting to compile such code would result in the following exception: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two utilities to transform _RDD_ and local collections into a _Dataset_. These are 2 methods of the 46 implicit conversions offered by this class. The request is to separate the two implicit methods that depend on the instance creation into a separate class: {code:java} SQLImplicits#214-229 /** * Creates a [[Dataset]] from an RDD. * * @since 1.6.0 */ implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(rdd)) } /** * Creates a [[Dataset]] from a local Seq. * @since 1.6.0 */ implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(s)) }{code} By separating the static methods from these two methods that depend on _sqlContext_ into separate classes, we could provide static imports for all the other functionality and only require the instance-bound implicits for the RDD and collection support (Which is an uncommon use case these days) As this is potentially breaking the current interface, this might be a candidate for Spark 3.0. Although there's nothing stopping us from creating a separate hierarchy for the static encoders already. > Separate SQLContext dependency from SparkSession.implicits > -- > > Key: SPARK-24202 >
[jira] [Updated] (SPARK-24202) Separate SQLContext dependency from SparkSession.implicits
[ https://issues.apache.org/jira/browse/SPARK-24202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-24202: Summary: Separate SQLContext dependency from SparkSession.implicits (was: Separate SQLContext dependencies from SparkSession.implicits) > Separate SQLContext dependency from SparkSession.implicits > -- > > Key: SPARK-24202 > URL: https://issues.apache.org/jira/browse/SPARK-24202 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Gerard Maas >Priority: Major > > The current implementation of the implicits in SparkSession passes the > current active SQLContext to the SQLImplicits class. This implies that all > usage of these (extremely helpful) implicits require the prior creation of a > Spark Session instance. > Usage is typically done as follows: > > {code:java} > val sparkSession = SparkSession.builder() > getOrCreate() > import sparkSession.implicits._ > {code} > > This is OK in user code, but it burdens the creation of library code that > uses Spark, where static imports for _Encoder_ support is required. > A simple example would be: > > {code:java} > class SparkTransformation[In: Encoder, Out: Encoder] { > def transform(ds: Dataset[In]): Dataset[Out] > } > {code} > > Attempting to compile such code would result in the following exception: > Unable to find encoder for type stored in a Dataset. Primitive types (Int, > String, etc) and Product types (case classes) are supported by importing > spark.implicits._ Support for serializing other types will be added in > future releases. > The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two > utilities to transform _RDD_ and local collections into a _Dataset_. > These are 2 methods of the 46 implicit conversions offered by this class. > The request is to separate the two implicit methods that depend on the > instance creation into a separate class: > {code:java} > SQLImplicits#214-229 > /** > * Creates a [[Dataset]] from an RDD. > * > * @since 1.6.0 > */ > implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = > { > DatasetHolder(_sqlContext.createDataset(rdd)) > } > /** > * Creates a [[Dataset]] from a local Seq. > * @since 1.6.0 > */ > implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): > DatasetHolder[T] = { > DatasetHolder(_sqlContext.createDataset(s)) > }{code} > By separating the static methods from these two methods that depend on > _sqlContext_ into separate classes, we could provide static imports for all > the other functionality and only require the instance-bound implicits for > the RDD and collection support (Which is an uncommon use case these days) > As this is potentially breaking the current interface, this might be a > candidate for Spark 3.0. Although there's nothing stopping us from creating a > separate hierarchy for the static encoders already. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24202) Separate SQLContext dependencies from SparkSession.implicits
[ https://issues.apache.org/jira/browse/SPARK-24202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-24202: Description: The current implementation of the implicits in SparkSession passes the current active SQLContext to the SQLImplicits class. This implies that all usage of these (extremely helpful) implicits require the prior creation of a Spark Session instance. Usage is typically done as follows: {code:java} val sparkSession = SparkSession.builder() build() import sparkSession.implicits._ {code} This is OK in user code, but it burdens the creation of library code that uses Spark, where static imports for _Encoder_ support is required. A simple example would be: {code:java} class SparkTransformation[In: Encoder, Out: Encoder] { def transform(ds: Dataset[In]): Dataset[Out] } {code} Attempting to compile such code would result in the following exception: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two utilities to transform _RDD_ and local collections into a _Dataset_. These are 2 methods of the 46 implicit conversions offered by this class. The request is to separate the two implicit methods that depend on the instance creation into a separate class: {code:java} SQLImplicits#214-229 /** * Creates a [[Dataset]] from an RDD. * * @since 1.6.0 */ implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(rdd)) } /** * Creates a [[Dataset]] from a local Seq. * @since 1.6.0 */ implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(s)) }{code} By separating the static methods from these two methods that depend on _sqlContext_ into separate classes, we could provide static imports for all the other functionality and only require the instance-bound implicits for the RDD and collection support (Which is an uncommon use case these days) As this is potentially breaking the current interface, this might be a candidate for Spark 3.0. Although there's nothing stopping us from creating a separate hierarchy for the static encoders already. was: The current implementation of the implicits in SparkSession passes the current active SQLContext to the SQLImplicits class. This implies that all usage of these (extremely helpful) implicits require the prior creation of a Spark Session instance. Usage is typically done as follows: {code:java} val sparkSession = SessionBuilderbuild() import sparkSession.implicits._ {code} This is OK in user code, but it burdens the creation of library code that uses Spark, where static imports for _Encoder_ support is required. A simple example would be: {code:java} class SparkTransformation[In: Encoder, Out: Encoder] { def transform(ds: Dataset[In]): Dataset[Out] } {code} Attempting to compile such code would result in the following exception: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two utilities to transform _RDD_ and local collections into a _Dataset_. These are 2 methods of the 46 implicit conversions offered by this class. The request is to separate the two implicit methods that depend on the instance creation into a separate class: {code:java} SQLImplicits#214-229 /** * Creates a [[Dataset]] from an RDD. * * @since 1.6.0 */ implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(rdd)) } /** * Creates a [[Dataset]] from a local Seq. * @since 1.6.0 */ implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(s)) }{code} By separating the static methods from these two methods that depend on _sqlContext_ into separate classes, we could provide static imports for all the other functionality and only require the instance-bound implicits for the RDD and collection support (Which is an uncommon use case these days) As this is potentially breaking the current interface, this might be a candidate for Spark 3.0. Although there's nothing stopping us from creating a separate hierarchy for the static encoders already. > Separate SQLContext dependencies from SparkSession.implicits > > > Key: SPARK-24202 > URL:
[jira] [Updated] (SPARK-24202) Separate SQLContext dependencies from SparkSession.implicits
[ https://issues.apache.org/jira/browse/SPARK-24202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-24202: Description: The current implementation of the implicits in SparkSession passes the current active SQLContext to the SQLImplicits class. This implies that all usage of these (extremely helpful) implicits require the prior creation of a Spark Session instance. Usage is typically done as follows: {code:java} val sparkSession = SparkSession.builder() getOrCreate() import sparkSession.implicits._ {code} This is OK in user code, but it burdens the creation of library code that uses Spark, where static imports for _Encoder_ support is required. A simple example would be: {code:java} class SparkTransformation[In: Encoder, Out: Encoder] { def transform(ds: Dataset[In]): Dataset[Out] } {code} Attempting to compile such code would result in the following exception: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two utilities to transform _RDD_ and local collections into a _Dataset_. These are 2 methods of the 46 implicit conversions offered by this class. The request is to separate the two implicit methods that depend on the instance creation into a separate class: {code:java} SQLImplicits#214-229 /** * Creates a [[Dataset]] from an RDD. * * @since 1.6.0 */ implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(rdd)) } /** * Creates a [[Dataset]] from a local Seq. * @since 1.6.0 */ implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(s)) }{code} By separating the static methods from these two methods that depend on _sqlContext_ into separate classes, we could provide static imports for all the other functionality and only require the instance-bound implicits for the RDD and collection support (Which is an uncommon use case these days) As this is potentially breaking the current interface, this might be a candidate for Spark 3.0. Although there's nothing stopping us from creating a separate hierarchy for the static encoders already. was: The current implementation of the implicits in SparkSession passes the current active SQLContext to the SQLImplicits class. This implies that all usage of these (extremely helpful) implicits require the prior creation of a Spark Session instance. Usage is typically done as follows: {code:java} val sparkSession = SparkSession.builder() build() import sparkSession.implicits._ {code} This is OK in user code, but it burdens the creation of library code that uses Spark, where static imports for _Encoder_ support is required. A simple example would be: {code:java} class SparkTransformation[In: Encoder, Out: Encoder] { def transform(ds: Dataset[In]): Dataset[Out] } {code} Attempting to compile such code would result in the following exception: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two utilities to transform _RDD_ and local collections into a _Dataset_. These are 2 methods of the 46 implicit conversions offered by this class. The request is to separate the two implicit methods that depend on the instance creation into a separate class: {code:java} SQLImplicits#214-229 /** * Creates a [[Dataset]] from an RDD. * * @since 1.6.0 */ implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(rdd)) } /** * Creates a [[Dataset]] from a local Seq. * @since 1.6.0 */ implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(s)) }{code} By separating the static methods from these two methods that depend on _sqlContext_ into separate classes, we could provide static imports for all the other functionality and only require the instance-bound implicits for the RDD and collection support (Which is an uncommon use case these days) As this is potentially breaking the current interface, this might be a candidate for Spark 3.0. Although there's nothing stopping us from creating a separate hierarchy for the static encoders already. > Separate SQLContext dependencies from SparkSession.implicits > > > Key: SPARK-24202 > URL:
[jira] [Created] (SPARK-24202) Separate SQLContext dependencies from SparkSession.implicits
Gerard Maas created SPARK-24202: --- Summary: Separate SQLContext dependencies from SparkSession.implicits Key: SPARK-24202 URL: https://issues.apache.org/jira/browse/SPARK-24202 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Gerard Maas The current implementation of the implicits in SparkSession passes the current active SQLContext to the SQLImplicits class. This implies that all usage of these (extremely helpful) implicits require the prior creation of a Spark Session instance. Usage is typically done as follows: {code:java} val sparkSession = SessionBuilderbuild() import sparkSession.implicits._ {code} This is OK in user code, but it burdens the creation of library code that uses Spark, where static imports for _Encoder_ support is required. A simple example would be: {code:java} class SparkTransformation[In: Encoder, Out: Encoder] { def transform(ds: Dataset[In]): Dataset[Out] } {code} Attempting to compile such code would result in the following exception: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. The usage of the _SQLContext_ instance in _SQLImplicits_ is limited to two utilities to transform _RDD_ and local collections into a _Dataset_. These are 2 methods of the 46 implicit conversions offered by this class. The request is to separate the two implicit methods that depend on the instance creation into a separate class: {code:java} SQLImplicits#214-229 /** * Creates a [[Dataset]] from an RDD. * * @since 1.6.0 */ implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(rdd)) } /** * Creates a [[Dataset]] from a local Seq. * @since 1.6.0 */ implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(s)) }{code} By separating the static methods from these two methods that depend on _sqlContext_ into separate classes, we could provide static imports for all the other functionality and only require the instance-bound implicits for the RDD and collection support (Which is an uncommon use case these days) As this is potentially breaking the current interface, this might be a candidate for Spark 3.0. Although there's nothing stopping us from creating a separate hierarchy for the static encoders already. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24046) Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond
[ https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447349#comment-16447349 ] Gerard Maas commented on SPARK-24046: - In graphical form, this is what is happening when rampUpTime >= rowsPerSecond (rampUpTime=10,rowsPerSecond=10) in this case !image-2018-04-22-22-03-03-945.png! (rampUpTime=60, rowsPerSecond=10) !image-2018-04-22-22-06-49-202.png! > Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond > -- > > Key: SPARK-24046 > URL: https://issues.apache.org/jira/browse/SPARK-24046 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 > Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4 > (Environment is not important, the issue lies in the rate calculation) >Reporter: Gerard Maas >Priority: Major > Labels: RateSource > Attachments: image-2018-04-22-22-03-03-945.png, > image-2018-04-22-22-06-49-202.png > > > When using the rate source in Structured streaming, the `rampUpTime` feature > fails to gradually increase the stream rate when the `rampUpTime` option is > equal or greater than `rowsPerSecond`. > When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain > 0 values. The rate jumps to `rowsPerSecond` when `time>rampUpTime`. > The following scenario, executed in the `spark-shell` demonstrates this issue: > {code:java} > // Using rampUpTime(10) > rowsPerSecond(5) > {code} > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 5) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58 > --- > Batch: 0 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 1 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 2 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 3 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 4 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 5 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 6 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 7 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 8 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 9 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 10 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 11 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 0| > |2018-04-22 17:08:...| 1| > |2018-04-22 17:08:...| 2| > |2018-04-22 17:08:...| 3| > |2018-04-22 17:08:...| 4| > ++-+ > --- > Batch: 12 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 5| > |2018-04-22 17:08:...| 6| > |2018-04-22 17:08:...| 7| > |2018-04-22 17:08:...| 8| > |2018-04-22 17:08:...| 9| > ++-+ > {code} > > This scenario shows rowsPerSecond == rampUpTime, which also fails > {code:java} > val stream
[jira] [Updated] (SPARK-24046) Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond
[ https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-24046: Attachment: image-2018-04-22-22-06-49-202.png > Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond > -- > > Key: SPARK-24046 > URL: https://issues.apache.org/jira/browse/SPARK-24046 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 > Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4 > (Environment is not important, the issue lies in the rate calculation) >Reporter: Gerard Maas >Priority: Major > Labels: RateSource > Attachments: image-2018-04-22-22-03-03-945.png, > image-2018-04-22-22-06-49-202.png > > > When using the rate source in Structured streaming, the `rampUpTime` feature > fails to gradually increase the stream rate when the `rampUpTime` option is > equal or greater than `rowsPerSecond`. > When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain > 0 values. The rate jumps to `rowsPerSecond` when `time>rampUpTime`. > The following scenario, executed in the `spark-shell` demonstrates this issue: > {code:java} > // Using rampUpTime(10) > rowsPerSecond(5) > {code} > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 5) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58 > --- > Batch: 0 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 1 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 2 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 3 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 4 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 5 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 6 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 7 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 8 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 9 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 10 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 11 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 0| > |2018-04-22 17:08:...| 1| > |2018-04-22 17:08:...| 2| > |2018-04-22 17:08:...| 3| > |2018-04-22 17:08:...| 4| > ++-+ > --- > Batch: 12 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 5| > |2018-04-22 17:08:...| 6| > |2018-04-22 17:08:...| 7| > |2018-04-22 17:08:...| 8| > |2018-04-22 17:08:...| 9| > ++-+ > {code} > > This scenario shows rowsPerSecond == rampUpTime, which also fails > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 10) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream:
[jira] [Updated] (SPARK-24046) Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond
[ https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-24046: Attachment: image-2018-04-22-22-03-03-945.png > Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond > -- > > Key: SPARK-24046 > URL: https://issues.apache.org/jira/browse/SPARK-24046 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 > Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4 > (Environment is not important, the issue lies in the rate calculation) >Reporter: Gerard Maas >Priority: Major > Labels: RateSource > Attachments: image-2018-04-22-22-03-03-945.png > > > When using the rate source in Structured streaming, the `rampUpTime` feature > fails to gradually increase the stream rate when the `rampUpTime` option is > equal or greater than `rowsPerSecond`. > When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain > 0 values. The rate jumps to `rowsPerSecond` when `time>rampUpTime`. > The following scenario, executed in the `spark-shell` demonstrates this issue: > {code:java} > // Using rampUpTime(10) > rowsPerSecond(5) > {code} > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 5) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58 > --- > Batch: 0 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 1 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 2 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 3 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 4 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 5 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 6 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 7 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 8 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 9 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 10 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 11 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 0| > |2018-04-22 17:08:...| 1| > |2018-04-22 17:08:...| 2| > |2018-04-22 17:08:...| 3| > |2018-04-22 17:08:...| 4| > ++-+ > --- > Batch: 12 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 5| > |2018-04-22 17:08:...| 6| > |2018-04-22 17:08:...| 7| > |2018-04-22 17:08:...| 8| > |2018-04-22 17:08:...| 9| > ++-+ > {code} > > This scenario shows rowsPerSecond == rampUpTime, which also fails > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 10) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream: org.apache.spark.sql.DataFrame = [timestamp:
[jira] [Commented] (SPARK-24046) Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond
[ https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447347#comment-16447347 ] Gerard Maas commented on SPARK-24046: - The problem seems to come from this integer division: https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala#L110 > Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond > -- > > Key: SPARK-24046 > URL: https://issues.apache.org/jira/browse/SPARK-24046 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 > Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4 > (Environment is not important, the issue lies in the rate calculation) >Reporter: Gerard Maas >Priority: Major > Labels: RateSource > > When using the rate source in Structured streaming, the `rampUpTime` feature > fails to gradually increase the stream rate when the `rampUpTime` option is > equal or greater than `rowsPerSecond`. > When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain > 0 values. The rate jumps to `rowsPerSecond` when `time>rampUpTime`. > The following scenario, executed in the `spark-shell` demonstrates this issue: > {code:java} > // Using rampUpTime(10) > rowsPerSecond(5) > {code} > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 5) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58 > --- > Batch: 0 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 1 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 2 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 3 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 4 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 5 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 6 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 7 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 8 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 9 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 10 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 11 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 0| > |2018-04-22 17:08:...| 1| > |2018-04-22 17:08:...| 2| > |2018-04-22 17:08:...| 3| > |2018-04-22 17:08:...| 4| > ++-+ > --- > Batch: 12 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 5| > |2018-04-22 17:08:...| 6| > |2018-04-22 17:08:...| 7| > |2018-04-22 17:08:...| 8| > |2018-04-22 17:08:...| 9| > ++-+ > {code} > > This scenario shows rowsPerSecond == rampUpTime, which also fails > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 10) > .option("rampUpTime", 10) > .load() > val query =
[jira] [Updated] (SPARK-24046) Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond
[ https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-24046: Summary: Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond (was: Rate Source does gradually increase rate when rampUpTime>=RowsPerSecond) > Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond > -- > > Key: SPARK-24046 > URL: https://issues.apache.org/jira/browse/SPARK-24046 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 > Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4 > (Environment is not important, the issue lies in the rate calculation) >Reporter: Gerard Maas >Priority: Major > Labels: RateSource > > When using the rate source in Structured streaming, the `rampUpTime` feature > fails to gradually increase the stream rate when the `rampUpTime` option is > equal or greater than `rowsPerSecond`. > When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain > 0 values. The rate jumps to `rowsPerSecond` when `time>rampUpTime`. > The following scenario, executed in the `spark-shell` demonstrates this issue: > {code:java} > // Using rampUpTime(10) > rowsPerSecond(5) > {code} > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 5) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58 > --- > Batch: 0 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 1 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 2 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 3 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 4 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 5 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 6 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 7 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 8 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 9 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 10 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 11 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 0| > |2018-04-22 17:08:...| 1| > |2018-04-22 17:08:...| 2| > |2018-04-22 17:08:...| 3| > |2018-04-22 17:08:...| 4| > ++-+ > --- > Batch: 12 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 5| > |2018-04-22 17:08:...| 6| > |2018-04-22 17:08:...| 7| > |2018-04-22 17:08:...| 8| > |2018-04-22 17:08:...| 9| > ++-+ > {code} > > This scenario shows rowsPerSecond == rampUpTime, which also fails > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 10) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. >
[jira] [Updated] (SPARK-24046) Rate Source does gradually increase rate when rampUpTime>=RowsPerSecond
[ https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-24046: Summary: Rate Source does gradually increase rate when rampUpTime>=RowsPerSecond (was: rampUpTime in Rate Source does not work for rampUpTime>=RowsPerSecond) > Rate Source does gradually increase rate when rampUpTime>=RowsPerSecond > --- > > Key: SPARK-24046 > URL: https://issues.apache.org/jira/browse/SPARK-24046 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 > Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4 > (Environment is not important, the issue lies in the rate calculation) >Reporter: Gerard Maas >Priority: Major > Labels: RateSource > > When using the rate source in Structured streaming, the `rampUpTime` feature > fails to gradually increase the stream rate when the `rampUpTime` option is > equal or greater than `rowsPerSecond`. > When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain > 0 values. The rate jumps to `rowsPerSecond` when `time>rampUpTime`. > The following scenario, executed in the `spark-shell` demonstrates this issue: > {code:java} > // Using rampUpTime(10) > rowsPerSecond(5) > {code} > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 5) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58 > --- > Batch: 0 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 1 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 2 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 3 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 4 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 5 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 6 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 7 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 8 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 9 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 10 > --- > +-+-+ > |timestamp|value| > +-+-+ > +-+-+ > --- > Batch: 11 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 0| > |2018-04-22 17:08:...| 1| > |2018-04-22 17:08:...| 2| > |2018-04-22 17:08:...| 3| > |2018-04-22 17:08:...| 4| > ++-+ > --- > Batch: 12 > --- > ++-+ > | timestamp|value| > ++-+ > |2018-04-22 17:08:...| 5| > |2018-04-22 17:08:...| 6| > |2018-04-22 17:08:...| 7| > |2018-04-22 17:08:...| 8| > |2018-04-22 17:08:...| 9| > ++-+ > {code} > > This scenario shows rowsPerSecond == rampUpTime, which also fails > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 10) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream:
[jira] [Created] (SPARK-24046) rampUpTime in Rate Source does not work for rampUpTime>=RowsPerSecond
Gerard Maas created SPARK-24046: --- Summary: rampUpTime in Rate Source does not work for rampUpTime>=RowsPerSecond Key: SPARK-24046 URL: https://issues.apache.org/jira/browse/SPARK-24046 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.3.0 Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4 (Environment is not important, the issue lies in the rate calculation) Reporter: Gerard Maas When using the rate source in Structured streaming, the `rampUpTime` feature fails to gradually increase the stream rate when the `rampUpTime` option is equal or greater than `rowsPerSecond`. When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain 0 values. The rate jumps to `rowsPerSecond` when `time>rampUpTime`. The following scenario, executed in the `spark-shell` demonstrates this issue: {code:java} // Using rampUpTime(10) > rowsPerSecond(5) {code} {code:java} val stream = spark.readStream .format("rate") .option("rowsPerSecond", 5) .option("rampUpTime", 10) .load() val query = stream.writeStream.format("console").start() // Exiting paste mode, now interpreting. stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58 --- Batch: 0 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 1 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 2 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 3 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 4 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 5 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 6 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 7 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 8 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 9 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 10 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 11 --- ++-+ | timestamp|value| ++-+ |2018-04-22 17:08:...| 0| |2018-04-22 17:08:...| 1| |2018-04-22 17:08:...| 2| |2018-04-22 17:08:...| 3| |2018-04-22 17:08:...| 4| ++-+ --- Batch: 12 --- ++-+ | timestamp|value| ++-+ |2018-04-22 17:08:...| 5| |2018-04-22 17:08:...| 6| |2018-04-22 17:08:...| 7| |2018-04-22 17:08:...| 8| |2018-04-22 17:08:...| 9| ++-+ {code} This scenario shows rowsPerSecond == rampUpTime, which also fails {code:java} val stream = spark.readStream .format("rate") .option("rowsPerSecond", 10) .option("rampUpTime", 10) .load() val query = stream.writeStream.format("console").start() // Exiting paste mode, now interpreting. stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@149ef64a scala> --- Batch: 0 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 1 --- +-+-+ |timestamp|value| +-+-+ +-+-+ --- Batch: 2 --- +-+-+
[jira] [Commented] (SPARK-21710) ConsoleSink causes OOM crashes with large inputs.
[ https://issues.apache.org/jira/browse/SPARK-21710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16123485#comment-16123485 ] Gerard Maas commented on SPARK-21710: - PR: https://github.com/apache/spark/pull/18923 > ConsoleSink causes OOM crashes with large inputs. > - > > Key: SPARK-21710 > URL: https://issues.apache.org/jira/browse/SPARK-21710 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 > Environment: affects all environments >Reporter: Gerard Maas > Labels: easyfix > > ConsoleSink does a full collect of the streaming dataset in order to show few > lines on screen. This is problematic with large inputs, like a kafka backlog > or a file source with files larger than the driver's memory. > Here's an example: > {code:java} > import spark.implicits._ > import org.apache.spark.sql.functions > import org.apache.spark.sql.types.StructType > import org.apache.spark.sql.types._ > val schema = StructType(StructField("text", StringType, true) :: Nil) > val lines = spark > .readStream > .format("text") > .option("path", "/tmp/data") > .schema(schema) > .load() > val base = lines.writeStream > .outputMode("append") > .format("console") > .start() > {code} > When a large file larger than the available driver memory is fed through this > streaming job, we get: > {code:java} > --- > Batch: 0 > --- > [Stage 0:>(0 + 8) / > 111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID > 6) > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > at > net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205) > at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158) > at java.io.DataOutputStream.write(DataOutputStream.java:107) > at > org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:237) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > 17/08/11 15:10:45 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker for task 6,5,main] > java.lang.OutOfMemoryError: Java heap space > {code} > This issue can be traced back to a `collect` on the source `DataFrame`: > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala#L52 > A fairly simple solution would be to do a `take(numRows)` instead of the > collect. (PR in progress) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21710) ConsoleSink causes OOM crashes with large inputs.
[ https://issues.apache.org/jira/browse/SPARK-21710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-21710: Description: ConsoleSink does a full collect of the streaming dataset in order to show few lines on screen. This is problematic with large inputs, like a kafka backlog or a file source with files larger than the driver's memory. Here's an example: {code:java} import spark.implicits._ import org.apache.spark.sql.functions import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types._ val schema = StructType(StructField("text", StringType, true) :: Nil) val lines = spark .readStream .format("text") .option("path", "/tmp/data") .schema(schema) .load() val base = lines.writeStream .outputMode("append") .format("console") .start() {code} When a large file larger than the available driver memory is fed through this streaming job, we get: {code:java} --- Batch: 0 --- [Stage 0:>(0 + 8) / 111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID 6) java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205) at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:237) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 17/08/11 15:10:45 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 6,5,main] java.lang.OutOfMemoryError: Java heap space {code} This issue can be traced back to a `collect` on the source `DataFrame`: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala#L52 A fairly simple solution would be to do a `take(numRows)` instead of the collect. (PR in progress) was: ConsoleSink does a full collect of the streaming dataset in order to show few lines on screen. This is problematic with large inputs, like a kafka backlog or a file source with files larger than the driver's memory. Here's an example: {code:scala} import spark.implicits._ import org.apache.spark.sql.functions import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types._ val schema = StructType(StructField("text", StringType, true) :: Nil) val lines = spark .readStream .format("text") .option("path", "/tmp/data") .schema(schema) .load() val base = lines.writeStream .outputMode("append") .format("console") .start() {code} When a large file larger than the available driver memory is fed through this streaming job, we get: {code:java} --- Batch: 0 --- [Stage 0:>(0 + 8) / 111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID 6) java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205) at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158) at java.io.DataOutputStream.write(DataOutputStream.java:107) at
[jira] [Created] (SPARK-21710) ConsoleSink causes OOM crashes with large inputs.
Gerard Maas created SPARK-21710: --- Summary: ConsoleSink causes OOM crashes with large inputs. Key: SPARK-21710 URL: https://issues.apache.org/jira/browse/SPARK-21710 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.2.0 Environment: affects all environments Reporter: Gerard Maas ConsoleSink does a full collect of the streaming dataset in order to show few lines on screen. This is problematic with large inputs, like a kafka backlog or a file source with files larger than the driver's memory. Here's an example: {code:scala} import spark.implicits._ import org.apache.spark.sql.functions import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types._ val schema = StructType(StructField("text", StringType, true) :: Nil) val lines = spark .readStream .format("text") .option("path", "/tmp/data") .schema(schema) .load() val base = lines.writeStream .outputMode("append") .format("console") .start() {code} When a large file larger than the available driver memory is fed through this streaming job, we get: {code:java} --- Batch: 0 --- [Stage 0:>(0 + 8) / 111]17/08/11 15:10:45 ERROR Executor: Exception in task 6.0 in stage 0.0 (TID 6) java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205) at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:237) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 17/08/11 15:10:45 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 6,5,main] java.lang.OutOfMemoryError: Java heap space {code} This issue can be traced back to a `collect` on the source `DataFrame`: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala#L52 A fairly simple solution would be to do a `take(numRows)` instead of the collect. (PR in progress) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114906#comment-15114906 ] Gerard Maas commented on SPARK-3958: [~pwendell] [~joshrosen] We just hit this bug in one of our production jobs using Spark Streaming 1.4.1. Each task spawned by the streaming job fails down the road. This jobs has been working fine for months, so I'm not clear on whether we can narrow down the conditions to reproduce it. Here's the exception: {code} [Stage 16049:(0 + 0) / 24][Stage 16056:(0 + 0) / 24][Stage 16058:(0 + 0) / 24]Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 17478.0 failed 6 times, most recent failure: Lost task 23.5 in stage 17478.0 (TID 172352, dnode-6.hdfs.private): java.io.IOException: PARSING_ERROR(2) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594) at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:358) at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:387) at java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2296) at java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2589) at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2599) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169) at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:200) at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:197) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) at
[jira] [Created] (SPARK-8009) [Mesos] Allow provisioning of executor logging configuration
Gerard Maas created SPARK-8009: -- Summary: [Mesos] Allow provisioning of executor logging configuration Key: SPARK-8009 URL: https://issues.apache.org/jira/browse/SPARK-8009 Project: Spark Issue Type: Improvement Components: Mesos Affects Versions: 1.3.1 Environment: Mesos executor Reporter: Gerard Maas It's currently not possible to provide a custom logging configuration for the Mesos executors. Upon startup of the executor JVM, it loads a default config file from the Spark assembly, visible by this line in stderr: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties That line comes from Logging.scala [1] where a default config is loaded if none is found in the classpath upon the startup of the Spark Mesos executor in the Mesos sandbox. At that point in time, none of the application-specific resources have been shipped yet, as the executor JVM is just starting up. To load a custom configuration file we should have it already on the sandbox before the executor JVM starts and add it to the classpath on the startup command. For the classpath customization, It looks like it should be possible to pass a -Dlog4j.configuration property by using the 'spark.executor.extraClassPath' that will be picked up at [2] and that should be added to the command that starts the executor JVM, but the resource must be already on the host before we can do that. Therefore we need some means of 'shipping' the log4j.configuration file to the allocated executor. This all boils down to the need of shipping extra files to the sandbox. There's a workaround: open up the Spark assembly, replace the log4j-default.properties and pack it up again. That would work, although kind of rudimentary as people may use the same assembly for many jobs. Probably, accessing the log4j API programmatically should also work (we didn't try that yet) [1] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Logging.scala#L128 [2] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L77 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5095) Support launching multiple mesos executors in coarse grained mesos mode
[ https://issues.apache.org/jira/browse/SPARK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14276934#comment-14276934 ] Gerard Maas commented on SPARK-5095: Great. I left you some comments on the docs. I'll try to give it a spin. Probably on Friday. Support launching multiple mesos executors in coarse grained mesos mode --- Key: SPARK-5095 URL: https://issues.apache.org/jira/browse/SPARK-5095 Project: Spark Issue Type: Improvement Components: Mesos Reporter: Timothy Chen Currently in coarse grained mesos mode, it's expected that we only launch one Mesos executor that launches one JVM process to launch multiple spark executors. However, this become a problem when the JVM process launched is larger than an ideal size (30gb is recommended value from databricks), which causes GC problems reported on the mailing list. We should support launching mulitple executors when large enough resources are available for spark to use, and these resources are still under the configured limit. This is also applicable when users want to specifiy number of executors to be launched on each node -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5095) Support launching multiple mesos executors in coarse grained mesos mode
[ https://issues.apache.org/jira/browse/SPARK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14271508#comment-14271508 ] Gerard Maas commented on SPARK-5095: If I understand correctly, setting `spark.mesos.coarse.executors.max = 1` will restrict a single executor per node, effectively forcing an allocation of spark.cores.max/spark.mesos.coarse.cores.max nodes. This indeed improves https://issues.apache.org/jira/browse/SPARK-4940 Support launching multiple mesos executors in coarse grained mesos mode --- Key: SPARK-5095 URL: https://issues.apache.org/jira/browse/SPARK-5095 Project: Spark Issue Type: Improvement Components: Mesos Reporter: Timothy Chen Currently in coarse grained mesos mode, it's expected that we only launch one Mesos executor that launches one JVM process to launch multiple spark executors. However, this become a problem when the JVM process launched is larger than an ideal size (30gb is recommended value from databricks), which causes GC problems reported on the mailing list. We should support launching mulitple executors when large enough resources are available for spark to use, and these resources are still under the configured limit. This is also applicable when users want to specifiy number of executors to be launched on each node -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4940) Support more evenly distributing cores for Mesos mode
[ https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268440#comment-14268440 ] Gerard Maas edited comment on SPARK-4940 at 1/8/15 9:21 AM: Hi Tim, We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes much sense for Spark Streaming. Here're few examples of resource allocation. They are taken from several runs of the same job with identical configuration: Job config: spark.cores.max = 18 spark.mesos.coarse = true spark.executor.memory = 4g The job logic will start 6 Kafka receivers. #1 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 4 | 4GB | 3 | 2 | | 2 | 6 | 4GB | 2 | 1 | | 3 | 7 | 4GB | 3 | 2 | | 4 | 1 | 4GB | 1 | 1 | Total mem: 16 GB Total CPUs: 18 Observations: Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process the received data, so all data received needs to be sent to other node for non-local processing (not sure how replication helps or not in this case, the blocks of data are processed on other nodes). Also the nodes with 2 streaming receivers have higher load that the node with 1 receiver. #2 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 7 | 4GB | 7 | 4 | | 2 | 2 | 4GB | 2 | 2 | Total mem: 8 GB Total CPUs: 9 Observations: This is the worst configuration of the day. Totally unbalanced (4 vs 2 receivers) and for some reason, the job didn't get all the resources assigned in the configuration. The job processing time is also slower as there're less cores to handle the data and less overall memory. #3 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 3 | 4GB | 3 | 2 | | 2 | 8 | 4GB | 2 | 2 | | 3 | 7 | 4GB | 3 | 2 | Total mem: 12GB Total CPU: 18 Observations: This is a fairly good configuration with a more evenly distributed receivers and CPUs although there's one considerable smaller node in terms of CPU assignment. We can observe that the current resource assignment policy results in less than ideal and in particular random assignments that have a strong impact on the job execution and performance. Given that CPU allocation is by executor (and not by job), makes total memory for the job variable as it can get 2 to 4 executors assigned. It's also weird and unexpected to observe less than max CPU allocations. Here's a performance chart of the same job jumping from one config to another (*): 3 nodes (left of the spike) and 2 nodes (right): !mesos-config-difference-3nodes-vs-2nodes.png! (chart line: processing time in ms, load is fairly constant, higher is worst. Note how the job performance is degraded) (*) for some reason we didn't find yet, Mesos often kills the job. When Marathon relaunches it, it results in a different resource assignment. was (Author: gmaas): Hi Tim, We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes much sense for Spark Streaming. Here're few examples of resource allocation. They are taken from several runs of the same job with identical configuration: Job config: spark.cores.max = 18 spark.mesos.coarse = true spark.executor.memory = 4g The job logic will start 6 Kafka receivers. #1 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 4 | 4GB | 3 | 2 | | 2 | 6 | 4GB | 2 | 1 | | 3 | 7 | 4GB | 3 | 2 | | 4 | 1 | 4GB | 1 | 1 | Total mem: 16 GB Total CPUs: 18 Observations: Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process the received data, so all data received needs to be sent to other node for non-local processing (not sure how replication helps or not in this case, the blocks of data are processed on other nodes). Also the nodes with 2 streaming receivers have higher load that the node with 1 receiver. #2 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 7 | 4GB | 7 | 4 | | 2 | 2 | 4GB | 2 | 2 | Total mem: 8 GB Total CPUs: 9 Observations: This is the worst configuration of the day. Totally unbalanced (4 vs 2 receivers) and for some reason, the job didn't get all the resources assigned in the configuration. The job processing time is also slower as there're less cores to handle the data and less overall memory. #3 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 3 | 4GB | 3 | 2 | | 2 | 8 | 4GB | 2 | 2 | | 3 | 7 | 4GB | 3 | 2 | Total mem: 12GB Total CPU: 18 Observations: This is a fairly good configuration with a more evenly distributed receivers and CPUs although there's one considerable smaller node in terms of CPU assignment. We can observe that the current resource assignment policy results in less than ideal and in particular random assignments that have a strong impact on the job
[jira] [Comment Edited] (SPARK-4940) Support more evenly distributing cores for Mesos mode
[ https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14269038#comment-14269038 ] Gerard Maas edited comment on SPARK-4940 at 1/8/15 9:28 AM: I forgot to mention that, in the previous example, the ideal resource assignment would be: || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | Total resources: 18 CPUs, 24GB This is an evenly distributed number CPUs and receivers over physical nodes to maximize the spread of network receivers over nodes. was (Author: gmaas): I forgot to mention that in the previous example, the ideal resource assignment would be: || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | Total resources: 18 CPUs, 24GB This is an evenly distributed number CPUs and receivers over physical nodes to maximize the spread of network receivers over nodes. Support more evenly distributing cores for Mesos mode - Key: SPARK-4940 URL: https://issues.apache.org/jira/browse/SPARK-4940 Project: Spark Issue Type: Improvement Components: Mesos Reporter: Timothy Chen Attachments: mesos-config-difference-3nodes-vs-2nodes.png Currently in Coarse grain mode the spark scheduler simply takes all the resources it can on each node, but can cause uneven distribution based on resources available on each slave. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4940) Support more evenly distributing cores for Mesos mode
[ https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14269038#comment-14269038 ] Gerard Maas commented on SPARK-4940: I forgot to mention that in the previous example, the ideal resource assignment would be: || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | | 3 | 4GB | 2 | 1 | Total resources: 18 CPUs, 24GB This is an evenly distributed number CPUs and receivers over physical nodes to maximize the spread of network receivers over nodes. Support more evenly distributing cores for Mesos mode - Key: SPARK-4940 URL: https://issues.apache.org/jira/browse/SPARK-4940 Project: Spark Issue Type: Improvement Components: Mesos Reporter: Timothy Chen Attachments: mesos-config-difference-3nodes-vs-2nodes.png Currently in Coarse grain mode the spark scheduler simply takes all the resources it can on each node, but can cause uneven distribution based on resources available on each slave. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4940) Support more evenly distributing cores for Mesos mode
[ https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gerard Maas updated SPARK-4940: --- Attachment: mesos-config-difference-3nodes-vs-2nodes.png Difference of job performance due to changing Mesos resource allocation. Support more evenly distributing cores for Mesos mode - Key: SPARK-4940 URL: https://issues.apache.org/jira/browse/SPARK-4940 Project: Spark Issue Type: Improvement Components: Mesos Reporter: Timothy Chen Attachments: mesos-config-difference-3nodes-vs-2nodes.png Currently in Coarse grain mode the spark scheduler simply takes all the resources it can on each node, but can cause uneven distribution based on resources available on each slave. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4940) Support more evenly distributing cores for Mesos mode
[ https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268440#comment-14268440 ] Gerard Maas commented on SPARK-4940: Hi Tim, We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes much sense for Spark Streaming. Here're few examples of resource allocation. They are taken from several runs of the same job with identical configuration: Job config: spark.cores.max = 18 spark.mesos.coarse = true spark.executor.memory = 4g The job logic will start 6 Kafka receivers. #1 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 4 | 4GB | 3 | 2 | | 2 | 6 | 4GB | 2 | 1 | | 3 | 7 | 4GB | 3 | 2 | | 4 | 1 | 4GB | 1 | 1 | Total mem: 16 GB Total CPUs: 18 Observations: Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process the received data, so all data received needs to be sent to other node for non-local processing (not sure how replication helps or not in this case, the blocks of data are processed on other nodes). Also the nodes with 2 streaming receivers have higher load that the node with 1 receiver. #2 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 7 | 4GB | 7 | 4 | | 2 | 2 | 4GB | 2 | 2 | Total mem: 8 GB Total CPUs: 9 Observations: This is the worst configuration of the day. Totally unbalanced (4 vs 2 receivers) and for some reason, the job didn't get all the resources assigned in the configuration. The job processing time is also slower as there're less cores to handle the data and less overall memory. #3 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 3 | 4GB | 3 | 2 | | 2 | 8 | 4GB | 2 | 2 | | 3 | 7 | 4GB | 3 | 2 | Total mem: 12GB Total CPU: 18 Observations: This is a fairly good configuration with a more evenly distributed receivers and CPUs although there's one considerable smaller node in terms of CPU assignment. We can observe that the current resource assignment policy results in less than ideal and in particular random assignments that have a strong impact on the job execution and performance. Given that CPU allocation is by executor (and not by job), makes total memory for the job variable as it can get 2 to 4 executors assigned. It's also weird and unexpected to observe less than max CPU allocations. Here's a performance chart of the same job across two configurations, one with 3 (left) nodes and one with 2 (right): !https://lh3.googleusercontent.com/Z1C71OKoQzGA13uNJ8Yvf_xz_glRUqU_IGGvLsfkPvUPK2lahrEatweiWl-PDDfysjXtbs1Sl_k=w1682-h689! (chart line: processing time in ms, load is fairly constant) Support more evenly distributing cores for Mesos mode - Key: SPARK-4940 URL: https://issues.apache.org/jira/browse/SPARK-4940 Project: Spark Issue Type: Improvement Components: Mesos Reporter: Timothy Chen Currently in Coarse grain mode the spark scheduler simply takes all the resources it can on each node, but can cause uneven distribution based on resources available on each slave. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4940) Support more evenly distributing cores for Mesos mode
[ https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268440#comment-14268440 ] Gerard Maas edited comment on SPARK-4940 at 1/7/15 10:54 PM: - Hi Tim, We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes much sense for Spark Streaming. Here're few examples of resource allocation. They are taken from several runs of the same job with identical configuration: Job config: spark.cores.max = 18 spark.mesos.coarse = true spark.executor.memory = 4g The job logic will start 6 Kafka receivers. #1 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 4 | 4GB | 3 | 2 | | 2 | 6 | 4GB | 2 | 1 | | 3 | 7 | 4GB | 3 | 2 | | 4 | 1 | 4GB | 1 | 1 | Total mem: 16 GB Total CPUs: 18 Observations: Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process the received data, so all data received needs to be sent to other node for non-local processing (not sure how replication helps or not in this case, the blocks of data are processed on other nodes). Also the nodes with 2 streaming receivers have higher load that the node with 1 receiver. #2 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 7 | 4GB | 7 | 4 | | 2 | 2 | 4GB | 2 | 2 | Total mem: 8 GB Total CPUs: 9 Observations: This is the worst configuration of the day. Totally unbalanced (4 vs 2 receivers) and for some reason, the job didn't get all the resources assigned in the configuration. The job processing time is also slower as there're less cores to handle the data and less overall memory. #3 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 3 | 4GB | 3 | 2 | | 2 | 8 | 4GB | 2 | 2 | | 3 | 7 | 4GB | 3 | 2 | Total mem: 12GB Total CPU: 18 Observations: This is a fairly good configuration with a more evenly distributed receivers and CPUs although there's one considerable smaller node in terms of CPU assignment. We can observe that the current resource assignment policy results in less than ideal and in particular random assignments that have a strong impact on the job execution and performance. Given that CPU allocation is by executor (and not by job), makes total memory for the job variable as it can get 2 to 4 executors assigned. It's also weird and unexpected to observe less than max CPU allocations. Here's a performance chart of the same job jumping from one config to another (*): 3 nodes (left of the spike) and 2 nodes (right): !https://lh3.googleusercontent.com/Z1C71OKoQzGA13uNJ8Yvf_xz_glRUqU_IGGvLsfkPvUPK2lahrEatweiWl-PDDfysjXtbs1Sl_k=w1682-h689! (chart line: processing time in ms, load is fairly constant, higher is worst. Note how the job performance is degraded) (*) for some reason we didn't find yet, Mesos often kills the job. When Marathon relaunches it, it results in a different resource assignment. was (Author: gmaas): Hi Tim, We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes much sense for Spark Streaming. Here're few examples of resource allocation. They are taken from several runs of the same job with identical configuration: Job config: spark.cores.max = 18 spark.mesos.coarse = true spark.executor.memory = 4g The job logic will start 6 Kafka receivers. #1 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 4 | 4GB | 3 | 2 | | 2 | 6 | 4GB | 2 | 1 | | 3 | 7 | 4GB | 3 | 2 | | 4 | 1 | 4GB | 1 | 1 | Total mem: 16 GB Total CPUs: 18 Observations: Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process the received data, so all data received needs to be sent to other node for non-local processing (not sure how replication helps or not in this case, the blocks of data are processed on other nodes). Also the nodes with 2 streaming receivers have higher load that the node with 1 receiver. #2 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 7 | 4GB | 7 | 4 | | 2 | 2 | 4GB | 2 | 2 | Total mem: 8 GB Total CPUs: 9 Observations: This is the worst configuration of the day. Totally unbalanced (4 vs 2 receivers) and for some reason, the job didn't get all the resources assigned in the configuration. The job processing time is also slower as there're less cores to handle the data and less overall memory. #3 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 3 | 4GB | 3 | 2 | | 2 | 8 | 4GB | 2 | 2 | | 3 | 7 | 4GB | 3 | 2 | Total mem: 12GB Total CPU: 18 Observations: This is a fairly good configuration with a more evenly distributed receivers and CPUs although there's one considerable smaller node in terms of CPU assignment. We can observe that the current resource assignment policy results in less than ideal
[jira] [Comment Edited] (SPARK-4940) Support more evenly distributing cores for Mesos mode
[ https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268440#comment-14268440 ] Gerard Maas edited comment on SPARK-4940 at 1/7/15 10:53 PM: - Hi Tim, We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes much sense for Spark Streaming. Here're few examples of resource allocation. They are taken from several runs of the same job with identical configuration: Job config: spark.cores.max = 18 spark.mesos.coarse = true spark.executor.memory = 4g The job logic will start 6 Kafka receivers. #1 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 4 | 4GB | 3 | 2 | | 2 | 6 | 4GB | 2 | 1 | | 3 | 7 | 4GB | 3 | 2 | | 4 | 1 | 4GB | 1 | 1 | Total mem: 16 GB Total CPUs: 18 Observations: Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process the received data, so all data received needs to be sent to other node for non-local processing (not sure how replication helps or not in this case, the blocks of data are processed on other nodes). Also the nodes with 2 streaming receivers have higher load that the node with 1 receiver. #2 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 7 | 4GB | 7 | 4 | | 2 | 2 | 4GB | 2 | 2 | Total mem: 8 GB Total CPUs: 9 Observations: This is the worst configuration of the day. Totally unbalanced (4 vs 2 receivers) and for some reason, the job didn't get all the resources assigned in the configuration. The job processing time is also slower as there're less cores to handle the data and less overall memory. #3 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 3 | 4GB | 3 | 2 | | 2 | 8 | 4GB | 2 | 2 | | 3 | 7 | 4GB | 3 | 2 | Total mem: 12GB Total CPU: 18 Observations: This is a fairly good configuration with a more evenly distributed receivers and CPUs although there's one considerable smaller node in terms of CPU assignment. We can observe that the current resource assignment policy results in less than ideal and in particular random assignments that have a strong impact on the job execution and performance. Given that CPU allocation is by executor (and not by job), makes total memory for the job variable as it can get 2 to 4 executors assigned. It's also weird and unexpected to observe less than max CPU allocations. Here's a performance chart of the same job jumping from one config to another (*), one with 3 (left) nodes and one with 2 (right): !https://lh3.googleusercontent.com/Z1C71OKoQzGA13uNJ8Yvf_xz_glRUqU_IGGvLsfkPvUPK2lahrEatweiWl-PDDfysjXtbs1Sl_k=w1682-h689! (chart line: processing time in ms, load is fairly constant) (*) for some reason we didn't find yet, Mesos often kills the job. When Marathon relaunches it, it results in a different resource assignment. was (Author: gmaas): Hi Tim, We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes much sense for Spark Streaming. Here're few examples of resource allocation. They are taken from several runs of the same job with identical configuration: Job config: spark.cores.max = 18 spark.mesos.coarse = true spark.executor.memory = 4g The job logic will start 6 Kafka receivers. #1 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 4 | 4GB | 3 | 2 | | 2 | 6 | 4GB | 2 | 1 | | 3 | 7 | 4GB | 3 | 2 | | 4 | 1 | 4GB | 1 | 1 | Total mem: 16 GB Total CPUs: 18 Observations: Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process the received data, so all data received needs to be sent to other node for non-local processing (not sure how replication helps or not in this case, the blocks of data are processed on other nodes). Also the nodes with 2 streaming receivers have higher load that the node with 1 receiver. #2 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 7 | 4GB | 7 | 4 | | 2 | 2 | 4GB | 2 | 2 | Total mem: 8 GB Total CPUs: 9 Observations: This is the worst configuration of the day. Totally unbalanced (4 vs 2 receivers) and for some reason, the job didn't get all the resources assigned in the configuration. The job processing time is also slower as there're less cores to handle the data and less overall memory. #3 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 3 | 4GB | 3 | 2 | | 2 | 8 | 4GB | 2 | 2 | | 3 | 7 | 4GB | 3 | 2 | Total mem: 12GB Total CPU: 18 Observations: This is a fairly good configuration with a more evenly distributed receivers and CPUs although there's one considerable smaller node in terms of CPU assignment. We can observe that the current resource assignment policy results in less than ideal and in particular random assignments that have a strong impact on
[jira] [Comment Edited] (SPARK-4940) Support more evenly distributing cores for Mesos mode
[ https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268440#comment-14268440 ] Gerard Maas edited comment on SPARK-4940 at 1/7/15 10:52 PM: - Hi Tim, We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes much sense for Spark Streaming. Here're few examples of resource allocation. They are taken from several runs of the same job with identical configuration: Job config: spark.cores.max = 18 spark.mesos.coarse = true spark.executor.memory = 4g The job logic will start 6 Kafka receivers. #1 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 4 | 4GB | 3 | 2 | | 2 | 6 | 4GB | 2 | 1 | | 3 | 7 | 4GB | 3 | 2 | | 4 | 1 | 4GB | 1 | 1 | Total mem: 16 GB Total CPUs: 18 Observations: Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process the received data, so all data received needs to be sent to other node for non-local processing (not sure how replication helps or not in this case, the blocks of data are processed on other nodes). Also the nodes with 2 streaming receivers have higher load that the node with 1 receiver. #2 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 7 | 4GB | 7 | 4 | | 2 | 2 | 4GB | 2 | 2 | Total mem: 8 GB Total CPUs: 9 Observations: This is the worst configuration of the day. Totally unbalanced (4 vs 2 receivers) and for some reason, the job didn't get all the resources assigned in the configuration. The job processing time is also slower as there're less cores to handle the data and less overall memory. #3 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 3 | 4GB | 3 | 2 | | 2 | 8 | 4GB | 2 | 2 | | 3 | 7 | 4GB | 3 | 2 | Total mem: 12GB Total CPU: 18 Observations: This is a fairly good configuration with a more evenly distributed receivers and CPUs although there's one considerable smaller node in terms of CPU assignment. We can observe that the current resource assignment policy results in less than ideal and in particular random assignments that have a strong impact on the job execution and performance. Given that CPU allocation is by executor (and not by job), makes total memory for the job variable as it can get 2 to 4 executors assigned. It's also weird and unexpected to observe less than max CPU allocations. Here's a performance chart of the same job jumping from one config to another (*), one with 3 (left) nodes and one with 2 (right): !https://lh3.googleusercontent.com/Z1C71OKoQzGA13uNJ8Yvf_xz_glRUqU_IGGvLsfkPvUPK2lahrEatweiWl-PDDfysjXtbs1Sl_k=w1682-h689! (chart line: processing time in ms, load is fairly constant) (*) for some reason we didn't find yet, Mesos often kills the job. When Marathon relaunches it, it results in a different resource assignment. was (Author: gmaas): Hi Tim, We are indeed using Coarse Grain mode. I'm not sure fine-grained mode makes much sense for Spark Streaming. Here're few examples of resource allocation. They are taken from several runs of the same job with identical configuration: Job config: spark.cores.max = 18 spark.mesos.coarse = true spark.executor.memory = 4g The job logic will start 6 Kafka receivers. #1 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 4 | 4GB | 3 | 2 | | 2 | 6 | 4GB | 2 | 1 | | 3 | 7 | 4GB | 3 | 2 | | 4 | 1 | 4GB | 1 | 1 | Total mem: 16 GB Total CPUs: 18 Observations: Node#4 with only 1 CPU and 1 Kafka receiver does not have capacity to process the received data, so all data received needs to be sent to other node for non-local processing (not sure how replication helps or not in this case, the blocks of data are processed on other nodes). Also the nodes with 2 streaming receivers have higher load that the node with 1 receiver. #2 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 7 | 4GB | 7 | 4 | | 2 | 2 | 4GB | 2 | 2 | Total mem: 8 GB Total CPUs: 9 Observations: This is the worst configuration of the day. Totally unbalanced (4 vs 2 receivers) and for some reason, the job didn't get all the resources assigned in the configuration. The job processing time is also slower as there're less cores to handle the data and less overall memory. #3 -- || Node || Mesos CPU || Mesos Mem || Spark tasks || Streaming receivers || | 1 | 3 | 4GB | 3 | 2 | | 2 | 8 | 4GB | 2 | 2 | | 3 | 7 | 4GB | 3 | 2 | Total mem: 12GB Total CPU: 18 Observations: This is a fairly good configuration with a more evenly distributed receivers and CPUs although there's one considerable smaller node in terms of CPU assignment. We can observe that the current resource assignment policy results in less than ideal and in particular random assignments that have a strong impact on
[jira] [Commented] (SPARK-4940) Document or Support more evenly distributing cores for Mesos mode
[ https://issues.apache.org/jira/browse/SPARK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14264579#comment-14264579 ] Gerard Maas commented on SPARK-4940: From the perspective of evenly allocating Spark Streaming consumers (network-bound), the ideal solution would be to explicitly set the number of hosts. With the current resource allocation policy, we can have eg. (4),(1),(1) consumers over 3 hosts, instead of the ideal (2),(2),(2). Given that the resource allocation is dynamic at job startup time, this results in variable performance characteristic for the job being submitted. In practice, we have been restarting the job (using Marathon) until we get a favorable resource allocation. Not sure how well the requirement of a fix amount of executors would fit with the node transparency offered by Mesos. I'm just trying to elaborate on the requirements from the Spark Streaming job perspective. Document or Support more evenly distributing cores for Mesos mode - Key: SPARK-4940 URL: https://issues.apache.org/jira/browse/SPARK-4940 Project: Spark Issue Type: Improvement Components: Mesos Reporter: Timothy Chen Currently in Coarse grain mode the spark scheduler simply takes all the resources it can on each node, but can cause uneven distribution based on resources available on each slave. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4537) Add 'processing delay' and 'totalDelay' to the metrics reported by the Spark Streaming subsystem
[ https://issues.apache.org/jira/browse/SPARK-4537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225945#comment-14225945 ] Gerard Maas commented on SPARK-4537: I was waiting for internal clearance to put a PR out, but this is probably much faster. Thanks. Add 'processing delay' and 'totalDelay' to the metrics reported by the Spark Streaming subsystem Key: SPARK-4537 URL: https://issues.apache.org/jira/browse/SPARK-4537 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.1.0 Reporter: Gerard Maas Labels: metrics As the Spark Streaming tuning guide indicates, the key indicators of a healthy streaming job are: - Processing Time - Total Delay The Spark UI page for the Streaming job [1] shows these two indicators but the metrics source for Spark Streaming (StreamingSource.scala) [2] does not. Adding these metrics will allow external monitoring systems that consume the Spark metrics interface to track these two critical pieces of information on a streaming job performance. [1] https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala#L127 [2] https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4537) Add 'processing delay' and 'totalDelay' to the metrics reported by the Spark Streaming subsystem
Gerard Maas created SPARK-4537: -- Summary: Add 'processing delay' and 'totalDelay' to the metrics reported by the Spark Streaming subsystem Key: SPARK-4537 URL: https://issues.apache.org/jira/browse/SPARK-4537 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.1.0 Reporter: Gerard Maas As the Spark Streaming tuning guide indicates, the key indicators of a healthy streaming job are: - Processing Time - Total Delay The Spark UI page for the Streaming job [1] shows these two indicators but the metrics source for Spark Streaming (StreamingSource.scala) [2] does not. Adding these metrics will allow external monitoring systems that consume the Spark metrics interface to track these two critical pieces of information on a streaming job performance. [1] https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala#L127 [2] https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-2620) case class cannot be used as key for reduce
Gerard Maas created SPARK-2620: -- Summary: case class cannot be used as key for reduce Key: SPARK-2620 URL: https://issues.apache.org/jira/browse/SPARK-2620 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Environment: reproduced on spark-shell local[4] Reporter: Gerard Maas Priority: Critical Using a case class as a key doesn't seem to work properly on Spark 1.0.0 A minimal example: case class P(name:String) val ps = Array(P(alice), P(bob), P(charly), P(bob)) sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), (P(bob),1), (P(abe),1), (P(charly),1)) In contrast to the expected behavior, that should be equivalent to: sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2)) groupByKey and distinct also present the same behavior. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2620) case class cannot be used as key for reduce
[ https://issues.apache.org/jira/browse/SPARK-2620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14070443#comment-14070443 ] Gerard Maas commented on SPARK-2620: [~sowen] No, doesn't look like it is. case class cannot be used as key for reduce --- Key: SPARK-2620 URL: https://issues.apache.org/jira/browse/SPARK-2620 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Environment: reproduced on spark-shell local[4] Reporter: Gerard Maas Priority: Critical Labels: case-class, core Using a case class as a key doesn't seem to work properly on Spark 1.0.0 A minimal example: case class P(name:String) val ps = Array(P(alice), P(bob), P(charly), P(bob)) sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), (P(bob),1), (P(abe),1), (P(charly),1)) In contrast to the expected behavior, that should be equivalent to: sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2)) groupByKey and distinct also present the same behavior. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-1985) SPARK_HOME shouldn't be required when spark.executor.uri is provided
Gerard Maas created SPARK-1985: -- Summary: SPARK_HOME shouldn't be required when spark.executor.uri is provided Key: SPARK-1985 URL: https://issues.apache.org/jira/browse/SPARK-1985 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Environment: MESOS Reporter: Gerard Maas When trying to run that simple example on a Mesos installation, I get an error that SPARK_HOME is not set. A local spark installation should not be required to run a job on Mesos. All that's needed is the executor package, being the assembly.tar.gz on a reachable location (HDFS/S3/HTTP). I went looking into the code and indeed there's a check on SPARK_HOME [2] regardless of the presence of the assembly but it's actually only used if the assembly is not provided (which is a kind-of best-effort recovery strategy). Current flow: if (!SPARK_HOME) fail(No SPARK_HOME) else if (assembly) { use assembly) } else { try use SPARK_HOME to build spark_executor } Should be: sparkExecutor = if (assembly) {assembly} else if (SPARK_HOME) {try use SPARK_HOME to build spark_executor} else { fail(No executor found. Please provide spark.executor.uri (preferred) or spark.home) [1] http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-with-Spark-Mesos-spark-shell-works-fine-td6165.html [2] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L89 -- This message was sent by Atlassian JIRA (v6.2#6252)