[jira] [Created] (SPARK-49189) pyspark auto batching serialization leads to job crash
Nan Zhu created SPARK-49189: --- Summary: pyspark auto batching serialization leads to job crash Key: SPARK-49189 URL: https://issues.apache.org/jira/browse/SPARK-49189 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.5.1, 3.5.0, 3.4.1, 3.4.0, 3.3.0, 3.2.0 Reporter: Nan Zhu the auto batching mechanism of serialzation leads to job crash in pyspark the logic is increasing batch size when it can [https://github.com/apache/spark/blob/master/python/pyspark/serializers.py#L269-L285] however, this logic is vulnerable to the situation that the total size of objects is larger than 2G and as a result , we are hit by the issue that ``` File "/databricks/spark/python/pyspark/worker.py", line 1876, in main process() File "/databricks/spark/python/pyspark/worker.py", line 1868, in process serializer.dump_stream(out_iter, outfile) File "/databricks/spark/python/pyspark/serializers.py", line 308, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/databricks/spark/python/pyspark/serializers.py", line 158, in dump_stream self._write_with_length(obj, stream) File "/databricks/spark/python/pyspark/serializers.py", line 172, in _write_with_length raise ValueError("can not serialize object larger than 2G") ValueError: can not serialize object larger than 2G ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44517) first operator should respect the nullability of child expression as well as ignoreNulls option
Nan Zhu created SPARK-44517: --- Summary: first operator should respect the nullability of child expression as well as ignoreNulls option Key: SPARK-44517 URL: https://issues.apache.org/jira/browse/SPARK-44517 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.1, 3.4.0, 3.3.2, 3.2.4, 3.2.3, 3.3.1, 3.2.2, 3.3.0, 3.2.1, 3.2.0 Reporter: Nan Zhu I found the following problem when using Spark recently: {code:java} // code placeholder import spark.implicits._ val s = Seq((1.2, "s", 2.2)).toDF("v1", "v2", "v3") val schema = StructType(Seq(StructField("v1", DoubleType, nullable = false),StructField("v2", StringType, nullable = true),StructField("v3", DoubleType, nullable = false))) val df = spark.createDataFrame(s.rdd, schema)val inputDF = val inputDF = df.dropDuplicates("v3") spark.sql("CREATE TABLE local.db.table (\n v1 DOUBLE NOT NULL,\n v2 STRING, v3 DOUBLE NOT NULL)") inputDF.write.mode("overwrite").format("iceberg").save("local.db.table") {code} when I use the above code to write to iceberg (i guess Delta Lake will have the same problem) , I got very confusing exception {code:java} Exception in thread "main" java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema: table { 1: v1: required double 2: v2: optional string 3: v3: required double} Provided schema: table { 1: v1: optional double 2: v2: optional string 3: v3: required double} {code} basically it complains that we have v1 as the nullable column in our `inputDF` above which is not allowed since we created table with the v1 as not nullable. The confusion comes from that, if we check the schema with printSchema() of inputDF, v1 is not nullable {noformat} root |-- v1: double (nullable = false) |-- v2: string (nullable = true) |-- v3: double (nullable = false){noformat} Clearly, something changed the v1's nullability unexpectedly! After some debugging I found that the key is that dropDuplicates("v3"). In optimization phase, we have ReplaceDeduplicateWithAggregate to replace the Deduplicate with aggregate on v3 and run first() over all other columns. However, first() operator has hard coded nullable as always "true" which is the source of changed nullability of v1 this is a very confusing behavior of Spark, and probably no one really noticed as we do not care too much without the new table formats like delta lake and iceberg which can make nullability check correctly. Nowadays, we users adopt them more and more, this is surfaced up -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33940) allow configuring the max column name length in csv writer
Nan Zhu created SPARK-33940: --- Summary: allow configuring the max column name length in csv writer Key: SPARK-33940 URL: https://issues.apache.org/jira/browse/SPARK-33940 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.0 Reporter: Nan Zhu csv writer actually has an implicit limit on column name length due to univocity-parser, when we initialize a writer [https://github.com/uniVocity/univocity-parsers/blob/e09114c6879fa6c2c15e7365abc02cda3e193ff7/src/main/java/com/univocity/parsers/common/AbstractWriter.java#L211,] it calls toIdentifierGroupArray which calls valueOf in NormalizedString.java eventually ([https://github.com/uniVocity/univocity-parsers/blob/e09114c6879fa6c2c15e7365abc02cda3e193ff7/src/main/java/com/univocity/parsers/common/NormalizedString.java#L205-L209)] in that stringCache.get, it has a maxStringLength cap [https://github.com/uniVocity/univocity-parsers/blob/e09114c6879fa6c2c15e7365abc02cda3e193ff7/src/main/java/com/univocity/parsers/common/StringCache.java#L104] which is 1024 by default we do not expose this as configurable option, leading to NPE when we have a column name larger than 1024, ``` [info] Cause: java.lang.NullPointerException: [info] at com.univocity.parsers.common.AbstractWriter.submitRow(AbstractWriter.java:349) [info] at com.univocity.parsers.common.AbstractWriter.writeHeaders(AbstractWriter.java:444) [info] at com.univocity.parsers.common.AbstractWriter.writeHeaders(AbstractWriter.java:410) [info] at org.apache.spark.sql.catalyst.csv.UnivocityGenerator.writeHeaders(UnivocityGenerator.scala:87) [info] at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter$.writeHeaders(CsvOutputWriter.scala:58) [info] at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.(CsvOutputWriter.scala:44) [info] at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:86) [info] at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126) [info] at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:111) [info] at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269) [info] at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210) ``` it could be reproduced by a simple unit test ``` val row1 = Row("a") val superLongHeader = (0 until 1025).map(_ => "c").mkString("") val df = Seq(s"${row1.getString(0)}").toDF(superLongHeader) df.repartition(1) .write .option("header", "true") .option("maxColumnNameLength", 1025) .csv(dataPath) ``` -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32351) Partially pushed partition filters are not explained
[ https://issues.apache.org/jira/browse/SPARK-32351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217238#comment-17217238 ] Nan Zhu commented on SPARK-32351: - [~hyukjin.kwon] nit: could you reassign this to me? [~codingcat] ;) > Partially pushed partition filters are not explained > > > Key: SPARK-32351 > URL: https://issues.apache.org/jira/browse/SPARK-32351 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Yuming Wang >Assignee: pavithra ramachandran >Priority: Major > Fix For: 3.1.0 > > > How to reproduce this issue: > {code:scala} > spark.sql( > s""" > |CREATE TABLE t(i INT, p STRING) > |USING parquet > |PARTITIONED BY (p)""".stripMargin) > spark.range(0, 1000).selectExpr("id as col").createOrReplaceTempView("temp") > for (part <- Seq(1, 2, 3, 4)) { > sql(s""" > |INSERT OVERWRITE TABLE t PARTITION (p='$part') > |SELECT col FROM temp""".stripMargin) > } > spark.sql("SELECT * FROM t WHERE WHERE (p = '1' AND i = 1) OR (p = '2' and i > = 2)").explain > {code} > We have pushed down {{p = '1' or p = '2'}} since SPARK-28169, but this pushed > down filter not in explain > {noformat} > == Physical Plan == > *(1) Filter (((p#21 = 1) AND (i#20 = 1)) OR ((p#21 = 2) AND (i#20 = 2))) > +- *(1) ColumnarToRow >+- FileScan parquet default.t[i#20,p#21] Batched: true, DataFilters: [], > Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/sql/core/spark-warehouse/org.apache.spark..., > PartitionFilters: [], PushedFilters: [], ReadSchema: struct > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26862) assertion failed in ParquetRowConverter
[ https://issues.apache.org/jira/browse/SPARK-26862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu resolved SPARK-26862. - Resolution: Invalid > assertion failed in ParquetRowConverter > --- > > Key: SPARK-26862 > URL: https://issues.apache.org/jira/browse/SPARK-26862 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Nan Zhu >Priority: Major > > When I run the following query over a internal table (A and B are typed in > string, C is Array[String]) > ``` > spark.read.table("table1") > .select(col("A"), col("B"), col("C")) > .withColumn("row_num", > row_number().over(Window.partitionBy(col("A")).orderBy(col("B").desc))) > ``` > I received error as > > ``` > org.apache.spark.SparkException: Job aborted due to stage failure: Task 730 > in stage 12.0 failed 4 times, most recent failure: Lost task 730.3 in stage > 12.0 (TID 2468, hadoopworker650-dca1.prod.uber.internal, executor 88): > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:156) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:514) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:318) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:324) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.(ParquetRecordMaterializer.scala:43) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:126) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204) > at > org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182) > at > org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:452) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:364) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1
[jira] [Commented] (SPARK-26862) assertion failed in ParquetRowConverter
[ https://issues.apache.org/jira/browse/SPARK-26862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766355#comment-16766355 ] Nan Zhu commented on SPARK-26862: - [~srowen] I don't think so, as the same parquet files can be accessed by 2.3.2, I checked the execution plan of query, the bottom part is the same ...confused on how a windowing operation would make difference in data source access > assertion failed in ParquetRowConverter > --- > > Key: SPARK-26862 > URL: https://issues.apache.org/jira/browse/SPARK-26862 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Nan Zhu >Priority: Major > > When I run the following query over a internal table (A and B are typed in > string, C is Array[String]) > ``` > spark.read.table("table1") > .select(col("A"), col("B"), col("C")) > .withColumn("row_num", > row_number().over(Window.partitionBy(col("A")).orderBy(col("B").desc))) > ``` > I received error as > > ``` > org.apache.spark.SparkException: Job aborted due to stage failure: Task 730 > in stage 12.0 failed 4 times, most recent failure: Lost task 730.3 in stage > 12.0 (TID 2468, hadoopworker650-dca1.prod.uber.internal, executor 88): > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:156) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:514) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:318) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:324) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.(ParquetRecordMaterializer.scala:43) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:126) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204) > at > org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182) > at > org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:452) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:364) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101) > at > org.apache.spark.sql.catalyst
[jira] [Updated] (SPARK-26862) assertion failed in ParquetRowConverter
[ https://issues.apache.org/jira/browse/SPARK-26862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-26862: Description: When I run the following query over a internal table (A and B are typed in string, C is Array[String]) ``` spark.read.table("table1") .select(col("A"), col("B"), col("C")) .withColumn("row_num", row_number().over(Window.partitionBy(col("A")).orderBy(col("B").desc))) ``` I received error as ``` org.apache.spark.SparkException: Job aborted due to stage failure: Task 730 in stage 12.0 failed 4 times, most recent failure: Lost task 730.3 in stage 12.0 (TID 2468, hadoopworker650-dca1.prod.uber.internal, executor 88): java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:514) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:324) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185) at org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.(ParquetRecordMaterializer.scala:43) at org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:126) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204) at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182) at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:452) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:364) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:622) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$1
[jira] [Commented] (SPARK-26862) assertion failed in ParquetRowConverter
[ https://issues.apache.org/jira/browse/SPARK-26862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766327#comment-16766327 ] Nan Zhu commented on SPARK-26862: - [~felixcheung] > assertion failed in ParquetRowConverter > --- > > Key: SPARK-26862 > URL: https://issues.apache.org/jira/browse/SPARK-26862 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Nan Zhu >Priority: Major > > When I run the following query over a internal table (A and B are typed in > string, C is Array[String]) > ``` > spark.read.table("table1") > .select(col("A"), col("B"), col("C")) > .withColumn("row_num", > row_number().over(Window.partitionBy(col("A")).orderBy(col("B").desc))) > ``` > I received error as > > ``` > org.apache.spark.SparkException: Job aborted due to stage failure: Task 730 > in stage 12.0 failed 4 times, most recent failure: Lost task 730.3 in stage > 12.0 (TID 2468, hadoopworker650-dca1.prod.uber.internal, executor 88): > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:156) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:514) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:318) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:324) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.(ParquetRecordMaterializer.scala:43) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:126) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204) > at > org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182) > at > org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:452) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:364) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[jira] [Created] (SPARK-26862) assertion failed in ParquetRowConverter
Nan Zhu created SPARK-26862: --- Summary: assertion failed in ParquetRowConverter Key: SPARK-26862 URL: https://issues.apache.org/jira/browse/SPARK-26862 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Nan Zhu When I run the following query over a internal table (A and B are typed in string, C is Array[String]) ``` spark.read.table("table1") .select(col("A"), col("B"), col("C")) .withColumn("row_num", row_number().over(Window.partitionBy(col("A")).orderBy(col("B").desc))) ``` I received error as ``` org.apache.spark.SparkException: Job aborted due to stage failure: Task 730 in stage 12.0 failed 4 times, most recent failure: Lost task 730.3 in stage 12.0 (TID 2468, hadoopworker650-dca1.prod.uber.internal, executor 88): java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:514) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:324) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185) at org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.(ParquetRecordMaterializer.scala:43) at org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:126) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204) at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182) at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:452) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:364) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:622) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scal
[jira] [Resolved] (SPARK-24797) Analyzer should respect spark.sql.hive.convertMetastoreOrc/Parquet when build the data source table
[ https://issues.apache.org/jira/browse/SPARK-24797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu resolved SPARK-24797. - Resolution: Won't Fix > Analyzer should respect spark.sql.hive.convertMetastoreOrc/Parquet when build > the data source table > --- > > Key: SPARK-24797 > URL: https://issues.apache.org/jira/browse/SPARK-24797 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.3.1 >Reporter: Nan Zhu >Priority: Major > > the current code path ignore the value of > spark.sql.hive.convertMetastoreParquet when building data source table > > [https://github.com/apache/spark/blob/e0559f238009e02c40f65678fec691c07904e8c0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L263] > > as a result, even I turned off spark.sql.hive.convertMetastoreParquet, Spark > SQL still uses its own parquet reader to access table instead of delegate to > serder -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24797) Analyzer should respect spark.sql.hive.convertMetastoreOrc/Parquet when build the data source table
Nan Zhu created SPARK-24797: --- Summary: Analyzer should respect spark.sql.hive.convertMetastoreOrc/Parquet when build the data source table Key: SPARK-24797 URL: https://issues.apache.org/jira/browse/SPARK-24797 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.1, 2.3.0 Reporter: Nan Zhu the current code path ignore the value of spark.sql.hive.convertMetastoreParquet when building data source table [https://github.com/apache/spark/blob/e0559f238009e02c40f65678fec691c07904e8c0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L263] as a result, even I turned off spark.sql.hive.convertMetastoreParquet, Spark SQL still uses its own parquet reader to access table instead of delegate to serder -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22599) Avoid extra reading for cached table
[ https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16303022#comment-16303022 ] Nan Zhu commented on SPARK-22599: - [~rajesh.balamohan] no, it means that SPARK-22599 and master are running with in-memory data parquet means that it reads parquet files from the storage system directly > Avoid extra reading for cached table > > > Key: SPARK-22599 > URL: https://issues.apache.org/jira/browse/SPARK-22599 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Nan Zhu > > In the current implementation of Spark, InMemoryTableExec read all data in a > cached table, filter CachedBatch according to stats and pass data to the > downstream operators. This implementation makes it inefficient to reside the > whole table in memory to serve various queries against different partitions > of the table, which occupies a certain portion of our users' scenarios. > The following is an example of such a use case: > store_sales is a 1TB-sized table in cloud storage, which is partitioned by > 'location'. The first query, Q1, wants to output several metrics A, B, C for > all stores in all locations. After that, a small team of 3 data scientists > wants to do some causal analysis for the sales in different locations. To > avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache > the whole table in memory in Q1. > With the current implementation, even any one of the data scientists is only > interested in one out of three locations, the queries they submit to Spark > cluster is still reading 1TB data completely. > The reason behind the extra reading operation is that we implement > CachedBatch as > {code} > case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: > InternalRow) > {code} > where "stats" is a part of every CachedBatch, so we can only filter batches > for output of InMemoryTableExec operator by reading all data in in-memory > table as input. The extra reading would be even more unacceptable when some > of the table's data is evicted to disks. > We propose to introduce a new type of block, metadata block, for the > partitions of RDD representing data in the cached table. Every metadata block > contains stats info for all columns in a partition and is saved to > BlockManager when executing compute() method for the partition. To minimize > the number of bytes to read, > More details can be found in design > doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing > performance test results: > Environment: 6 Executors, each of which has 16 cores 90G memory > dataset: 1T TPCDS data > queries: tested 4 queries (Q19, Q46, Q34, Q27) in > https://github.com/databricks/spark-sql-perf/blob/c2224f37e50628c5c8691be69414ec7f5a3d919a/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala > results: > https://docs.google.com/spreadsheets/d/1A20LxqZzAxMjW7ptAJZF4hMBaHxKGk3TBEQoAJXfzCI/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297453#comment-16297453 ] Nan Zhu commented on SPARK-22765: - I took a look at the code, one of the possibilities is as following: we add the new executor id https://github.com/apache/spark/blob/a233fac0b8bf8229d938a24f2ede2d9d8861c284/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L598 and immediately filter idle executors https://github.com/apache/spark/blob/a233fac0b8bf8229d938a24f2ede2d9d8861c284/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L419 Right after this step, since executorIdToTaskIds hasn't contained executor id, the newly added executor is applied with onExecutorIdle in the method of onExecutorIdle: https://github.com/apache/spark/blob/a233fac0b8bf8229d938a24f2ede2d9d8861c284/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L479 the newly added executor is also vulnerable to be assigned with a removeTimes value, since removeTimes and executorsPendingToRemove would not contain the newly added executor ID. Based on the calculation method, the removeTime would be 60s after. Then we will remove the executor after 60s (https://github.com/apache/spark/blob/a233fac0b8bf8229d938a24f2ede2d9d8861c284/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L266). The potential fix is to update executorIdToTaskIds before we filter idle executors (but SPARK-21656 should prevent this happeningstill trying to figure out why SPARK-21656 not working) > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295958#comment-16295958 ] Nan Zhu commented on SPARK-22765: - [~xuefuz] Regarding this, "The symptom is that newly allocated executors are idled out before completing a single tasks! I suspected that this is caused by a busy scheduler. As a result, we have to keep 60s as a minimum." did you try to tune the parameters like 1. spark.driver.cores to assign more threads to the driver, 2. "spark.locality.wait.process", "spark.locality.wait.node", "spark.locality.wait.rack" to let executors get filled in a faster pace? > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21656) spark dynamic allocation should not idle timeout executors when there are enough tasks to run on them
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295282#comment-16295282 ] Nan Zhu commented on SPARK-21656: - NOTE: the issue fixed by https://github.com/apache/spark/pull/18874 > spark dynamic allocation should not idle timeout executors when there are > enough tasks to run on them > - > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee >Assignee: Jong Yoon Lee > Fix For: 2.2.1, 2.3.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now with dynamic allocation spark starts by getting the number of > executors it needs to run all the tasks in parallel (or the configured > maximum) for that stage. After it gets that number it will never reacquire > more unless either an executor dies, is explicitly killed by yarn or it goes > to the next stage. The dynamic allocation manager has the concept of idle > timeout. Currently this says if a task hasn't been scheduled on that executor > for a configurable amount of time (60 seconds by default), then let that > executor go. Note when it lets that executor go due to the idle timeout it > never goes back to see if it should reacquire more. > This is a problem for multiple reasons: > 1 . Things can happen in the system that are not expected that can cause > delays. Spark should be resilient to these. If the driver is GC'ing, you have > network delays, etc we could idle timeout executors even though there are > tasks to run on them its just the scheduler hasn't had time to start those > tasks. Note that in the worst case this allows the number of executors to go > to 0 and we have a deadlock. > 2. Internal Spark components have opposing requirements. The scheduler has a > requirement to try to get locality, the dynamic allocation doesn't know about > this and if it lets the executors go it hurts the scheduler from doing what > it was designed to do. For example the scheduler first tries to schedule > node local, during this time it can skip scheduling on some executors. After > a while though the scheduler falls back from node local to scheduler on rack > local, and then eventually on any node. So during when the scheduler is > doing node local scheduling, the other executors can idle timeout. This > means that when the scheduler does fall back to rack or any locality where it > would have used those executors, we have already let them go and it can't > scheduler all the tasks it could which can have a huge negative impact on job > run time. > > In both of these cases when the executors idle timeout we never go back to > check to see if we need more executors (until the next stage starts). In the > worst case you end up with 0 and deadlock, but generally this shows itself by > just going down to very few executors when you could have 10's of thousands > of tasks to run on them, which causes the job to take way more time (in my > case I've seen it should take minutes and it takes hours due to only been > left a few executors). > We should handle these situations in Spark. The most straight forward > approach would be to not allow the executors to idle timeout when there are > tasks that could run on those executors. This would allow the scheduler to do > its job with locality scheduling. In doing this it also fixes number 1 above > because you never can go into a deadlock as it will keep enough executors to > run all the tasks on. > There are other approaches to fix this, like explicitly prevent it from going > to 0 executors, that prevents a deadlock but can still cause the job to > slowdown greatly. We could also change it at some point to just re-check to > see if we should get more executors, but this adds extra logic, we would have > to decide when to check, its also just overhead in letting them go and then > re-acquiring them again and this would cause some slowdown in the job as the > executors aren't immediately there for the scheduler to place things on. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22790) add a configurable factor to describe HadoopFsRelation's size
Nan Zhu created SPARK-22790: --- Summary: add a configurable factor to describe HadoopFsRelation's size Key: SPARK-22790 URL: https://issues.apache.org/jira/browse/SPARK-22790 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Nan Zhu as per discussion in https://github.com/apache/spark/pull/19864#discussion_r156847927 the current HadoopFsRelation is purely based on the underlying file size which is not accurate and makes the execution vulnerable to errors like OOM Users can enable CBO with the functionalities in https://github.com/apache/spark/pull/19864 to avoid this issue This JIRA proposes to add a configurable factor to sizeInBytes method in HadoopFsRelation class so that users can mitigate this problem without CBO -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22790) add a configurable factor to describe HadoopFsRelation's size
[ https://issues.apache.org/jira/browse/SPARK-22790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291985#comment-16291985 ] Nan Zhu commented on SPARK-22790: - created per discussion in https://github.com/apache/spark/pull/19864 (will file a PR soon) > add a configurable factor to describe HadoopFsRelation's size > - > > Key: SPARK-22790 > URL: https://issues.apache.org/jira/browse/SPARK-22790 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Nan Zhu > > as per discussion in > https://github.com/apache/spark/pull/19864#discussion_r156847927 > the current HadoopFsRelation is purely based on the underlying file size > which is not accurate and makes the execution vulnerable to errors like OOM > Users can enable CBO with the functionalities in > https://github.com/apache/spark/pull/19864 to avoid this issue > This JIRA proposes to add a configurable factor to sizeInBytes method in > HadoopFsRelation class so that users can mitigate this problem without CBO -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22680) SparkSQL scan all partitions when the specified partitions are not exists in parquet formatted table
[ https://issues.apache.org/jira/browse/SPARK-22680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16282248#comment-16282248 ] Nan Zhu commented on SPARK-22680: - how you observed that spark scans all partitions? I tried to reproduce but no luck table structure {code} zhunan@sparktest:~/testdata1$ ls count1=1 count1=2 count1=3 _SUCCESS {code} query: select * from table1 where count1 = 4 I can see the log shows 17/12/07 17:57:51 INFO PrunedInMemoryFileIndex: Selected 0 partitions out of 0, pruned 0 partitions. 17/12/07 17:57:51 TRACE PrunedInMemoryFileIndex: Selected files after partition pruning: No file was selected > SparkSQL scan all partitions when the specified partitions are not exists in > parquet formatted table > > > Key: SPARK-22680 > URL: https://issues.apache.org/jira/browse/SPARK-22680 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.2.0 > Environment: spark2.0.2 spark2.2.0 >Reporter: Xiaochen Ouyang > > 1. spark-sql --master local[2] > 2. create external table test (id int,name string) partitioned by (country > string,province string, day string,hour int) stored as parquet localtion > '/warehouse/test'; > 3.produce data into table test > 4. select count(1) from test where country = '185' and province = '021' and > day = '2017-11-12' and hour = 10; if the 4 filter conditions are not exists > in HDFS and MetaStore[mysql] , this sql will scan all partitions in table test -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22673) InMemoryRelation should utilize on-disk table stats whenever possible
Nan Zhu created SPARK-22673: --- Summary: InMemoryRelation should utilize on-disk table stats whenever possible Key: SPARK-22673 URL: https://issues.apache.org/jira/browse/SPARK-22673 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Nan Zhu The current implementation of InMemoryRelation always uses the most expensive execution plan when writing cache With CBO enabled, we can actually have a more exact estimation of the underlying table size... -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22599) Avoid extra reading for cached table
[ https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-22599: Description: In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios. The following is an example of such a use case: store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1. With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely. The reason behind the extra reading operation is that we implement CachedBatch as {code} case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) {code} where "stats" is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks. We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read, More details can be found in design doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing performance test results: Environment: 6 Executors, each of which has 16 cores 90G memory dataset: 1T TPCDS data queries: tested 4 queries (Q19, Q46, Q34, Q27) in https://github.com/databricks/spark-sql-perf/blob/c2224f37e50628c5c8691be69414ec7f5a3d919a/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala results: https://docs.google.com/spreadsheets/d/1A20LxqZzAxMjW7ptAJZF4hMBaHxKGk3TBEQoAJXfzCI/edit?usp=sharing was: In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios. The following is an example of such a use case: store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1. With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely. The reason behind the extra reading operation is that we implement CachedBatch as {code} case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) {code} where "stats" is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks. We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read, More details can be found in design doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing > Avoid extra reading for cached table > > > Key: SPARK-22599 > URL: https://issues.apache.org/jira/browse/SPARK-22599 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Nan Zhu > > In the current implementation of Spark, InMemoryTableExec read all
[jira] [Updated] (SPARK-22599) Avoid extra reading for cached table
[ https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-22599: Description: In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios. The following is an example of such a use case: store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1. With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely. The reason behind the extra reading operation is that we implement CachedBatch as {code} case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) {code} where "stats" is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks. We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read, More details can be found in design doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing was: In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios. The following is an example of such a use case: store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1. With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely. The reason behind the extra reading operation is that we implement CachedBatch as {code:scala} case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) {code} where "stats" is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks. We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read, More details can be found in design doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing > Avoid extra reading for cached table > > > Key: SPARK-22599 > URL: https://issues.apache.org/jira/browse/SPARK-22599 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Nan Zhu > > In the current implementation of Spark, InMemoryTableExec read all data in a > cached table, filter CachedBatch according to stats and pass data to the > downstream operators. This implementation makes it inefficient to reside the > whole table in memory to serve various queries against different partitions > of the table, which occupies a certain portion of our users' scenarios. > The following is an example of such a use case: > store_sales is a 1TB-sized table in cloud storage, which is part
[jira] [Created] (SPARK-22599) Avoid extra reading for cached table
Nan Zhu created SPARK-22599: --- Summary: Avoid extra reading for cached table Key: SPARK-22599 URL: https://issues.apache.org/jira/browse/SPARK-22599 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Nan Zhu In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios. The following is an example of such a use case: store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1. With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely. The reason behind the extra reading operation is that we implement CachedBatch as {code:scala} case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) {code} where "stats" is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks. We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read, More details can be found in design doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-21197) Tricky use case makes dead application struggle for a long duration
[ https://issues.apache.org/jira/browse/SPARK-21197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu closed SPARK-21197. --- Resolution: Won't Fix > Tricky use case makes dead application struggle for a long duration > --- > > Key: SPARK-21197 > URL: https://issues.apache.org/jira/browse/SPARK-21197 > Project: Spark > Issue Type: Bug > Components: DStreams, Spark Core >Affects Versions: 2.0.2, 2.1.1 >Reporter: Nan Zhu > > The use case is in Spark Streaming while the root cause is in DAGScheduler, > so I said the component as both of DStreams and Core > Use case: > the user has a thread periodically triggering Spark jobs, and in the same > application, they retrieve data through Spark Streaming from somewherein > the Streaming logic, an exception is thrown so that the whole application is > supposed to be shutdown and let YARN restart it... > The user observed that after the exception is propagated to Spark core and > SparkContext.stop() is called, after 18 hours, the application is still > running... > The root cause is that when we call DAGScheduler.stop(), we will wait for > eventLoop's thread to finish > (https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1704 > and > https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/util/EventLoop.scala#L40) > Since there is a thread periodically push events to DAGScheduler's event > queue, it will never finish > a potential solution is that in EventLoop, we should allow interrupt the > thread directly for some cases, e.g. this one, and simultaneously allow > graceful shutdown for other cases, e.g. ListenerBus one, -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21197) Tricky use case makes dead application struggle for a long duration
[ https://issues.apache.org/jira/browse/SPARK-21197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16062162#comment-16062162 ] Nan Zhu commented on SPARK-21197: - yeah, after rethinking about the solution, I think daemon thread would be a better solution, > Tricky use case makes dead application struggle for a long duration > --- > > Key: SPARK-21197 > URL: https://issues.apache.org/jira/browse/SPARK-21197 > Project: Spark > Issue Type: Bug > Components: DStreams, Spark Core >Affects Versions: 2.0.2, 2.1.1 >Reporter: Nan Zhu > > The use case is in Spark Streaming while the root cause is in DAGScheduler, > so I said the component as both of DStreams and Core > Use case: > the user has a thread periodically triggering Spark jobs, and in the same > application, they retrieve data through Spark Streaming from somewherein > the Streaming logic, an exception is thrown so that the whole application is > supposed to be shutdown and let YARN restart it... > The user observed that after the exception is propagated to Spark core and > SparkContext.stop() is called, after 18 hours, the application is still > running... > The root cause is that when we call DAGScheduler.stop(), we will wait for > eventLoop's thread to finish > (https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1704 > and > https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/util/EventLoop.scala#L40) > Since there is a thread periodically push events to DAGScheduler's event > queue, it will never finish > a potential solution is that in EventLoop, we should allow interrupt the > thread directly for some cases, e.g. this one, and simultaneously allow > graceful shutdown for other cases, e.g. ListenerBus one, -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21197) Tricky use case makes dead application struggle for a long duration
[ https://issues.apache.org/jira/browse/SPARK-21197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-21197: Summary: Tricky use case makes dead application struggle for a long duration (was: Tricky use cases makes dead application struggle for a long duration) > Tricky use case makes dead application struggle for a long duration > --- > > Key: SPARK-21197 > URL: https://issues.apache.org/jira/browse/SPARK-21197 > Project: Spark > Issue Type: Bug > Components: DStreams, Spark Core >Affects Versions: 2.0.2, 2.1.1 >Reporter: Nan Zhu > > The use case is in Spark Streaming while the root cause is in DAGScheduler, > so I said the component as both of DStreams and Core > Use case: > the user has a thread periodically triggering Spark jobs, and in the same > application, they retrieve data through Spark Streaming from somewherein > the Streaming logic, an exception is thrown so that the whole application is > supposed to be shutdown and let YARN restart it... > The user observed that after the exception is propagated to Spark core and > SparkContext.stop() is called, after 18 hours, the application is still > running... > The root cause is that when we call DAGScheduler.stop(), we will wait for > eventLoop's thread to finish > (https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1704 > and > https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/util/EventLoop.scala#L40) > Since there is a thread periodically push events to DAGScheduler's event > queue, it will never finish > a potential solution is that in EventLoop, we should allow interrupt the > thread directly for some cases, e.g. this one, and simultaneously allow > graceful shutdown for other cases, e.g. ListenerBus one, -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21197) Tricky use cases makes dead application struggle for a long duration
Nan Zhu created SPARK-21197: --- Summary: Tricky use cases makes dead application struggle for a long duration Key: SPARK-21197 URL: https://issues.apache.org/jira/browse/SPARK-21197 Project: Spark Issue Type: Bug Components: DStreams, Spark Core Affects Versions: 2.1.1, 2.0.2 Reporter: Nan Zhu The use case is in Spark Streaming while the root cause is in DAGScheduler, so I said the component as both of DStreams and Core Use case: the user has a thread periodically triggering Spark jobs, and in the same application, they retrieve data through Spark Streaming from somewherein the Streaming logic, an exception is thrown so that the whole application is supposed to be shutdown and let YARN restart it... The user observed that after the exception is propagated to Spark core and SparkContext.stop() is called, after 18 hours, the application is still running... The root cause is that when we call DAGScheduler.stop(), we will wait for eventLoop's thread to finish (https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1704 and https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/util/EventLoop.scala#L40) Since there is a thread periodically push events to DAGScheduler's event queue, it will never finish a potential solution is that in EventLoop, we should allow interrupt the thread directly for some cases, e.g. this one, and simultaneously allow graceful shutdown for other cases, e.g. ListenerBus one, -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16036157#comment-16036157 ] Nan Zhu commented on SPARK-20928: - if I understand correctly the tasks will be "long-term" tasks just like the receiver tasks in receiver-based InputDStream in Spark Streaming? > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030379#comment-16030379 ] Nan Zhu commented on SPARK-20928: - Hi, is there any description on what does it mean? > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4921) TaskSetManager mistakenly returns PROCESS_LOCAL for NO_PREF tasks
[ https://issues.apache.org/jira/browse/SPARK-4921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021417#comment-16021417 ] Nan Zhu commented on SPARK-4921: I forgot most of details...but the final conclusion was that "it's a typo resulting no performance issue, but for some reason, we cannot fully prove changing this will not bring hurts...so won't fix" > TaskSetManager mistakenly returns PROCESS_LOCAL for NO_PREF tasks > - > > Key: SPARK-4921 > URL: https://issues.apache.org/jira/browse/SPARK-4921 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.2.0 >Reporter: Xuefu Zhang > Attachments: NO_PREF.patch > > > During research for HIVE-9153, we found that TaskSetManager returns > PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. > Changing the return value to NO_PREF, as demonstrated in the attached patch, > seemingly improves the performance. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20811) GBT Classifier failed with mysterious StackOverflowError
[ https://issues.apache.org/jira/browse/SPARK-20811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018246#comment-16018246 ] Nan Zhu commented on SPARK-20811: - thanks, let me try it > GBT Classifier failed with mysterious StackOverflowError > > > Key: SPARK-20811 > URL: https://issues.apache.org/jira/browse/SPARK-20811 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.1.0 >Reporter: Nan Zhu > > I am running GBT Classifier over airline dataset (combining 2005-2008) and in > total it's around 22M examples as training data > code is simple > {code:title=Bar.scala|borderStyle=solid} > val gradientBoostedTrees = new GBTClassifier() > gradientBoostedTrees.setMaxBins(1000) > gradientBoostedTrees.setMaxIter(500) > gradientBoostedTrees.setMaxDepth(6) > gradientBoostedTrees.setStepSize(1.0) > transformedTrainingSet.cache().foreach(_ => Unit) > val startTime = System.nanoTime() > val model = gradientBoostedTrees.fit(transformedTrainingSet) > println(s"===training time cost: ${(System.nanoTime() - startTime) / > 1000.0 / 1000.0} ms") > val resultDF = model.transform(transformedTestset) > val binaryClassificationEvaluator = new BinaryClassificationEvaluator() > > binaryClassificationEvaluator.setRawPredictionCol("prediction").setLabelCol("label") > println(s"=test AUC: > ${binaryClassificationEvaluator.evaluate(resultDF)}==") > {code} > my training job always failed with > {quote} > 17/05/19 13:41:29 WARN TaskSetManager: Lost task 18.0 in stage 3907.0 (TID > 137506, 10.0.0.13, executor 3): java.lang.StackOverflowError > at > java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:3037) > at > java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:3061) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2234) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > {quote} > the above pattern repeated for many times > Is it a bug or did I make something wrong when using GBTClassifier in ML? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20811) GBT Classifier failed with mysterious StackOverflowException
Nan Zhu created SPARK-20811: --- Summary: GBT Classifier failed with mysterious StackOverflowException Key: SPARK-20811 URL: https://issues.apache.org/jira/browse/SPARK-20811 Project: Spark Issue Type: Bug Components: ML Affects Versions: 2.1.0 Reporter: Nan Zhu I am running GBT Classifier over airline dataset (combining 2005-2008) and in total it's around 22M examples as training data code is simple {code:title=Bar.scala|borderStyle=solid} val gradientBoostedTrees = new GBTClassifier() gradientBoostedTrees.setMaxBins(1000) gradientBoostedTrees.setMaxIter(500) gradientBoostedTrees.setMaxDepth(6) gradientBoostedTrees.setStepSize(1.0) transformedTrainingSet.cache().foreach(_ => Unit) val startTime = System.nanoTime() val model = gradientBoostedTrees.fit(transformedTrainingSet) println(s"===training time cost: ${(System.nanoTime() - startTime) / 1000.0 / 1000.0} ms") val resultDF = model.transform(transformedTestset) val binaryClassificationEvaluator = new BinaryClassificationEvaluator() binaryClassificationEvaluator.setRawPredictionCol("prediction").setLabelCol("label") println(s"=test AUC: ${binaryClassificationEvaluator.evaluate(resultDF)}==") {code} my training job always failed with {quote} 17/05/19 13:41:29 WARN TaskSetManager: Lost task 18.0 in stage 3907.0 (TID 137506, 10.0.0.13, executor 3): java.lang.StackOverflowError at java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:3037) at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:3061) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2234) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) {quote} the above pattern repeated for many times Is it a bug or did I make something wrong when using GBTClassifier in ML? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20811) GBT Classifier failed with mysterious StackOverflowError
[ https://issues.apache.org/jira/browse/SPARK-20811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-20811: Summary: GBT Classifier failed with mysterious StackOverflowError (was: GBT Classifier failed with mysterious StackOverflowException ) > GBT Classifier failed with mysterious StackOverflowError > > > Key: SPARK-20811 > URL: https://issues.apache.org/jira/browse/SPARK-20811 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.1.0 >Reporter: Nan Zhu > > I am running GBT Classifier over airline dataset (combining 2005-2008) and in > total it's around 22M examples as training data > code is simple > {code:title=Bar.scala|borderStyle=solid} > val gradientBoostedTrees = new GBTClassifier() > gradientBoostedTrees.setMaxBins(1000) > gradientBoostedTrees.setMaxIter(500) > gradientBoostedTrees.setMaxDepth(6) > gradientBoostedTrees.setStepSize(1.0) > transformedTrainingSet.cache().foreach(_ => Unit) > val startTime = System.nanoTime() > val model = gradientBoostedTrees.fit(transformedTrainingSet) > println(s"===training time cost: ${(System.nanoTime() - startTime) / > 1000.0 / 1000.0} ms") > val resultDF = model.transform(transformedTestset) > val binaryClassificationEvaluator = new BinaryClassificationEvaluator() > > binaryClassificationEvaluator.setRawPredictionCol("prediction").setLabelCol("label") > println(s"=test AUC: > ${binaryClassificationEvaluator.evaluate(resultDF)}==") > {code} > my training job always failed with > {quote} > 17/05/19 13:41:29 WARN TaskSetManager: Lost task 18.0 in stage 3907.0 (TID > 137506, 10.0.0.13, executor 3): java.lang.StackOverflowError > at > java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:3037) > at > java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:3061) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2234) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > {quote} > the above pattern repeated for many times > Is it a bug or did I make something wrong when using GBTClassifier in ML? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20251) Spark streaming skips batches in a case of failure
[ https://issues.apache.org/jira/browse/SPARK-20251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977589#comment-15977589 ] Nan Zhu commented on SPARK-20251: - ignore my previous comments...the moving on Spark Streaming is due to a leaked user thread (didn't set to daemon) and blocked the whole app to quit...the StreamingContext is shutdown > Spark streaming skips batches in a case of failure > -- > > Key: SPARK-20251 > URL: https://issues.apache.org/jira/browse/SPARK-20251 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Roman Studenikin > > We are experiencing strange behaviour of spark streaming application. > Sometimes it just skips batch in a case of job failure and starts working on > the next one. > We expect it to attempt to reprocess batch, but not to skip it. Is it a bug > or we are missing any important configuration params? > Screenshots from spark UI: > http://pasteboard.co/1oRW0GDUX.png > http://pasteboard.co/1oSjdFpbc.png -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20251) Spark streaming skips batches in a case of failure
[ https://issues.apache.org/jira/browse/SPARK-20251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962318#comment-15962318 ] Nan Zhu edited comment on SPARK-20251 at 4/10/17 12:16 AM: --- more details here, it is expected that the compute() method for the next batch was executed before the app is shutdown, however, the app should be eventually shutdown since we have signalled the awaiting condition set in awaitTermination() however, this "eventual shutdown" was not happened...(this issue did not consistently happen) was (Author: codingcat): more details here, by "be proceeding", I mean it is expected that the compute() method for the next batch was executed before the app is shutdown, however, the app should be eventually shutdown since we have signalled the awaiting condition set in awaitTermination() however, this "eventual shutdown" was not happened...(this issue did not consistently happen) > Spark streaming skips batches in a case of failure > -- > > Key: SPARK-20251 > URL: https://issues.apache.org/jira/browse/SPARK-20251 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Roman Studenikin > > We are experiencing strange behaviour of spark streaming application. > Sometimes it just skips batch in a case of job failure and starts working on > the next one. > We expect it to attempt to reprocess batch, but not to skip it. Is it a bug > or we are missing any important configuration params? > Screenshots from spark UI: > http://pasteboard.co/1oRW0GDUX.png > http://pasteboard.co/1oSjdFpbc.png -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20251) Spark streaming skips batches in a case of failure
[ https://issues.apache.org/jira/browse/SPARK-20251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962318#comment-15962318 ] Nan Zhu commented on SPARK-20251: - more details here, by "be proceeding", I mean it is expected that the compute() method for the next batch was executed before the app is shutdown, however, the app should be eventually shutdown since we have signalled the awaiting condition set in awaitTermination() however, this "eventual shutdown" was not happened...(this issue did not consistently happen) > Spark streaming skips batches in a case of failure > -- > > Key: SPARK-20251 > URL: https://issues.apache.org/jira/browse/SPARK-20251 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Roman Studenikin > > We are experiencing strange behaviour of spark streaming application. > Sometimes it just skips batch in a case of job failure and starts working on > the next one. > We expect it to attempt to reprocess batch, but not to skip it. Is it a bug > or we are missing any important configuration params? > Screenshots from spark UI: > http://pasteboard.co/1oRW0GDUX.png > http://pasteboard.co/1oSjdFpbc.png -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20251) Spark streaming skips batches in a case of failure
[ https://issues.apache.org/jira/browse/SPARK-20251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962313#comment-15962313 ] Nan Zhu edited comment on SPARK-20251 at 4/9/17 11:57 PM: -- why this is an invalid report? I have been observing the same behavior recently when I upgrade to Spark 2.1 The basic idea (in my side) is that an exception thrown from DStream.compute() method should close the app instead of be proceeding (as the error handling in Spark Streaming is to release the await lock set in awaitTermination) I am still looking at those threads within Spark Streaming to see what was happening, can we change it back to a valid case and give me more time to investigate? was (Author: codingcat): why this is an invalid report? I have been observing the same behavior recently when I upgrade to Spark 2.1 The basic idea (in my side), an exception thrown from DStream.compute() method should close the app instead of proceeding (as the error handling in Spark Streaming is to release the await lock set in awaitTermination) I am still looking at those threads within Spark Streaming to see what was happening, can we change it back to a valid case and give me more time to investigate? > Spark streaming skips batches in a case of failure > -- > > Key: SPARK-20251 > URL: https://issues.apache.org/jira/browse/SPARK-20251 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Roman Studenikin > > We are experiencing strange behaviour of spark streaming application. > Sometimes it just skips batch in a case of job failure and starts working on > the next one. > We expect it to attempt to reprocess batch, but not to skip it. Is it a bug > or we are missing any important configuration params? > Screenshots from spark UI: > http://pasteboard.co/1oRW0GDUX.png > http://pasteboard.co/1oSjdFpbc.png -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20251) Spark streaming skips batches in a case of failure
[ https://issues.apache.org/jira/browse/SPARK-20251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962313#comment-15962313 ] Nan Zhu commented on SPARK-20251: - why this is an invalid report? I have been observing the same behavior recently when I upgrade to Spark 2.1 The basic idea (in my side), an exception thrown from DStream.compute() method should close the app instead of proceeding (as the error handling in Spark Streaming is to release the await lock set in awaitTermination) I am still looking at those threads within Spark Streaming to see what was happening, can we change it back to a valid case and give me more time to investigate? > Spark streaming skips batches in a case of failure > -- > > Key: SPARK-20251 > URL: https://issues.apache.org/jira/browse/SPARK-20251 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Roman Studenikin > > We are experiencing strange behaviour of spark streaming application. > Sometimes it just skips batch in a case of job failure and starts working on > the next one. > We expect it to attempt to reprocess batch, but not to skip it. Is it a bug > or we are missing any important configuration params? > Screenshots from spark UI: > http://pasteboard.co/1oRW0GDUX.png > http://pasteboard.co/1oSjdFpbc.png -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19789) Add the shortcut of .format("parquet").option("path", "/hdfs/path").partitionBy("col1", "col2").start()
[ https://issues.apache.org/jira/browse/SPARK-19789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906782#comment-15906782 ] Nan Zhu commented on SPARK-19789: - [~zsxwing] mind reviewing the PR? > Add the shortcut of .format("parquet").option("path", > "/hdfs/path").partitionBy("col1", "col2").start() > --- > > Key: SPARK-19789 > URL: https://issues.apache.org/jira/browse/SPARK-19789 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Nan Zhu > > as said in the title, everytime I use parquet, I need to type a long string > Being consistent with DataFrameReader/Writer, DataStreamReader, add parquet > shortcut for this case -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19789) Add the shortcut of .format("parquet").option("path", "/hdfs/path").partitionBy("col1", "col2").start()
Nan Zhu created SPARK-19789: --- Summary: Add the shortcut of .format("parquet").option("path", "/hdfs/path").partitionBy("col1", "col2").start() Key: SPARK-19789 URL: https://issues.apache.org/jira/browse/SPARK-19789 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.1.0 Reporter: Nan Zhu as said in the title, everytime I use parquet, I need to type a long string Being consistent with DataFrameReader/Writer, DataStreamReader, add parquet shortcut for this case -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19788) DataStreamReader/DataStreamWriter.option shall accept user-defined type
[ https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-19788: Description: There are many other data sources/sinks which has very different configuration ways than Kafka, FileSystem, etc. The expected type of the configuration entry passed to them might be a nested collection type, e.g. Map[String, Map[String, String]], or even a user-defined type(for example, the one I am working on) Right now, option can only accept String -> String/Boolean/Long/Double OR a complete Map[String, String]...my suggestion is that we can accept Map[String, Any], and the type of 'parameters' in SourceProvider.createSource can also be Map[String, Any], this will create much more flexibility to the user The drawback is that, it is a breaking change ( we can mitigate this by deprecating the current one, and progressively evolve to the new one if the proposal is accepted) [~zsxwing] what do you think? was: There are many other data sources/sinks which has very different configuration ways than Kafka, FileSystem, etc. The expected type of the configuration entry passed to them might be a nested collection type, e.g. Map[String, Map[String, String]], or even a user-defined type(for example, the one I am working on) Right now, option can only accept String -> String/Boolean/Long/Double OR a complete Map[String, String]...my suggestion is that we can accept Map[String, Any], and the type of 'parameters' in SourceProvider.createSource can also be Map[String, Any], this will create much more flexibility to the user The drawback is that, it is a breaking change ( we can mitigate this by deprecate the current one, and progressively evolve to the new one if the proposal is accepted) [~zsxwing] what do you think? > DataStreamReader/DataStreamWriter.option shall accept user-defined type > --- > > Key: SPARK-19788 > URL: https://issues.apache.org/jira/browse/SPARK-19788 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Nan Zhu > > There are many other data sources/sinks which has very different > configuration ways than Kafka, FileSystem, etc. > The expected type of the configuration entry passed to them might be a nested > collection type, e.g. Map[String, Map[String, String]], or even a > user-defined type(for example, the one I am working on) > Right now, option can only accept String -> String/Boolean/Long/Double OR a > complete Map[String, String]...my suggestion is that we can accept > Map[String, Any], and the type of 'parameters' in SourceProvider.createSource > can also be Map[String, Any], this will create much more flexibility to the > user > The drawback is that, it is a breaking change ( we can mitigate this by > deprecating the current one, and progressively evolve to the new one if the > proposal is accepted) > [~zsxwing] what do you think? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19788) DataStreamReader/DataStreamWriter.option shall accept user-defined type
[ https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-19788: Description: There are many other data sources/sinks which has very different configuration ways than Kafka, FileSystem, etc. The expected type of the configuration entry passed to them might be a nested collection type, e.g. Map[String, Map[String, String]], or even a user-defined type(for example, the one I am working on) Right now, option can only accept String -> String/Boolean/Long/Double OR a complete Map[String, String]...my suggestion is that we can accept Map[String, Any], and the type of 'parameters' in SourceProvider.createSource can also be Map[String, Any], this will create much more flexibility to the user The drawback is that, it is a breaking change ( we can mitigate this by deprecate the current one, and progressively evolve to the new one if the proposal is accepted) [~zsxwing] what do you think? was: There are many other data sources which has very different configuration ways than Kafka, FileSystem, etc. The expected type of the configuration entry passed to them might be a nested collection type, e.g. Map[String, Map[String, String]], or even a user-defined type(for example, the one I am working on) Right now, option can only accept String -> String/Boolean/Long/Double OR a complete Map[String, String]...my suggestion is that we can accept Map[String, Any], and the type of 'parameters' in SourceProvider.createSource can also be Map[String, Any], this will create much more flexibility to the user The drawback is that, it is a breaking change ( we can mitigate this by deprecate the current one, and progressively evolve to the new one if the proposal is accepted) [~zsxwing] what do you think? > DataStreamReader/DataStreamWriter.option shall accept user-defined type > --- > > Key: SPARK-19788 > URL: https://issues.apache.org/jira/browse/SPARK-19788 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Nan Zhu > > There are many other data sources/sinks which has very different > configuration ways than Kafka, FileSystem, etc. > The expected type of the configuration entry passed to them might be a nested > collection type, e.g. Map[String, Map[String, String]], or even a > user-defined type(for example, the one I am working on) > Right now, option can only accept String -> String/Boolean/Long/Double OR a > complete Map[String, String]...my suggestion is that we can accept > Map[String, Any], and the type of 'parameters' in SourceProvider.createSource > can also be Map[String, Any], this will create much more flexibility to the > user > The drawback is that, it is a breaking change ( we can mitigate this by > deprecate the current one, and progressively evolve to the new one if the > proposal is accepted) > [~zsxwing] what do you think? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19788) DataStreamReader/DataStreamWriter.option shall accept user-defined type
[ https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890522#comment-15890522 ] Nan Zhu edited comment on SPARK-19788 at 3/1/17 4:45 PM: - another drawback is that it might look like incompatible with DataFrameReader/DataFrameWriter (we can also change that?) was (Author: codingcat): another drawback is that it might look like incompatible with DataFrameReader (we can also change that?) > DataStreamReader/DataStreamWriter.option shall accept user-defined type > --- > > Key: SPARK-19788 > URL: https://issues.apache.org/jira/browse/SPARK-19788 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Nan Zhu > > There are many other data sources/sinks which has very different > configuration ways than Kafka, FileSystem, etc. > The expected type of the configuration entry passed to them might be a nested > collection type, e.g. Map[String, Map[String, String]], or even a > user-defined type(for example, the one I am working on) > Right now, option can only accept String -> String/Boolean/Long/Double OR a > complete Map[String, String]...my suggestion is that we can accept > Map[String, Any], and the type of 'parameters' in SourceProvider.createSource > can also be Map[String, Any], this will create much more flexibility to the > user > The drawback is that, it is a breaking change ( we can mitigate this by > deprecate the current one, and progressively evolve to the new one if the > proposal is accepted) > [~zsxwing] what do you think? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19788) DataStreamReader/DataStreamWriter.option shall accept user-defined type
[ https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-19788: Summary: DataStreamReader/DataStreamWriter.option shall accept user-defined type (was: DataStreamReader.option shall accept user-defined type) > DataStreamReader/DataStreamWriter.option shall accept user-defined type > --- > > Key: SPARK-19788 > URL: https://issues.apache.org/jira/browse/SPARK-19788 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Nan Zhu > > There are many other data sources which has very different configuration ways > than Kafka, FileSystem, etc. > The expected type of the configuration entry passed to them might be a nested > collection type, e.g. Map[String, Map[String, String]], or even a > user-defined type(for example, the one I am working on) > Right now, option can only accept String -> String/Boolean/Long/Double OR a > complete Map[String, String]...my suggestion is that we can accept > Map[String, Any], and the type of 'parameters' in SourceProvider.createSource > can also be Map[String, Any], this will create much more flexibility to the > user > The drawback is that, it is a breaking change ( we can mitigate this by > deprecate the current one, and progressively evolve to the new one if the > proposal is accepted) > [~zsxwing] what do you think? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19788) DataStreamReader.option shall accept user-defined type
[ https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890522#comment-15890522 ] Nan Zhu commented on SPARK-19788: - another drawback is that it might look like incompatible with DataFrameReader (we can also change that?) > DataStreamReader.option shall accept user-defined type > -- > > Key: SPARK-19788 > URL: https://issues.apache.org/jira/browse/SPARK-19788 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Nan Zhu > > There are many other data sources which has very different configuration ways > than Kafka, FileSystem, etc. > The expected type of the configuration entry passed to them might be a nested > collection type, e.g. Map[String, Map[String, String]], or even a > user-defined type(for example, the one I am working on) > Right now, option can only accept String -> String/Boolean/Long/Double OR a > complete Map[String, String]...my suggestion is that we can accept > Map[String, Any], and the type of 'parameters' in SourceProvider.createSource > can also be Map[String, Any], this will create much more flexibility to the > user > The drawback is that, it is a breaking change ( we can mitigate this by > deprecate the current one, and progressively evolve to the new one if the > proposal is accepted) > [~zsxwing] what do you think? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19788) DataStreamReader.option shall accept user-defined type
Nan Zhu created SPARK-19788: --- Summary: DataStreamReader.option shall accept user-defined type Key: SPARK-19788 URL: https://issues.apache.org/jira/browse/SPARK-19788 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.1.0 Reporter: Nan Zhu There are many other data sources which has very different configuration ways than Kafka, FileSystem, etc. The expected type of the configuration entry passed to them might be a nested collection type, e.g. Map[String, Map[String, String]], or even a user-defined type(for example, the one I am working on) Right now, option can only accept String -> String/Boolean/Long/Double OR a complete Map[String, String]...my suggestion is that we can accept Map[String, Any], and the type of 'parameters' in SourceProvider.createSource can also be Map[String, Any], this will create much more flexibility to the user The drawback is that, it is a breaking change ( we can mitigate this by deprecate the current one, and progressively evolve to the new one if the proposal is accepted) [~zsxwing] what do you think? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
[ https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886999#comment-15886999 ] Nan Zhu commented on SPARK-19280: - [~zsxwing] please let me know if we agree on that 2 is something we like I plan to work on this > Failed Recovery from checkpoint caused by the multi-threads issue in Spark > Streaming scheduler > -- > > Key: SPARK-19280 > URL: https://issues.apache.org/jira/browse/SPARK-19280 > Project: Spark > Issue Type: Bug >Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu >Priority: Critical > > In one of our applications, we found the following issue, the application > recovering from a checkpoint file named "checkpoint-***16670" but with > the timestamp ***16650 will recover from the very beginning of the stream > and because our application relies on the external & periodically-cleaned > data (syncing with checkpoint cleanup), the recovery just failed > We identified a potential issue in Spark Streaming checkpoint and will > describe it with the following example. We will propose a fix in the end of > this JIRA. > 1. The application properties: Batch Duration: 2, Functionality: Single > Stream calling ReduceByKeyAndWindow and print, Window Size: 6, > SlideDuration, 2 > 2. RDD at 16650 is generated and the corresponding job is submitted to > the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the > queue of JobGenerator > 3. Job at 16650 is finished and JobCompleted message is sent to > JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the > execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of > JobGenerator > 4. JobScheduler's message processing thread (I will use JS-EventLoop to > identify it) is not scheduled by the operating system for a long time, and > during this period, Jobs generated from 16652 - 16670 are generated > and completed. > 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled > and processed all DoCheckpoint messages for jobs ranging from 16652 - > 16670 and checkpoint files are successfully written. CRITICAL: at this > moment, the lastCheckpointTime would be 16670. > 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs > ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is > pushed to JobGenerator's message queue for EACH JobCompleted. > 7. The current message queue contains 20 ClearMetadata messages and > JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will > remove all RDDs out of rememberDuration window. In our case, > ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of > ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- > (16660, 16670] are kept. And ClearMetaData processing logic will push > a DoCheckpoint to JobGenerator's thread > 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY > CRITICAL: at this step, RDD no later than 16660 has been removed, and > checkpoint data is updated as > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 > and > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. > 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with > the timestamp 16650. and at this moment, Application crashed > 10. Application recovers from /path/checkpoint-16670 and try to get RDD > with validTime 16650. Of course it will not find it and has to recompute. > In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until > to the start of the stream. When the stream depends on the external data, it > will not successfully recover. In the case of Kafka, the recovered RDDs would > not be the same as the original one, as the currentOffsets has been updated > to the value at the moment of 16670 > The proposed fix: > 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime > instead of using the timestamp of Checkpoint instance (any side-effect?) > 1. ClearMetadata shall be ClearMedataAndCheckpoint > 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see > any necessary to have two threads here -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-
[jira] [Updated] (SPARK-19499) Add more notes in the comments of Sink.addBatch()
[ https://issues.apache.org/jira/browse/SPARK-19499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-19499: Description: addBatch method in Sink trait is supposed to be a synchronous method to coordinate with the fault-tolerance design in StreamingExecution (being different with the compute() method in DStream) We need to add more notes in the comments of this method to remind the developers was: addBatch method in Sink trait is supposed to be a synchronous method to coordinate with the fault-tolerance design in StreamingExecution (being different with the compute() method in DStream) We need to add more notes in the method of this method to remind the developers > Add more notes in the comments of Sink.addBatch() > -- > > Key: SPARK-19499 > URL: https://issues.apache.org/jira/browse/SPARK-19499 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu >Priority: Minor > > addBatch method in Sink trait is supposed to be a synchronous method to > coordinate with the fault-tolerance design in StreamingExecution (being > different with the compute() method in DStream) > We need to add more notes in the comments of this method to remind the > developers -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19499) Add more notes in the comments of Sink.addBatch()
[ https://issues.apache.org/jira/browse/SPARK-19499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-19499: Summary: Add more notes in the comments of Sink.addBatch() (was: Add more description in the comments of Sink.addBatch() ) > Add more notes in the comments of Sink.addBatch() > -- > > Key: SPARK-19499 > URL: https://issues.apache.org/jira/browse/SPARK-19499 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu >Priority: Minor > > addBatch method in Sink trait is supposed to be a synchronous method to > coordinate with the fault-tolerance design in StreamingExecution (being > different with the compute() method in DStream) > We need to add more notes in the method of this method to remind the > developers -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19499) Add more description in the comments of Sink.addBatch()
Nan Zhu created SPARK-19499: --- Summary: Add more description in the comments of Sink.addBatch() Key: SPARK-19499 URL: https://issues.apache.org/jira/browse/SPARK-19499 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0 Reporter: Nan Zhu Priority: Minor addBatch method in Sink trait is supposed to be a synchronous method to coordinate with the fault-tolerance design in StreamingExecution (being different with the compute() method in DStream) We need to add more notes in the method of this method to remind the developers -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint
[ https://issues.apache.org/jira/browse/SPARK-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15851787#comment-15851787 ] Nan Zhu commented on SPARK-19233: - ping > Inconsistent Behaviour of Spark Streaming Checkpoint > > > Key: SPARK-19233 > URL: https://issues.apache.org/jira/browse/SPARK-19233 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu > > When checking one of our application logs, we found the following behavior > (simplified) > 1. Spark application recovers from checkpoint constructed at timestamp 1000ms > 2. The log shows that Spark application can recover RDDs generated at > timestamp 2000, 3000 > The root cause is that generateJobs event is pushed to the queue by a > separate thread (RecurTimer), before doCheckpoint event is pushed to the > queue, there might have been multiple generatedJobs being processed. As a > result, when doCheckpoint for timestamp 1000 is processed, the generatedRDDs > data structure containing RDDs generated at 2000, 3000 is serialized as part > of checkpoint of 1000. > It brings overhead for debugging and coordinate our offset management > strategy with Spark Streaming's checkpoint strategy when we are developing a > new type of DStream which integrates Spark Streaming with an internal message > middleware. > The proposed fix is to filter generatedRDDs according to checkpoint timestamp > when serializing it. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
[ https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15851786#comment-15851786 ] Nan Zhu commented on SPARK-19280: - ping > Failed Recovery from checkpoint caused by the multi-threads issue in Spark > Streaming scheduler > -- > > Key: SPARK-19280 > URL: https://issues.apache.org/jira/browse/SPARK-19280 > Project: Spark > Issue Type: Bug >Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu >Priority: Critical > > In one of our applications, we found the following issue, the application > recovering from a checkpoint file named "checkpoint-***16670" but with > the timestamp ***16650 will recover from the very beginning of the stream > and because our application relies on the external & periodically-cleaned > data (syncing with checkpoint cleanup), the recovery just failed > We identified a potential issue in Spark Streaming checkpoint and will > describe it with the following example. We will propose a fix in the end of > this JIRA. > 1. The application properties: Batch Duration: 2, Functionality: Single > Stream calling ReduceByKeyAndWindow and print, Window Size: 6, > SlideDuration, 2 > 2. RDD at 16650 is generated and the corresponding job is submitted to > the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the > queue of JobGenerator > 3. Job at 16650 is finished and JobCompleted message is sent to > JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the > execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of > JobGenerator > 4. JobScheduler's message processing thread (I will use JS-EventLoop to > identify it) is not scheduled by the operating system for a long time, and > during this period, Jobs generated from 16652 - 16670 are generated > and completed. > 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled > and processed all DoCheckpoint messages for jobs ranging from 16652 - > 16670 and checkpoint files are successfully written. CRITICAL: at this > moment, the lastCheckpointTime would be 16670. > 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs > ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is > pushed to JobGenerator's message queue for EACH JobCompleted. > 7. The current message queue contains 20 ClearMetadata messages and > JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will > remove all RDDs out of rememberDuration window. In our case, > ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of > ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- > (16660, 16670] are kept. And ClearMetaData processing logic will push > a DoCheckpoint to JobGenerator's thread > 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY > CRITICAL: at this step, RDD no later than 16660 has been removed, and > checkpoint data is updated as > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 > and > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. > 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with > the timestamp 16650. and at this moment, Application crashed > 10. Application recovers from /path/checkpoint-16670 and try to get RDD > with validTime 16650. Of course it will not find it and has to recompute. > In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until > to the start of the stream. When the stream depends on the external data, it > will not successfully recover. In the case of Kafka, the recovered RDDs would > not be the same as the original one, as the currentOffsets has been updated > to the value at the moment of 16670 > The proposed fix: > 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime > instead of using the timestamp of Checkpoint instance (any side-effect?) > 1. ClearMetadata shall be ClearMedataAndCheckpoint > 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see > any necessary to have two threads here -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint
Title: Message Title Nan Zhu commented on SPARK-19233 Re: Inconsistent Behaviour of Spark Streaming Checkpoint update? Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
Title: Message Title Nan Zhu commented on SPARK-19280 Re: Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler update? Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] [Created] (SPARK-19358) LiveListenerBus shall log the event name when dropping them due to a fully filled queue
Nan Zhu created SPARK-19358: --- Summary: LiveListenerBus shall log the event name when dropping them due to a fully filled queue Key: SPARK-19358 URL: https://issues.apache.org/jira/browse/SPARK-19358 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3 Reporter: Nan Zhu Priority: Trivial Some dropped event will make the whole application behaves unexpectedly, e.g. some UI problem...we shall log the dropped event name to facilitate the debugging -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
[ https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831209#comment-15831209 ] Nan Zhu edited comment on SPARK-19280 at 1/20/17 1:24 PM: -- [~zsxwing] Thanks for reply 0) I do not think the content in checkpoint file (except that timestamp) has some problem. A checkpoint request for an earlier time is just for reducing the future checkpoint size (because a checkpoint request for an earlier timestamp must follow a ClearMetadata for the same timestamp). based on this, we should not recover from that "just-for-reduceing-size" moment, we shall recover from the latestCheckpointTime. I didn't see how recovering from this moment will lose data (I may miss something..) 1) can you elaborate it? my original idea is just to avoid a checkpoint with an earlier timestamp in the case of ClearMetadata (if I miss any data loss case in 0, then forget about this one...) 2) long-term fix and brings benefits, including SPARK-19233, it is the second one caused by multi threads here. I am volunteering to work on it and will be happy if you or TD can be the Shepherd was (Author: codingcat): [~zsxwing] Thanks for reply 0) I do not think the content in checkpoint file (except that timestamp) has some problem. A checkpoint request for an earlier time is just for reducing the future checkpoint size (because a checkpoint request for an earlier timestamp but follow a ClearMetadata for the same timestamp). based on this, we should not recover from that "just-for-reduceing-size" moment, we shall recover from the latestCheckpointTime. I didn't see how recovering from this moment will lose data (I may miss something..) 1) can you elaborate it? my original idea is just to avoid a checkpoint with an earlier timestamp (if I miss any data loss case in 0, then forget about this one...) 2) long-term fix and brings benefits, including SPARK-19233, it is the second one caused by multi threads here. I am volunteering to work on it and will be happy if you or TD can be the Shepherd > Failed Recovery from checkpoint caused by the multi-threads issue in Spark > Streaming scheduler > -- > > Key: SPARK-19280 > URL: https://issues.apache.org/jira/browse/SPARK-19280 > Project: Spark > Issue Type: Bug >Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu >Priority: Critical > > In one of our applications, we found the following issue, the application > recovering from a checkpoint file named "checkpoint-***16670" but with > the timestamp ***16650 will recover from the very beginning of the stream > and because our application relies on the external & periodically-cleaned > data (syncing with checkpoint cleanup), the recovery just failed > We identified a potential issue in Spark Streaming checkpoint and will > describe it with the following example. We will propose a fix in the end of > this JIRA. > 1. The application properties: Batch Duration: 2, Functionality: Single > Stream calling ReduceByKeyAndWindow and print, Window Size: 6, > SlideDuration, 2 > 2. RDD at 16650 is generated and the corresponding job is submitted to > the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the > queue of JobGenerator > 3. Job at 16650 is finished and JobCompleted message is sent to > JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the > execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of > JobGenerator > 4. JobScheduler's message processing thread (I will use JS-EventLoop to > identify it) is not scheduled by the operating system for a long time, and > during this period, Jobs generated from 16652 - 16670 are generated > and completed. > 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled > and processed all DoCheckpoint messages for jobs ranging from 16652 - > 16670 and checkpoint files are successfully written. CRITICAL: at this > moment, the lastCheckpointTime would be 16670. > 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs > ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is > pushed to JobGenerator's message queue for EACH JobCompleted. > 7. The current message queue contains 20 ClearMetadata messages and > JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will > remove all RDDs out of rememberDuration window. In our case, > ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of > ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- > (16660, 16670] are kept. And ClearMetaData processing logic will push > a DoC
[jira] [Commented] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
[ https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831217#comment-15831217 ] Nan Zhu commented on SPARK-19280: - BTW, do I need to highlight the KafkaDStream issue as another JIRA, (which is taken as an example here)? > Failed Recovery from checkpoint caused by the multi-threads issue in Spark > Streaming scheduler > -- > > Key: SPARK-19280 > URL: https://issues.apache.org/jira/browse/SPARK-19280 > Project: Spark > Issue Type: Bug >Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu >Priority: Critical > > In one of our applications, we found the following issue, the application > recovering from a checkpoint file named "checkpoint-***16670" but with > the timestamp ***16650 will recover from the very beginning of the stream > and because our application relies on the external & periodically-cleaned > data (syncing with checkpoint cleanup), the recovery just failed > We identified a potential issue in Spark Streaming checkpoint and will > describe it with the following example. We will propose a fix in the end of > this JIRA. > 1. The application properties: Batch Duration: 2, Functionality: Single > Stream calling ReduceByKeyAndWindow and print, Window Size: 6, > SlideDuration, 2 > 2. RDD at 16650 is generated and the corresponding job is submitted to > the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the > queue of JobGenerator > 3. Job at 16650 is finished and JobCompleted message is sent to > JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the > execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of > JobGenerator > 4. JobScheduler's message processing thread (I will use JS-EventLoop to > identify it) is not scheduled by the operating system for a long time, and > during this period, Jobs generated from 16652 - 16670 are generated > and completed. > 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled > and processed all DoCheckpoint messages for jobs ranging from 16652 - > 16670 and checkpoint files are successfully written. CRITICAL: at this > moment, the lastCheckpointTime would be 16670. > 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs > ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is > pushed to JobGenerator's message queue for EACH JobCompleted. > 7. The current message queue contains 20 ClearMetadata messages and > JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will > remove all RDDs out of rememberDuration window. In our case, > ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of > ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- > (16660, 16670] are kept. And ClearMetaData processing logic will push > a DoCheckpoint to JobGenerator's thread > 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY > CRITICAL: at this step, RDD no later than 16660 has been removed, and > checkpoint data is updated as > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 > and > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. > 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with > the timestamp 16650. and at this moment, Application crashed > 10. Application recovers from /path/checkpoint-16670 and try to get RDD > with validTime 16650. Of course it will not find it and has to recompute. > In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until > to the start of the stream. When the stream depends on the external data, it > will not successfully recover. In the case of Kafka, the recovered RDDs would > not be the same as the original one, as the currentOffsets has been updated > to the value at the moment of 16670 > The proposed fix: > 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime > instead of using the timestamp of Checkpoint instance (any side-effect?) > 1. ClearMetadata shall be ClearMedataAndCheckpoint > 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see > any necessary to have two threads here -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional comma
[jira] [Commented] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
[ https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831209#comment-15831209 ] Nan Zhu commented on SPARK-19280: - [~zsxwing] Thanks for reply 0) I do not think the content in checkpoint file (except that timestamp) has some problem. A checkpoint request for an earlier time is just for reducing the future checkpoint size (because a checkpoint request for an earlier timestamp but follow a ClearMetadata for the same timestamp). based on this, we should not recover from that "just-for-reduceing-size" moment, we shall recover from the latestCheckpointTime. I didn't see how recovering from this moment will lose data (I may miss something..) 1) can you elaborate it? my original idea is just to avoid a checkpoint with an earlier timestamp (if I miss any data loss case in 0, then forget about this one...) 2) long-term fix and brings benefits, including SPARK-19233, it is the second one caused by multi threads here. I am volunteering to work on it and will be happy if you or TD can be the Shepherd > Failed Recovery from checkpoint caused by the multi-threads issue in Spark > Streaming scheduler > -- > > Key: SPARK-19280 > URL: https://issues.apache.org/jira/browse/SPARK-19280 > Project: Spark > Issue Type: Bug >Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu >Priority: Critical > > In one of our applications, we found the following issue, the application > recovering from a checkpoint file named "checkpoint-***16670" but with > the timestamp ***16650 will recover from the very beginning of the stream > and because our application relies on the external & periodically-cleaned > data (syncing with checkpoint cleanup), the recovery just failed > We identified a potential issue in Spark Streaming checkpoint and will > describe it with the following example. We will propose a fix in the end of > this JIRA. > 1. The application properties: Batch Duration: 2, Functionality: Single > Stream calling ReduceByKeyAndWindow and print, Window Size: 6, > SlideDuration, 2 > 2. RDD at 16650 is generated and the corresponding job is submitted to > the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the > queue of JobGenerator > 3. Job at 16650 is finished and JobCompleted message is sent to > JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the > execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of > JobGenerator > 4. JobScheduler's message processing thread (I will use JS-EventLoop to > identify it) is not scheduled by the operating system for a long time, and > during this period, Jobs generated from 16652 - 16670 are generated > and completed. > 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled > and processed all DoCheckpoint messages for jobs ranging from 16652 - > 16670 and checkpoint files are successfully written. CRITICAL: at this > moment, the lastCheckpointTime would be 16670. > 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs > ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is > pushed to JobGenerator's message queue for EACH JobCompleted. > 7. The current message queue contains 20 ClearMetadata messages and > JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will > remove all RDDs out of rememberDuration window. In our case, > ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of > ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- > (16660, 16670] are kept. And ClearMetaData processing logic will push > a DoCheckpoint to JobGenerator's thread > 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY > CRITICAL: at this step, RDD no later than 16660 has been removed, and > checkpoint data is updated as > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 > and > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. > 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with > the timestamp 16650. and at this moment, Application crashed > 10. Application recovers from /path/checkpoint-16670 and try to get RDD > with validTime 16650. Of course it will not find it and has to recompute. > In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until > to the start of the stream. When the strea
[jira] [Updated] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
[ https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-19280: Description: In one of our applications, we found the following issue, the application recovering from a checkpoint file named "checkpoint-***16670" but with the timestamp ***16650 will recover from the very beginning of the stream and because our application relies on the external & periodically-cleaned data (syncing with checkpoint cleanup), the recovery just failed We identified a potential issue in Spark Streaming checkpoint and will describe it with the following example. We will propose a fix in the end of this JIRA. 1. The application properties: Batch Duration: 2, Functionality: Single Stream calling ReduceByKeyAndWindow and print, Window Size: 6, SlideDuration, 2 2. RDD at 16650 is generated and the corresponding job is submitted to the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of JobGenerator 3. Job at 16650 is finished and JobCompleted message is sent to JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of JobGenerator 4. JobScheduler's message processing thread (I will use JS-EventLoop to identify it) is not scheduled by the operating system for a long time, and during this period, Jobs generated from 16652 - 16670 are generated and completed. 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled and processed all DoCheckpoint messages for jobs ranging from 16652 - 16670 and checkpoint files are successfully written. CRITICAL: at this moment, the lastCheckpointTime would be 16670. 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is pushed to JobGenerator's message queue for EACH JobCompleted. 7. The current message queue contains 20 ClearMetadata messages and JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- (16660, 16670] are kept. And ClearMetaData processing logic will push a DoCheckpoint to JobGenerator's thread 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY CRITICAL: at this step, RDD no later than 16660 has been removed, and checkpoint data is updated as https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 and https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with the timestamp 16650. and at this moment, Application crashed 10. Application recovers from /path/checkpoint-16670 and try to get RDD with validTime 16650. Of course it will not find it and has to recompute. In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until to the start of the stream. When the stream depends on the external data, it will not successfully recover. In the case of Kafka, the recovered RDDs would not be the same as the original one, as the currentOffsets has been updated to the value at the moment of 16670 The proposed fix: 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime instead of using the timestamp of Checkpoint instance (any side-effect?) 1. ClearMetadata shall be ClearMedataAndCheckpoint 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see any necessary to have two threads here was: In one of our applications, we found the following issue, the application recovering from a checkpoint file named "checkpoint-***16670" but with the timestamp ***16650 will recover from the very beginning of the stream and because our application relies on the external & periodically-cleaned data (syncing with checkpoint cleanup), the recovery just failed We identified a potential issue in Spark Streaming checkpoint and will describe it with the following example. We will propose a fix in the end of this JIRA. 1. The application properties: Batch Duration: 2, Functionality: Single Stream calling ReduceByKeyAndWindow and print, Window Size: 6, SlideDuration, 2 2. RDD at 16650 is generated and the corresponding job is submitted to the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of JobGenerator 3. Job at 16650 is finished and JobCompleted message is se
[jira] [Commented] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint
[ https://issues.apache.org/jira/browse/SPARK-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831098#comment-15831098 ] Nan Zhu commented on SPARK-19233: - By filtering generatedRDDs, I may bring some confusion here, what I propose is 1. Add a latestCheckpointTime in DStreamCheckpointData 2. when update() is called, only put RDDs with the timestamp <= latestCheckpointTime to currentCheckpointFiles (https://github.com/CodingCat/spark/blob/65623f4408ab6152719046c55093c70435da82c8/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53) What do you think? Regarding correctness, until so far, I haven't seen applications die because of this issue. But when I analyzed log with some simple script/re to see whether (some lines of) DStream.compute() is called, I was always confused...since I assume checkpoint-1000 will always call compute(time > 1000) > Inconsistent Behaviour of Spark Streaming Checkpoint > > > Key: SPARK-19233 > URL: https://issues.apache.org/jira/browse/SPARK-19233 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu > > When checking one of our application logs, we found the following behavior > (simplified) > 1. Spark application recovers from checkpoint constructed at timestamp 1000ms > 2. The log shows that Spark application can recover RDDs generated at > timestamp 2000, 3000 > The root cause is that generateJobs event is pushed to the queue by a > separate thread (RecurTimer), before doCheckpoint event is pushed to the > queue, there might have been multiple generatedJobs being processed. As a > result, when doCheckpoint for timestamp 1000 is processed, the generatedRDDs > data structure containing RDDs generated at 2000, 3000 is serialized as part > of checkpoint of 1000. > It brings overhead for debugging and coordinate our offset management > strategy with Spark Streaming's checkpoint strategy when we are developing a > new type of DStream which integrates Spark Streaming with an internal message > middleware. > The proposed fix is to filter generatedRDDs according to checkpoint timestamp > when serializing it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
[ https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-19280: Description: In one of our applications, we found the following issue, the application recovering from a checkpoint file named "checkpoint-***16670" but with the timestamp ***16650 will recover from the very beginning of the stream and because our application relies on the external & periodically-cleaned data (syncing with checkpoint cleanup), the recovery just failed We identified a potential issue in Spark Streaming checkpoint and will describe it with the following example. We will propose a fix in the end of this JIRA. 1. The application properties: Batch Duration: 2, Functionality: Single Stream calling ReduceByKeyAndWindow and print, Window Size: 6, SlideDuration, 2 2. RDD at 16650 is generated and the corresponding job is submitted to the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of JobGenerator 3. Job at 16650 is finished and JobCompleted message is sent to JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of JobGenerator 4. JobScheduler's message processing thread (I will use JS-EventLoop to identify it) is not scheduled by the operating system for a long time, and during this period, Jobs generated from 16652 - 16670 are generated and completed. 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled and processed all DoCheckpoint messages for jobs ranging from 16652 - 16670 and checkpoint files are successfully written. CRITICAL: at this moment, the lastCheckpointTime would be 16670. 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is pushed to JobGenerator's message queue for EACH JobCompleted. 7. The current message queue contains 20 ClearMetadata messages and JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- (16660, 16670] are kept. And ClearMetaData processing logic will push a DoCheckpoint to JobGenerator's thread 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY CRITICAL: at this step, RDD no later than 16670 has been removed, and checkpoint data is updated as https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 and https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with the timestamp 16650. and at this moment, Application crashed 10. Application recovers from /path/checkpoint-16670 and try to get RDD with validTime 16650. Of course it will not find it and has to recompute. In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until to the start of the stream. When the stream depends on the external data, it will not successfully recover. In the case of Kafka, the recovered RDDs would not be the same as the original one, as the currentOffsets has been updated to the value at the moment of 16670 The proposed fix: 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime instead of using the timestamp of Checkpoint instance (any side-effect?) 1. ClearMetadata shall be ClearMedataAndCheckpoint 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see any necessary to have two threads here was: In one of our applications, we found the following issue, the application recovering from a checkpoint file named "checkpoint-***16670" but with the timestamp ***16650 will recover from the very beginning of the stream and because our application relies on the external & periodically-cleaned data (syncing with checkpoint cleanup), the recovery just failed We identified a potential issue in Spark Streaming checkpoint and will describe it with the following example. We will propose a fix in the end of this JIRA. 1. The application properties: Batch Duration: 2, Functionality: Single Stream calling ReduceByKeyAndWindow and print, Window Size: 6, SlideDuration, 2 2. RDD at 16650 is generated and the corresponding job is submitted to the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of JobGenerator 3. Job at 16650 is finished and JobCompleted message is se
[jira] [Commented] (SPARK-19278) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
[ https://issues.apache.org/jira/browse/SPARK-19278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828621#comment-15828621 ] Nan Zhu commented on SPARK-19278: - any one would help to close this one? as it is a duplication of 19280I don't know how it comes [~srowen] would you mind helping? > Failed Recovery from checkpoint caused by the multi-threads issue in Spark > Streaming scheduler > -- > > Key: SPARK-19278 > URL: https://issues.apache.org/jira/browse/SPARK-19278 > Project: Spark > Issue Type: Bug >Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu > > In one of our applications, we found the following issue, the application > recovering from a checkpoint file named "checkpoint-***16670" but with > the timestamp ***16650 will recover from the very beginning of the stream > and because our application relies on the external & periodically-cleaned > data (syncing with checkpoint cleanup), the recovery just failed > We identified a potential issue in Spark Streaming checkpoint and will > describe it with the following example. We will propose a fix in the end of > this JIRA. > 1. The application properties: Batch Duration: 2, Functionality: Single > Stream calling ReduceByKeyAndWindow and print, Window Size: 6, > SlideDuration, 2 > 2. RDD at 16650 is generated and the corresponding job is submitted to > the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the > queue of JobGenerator > 3. Job at 16650 is finished and JobCompleted message is sent to > JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the > execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of > JobGenerator > 4. JobScheduler's message processing thread (I will use JS-EventLoop to > identify it) is not scheduled by the operating system for a long time, and > during this period, Jobs generated from 16652 - 16670 are generated > and completed. > 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled > and processed all DoCheckpoint messages for jobs ranging from 16652 - > 16670 and checkpoint files are successfully written. CRITICAL: at this > moment, the lastCheckpointTime would be 16670. > 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs > ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is > pushed to JobGenerator's message queue for EACH JobCompleted. > 7. The current message queue contains 20 ClearMetadata messages and > JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will > remove all RDDs out of rememberDuration window. In our case, > ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of > ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- > (16660, 16670] are kept. And ClearMetaData processing logic will push > a DoCheckpoint to JobGenerator's thread > 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY > CRITICAL: at this step, RDD no later than 16670 has been removed, and > checkpoint data is updated as > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 > and > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. > 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with > the timestamp 16650. and at this moment, Application crashed > 10. Application recovers from /path/checkpoint-16670 and try to get RDD > with validTime 16650. Of course it will not find it and has to recompute. > In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until > to the start of the stream. When the stream depends on the external data, it > will not successfully recover. In the case of Kafka, the recovered RDDs would > not be the same as the original one, as the currentOffsets has been updated > to the value at the moment of 16670 > The proposed fix: > 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime > instead of using the timestamp of Checkpoint instance (any side-effect?) > 1. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see > any necessary to have two threads here -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
[ https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828602#comment-15828602 ] Nan Zhu commented on SPARK-19280: - [~zsxwing] would you mind confirming about this? it would be great if we can discuss about it > Failed Recovery from checkpoint caused by the multi-threads issue in Spark > Streaming scheduler > -- > > Key: SPARK-19280 > URL: https://issues.apache.org/jira/browse/SPARK-19280 > Project: Spark > Issue Type: Bug >Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu > > In one of our applications, we found the following issue, the application > recovering from a checkpoint file named "checkpoint-***16670" but with > the timestamp ***16650 will recover from the very beginning of the stream > and because our application relies on the external & periodically-cleaned > data (syncing with checkpoint cleanup), the recovery just failed > We identified a potential issue in Spark Streaming checkpoint and will > describe it with the following example. We will propose a fix in the end of > this JIRA. > 1. The application properties: Batch Duration: 2, Functionality: Single > Stream calling ReduceByKeyAndWindow and print, Window Size: 6, > SlideDuration, 2 > 2. RDD at 16650 is generated and the corresponding job is submitted to > the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the > queue of JobGenerator > 3. Job at 16650 is finished and JobCompleted message is sent to > JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the > execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of > JobGenerator > 4. JobScheduler's message processing thread (I will use JS-EventLoop to > identify it) is not scheduled by the operating system for a long time, and > during this period, Jobs generated from 16652 - 16670 are generated > and completed. > 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled > and processed all DoCheckpoint messages for jobs ranging from 16652 - > 16670 and checkpoint files are successfully written. CRITICAL: at this > moment, the lastCheckpointTime would be 16670. > 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs > ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is > pushed to JobGenerator's message queue for EACH JobCompleted. > 7. The current message queue contains 20 ClearMetadata messages and > JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will > remove all RDDs out of rememberDuration window. In our case, > ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of > ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- > (16660, 16670] are kept. And ClearMetaData processing logic will push > a DoCheckpoint to JobGenerator's thread > 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY > CRITICAL: at this step, RDD no later than 16670 has been removed, and > checkpoint data is updated as > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 > and > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. > 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with > the timestamp 16650. and at this moment, Application crashed > 10. Application recovers from /path/checkpoint-16670 and try to get RDD > with validTime 16650. Of course it will not find it and has to recompute. > In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until > to the start of the stream. When the stream depends on the external data, it > will not successfully recover. In the case of Kafka, the recovered RDDs would > not be the same as the original one, as the currentOffsets has been updated > to the value at the moment of 16670 > The proposed fix: > 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime > instead of using the timestamp of Checkpoint instance (any side-effect?) > 1. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see > any necessary to have two threads here -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
Nan Zhu created SPARK-19280: --- Summary: Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler Key: SPARK-19280 URL: https://issues.apache.org/jira/browse/SPARK-19280 Project: Spark Issue Type: Bug Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3 Reporter: Nan Zhu In one of our applications, we found the following issue, the application recovering from a checkpoint file named "checkpoint-***16670" but with the timestamp ***16650 will recover from the very beginning of the stream and because our application relies on the external & periodically-cleaned data (syncing with checkpoint cleanup), the recovery just failed We identified a potential issue in Spark Streaming checkpoint and will describe it with the following example. We will propose a fix in the end of this JIRA. 1. The application properties: Batch Duration: 2, Functionality: Single Stream calling ReduceByKeyAndWindow and print, Window Size: 6, SlideDuration, 2 2. RDD at 16650 is generated and the corresponding job is submitted to the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of JobGenerator 3. Job at 16650 is finished and JobCompleted message is sent to JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of JobGenerator 4. JobScheduler's message processing thread (I will use JS-EventLoop to identify it) is not scheduled by the operating system for a long time, and during this period, Jobs generated from 16652 - 16670 are generated and completed. 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled and processed all DoCheckpoint messages for jobs ranging from 16652 - 16670 and checkpoint files are successfully written. CRITICAL: at this moment, the lastCheckpointTime would be 16670. 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is pushed to JobGenerator's message queue for EACH JobCompleted. 7. The current message queue contains 20 ClearMetadata messages and JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- (16660, 16670] are kept. And ClearMetaData processing logic will push a DoCheckpoint to JobGenerator's thread 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY CRITICAL: at this step, RDD no later than 16670 has been removed, and checkpoint data is updated as https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 and https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with the timestamp 16650. and at this moment, Application crashed 10. Application recovers from /path/checkpoint-16670 and try to get RDD with validTime 16650. Of course it will not find it and has to recompute. In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until to the start of the stream. When the stream depends on the external data, it will not successfully recover. In the case of Kafka, the recovered RDDs would not be the same as the original one, as the currentOffsets has been updated to the value at the moment of 16670 The proposed fix: 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime instead of using the timestamp of Checkpoint instance (any side-effect?) 1. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see any necessary to have two threads here -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19278) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler
Nan Zhu created SPARK-19278: --- Summary: Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler Key: SPARK-19278 URL: https://issues.apache.org/jira/browse/SPARK-19278 Project: Spark Issue Type: Bug Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3 Reporter: Nan Zhu In one of our applications, we found the following issue, the application recovering from a checkpoint file named "checkpoint-***16670" but with the timestamp ***16650 will recover from the very beginning of the stream and because our application relies on the external & periodically-cleaned data (syncing with checkpoint cleanup), the recovery just failed We identified a potential issue in Spark Streaming checkpoint and will describe it with the following example. We will propose a fix in the end of this JIRA. 1. The application properties: Batch Duration: 2, Functionality: Single Stream calling ReduceByKeyAndWindow and print, Window Size: 6, SlideDuration, 2 2. RDD at 16650 is generated and the corresponding job is submitted to the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of JobGenerator 3. Job at 16650 is finished and JobCompleted message is sent to JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of JobGenerator 4. JobScheduler's message processing thread (I will use JS-EventLoop to identify it) is not scheduled by the operating system for a long time, and during this period, Jobs generated from 16652 - 16670 are generated and completed. 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled and processed all DoCheckpoint messages for jobs ranging from 16652 - 16670 and checkpoint files are successfully written. CRITICAL: at this moment, the lastCheckpointTime would be 16670. 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is pushed to JobGenerator's message queue for EACH JobCompleted. 7. The current message queue contains 20 ClearMetadata messages and JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- (16660, 16670] are kept. And ClearMetaData processing logic will push a DoCheckpoint to JobGenerator's thread 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY CRITICAL: at this step, RDD no later than 16670 has been removed, and checkpoint data is updated as https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 and https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with the timestamp 16650. and at this moment, Application crashed 10. Application recovers from /path/checkpoint-16670 and try to get RDD with validTime 16650. Of course it will not find it and has to recompute. In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until to the start of the stream. When the stream depends on the external data, it will not successfully recover. In the case of Kafka, the recovered RDDs would not be the same as the original one, as the currentOffsets has been updated to the value at the moment of 16670 The proposed fix: 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime instead of using the timestamp of Checkpoint instance (any side-effect?) 1. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see any necessary to have two threads here -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint
[ https://issues.apache.org/jira/browse/SPARK-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823364#comment-15823364 ] Nan Zhu commented on SPARK-19233: - [~zsxwing] so, another potential issue I found in Spark Streaming recently, if you agree on this...I will file a PR > Inconsistent Behaviour of Spark Streaming Checkpoint > > > Key: SPARK-19233 > URL: https://issues.apache.org/jira/browse/SPARK-19233 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu > > When checking one of our application logs, we found the following behavior > (simplified) > 1. Spark application recovers from checkpoint constructed at timestamp 1000ms > 2. The log shows that Spark application can recover RDDs generated at > timestamp 2000, 3000 > The root cause is that generateJobs event is pushed to the queue by a > separate thread (RecurTimer), before doCheckpoint event is pushed to the > queue, there might have been multiple generatedJobs being processed. As a > result, when doCheckpoint for timestamp 1000 is processed, the generatedRDDs > data structure containing RDDs generated at 2000, 3000 is serialized as part > of checkpoint of 1000. > It brings overhead for debugging and coordinate our offset management > strategy with Spark Streaming's checkpoint strategy when we are developing a > new type of DStream which integrates Spark Streaming with an internal message > middleware. > The proposed fix is to filter generatedRDDs according to checkpoint timestamp > when serializing it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint
[ https://issues.apache.org/jira/browse/SPARK-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823359#comment-15823359 ] Nan Zhu commented on SPARK-19233: - The category of this issue is Improvement which is subject to be revised > Inconsistent Behaviour of Spark Streaming Checkpoint > > > Key: SPARK-19233 > URL: https://issues.apache.org/jira/browse/SPARK-19233 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Nan Zhu > > When checking one of our application logs, we found the following behavior > (simplified) > 1. Spark application recovers from checkpoint constructed at timestamp 1000ms > 2. The log shows that Spark application can recover RDDs generated at > timestamp 2000, 3000 > The root cause is that generateJobs event is pushed to the queue by a > separate thread (RecurTimer), before doCheckpoint event is pushed to the > queue, there might have been multiple generatedJobs being processed. As a > result, when doCheckpoint for timestamp 1000 is processed, the generatedRDDs > data structure containing RDDs generated at 2000, 3000 is serialized as part > of checkpoint of 1000. > It brings overhead for debugging and coordinate our offset management > strategy with Spark Streaming's checkpoint strategy when we are developing a > new type of DStream which integrates Spark Streaming with an internal message > middleware. > The proposed fix is to filter generatedRDDs according to checkpoint timestamp > when serializing it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint
Nan Zhu created SPARK-19233: --- Summary: Inconsistent Behaviour of Spark Streaming Checkpoint Key: SPARK-19233 URL: https://issues.apache.org/jira/browse/SPARK-19233 Project: Spark Issue Type: Improvement Components: DStreams Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0 Reporter: Nan Zhu When checking one of our application logs, we found the following behavior (simplified) 1. Spark application recovers from checkpoint constructed at timestamp 1000ms 2. The log shows that Spark application can recover RDDs generated at timestamp 2000, 3000 The root cause is that generateJobs event is pushed to the queue by a separate thread (RecurTimer), before doCheckpoint event is pushed to the queue, there might have been multiple generatedJobs being processed. As a result, when doCheckpoint for timestamp 1000 is processed, the generatedRDDs data structure containing RDDs generated at 2000, 3000 is serialized as part of checkpoint of 1000. It brings overhead for debugging and coordinate our offset management strategy with Spark Streaming's checkpoint strategy when we are developing a new type of DStream which integrates Spark Streaming with an internal message middleware. The proposed fix is to filter generatedRDDs according to checkpoint timestamp when serializing it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted
[ https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15813632#comment-15813632 ] Nan Zhu commented on SPARK-18905: - [~zsxwing] If you agree on the conclusion above, I will file a PR > Potential Issue of Semantics of BatchCompleted > -- > > Key: SPARK-18905 > URL: https://issues.apache.org/jira/browse/SPARK-18905 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Nan Zhu > > the current implementation of Spark streaming considers a batch is completed > no matter the results of the jobs > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) > Let's consider the following case: > A micro batch contains 2 jobs and they read from two different kafka topics > respectively. One of these jobs is failed due to some problem in the user > defined logic, after the other one is finished successfully. > 1. The main thread in the Spark streaming application will execute the line > mentioned above, > 2. and another thread (checkpoint writer) will make a checkpoint file > immediately after this line is executed. > 3. Then due to the current error handling mechanism in Spark Streaming, > StreamingContext will be closed > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) > the user recovers from the checkpoint file, and because the JobSet containing > the failed job has been removed (taken as completed) before the checkpoint is > constructed, the data being processed by the failed job would never be > reprocessed? > I might have missed something in the checkpoint thread or this > handleJobCompletion()or it is a potential bug -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted
[ https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15813560#comment-15813560 ] Nan Zhu commented on SPARK-18905: - eat my words... when we have queued up batches, we do need pendingTime, and yes, the original description in the JIRA still holds, > Potential Issue of Semantics of BatchCompleted > -- > > Key: SPARK-18905 > URL: https://issues.apache.org/jira/browse/SPARK-18905 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Nan Zhu > > the current implementation of Spark streaming considers a batch is completed > no matter the results of the jobs > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) > Let's consider the following case: > A micro batch contains 2 jobs and they read from two different kafka topics > respectively. One of these jobs is failed due to some problem in the user > defined logic, after the other one is finished successfully. > 1. The main thread in the Spark streaming application will execute the line > mentioned above, > 2. and another thread (checkpoint writer) will make a checkpoint file > immediately after this line is executed. > 3. Then due to the current error handling mechanism in Spark Streaming, > StreamingContext will be closed > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) > the user recovers from the checkpoint file, and because the JobSet containing > the failed job has been removed (taken as completed) before the checkpoint is > constructed, the data being processed by the failed job would never be > reprocessed? > I might have missed something in the checkpoint thread or this > handleJobCompletion()or it is a potential bug -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18905) Potential Issue of Semantics of BatchCompleted
[ https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15813459#comment-15813459 ] Nan Zhu edited comment on SPARK-18905 at 1/10/17 1:16 AM: -- yeah, but the downTime includes all batches from "checkpoint time" to "restart time" the jobs that have been generated but not completed shall be the first batch in downTime...no? was (Author: codingcat): yeah, but the downTime including all batches from "checkpoint time" to "restart time" the jobs that have been generated but not completed shall be the first batch in downTime...no? > Potential Issue of Semantics of BatchCompleted > -- > > Key: SPARK-18905 > URL: https://issues.apache.org/jira/browse/SPARK-18905 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Nan Zhu > > the current implementation of Spark streaming considers a batch is completed > no matter the results of the jobs > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) > Let's consider the following case: > A micro batch contains 2 jobs and they read from two different kafka topics > respectively. One of these jobs is failed due to some problem in the user > defined logic, after the other one is finished successfully. > 1. The main thread in the Spark streaming application will execute the line > mentioned above, > 2. and another thread (checkpoint writer) will make a checkpoint file > immediately after this line is executed. > 3. Then due to the current error handling mechanism in Spark Streaming, > StreamingContext will be closed > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) > the user recovers from the checkpoint file, and because the JobSet containing > the failed job has been removed (taken as completed) before the checkpoint is > constructed, the data being processed by the failed job would never be > reprocessed? > I might have missed something in the checkpoint thread or this > handleJobCompletion()or it is a potential bug -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted
[ https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15813459#comment-15813459 ] Nan Zhu commented on SPARK-18905: - yeah, but the downTime including all batches from "checkpoint time" to "restart time" the jobs that have been generated but not completed shall be the first batch in downTime...no? > Potential Issue of Semantics of BatchCompleted > -- > > Key: SPARK-18905 > URL: https://issues.apache.org/jira/browse/SPARK-18905 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Nan Zhu > > the current implementation of Spark streaming considers a batch is completed > no matter the results of the jobs > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) > Let's consider the following case: > A micro batch contains 2 jobs and they read from two different kafka topics > respectively. One of these jobs is failed due to some problem in the user > defined logic, after the other one is finished successfully. > 1. The main thread in the Spark streaming application will execute the line > mentioned above, > 2. and another thread (checkpoint writer) will make a checkpoint file > immediately after this line is executed. > 3. Then due to the current error handling mechanism in Spark Streaming, > StreamingContext will be closed > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) > the user recovers from the checkpoint file, and because the JobSet containing > the failed job has been removed (taken as completed) before the checkpoint is > constructed, the data being processed by the failed job would never be > reprocessed? > I might have missed something in the checkpoint thread or this > handleJobCompletion()or it is a potential bug -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18905) Potential Issue of Semantics of BatchCompleted
[ https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15813434#comment-15813434 ] Nan Zhu edited comment on SPARK-18905 at 1/10/17 1:05 AM: -- Hi, [~zsxwing] Thanks for the reply, After testing in our environment for more times, I feel that this is not a problem anymore. The failed job would be recovered https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L216, as counted in the downTime The question right now is, why we need to have pendingTime + downTime in the above method, was (Author: codingcat): Hi, [~zsxwing] Thanks for the reply, After testing in our environment for more times, I feel that this is not a problem anymore. The failed job would be recovered https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L216, as the downTime The question right now is, why we need to have pendingTime + downTime in the above method, > Potential Issue of Semantics of BatchCompleted > -- > > Key: SPARK-18905 > URL: https://issues.apache.org/jira/browse/SPARK-18905 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Nan Zhu > > the current implementation of Spark streaming considers a batch is completed > no matter the results of the jobs > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) > Let's consider the following case: > A micro batch contains 2 jobs and they read from two different kafka topics > respectively. One of these jobs is failed due to some problem in the user > defined logic, after the other one is finished successfully. > 1. The main thread in the Spark streaming application will execute the line > mentioned above, > 2. and another thread (checkpoint writer) will make a checkpoint file > immediately after this line is executed. > 3. Then due to the current error handling mechanism in Spark Streaming, > StreamingContext will be closed > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) > the user recovers from the checkpoint file, and because the JobSet containing > the failed job has been removed (taken as completed) before the checkpoint is > constructed, the data being processed by the failed job would never be > reprocessed? > I might have missed something in the checkpoint thread or this > handleJobCompletion()or it is a potential bug -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted
[ https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15813434#comment-15813434 ] Nan Zhu commented on SPARK-18905: - Hi, [~zsxwing] Thanks for the reply, After testing in our environment for more times, I feel that this is not a problem anymore. The failed job would be recovered https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L216, as the downTime The question right now is, why we need to have pendingTime + downTime in the above method, > Potential Issue of Semantics of BatchCompleted > -- > > Key: SPARK-18905 > URL: https://issues.apache.org/jira/browse/SPARK-18905 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Nan Zhu > > the current implementation of Spark streaming considers a batch is completed > no matter the results of the jobs > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) > Let's consider the following case: > A micro batch contains 2 jobs and they read from two different kafka topics > respectively. One of these jobs is failed due to some problem in the user > defined logic, after the other one is finished successfully. > 1. The main thread in the Spark streaming application will execute the line > mentioned above, > 2. and another thread (checkpoint writer) will make a checkpoint file > immediately after this line is executed. > 3. Then due to the current error handling mechanism in Spark Streaming, > StreamingContext will be closed > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) > the user recovers from the checkpoint file, and because the JobSet containing > the failed job has been removed (taken as completed) before the checkpoint is > constructed, the data being processed by the failed job would never be > reprocessed? > I might have missed something in the checkpoint thread or this > handleJobCompletion()or it is a potential bug -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18905) Potential Issue of Semantics of BatchCompleted
[ https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-18905: Description: the current implementation of Spark streaming considers a batch is completed no matter the results of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) Let's consider the following case: A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of these jobs is failed due to some problem in the user defined logic, after the other one is finished successfully. 1. The main thread in the Spark streaming application will execute the line mentioned above, 2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed. 3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed? I might have missed something in the checkpoint thread or this handleJobCompletion()or it is a potential bug was: the current implementation of Spark streaming considers a batch is completed no matter the results of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) Let's consider the following case: A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of these jobs is failed due to some problem in the user defined logic. 1. The main thread in the Spark streaming application will execute the line mentioned above, 2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed. 3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed? I might have missed something in the checkpoint thread or this handleJobCompletion()or it is a potential bug > Potential Issue of Semantics of BatchCompleted > -- > > Key: SPARK-18905 > URL: https://issues.apache.org/jira/browse/SPARK-18905 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Nan Zhu > > the current implementation of Spark streaming considers a batch is completed > no matter the results of the jobs > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) > Let's consider the following case: > A micro batch contains 2 jobs and they read from two different kafka topics > respectively. One of these jobs is failed due to some problem in the user > defined logic, after the other one is finished successfully. > 1. The main thread in the Spark streaming application will execute the line > mentioned above, > 2. and another thread (checkpoint writer) will make a checkpoint file > immediately after this line is executed. > 3. Then due to the current error handling mechanism in Spark Streaming, > StreamingContext will be closed > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) > the user recovers from the checkpoint file, and because the JobSet containing > the failed job has been removed (taken as completed) before the checkpoint is > constructed, the data being processed by the failed job would never be > reprocessed? > I might have missed something in the checkpoint thread or this > handleJobCompletion()or it is a potential bug -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18905) Potential Issue of Semantics of BatchCompleted
[ https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-18905: Description: the current implementation of Spark streaming considers a batch is completed no matter the results of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) Let's consider the following case: A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of these jobs is failed due to some problem in the user defined logic. 1. The main thread in the Spark streaming application will execute the line mentioned above, 2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed. 3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed? I might have missed something in the checkpoint thread or this handleJobCompletion()or it is a potential bug was: the current implementation of Spark streaming considers a batch is completed no matter the results of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) Let's consider the following case: A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of this job is failed due to some problem in the user defined logic. 1. The main thread in the Spark streaming application will execute the line mentioned above, 2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed. 3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed? I might have missed something in the checkpoint thread or this handleJobCompletion()or it is a potential bug > Potential Issue of Semantics of BatchCompleted > -- > > Key: SPARK-18905 > URL: https://issues.apache.org/jira/browse/SPARK-18905 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Nan Zhu > > the current implementation of Spark streaming considers a batch is completed > no matter the results of the jobs > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) > Let's consider the following case: > A micro batch contains 2 jobs and they read from two different kafka topics > respectively. One of these jobs is failed due to some problem in the user > defined logic. > 1. The main thread in the Spark streaming application will execute the line > mentioned above, > 2. and another thread (checkpoint writer) will make a checkpoint file > immediately after this line is executed. > 3. Then due to the current error handling mechanism in Spark Streaming, > StreamingContext will be closed > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) > the user recovers from the checkpoint file, and because the JobSet containing > the failed job has been removed (taken as completed) before the checkpoint is > constructed, the data being processed by the failed job would never be > reprocessed? > I might have missed something in the checkpoint thread or this > handleJobCompletion()or it is a potential bug -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18905) Potential Issue of Semantics of BatchCompleted
[ https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-18905: Description: the current implementation of Spark streaming considers a batch is completed no matter the results of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) Let's consider the following case: A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of this job is failed due to some problem in the user defined logic. 1. The main thread in the Spark streaming application will execute the line mentioned above, 2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed. 3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed? I might have missed something in the checkpoint thread or this handleJobCompletion()or it is a potential bug was: the current implementation of Spark streaming considers a batch is completed no matter the result of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) Let's consider the following case: A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of this job is failed due to some problem in the user defined logic. 1. The main thread in the Spark streaming application will execute the line mentioned above, 2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed. 3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed? I might have missed something in the checkpoint thread or this handleJobCompletion()or it is a potential bug > Potential Issue of Semantics of BatchCompleted > -- > > Key: SPARK-18905 > URL: https://issues.apache.org/jira/browse/SPARK-18905 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0, 2.0.1, 2.0.2 >Reporter: Nan Zhu > > the current implementation of Spark streaming considers a batch is completed > no matter the results of the jobs > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) > Let's consider the following case: > A micro batch contains 2 jobs and they read from two different kafka topics > respectively. One of this job is failed due to some problem in the user > defined logic. > 1. The main thread in the Spark streaming application will execute the line > mentioned above, > 2. and another thread (checkpoint writer) will make a checkpoint file > immediately after this line is executed. > 3. Then due to the current error handling mechanism in Spark Streaming, > StreamingContext will be closed > (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) > the user recovers from the checkpoint file, and because the JobSet containing > the failed job has been removed (taken as completed) before the checkpoint is > constructed, the data being processed by the failed job would never be > reprocessed? > I might have missed something in the checkpoint thread or this > handleJobCompletion()or it is a potential bug -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18905) Potential Issue of Semantics of BatchCompleted
Nan Zhu created SPARK-18905: --- Summary: Potential Issue of Semantics of BatchCompleted Key: SPARK-18905 URL: https://issues.apache.org/jira/browse/SPARK-18905 Project: Spark Issue Type: Bug Components: DStreams Affects Versions: 2.0.2, 2.0.1, 2.0.0 Reporter: Nan Zhu the current implementation of Spark streaming considers a batch is completed no matter the result of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) Let's consider the following case: A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of this job is failed due to some problem in the user defined logic. 1. The main thread in the Spark streaming application will execute the line mentioned above, 2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed. 3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed? I might have missed something in the checkpoint thread or this handleJobCompletion()or it is a potential bug -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17347) Encoder in Dataset example has incorrect type
[ https://issues.apache.org/jira/browse/SPARK-17347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-17347: Summary: Encoder in Dataset example has incorrect type (was: Encoder in Dataset example is incorrect on type) > Encoder in Dataset example has incorrect type > - > > Key: SPARK-17347 > URL: https://issues.apache.org/jira/browse/SPARK-17347 > Project: Spark > Issue Type: Bug > Components: Examples >Affects Versions: 2.0.0 >Reporter: Nan Zhu >Priority: Minor > > https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala#L206 > The type of this alternative encoder shall be Encoder[Map[String, Any]] > instead of Encoder[Map[String, Int]] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17347) Encoder in Dataset example is incorrect on type
Nan Zhu created SPARK-17347: --- Summary: Encoder in Dataset example is incorrect on type Key: SPARK-17347 URL: https://issues.apache.org/jira/browse/SPARK-17347 Project: Spark Issue Type: Bug Components: Examples Affects Versions: 2.0.0 Reporter: Nan Zhu Priority: Minor https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala#L206 The type of this alternative encoder shall be Encoder[Map[String, Any]] instead of Encoder[Map[String, Int]] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....
[ https://issues.apache.org/jira/browse/SPARK-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu closed SPARK-14247. --- Resolution: Not A Problem > Spark does not compile with CDH-5.4.x due to the possible bug of ivy. > - > > Key: SPARK-14247 > URL: https://issues.apache.org/jira/browse/SPARK-14247 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 1.6.0 >Reporter: Nan Zhu >Priority: Minor > > I recently tried to compile Spark with CDH 5.4.x by the following command: > sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly > I cannot finish the building due to an error saying that [error] impossible > to get artifacts when data has not been loaded. IvyNode = > org.slf4j#slf4j-api;1.6.1 > It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x > long time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly > (see the related discussion in https://github.com/sbt/sbt/issues/1598) > I currently work around this by downgrade to slf4j 1.6.1 > What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with > 1.5.0, I can successfully compile Spark with the same version of CDH > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....
[ https://issues.apache.org/jira/browse/SPARK-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15216719#comment-15216719 ] Nan Zhu edited comment on SPARK-14247 at 3/29/16 7:39 PM: -- thanks [~sowen], it seems that change the hadoop.version name solves the problem was (Author: codingcat): thanks [~sowen], it seems that change the hadoop.version name solves the problem but... the command "sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.5 -DskipTests clean assembly" gives me an assembly jar with the suffix 2.2.0 (testing in trunk)? /Users/nanzhu/code/spark/assembly/target/scala-2.11/spark-assembly-2.0.0-SNAPSHOT-hadoop2.2.0.jar seems it involves the issue belonging to another JIRA? > Spark does not compile with CDH-5.4.x due to the possible bug of ivy. > - > > Key: SPARK-14247 > URL: https://issues.apache.org/jira/browse/SPARK-14247 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 1.6.0 >Reporter: Nan Zhu >Priority: Minor > > I recently tried to compile Spark with CDH 5.4.x by the following command: > sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly > I cannot finish the building due to an error saying that [error] impossible > to get artifacts when data has not been loaded. IvyNode = > org.slf4j#slf4j-api;1.6.1 > It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x > long time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly > (see the related discussion in https://github.com/sbt/sbt/issues/1598) > I currently work around this by downgrade to slf4j 1.6.1 > What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with > 1.5.0, I can successfully compile Spark with the same version of CDH > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....
[ https://issues.apache.org/jira/browse/SPARK-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15216719#comment-15216719 ] Nan Zhu commented on SPARK-14247: - thanks [~sowen], it seems that change the hadoop.version name solves the problem but... the command "sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.5 -DskipTests clean assembly" gives me an assembly jar with the suffix 2.2.0 (testing in trunk)? /Users/nanzhu/code/spark/assembly/target/scala-2.11/spark-assembly-2.0.0-SNAPSHOT-hadoop2.2.0.jar seems it involves the issue belonging to another JIRA? > Spark does not compile with CDH-5.4.x due to the possible bug of ivy. > - > > Key: SPARK-14247 > URL: https://issues.apache.org/jira/browse/SPARK-14247 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 1.6.0 >Reporter: Nan Zhu >Priority: Minor > > I recently tried to compile Spark with CDH 5.4.x by the following command: > sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly > I cannot finish the building due to an error saying that [error] impossible > to get artifacts when data has not been loaded. IvyNode = > org.slf4j#slf4j-api;1.6.1 > It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x > long time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly > (see the related discussion in https://github.com/sbt/sbt/issues/1598) > I currently work around this by downgrade to slf4j 1.6.1 > What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with > 1.5.0, I can successfully compile Spark with the same version of CDH > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....
[ https://issues.apache.org/jira/browse/SPARK-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-14247: Priority: Minor (was: Major) > Spark does not compile with CDH-5.4.x due to the possible bug of ivy. > - > > Key: SPARK-14247 > URL: https://issues.apache.org/jira/browse/SPARK-14247 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 1.6.0 >Reporter: Nan Zhu >Priority: Minor > > I recently tried to compile Spark with CDH 5.4.x by the following command: > sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly > I cannot finish the building due to an error saying that [error] impossible > to get artifacts when data has not been loaded. IvyNode = > org.slf4j#slf4j-api;1.6.1 > It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x > long time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly > (see the related discussion in https://github.com/sbt/sbt/issues/1598) > I currently work around this by downgrade to slf4j 1.6.1 > What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with > 1.5.0, I can successfully compile Spark with the same version of CDH > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....
[ https://issues.apache.org/jira/browse/SPARK-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15216661#comment-15216661 ] Nan Zhu commented on SPARK-14247: - [~srowen] I always blindly copied "CDH.*" string from Spark building doc since I only use HDFS/HBase and Cloud manager contained in CDH...I think it is just fine...and I have been working with it since 1.5 (but what's the name of the normal packages? https://repository.cloudera.com/artifactory/public/org/apache/hadoop/hadoop-core/ ) yeah, it might be the problem of the compilation tools, but I'm not 100% sure about itmaybe someone who is more familiar about how maven/sbt decides which package to be evicted can give some explanations and solutions I do not mind closing itjust keep a record here in case someone has the similar problem and they know what happened > Spark does not compile with CDH-5.4.x due to the possible bug of ivy. > - > > Key: SPARK-14247 > URL: https://issues.apache.org/jira/browse/SPARK-14247 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 1.6.0 >Reporter: Nan Zhu > > I recently tried to compile Spark with CDH 5.4.x by the following command: > sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly > I cannot finish the building due to an error saying that [error] impossible > to get artifacts when data has not been loaded. IvyNode = > org.slf4j#slf4j-api;1.6.1 > It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x > long time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly > (see the related discussion in https://github.com/sbt/sbt/issues/1598) > I currently work around this by downgrade to slf4j 1.6.1 > What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with > 1.5.0, I can successfully compile Spark with the same version of CDH > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....
Nan Zhu created SPARK-14247: --- Summary: Spark does not compile with CDH-5.4.x due to the possible bug of ivy. Key: SPARK-14247 URL: https://issues.apache.org/jira/browse/SPARK-14247 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.6.0 Reporter: Nan Zhu I recently tried to compile Spark with CDH 5.4.x by the following command: sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly I cannot finish the building due to an error saying that [error] impossible to get artifacts when data has not been loaded. IvyNode = org.slf4j#slf4j-api;1.6.1 It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x long time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly (see the related discussion in https://github.com/sbt/sbt/issues/1598) I currently work around this by downgrade to slf4j 1.6.1 What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with 1.5.0, I can successfully compile Spark with the same version of CDH -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8547) xgboost exploration
[ https://issues.apache.org/jira/browse/SPARK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15195682#comment-15195682 ] Nan Zhu commented on SPARK-8547: FYI, we released a solution to integrate XGBoost with Spark directly http://dmlc.ml/2016/03/14/xgboost4j-portable-distributed-xgboost-in-spark-flink-and-dataflow.html > xgboost exploration > --- > > Key: SPARK-8547 > URL: https://issues.apache.org/jira/browse/SPARK-8547 > Project: Spark > Issue Type: New Feature > Components: ML, MLlib >Reporter: Joseph K. Bradley > > There has been quite a bit of excitement around xgboost: > [https://github.com/dmlc/xgboost] > It improves the parallelism of boosting by mixing boosting and bagging (where > bagging makes the algorithm more parallel). > It would worth exploring implementing this within MLlib (probably as a new > algorithm). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13868) Random forest accuracy exploration
[ https://issues.apache.org/jira/browse/SPARK-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15195686#comment-15195686 ] Nan Zhu commented on SPARK-13868: - FYI, we released a solution to integrate XGBoost with Spark directly http://dmlc.ml/2016/03/14/xgboost4j-portable-distributed-xgboost-in-spark-flink-and-dataflow.html > Random forest accuracy exploration > -- > > Key: SPARK-13868 > URL: https://issues.apache.org/jira/browse/SPARK-13868 > Project: Spark > Issue Type: Improvement > Components: ML >Reporter: Joseph K. Bradley > > This is a JIRA for exploring accuracy improvements for Random Forests. > h2. Background > Initial exploration was based on reports of poor accuracy from > [http://datascience.la/benchmarking-random-forest-implementations/] > Essentially, Spark 1.2 showed poor performance relative to other libraries > for training set sizes of 1M and 10M. > h3. Initial improvements > The biggest issue was that the metric being used was AUC and Spark 1.2 was > using hard predictions, not class probabilities. This was fixed in > [SPARK-9528], and that brought Spark up to performance parity with > scikit-learn, Vowpal Wabbit, and R for the training set size of 1M. > h3. Remaining issues > For training set size 10M, Spark does not yet match the AUC of the other 2 > libraries benchmarked (H2O and xgboost). > Note that, on 1M instances, these 2 libraries also show better results than > scikit-learn, VW, and R. I'm not too familiar with the H2O implementation > and how it differs, but xgboost is a very different algorithm, so it's not > surprising it has different behavior. > h2. My explorations > I've run Spark on the test set of 10M instances. (Note that the benchmark > linked above used somewhat different settings for the different algorithms, > but those settings are actually not that important for this problem. This > included gini vs. entropy impurity and limits on splitting nodes.) > I've tried adjusting: > * maxDepth: Past depth 20, going deeper does not seem to matter > * maxBins: I've gone up to 500, but this too does not seem to matter. > However, this is a hard thing to verify since slight differences in > discretization could become significant in a large tree. > h2. Current questions > * H2O: It would be good to understand how this implementation differs from > standard RF implementations (in R, VW, scikit-learn, and Spark). > * xgboost: There's a JIRA for it: [SPARK-8547]. It would be great to see the > Spark package linked from that JIRA tested vs. MLlib on the benchmark data > (or other data). From what I've heard/read, xgboost is sometimes better, > sometimes worse in accuracy (but of course faster with more localized > training). > * Based on the above explorations, are there changes we should make to Spark > RFs? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13227) Risky apply() in OpenHashMap
Nan Zhu created SPARK-13227: --- Summary: Risky apply() in OpenHashMap Key: SPARK-13227 URL: https://issues.apache.org/jira/browse/SPARK-13227 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.6.0 Reporter: Nan Zhu Priority: Minor It might confuse the future developers when they use OpenHashMap.apply() with a numeric value type. null.asInstance[Int], null.asInstance[Long], null.asInstace[Float] and null.asInstance[Double] will return 0/0.0/0L, which might confuse the developer if the value set contains 0/0.0/0L with an existing key -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12786) Actor demo does not demonstrate usable code
[ https://issues.apache.org/jira/browse/SPARK-12786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15096187#comment-15096187 ] Nan Zhu commented on SPARK-12786: - the only place it relies on AkkaUtil is to create an ActorSystem, which I thought to be fine as you can easily replace that part with your own system creation logicdid I miss anything? > Actor demo does not demonstrate usable code > --- > > Key: SPARK-12786 > URL: https://issues.apache.org/jira/browse/SPARK-12786 > Project: Spark > Issue Type: Documentation > Components: Streaming >Affects Versions: 1.5.2, 1.6.0 >Reporter: Brian London >Priority: Minor > > The ActorWordCount demo doesn't show how to set up an actor based dstream in > a way that can be used. > The demo relies on the {{AkkaUtils}} object, which is marked private[spark]. > Thus the code presented will not compile unless users declare their code to > be in the org.apache.spark package. > Demo is located at > https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12713) UI Executor page should keep links around to executors that died
[ https://issues.apache.org/jira/browse/SPARK-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089884#comment-15089884 ] Nan Zhu commented on SPARK-12713: - I attached a PR and two duplicate JIRAs which are addressing the same issue > UI Executor page should keep links around to executors that died > > > Key: SPARK-12713 > URL: https://issues.apache.org/jira/browse/SPARK-12713 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 1.5.2 >Reporter: Thomas Graves > > When an executor dies the web ui no longer shows it in the executors page > which makes getting to the logs to see what happened very difficult. I'm > running on yarn so not sure if behavior is different in standalone mode. > We should figure out a way to keep links around to the ones that died so we > can show stats and log links. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12469) Consistent Accumulators for Spark
[ https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071799#comment-15071799 ] Nan Zhu edited comment on SPARK-12469 at 12/26/15 2:44 AM: --- Just to bring the previous discussions about the topic here, https://github.com/apache/spark/pull/2524 I originally would like to fix exactly the same issue in the patch, but later we have to shrink our range to result task, There, I wanted to use StageId + partitionId to identify the accumulator uniquely, but [~matei] raised up the counter example that " A shuffle stage may be resubmitted once the old one is garbage-collected (if periodic cleanup is on) If you use an accumulator in a pipelined transformation like a map(), and then you make a new RDD built on top of that (e.g. apply another map() to it), it won't count as the same stage so you'll still get the updates twice " I'm not sure if the proposed solution can fully resolve this issue was (Author: codingcat): Just to bring the previous discussions about the topic here, https://github.com/apache/spark/pull/2524 I originally would like to fix exactly the same issue in the patch, but later we have to shrink our range to result task, There, I wanted to use StageId + partitionId to identify the accumulator uniquely, but [~matei] indicated the counter example that " A shuffle stage may be resubmitted once the old one is garbage-collected (if periodic cleanup is on) If you use an accumulator in a pipelined transformation like a map(), and then you make a new RDD built on top of that (e.g. apply another map() to it), it won't count as the same stage so you'll still get the updates twice " I'm not sure if the proposed solution can fully resolve this issue > Consistent Accumulators for Spark > - > > Key: SPARK-12469 > URL: https://issues.apache.org/jira/browse/SPARK-12469 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: holdenk > > Tasks executed on Spark workers are unable to modify values from the driver, > and accumulators are the one exception for this. Accumulators in Spark are > implemented in such a way that when a stage is recomputed (say for cache > eviction) the accumulator will be updated a second time. This makes > accumulators inside of transformations more difficult to use for things like > counting invalid records (one of the primary potential use cases of > collecting side information during a transformation). However in some cases > this counting during re-evaluation is exactly the behaviour we want (say in > tracking total execution time for a particular function). Spark would benefit > from a version of accumulators which did not double count even if stages were > re-executed. > Motivating example: > {code} > val parseTime = sc.accumulator(0L) > val parseFailures = sc.accumulator(0L) > val parsedData = sc.textFile(...).flatMap { line => > val start = System.currentTimeMillis() > val parsed = Try(parse(line)) > if (parsed.isFailure) parseFailures += 1 > parseTime += System.currentTimeMillis() - start > parsed.toOption > } > parsedData.cache() > val resultA = parsedData.map(...).filter(...).count() > // some intervening code. Almost anything could happen here -- some of > parsedData may > // get kicked out of the cache, or an executor where data was cached might > get lost > val resultB = parsedData.filter(...).map(...).flatMap(...).count() > // now we look at the accumulators > {code} > Here we would want parseFailures to only have been added to once for every > line which failed to parse. Unfortunately, the current Spark accumulator API > doesn’t support the current parseFailures use case since if some data had > been evicted its possible that it will be double counted. > See the full design document at > https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12469) Consistent Accumulators for Spark
[ https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071799#comment-15071799 ] Nan Zhu commented on SPARK-12469: - Just to bring the previous discussions about the topic here, https://github.com/apache/spark/pull/2524 I originally would like to fix exactly the same issue in the patch, but later we have to shrink our range to result task, There, I wanted to use StageId + partitionId to identify the accumulator uniquely, but [~matei] indicated the counter example that " A shuffle stage may be resubmitted once the old one is garbage-collected (if periodic cleanup is on) If you use an accumulator in a pipelined transformation like a map(), and then you make a new RDD built on top of that (e.g. apply another map() to it), it won't count as the same stage so you'll still get the updates twice " I'm not sure if the proposed solution can fully resolve this issue > Consistent Accumulators for Spark > - > > Key: SPARK-12469 > URL: https://issues.apache.org/jira/browse/SPARK-12469 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: holdenk > > Tasks executed on Spark workers are unable to modify values from the driver, > and accumulators are the one exception for this. Accumulators in Spark are > implemented in such a way that when a stage is recomputed (say for cache > eviction) the accumulator will be updated a second time. This makes > accumulators inside of transformations more difficult to use for things like > counting invalid records (one of the primary potential use cases of > collecting side information during a transformation). However in some cases > this counting during re-evaluation is exactly the behaviour we want (say in > tracking total execution time for a particular function). Spark would benefit > from a version of accumulators which did not double count even if stages were > re-executed. > Motivating example: > {code} > val parseTime = sc.accumulator(0L) > val parseFailures = sc.accumulator(0L) > val parsedData = sc.textFile(...).flatMap { line => > val start = System.currentTimeMillis() > val parsed = Try(parse(line)) > if (parsed.isFailure) parseFailures += 1 > parseTime += System.currentTimeMillis() - start > parsed.toOption > } > parsedData.cache() > val resultA = parsedData.map(...).filter(...).count() > // some intervening code. Almost anything could happen here -- some of > parsedData may > // get kicked out of the cache, or an executor where data was cached might > get lost > val resultB = parsedData.filter(...).map(...).flatMap(...).count() > // now we look at the accumulators > {code} > Here we would want parseFailures to only have been added to once for every > line which failed to parse. Unfortunately, the current Spark accumulator API > doesn’t support the current parseFailures use case since if some data had > been evicted its possible that it will be double counted. > See the full design document at > https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12237) Unsupported message RpcMessage causes message retries
[ https://issues.apache.org/jira/browse/SPARK-12237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15051499#comment-15051499 ] Nan Zhu commented on SPARK-12237: - if that's the case, I don't think it would happen in the real world As Executor will not directly communicate with the Master > Unsupported message RpcMessage causes message retries > - > > Key: SPARK-12237 > URL: https://issues.apache.org/jira/browse/SPARK-12237 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Jacek Laskowski > > When an unsupported message is sent to an endpoint, Spark throws > {{org.apache.spark.SparkException}} and retries sending the message. It > should *not* since the message is unsupported. > {code} > WARN NettyRpcEndpointRef: Error sending message [message = > RetrieveSparkProps] in 1 attempts > org.apache.spark.SparkException: Unsupported message > RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@c0a6275) > from localhost:51137 > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104) > at > org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > at > org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > WARN NettyRpcEndpointRef: Error sending message [message = > RetrieveSparkProps] in 2 attempts > org.apache.spark.SparkException: Unsupported message > RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@73a76a5a) > from localhost:51137 > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104) > at > org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > at > org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > WARN NettyRpcEndpointRef: Error sending message [message = > RetrieveSparkProps] in 3 attempts > org.apache.spark.SparkException: Unsupported message > RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@670bfda7) > from localhost:51137 > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104) > at > org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > at > org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Exception in thread "main" java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1672) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:151) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutor
[jira] [Commented] (SPARK-12229) How to Perform spark submit of application written in scala from Node js
[ https://issues.apache.org/jira/browse/SPARK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048663#comment-15048663 ] Nan Zhu commented on SPARK-12229: - https://github.com/spark-jobserver/spark-jobserver might be a good reference BTW, I think you can forward this question to user list to get more feedback. Most of the discussions here are just for bug/feature tracking, etc. If you are comfortable about this, would you mind closing the issue? > How to Perform spark submit of application written in scala from Node js > > > Key: SPARK-12229 > URL: https://issues.apache.org/jira/browse/SPARK-12229 > Project: Spark > Issue Type: Question > Components: Spark Core, Spark Submit >Affects Versions: 1.3.1 > Environment: Linux 14.4 >Reporter: himanshu singhal > Labels: newbie > Original Estimate: 1,344h > Remaining Estimate: 1,344h > > I am having an spark core application written in scala and now i am > developing the front end in node js but having a question how can i make the > spark submit to run the application from the node js -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12237) Unsupported message RpcMessage causes message retries
[ https://issues.apache.org/jira/browse/SPARK-12237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048651#comment-15048651 ] Nan Zhu commented on SPARK-12237: - may I ask how you found this issue? It seems that Master received RetrieveSparkProps message.which is supposed to be only transmitted between Executor and Driver > Unsupported message RpcMessage causes message retries > - > > Key: SPARK-12237 > URL: https://issues.apache.org/jira/browse/SPARK-12237 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Jacek Laskowski > > When an unsupported message is sent to an endpoint, Spark throws > {{org.apache.spark.SparkException}} and retries sending the message. It > should *not* since the message is unsupported. > {code} > WARN NettyRpcEndpointRef: Error sending message [message = > RetrieveSparkProps] in 1 attempts > org.apache.spark.SparkException: Unsupported message > RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@c0a6275) > from localhost:51137 > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104) > at > org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > at > org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > WARN NettyRpcEndpointRef: Error sending message [message = > RetrieveSparkProps] in 2 attempts > org.apache.spark.SparkException: Unsupported message > RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@73a76a5a) > from localhost:51137 > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104) > at > org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > at > org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > WARN NettyRpcEndpointRef: Error sending message [message = > RetrieveSparkProps] in 3 attempts > org.apache.spark.SparkException: Unsupported message > RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@670bfda7) > from localhost:51137 > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104) > at > org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373) > at > org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > at > org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Exception in thread "main" java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1672) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:151) > at > org.apache.spark.executor.CoarseGrainedEx
[jira] [Created] (SPARK-12021) Fishy test of "don't call ssc.stop in listener"
Nan Zhu created SPARK-12021: --- Summary: Fishy test of "don't call ssc.stop in listener" Key: SPARK-12021 URL: https://issues.apache.org/jira/browse/SPARK-12021 Project: Spark Issue Type: Bug Components: Streaming, Tests Affects Versions: 1.6.0 Reporter: Nan Zhu I just noticed that some of the tests blocked at the case "don't call ssc.stop in listener" in StreamingListenerSuite https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46766/console https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46776/console https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46774/console The PRs corresponding to the tests are on different things...I think something fishy hidden in the case... -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11402) Allow to define a custom driver runner and executor runner
[ https://issues.apache.org/jira/browse/SPARK-11402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14980584#comment-14980584 ] Nan Zhu commented on SPARK-11402: - I'm curious about what kind of functionalities you need with a customized ExecutorRunner/DriverRunner? > Allow to define a custom driver runner and executor runner > -- > > Key: SPARK-11402 > URL: https://issues.apache.org/jira/browse/SPARK-11402 > Project: Spark > Issue Type: Improvement > Components: Deploy, Spark Core >Reporter: Jacek Lewandowski >Priority: Minor > > {{DriverRunner}} and {{ExecutorRunner}} are used by Spark Worker in > standalone mode to spawn driver and executor processes respectively. When > integrating Spark with some environments, it would be useful to allow > providing a custom implementation of those components. > The idea is simple - provide factory class names for driver and executor > runners in Worker configuration. By default, the current implementations are > used. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10315) remove document on spark.akka.failure-detector.threshold
Nan Zhu created SPARK-10315: --- Summary: remove document on spark.akka.failure-detector.threshold Key: SPARK-10315 URL: https://issues.apache.org/jira/browse/SPARK-10315 Project: Spark Issue Type: Bug Components: Documentation Reporter: Nan Zhu this parameter is not used any longer and there is some mistake in the current document , should be 'akka.remote.watch-failure-detector.threshold' -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9602) Remove 'Actor' from the comments
Nan Zhu created SPARK-9602: -- Summary: Remove 'Actor' from the comments Key: SPARK-9602 URL: https://issues.apache.org/jira/browse/SPARK-9602 Project: Spark Issue Type: Improvement Components: Build, PySpark, Spark Core, Streaming Reporter: Nan Zhu Although we have hidden Akka behind RPC interface, I found that the Akka/Actor-related comments are still spreading everywhere. To make it consistent, we shall remove "actor"/"akka" words from the comments... -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9514) Add EventHubsReceiver to support Spark Streaming using Azure EventHubs
[ https://issues.apache.org/jira/browse/SPARK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14650568#comment-14650568 ] Nan Zhu commented on SPARK-9514: Hi, [~shanyu], in Spark, we usually submit patches via Github, https://github.com/apache/spark/pulls, so just move the patch over there to get more eyes on this > Add EventHubsReceiver to support Spark Streaming using Azure EventHubs > -- > > Key: SPARK-9514 > URL: https://issues.apache.org/jira/browse/SPARK-9514 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.1 >Reporter: shanyu zhao > Fix For: 1.5.0 > > Attachments: SPARK-9514.patch > > > We need to add EventHubsReceiver implementation to support Spark Streaming > applications that receive data from Azure EventHubs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org