[jira] [Created] (SPARK-44517) first operator should respect the nullability of child expression as well as ignoreNulls option

2023-07-23 Thread Nan Zhu (Jira)
Nan Zhu created SPARK-44517:
---

 Summary: first operator should respect the nullability of child 
expression as well as ignoreNulls option
 Key: SPARK-44517
 URL: https://issues.apache.org/jira/browse/SPARK-44517
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.1, 3.4.0, 3.3.2, 3.2.4, 3.2.3, 3.3.1, 3.2.2, 3.3.0, 
3.2.1, 3.2.0
Reporter: Nan Zhu


I found the following problem when using Spark recently:

 
{code:java}
// code placeholder
import spark.implicits._

val s = Seq((1.2, "s", 2.2)).toDF("v1", "v2", "v3")

val schema = StructType(Seq(StructField("v1", DoubleType, nullable = 
false),StructField("v2", StringType, nullable = true),StructField("v3", 
DoubleType, nullable = false)))

val df = spark.createDataFrame(s.rdd, schema)val inputDF = 

val inputDF = df.dropDuplicates("v3")

spark.sql("CREATE TABLE local.db.table (\n v1 DOUBLE NOT NULL,\n v2 STRING, v3 
DOUBLE NOT NULL)")

inputDF.write.mode("overwrite").format("iceberg").save("local.db.table") {code}
 

 

when I use the above code to write to iceberg (i guess Delta Lake will have the 
same problem) , I got very confusing exception


{code:java}
Exception in thread "main" java.lang.IllegalArgumentException: Cannot write 
incompatible dataset to table with schema:

table 

{  1: v1: required double  2: v2: optional string  3: v3: required double}

Provided schema:

table {  1: v1: optional double  2: v2: optional string  3: v3: required 
double} {code}

basically it complains that we have v1 as the nullable column in our `inputDF` 
above which is not allowed since we created table with the v1 as not nullable. 
The confusion comes from that,  if we check the schema with printSchema() of 
inputDF, v1 is not nullable
{noformat}
root 
|-- v1: double (nullable = false) 
|-- v2: string (nullable = true) 
|-- v3: double (nullable = false){noformat}
Clearly, something changed the v1's nullability unexpectedly!

 

After some debugging I found that the key is that dropDuplicates("v3"). In 
optimization phase, we have ReplaceDeduplicateWithAggregate to replace the 
Deduplicate with aggregate on v3 and run first() over all other columns. 
However, first() operator has hard coded nullable as always "true" which is the 
source of changed nullability of v1

 

this is a very confusing behavior of Spark, and probably no one really noticed 
as we do not care too much without the new table formats like delta lake and 
iceberg which can make nullability check correctly. Nowadays, we users adopt 
them more and more, this is surfaced up

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33940) allow configuring the max column name length in csv writer

2020-12-29 Thread Nan Zhu (Jira)
Nan Zhu created SPARK-33940:
---

 Summary: allow configuring the max column name length in csv writer
 Key: SPARK-33940
 URL: https://issues.apache.org/jira/browse/SPARK-33940
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.0
Reporter: Nan Zhu


csv writer actually has an implicit limit on column name length due to 
univocity-parser, 

 

when we initialize a writer 
[https://github.com/uniVocity/univocity-parsers/blob/e09114c6879fa6c2c15e7365abc02cda3e193ff7/src/main/java/com/univocity/parsers/common/AbstractWriter.java#L211,]
 it calls toIdentifierGroupArray which calls valueOf in NormalizedString.java 
eventually 
([https://github.com/uniVocity/univocity-parsers/blob/e09114c6879fa6c2c15e7365abc02cda3e193ff7/src/main/java/com/univocity/parsers/common/NormalizedString.java#L205-L209)]

 

in that stringCache.get, it has a maxStringLength cap 
[https://github.com/uniVocity/univocity-parsers/blob/e09114c6879fa6c2c15e7365abc02cda3e193ff7/src/main/java/com/univocity/parsers/common/StringCache.java#L104]
 which is 1024 by default

 

we do not expose this as configurable option, leading to NPE when we have a 
column name larger than 1024, 

 

```

[info]   Cause: java.lang.NullPointerException:

[info]   at 
com.univocity.parsers.common.AbstractWriter.submitRow(AbstractWriter.java:349)

[info]   at 
com.univocity.parsers.common.AbstractWriter.writeHeaders(AbstractWriter.java:444)

[info]   at 
com.univocity.parsers.common.AbstractWriter.writeHeaders(AbstractWriter.java:410)

[info]   at 
org.apache.spark.sql.catalyst.csv.UnivocityGenerator.writeHeaders(UnivocityGenerator.scala:87)

[info]   at 
org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter$.writeHeaders(CsvOutputWriter.scala:58)

[info]   at 
org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.(CsvOutputWriter.scala:44)

[info]   at 
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:86)

[info]   at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)

[info]   at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:111)

[info]   at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)

[info]   at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)

```

 

it could be reproduced by a simple unit test

 

```

val row1 = Row("a")
val superLongHeader = (0 until 1025).map(_ => "c").mkString("")
val df = Seq(s"${row1.getString(0)}").toDF(superLongHeader)
df.repartition(1)
 .write
 .option("header", "true")
 .option("maxColumnNameLength", 1025)
 .csv(dataPath)

```

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32351) Partially pushed partition filters are not explained

2020-10-19 Thread Nan Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217238#comment-17217238
 ] 

Nan Zhu commented on SPARK-32351:
-

[~hyukjin.kwon] nit: could you reassign this to me? [~codingcat] ;)

> Partially pushed partition filters are not explained
> 
>
> Key: SPARK-32351
> URL: https://issues.apache.org/jira/browse/SPARK-32351
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Yuming Wang
>Assignee: pavithra ramachandran
>Priority: Major
> Fix For: 3.1.0
>
>
> How to reproduce this issue:
> {code:scala}
> spark.sql(
>   s"""
>  |CREATE TABLE t(i INT, p STRING)
>  |USING parquet
>  |PARTITIONED BY (p)""".stripMargin)
> spark.range(0, 1000).selectExpr("id as col").createOrReplaceTempView("temp")
> for (part <- Seq(1, 2, 3, 4)) {
>   sql(s"""
>  |INSERT OVERWRITE TABLE t PARTITION (p='$part')
>  |SELECT col FROM temp""".stripMargin)
> }
> spark.sql("SELECT * FROM t WHERE  WHERE (p = '1' AND i = 1) OR (p = '2' and i 
> = 2)").explain
> {code}
> We have pushed down {{p = '1' or p = '2'}} since SPARK-28169, but this pushed 
> down filter not in explain
> {noformat}
> == Physical Plan ==
> *(1) Filter (((p#21 = 1) AND (i#20 = 1)) OR ((p#21 = 2) AND (i#20 = 2)))
> +- *(1) ColumnarToRow
>+- FileScan parquet default.t[i#20,p#21] Batched: true, DataFilters: [], 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/sql/core/spark-warehouse/org.apache.spark...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: struct
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26862) assertion failed in ParquetRowConverter

2019-02-12 Thread Nan Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu resolved SPARK-26862.
-
Resolution: Invalid

> assertion failed in ParquetRowConverter
> ---
>
> Key: SPARK-26862
> URL: https://issues.apache.org/jira/browse/SPARK-26862
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Nan Zhu
>Priority: Major
>
> When I run the following  query over a internal table (A and B are typed in 
> string, C is Array[String])
> ```
> spark.read.table("table1")
>  .select(col("A"), col("B"), col("C"))
>  .withColumn("row_num", 
> row_number().over(Window.partitionBy(col("A")).orderBy(col("B").desc)))
> ```
> I received error as 
>  
> ```
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 730 
> in stage 12.0 failed 4 times, most recent failure: Lost task 730.3 in stage 
> 12.0 (TID 2468, hadoopworker650-dca1.prod.uber.internal, executor 88): 
> java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:156)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:514)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:318)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:324)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.(ParquetRecordMaterializer.scala:43)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:126)
> at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)
> at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)
> at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:452)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:364)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> 

[jira] [Commented] (SPARK-26862) assertion failed in ParquetRowConverter

2019-02-12 Thread Nan Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16766355#comment-16766355
 ] 

Nan Zhu commented on SPARK-26862:
-

[~srowen] I don't think so, as the same parquet files can be accessed by 2.3.2, 
I checked the execution plan of query, the bottom part is the same ...confused 
on how a windowing operation would make difference in data source access

 

> assertion failed in ParquetRowConverter
> ---
>
> Key: SPARK-26862
> URL: https://issues.apache.org/jira/browse/SPARK-26862
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Nan Zhu
>Priority: Major
>
> When I run the following  query over a internal table (A and B are typed in 
> string, C is Array[String])
> ```
> spark.read.table("table1")
>  .select(col("A"), col("B"), col("C"))
>  .withColumn("row_num", 
> row_number().over(Window.partitionBy(col("A")).orderBy(col("B").desc)))
> ```
> I received error as 
>  
> ```
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 730 
> in stage 12.0 failed 4 times, most recent failure: Lost task 730.3 in stage 
> 12.0 (TID 2468, hadoopworker650-dca1.prod.uber.internal, executor 88): 
> java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:156)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:514)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:318)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:324)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.(ParquetRecordMaterializer.scala:43)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:126)
> at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)
> at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)
> at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:452)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:364)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
> at 
> 

[jira] [Updated] (SPARK-26862) assertion failed in ParquetRowConverter

2019-02-12 Thread Nan Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-26862:

Description: 
When I run the following  query over a internal table (A and B are typed in 
string, C is Array[String])

```

spark.read.table("table1")
 .select(col("A"), col("B"), col("C"))
 .withColumn("row_num", 
row_number().over(Window.partitionBy(col("A")).orderBy(col("B").desc)))

```

I received error as 

 

```

org.apache.spark.SparkException: Job aborted due to stage failure: Task 730 in 
stage 12.0 failed 4 times, most recent failure: Lost task 730.3 in stage 12.0 
(TID 2468, hadoopworker650-dca1.prod.uber.internal, executor 88): 
java.lang.AssertionError: assertion failed

at scala.Predef$.assert(Predef.scala:156)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:514)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:318)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:324)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.(ParquetRecordMaterializer.scala:43)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:126)

at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)

at 
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)

at 
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:452)

at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:364)

at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)

at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)

at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)

at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)

at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:622)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)

at org.apache.spark.scheduler.Task.run(Task.scala:121)

at 

[jira] [Commented] (SPARK-26862) assertion failed in ParquetRowConverter

2019-02-12 Thread Nan Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16766327#comment-16766327
 ] 

Nan Zhu commented on SPARK-26862:
-

[~felixcheung]

> assertion failed in ParquetRowConverter
> ---
>
> Key: SPARK-26862
> URL: https://issues.apache.org/jira/browse/SPARK-26862
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Nan Zhu
>Priority: Major
>
> When I run the following  query over a internal table (A and B are typed in 
> string, C is Array[String])
> ```
> spark.read.table("table1")
>  .select(col("A"), col("B"), col("C"))
>  .withColumn("row_num", 
> row_number().over(Window.partitionBy(col("A")).orderBy(col("B").desc)))
> ```
> I received error as 
>  
> ```
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 730 
> in stage 12.0 failed 4 times, most recent failure: Lost task 730.3 in stage 
> 12.0 (TID 2468, hadoopworker650-dca1.prod.uber.internal, executor 88): 
> java.lang.AssertionError: assertion failed
>  at scala.Predef$.assert(Predef.scala:156)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:514)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:318)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:324)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.(ParquetRecordMaterializer.scala:43)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:126)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)
>  at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)
>  at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:452)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:364)
>  at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
>  at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
>  at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
>  at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>  at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> 

[jira] [Created] (SPARK-26862) assertion failed in ParquetRowConverter

2019-02-12 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-26862:
---

 Summary: assertion failed in ParquetRowConverter
 Key: SPARK-26862
 URL: https://issues.apache.org/jira/browse/SPARK-26862
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Nan Zhu


When I run the following  query over a internal table (A and B are typed in 
string, C is Array[String])

```

spark.read.table("table1")
 .select(col("A"), col("B"), col("C"))
 .withColumn("row_num", 
row_number().over(Window.partitionBy(col("A")).orderBy(col("B").desc)))

```

I received error as 

 

```

org.apache.spark.SparkException: Job aborted due to stage failure: Task 730 in 
stage 12.0 failed 4 times, most recent failure: Lost task 730.3 in stage 12.0 
(TID 2468, hadoopworker650-dca1.prod.uber.internal, executor 88): 
java.lang.AssertionError: assertion failed

 at scala.Predef$.assert(Predef.scala:156)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetArrayConverter.(ParquetRowConverter.scala:514)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:318)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185)

 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

 at scala.collection.AbstractTraversable.map(Traversable.scala:104)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetRowConverter$$newConverter(ParquetRowConverter.scala:324)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:190)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$$anonfun$7.apply(ParquetRowConverter.scala:185)

 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

 at scala.collection.AbstractTraversable.map(Traversable.scala:104)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter.(ParquetRowConverter.scala:185)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRecordMaterializer.(ParquetRecordMaterializer.scala:43)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.prepareForRead(ParquetReadSupport.scala:126)

 at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204)

 at 
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182)

 at 
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:452)

 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:364)

 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)

 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)

 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)

 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)

 at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:622)

 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)

 at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)

 at 

[jira] [Resolved] (SPARK-24797) Analyzer should respect spark.sql.hive.convertMetastoreOrc/Parquet when build the data source table

2018-07-13 Thread Nan Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu resolved SPARK-24797.
-
Resolution: Won't Fix

> Analyzer should respect spark.sql.hive.convertMetastoreOrc/Parquet when build 
> the data source table
> ---
>
> Key: SPARK-24797
> URL: https://issues.apache.org/jira/browse/SPARK-24797
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Nan Zhu
>Priority: Major
>
> the current code path ignore the value of 
> spark.sql.hive.convertMetastoreParquet when building data source table 
>  
> [https://github.com/apache/spark/blob/e0559f238009e02c40f65678fec691c07904e8c0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L263]
>  
> as a result, even I turned off spark.sql.hive.convertMetastoreParquet, Spark 
> SQL still uses its own parquet reader to access table instead of delegate to 
> serder



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24797) Analyzer should respect spark.sql.hive.convertMetastoreOrc/Parquet when build the data source table

2018-07-12 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-24797:
---

 Summary: Analyzer should respect 
spark.sql.hive.convertMetastoreOrc/Parquet when build the data source table
 Key: SPARK-24797
 URL: https://issues.apache.org/jira/browse/SPARK-24797
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.1, 2.3.0
Reporter: Nan Zhu


the current code path ignore the value of 
spark.sql.hive.convertMetastoreParquet when building data source table 

 

[https://github.com/apache/spark/blob/e0559f238009e02c40f65678fec691c07904e8c0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L263]

 

as a result, even I turned off spark.sql.hive.convertMetastoreParquet, Spark 
SQL still uses its own parquet reader to access table instead of delegate to 
serder



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22599) Avoid extra reading for cached table

2017-12-24 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303022#comment-16303022
 ] 

Nan Zhu commented on SPARK-22599:
-

[~rajesh.balamohan] no, it means that SPARK-22599 and master are running with 
in-memory data

parquet means that it reads parquet files from the storage system directly

> Avoid extra reading for cached table
> 
>
> Key: SPARK-22599
> URL: https://issues.apache.org/jira/browse/SPARK-22599
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Nan Zhu
>
> In the current implementation of Spark, InMemoryTableExec read all data in a 
> cached table, filter CachedBatch according to stats and pass data to the 
> downstream operators. This implementation makes it inefficient to reside the 
> whole table in memory to serve various queries against different partitions 
> of the table, which occupies a certain portion of our users' scenarios.
> The following is an example of such a use case:
> store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
> 'location'. The first query, Q1, wants to output several metrics A, B, C for 
> all stores in all locations. After that, a small team of 3 data scientists 
> wants to do some causal analysis for the sales in different locations. To 
> avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache 
> the whole table in memory in Q1.
> With the current implementation, even any one of the data scientists is only 
> interested in one out of three locations, the queries they submit to Spark 
> cluster is still reading 1TB data completely.
> The reason behind the extra reading operation is that we implement 
> CachedBatch as
> {code}
> case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
> InternalRow)
> {code}
> where "stats" is a part of every CachedBatch, so we can only filter batches 
> for output of InMemoryTableExec operator by reading all data in in-memory 
> table as input. The extra reading would be even more unacceptable when some 
> of the table's data is evicted to disks.
> We propose to introduce a new type of block, metadata block, for the 
> partitions of RDD representing data in the cached table. Every metadata block 
> contains stats info for all columns in a partition and is saved to 
> BlockManager when executing compute() method for the partition. To minimize 
> the number of bytes to read,
> More details can be found in design 
> doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing
> performance test results:
> Environment: 6 Executors, each of which has 16 cores 90G memory
> dataset: 1T TPCDS data
> queries: tested 4 queries (Q19, Q46, Q34, Q27) in 
> https://github.com/databricks/spark-sql-perf/blob/c2224f37e50628c5c8691be69414ec7f5a3d919a/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala
> results: 
> https://docs.google.com/spreadsheets/d/1A20LxqZzAxMjW7ptAJZF4hMBaHxKGk3TBEQoAJXfzCI/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-19 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297453#comment-16297453
 ] 

Nan Zhu commented on SPARK-22765:
-

I took a look at the code, one of the possibilities is as following:

we add the new executor id 
https://github.com/apache/spark/blob/a233fac0b8bf8229d938a24f2ede2d9d8861c284/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L598

and immediately filter idle executors

https://github.com/apache/spark/blob/a233fac0b8bf8229d938a24f2ede2d9d8861c284/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L419

Right after this step, since executorIdToTaskIds hasn't contained executor id, 
the newly added executor is applied with onExecutorIdle

in the method of onExecutorIdle: 
https://github.com/apache/spark/blob/a233fac0b8bf8229d938a24f2ede2d9d8861c284/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L479

the newly added executor is also vulnerable to be assigned with a removeTimes 
value, since removeTimes and executorsPendingToRemove would not contain the 
newly added executor ID. Based on the calculation method, the removeTime would 
be 60s after. Then we will remove the executor after 60s 
(https://github.com/apache/spark/blob/a233fac0b8bf8229d938a24f2ede2d9d8861c284/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L266).


The potential fix is to update executorIdToTaskIds before we filter idle 
executors (but SPARK-21656 should prevent this happeningstill trying to 
figure out why SPARK-21656 not working)


> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-18 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295958#comment-16295958
 ] 

Nan Zhu commented on SPARK-22765:
-

[~xuefuz] Regarding this, "The symptom is that newly allocated executors are 
idled out before completing a single tasks! I suspected that this is caused by 
a busy scheduler. As a result, we have to keep 60s as a minimum."

did you try to tune the parameters like 

1. spark.driver.cores to assign more threads to the driver, 

2. "spark.locality.wait.process", "spark.locality.wait.node", 
"spark.locality.wait.rack" to let executors get filled in a faster pace?

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21656) spark dynamic allocation should not idle timeout executors when there are enough tasks to run on them

2017-12-18 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295282#comment-16295282
 ] 

Nan Zhu commented on SPARK-21656:
-

NOTE: the issue fixed by https://github.com/apache/spark/pull/18874

> spark dynamic allocation should not idle timeout executors when there are 
> enough tasks to run on them
> -
>
> Key: SPARK-21656
> URL: https://issues.apache.org/jira/browse/SPARK-21656
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: Jong Yoon Lee
>Assignee: Jong Yoon Lee
> Fix For: 2.2.1, 2.3.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Right now with dynamic allocation spark starts by getting the number of 
> executors it needs to run all the tasks in parallel (or the configured 
> maximum) for that stage.  After it gets that number it will never reacquire 
> more unless either an executor dies, is explicitly killed by yarn or it goes 
> to the next stage.  The dynamic allocation manager has the concept of idle 
> timeout. Currently this says if a task hasn't been scheduled on that executor 
> for a configurable amount of time (60 seconds by default), then let that 
> executor go.  Note when it lets that executor go due to the idle timeout it 
> never goes back to see if it should reacquire more.
> This is a problem for multiple reasons:
> 1 . Things can happen in the system that are not expected that can cause 
> delays. Spark should be resilient to these. If the driver is GC'ing, you have 
> network delays, etc we could idle timeout executors even though there are 
> tasks to run on them its just the scheduler hasn't had time to start those 
> tasks.  Note that in the worst case this allows the number of executors to go 
> to 0 and we have a deadlock.
> 2. Internal Spark components have opposing requirements. The scheduler has a 
> requirement to try to get locality, the dynamic allocation doesn't know about 
> this and if it lets the executors go it hurts the scheduler from doing what 
> it was designed to do.  For example the scheduler first tries to schedule 
> node local, during this time it can skip scheduling on some executors.  After 
> a while though the scheduler falls back from node local to scheduler on rack 
> local, and then eventually on any node.  So during when the scheduler is 
> doing node local scheduling, the other executors can idle timeout.  This 
> means that when the scheduler does fall back to rack or any locality where it 
> would have used those executors, we have already let them go and it can't 
> scheduler all the tasks it could which can have a huge negative impact on job 
> run time.
>  
> In both of these cases when the executors idle timeout we never go back to 
> check to see if we need more executors (until the next stage starts).  In the 
> worst case you end up with 0 and deadlock, but generally this shows itself by 
> just going down to very few executors when you could have 10's of thousands 
> of tasks to run on them, which causes the job to take way more time (in my 
> case I've seen it should take minutes and it takes hours due to only been 
> left a few executors).  
> We should handle these situations in Spark.   The most straight forward 
> approach would be to not allow the executors to idle timeout when there are 
> tasks that could run on those executors. This would allow the scheduler to do 
> its job with locality scheduling.  In doing this it also fixes number 1 above 
> because you never can go into a deadlock as it will keep enough executors to 
> run all the tasks on. 
> There are other approaches to fix this, like explicitly prevent it from going 
> to 0 executors, that prevents a deadlock but can still cause the job to 
> slowdown greatly.  We could also change it at some point to just re-check to 
> see if we should get more executors, but this adds extra logic, we would have 
> to decide when to check, its also just overhead in letting them go and then 
> re-acquiring them again and this would cause some slowdown in the job as the 
> executors aren't immediately there for the scheduler to place things on. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22790) add a configurable factor to describe HadoopFsRelation's size

2017-12-14 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-22790:
---

 Summary: add a configurable factor to describe HadoopFsRelation's 
size
 Key: SPARK-22790
 URL: https://issues.apache.org/jira/browse/SPARK-22790
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.0
Reporter: Nan Zhu


as per discussion in 
https://github.com/apache/spark/pull/19864#discussion_r156847927

the current HadoopFsRelation is purely based on the underlying file size which 
is not accurate and makes the execution vulnerable to errors like OOM

Users can enable CBO with the functionalities in 
https://github.com/apache/spark/pull/19864 to avoid this issue

This JIRA proposes to add a configurable factor to sizeInBytes method in 
HadoopFsRelation class so that users can mitigate this problem without CBO



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22790) add a configurable factor to describe HadoopFsRelation's size

2017-12-14 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291985#comment-16291985
 ] 

Nan Zhu commented on SPARK-22790:
-

created per discussion in https://github.com/apache/spark/pull/19864 (will file 
a PR soon)

> add a configurable factor to describe HadoopFsRelation's size
> -
>
> Key: SPARK-22790
> URL: https://issues.apache.org/jira/browse/SPARK-22790
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Nan Zhu
>
> as per discussion in 
> https://github.com/apache/spark/pull/19864#discussion_r156847927
> the current HadoopFsRelation is purely based on the underlying file size 
> which is not accurate and makes the execution vulnerable to errors like OOM
> Users can enable CBO with the functionalities in 
> https://github.com/apache/spark/pull/19864 to avoid this issue
> This JIRA proposes to add a configurable factor to sizeInBytes method in 
> HadoopFsRelation class so that users can mitigate this problem without CBO



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22680) SparkSQL scan all partitions when the specified partitions are not exists in parquet formatted table

2017-12-07 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282248#comment-16282248
 ] 

Nan Zhu commented on SPARK-22680:
-

how you observed that spark scans all partitions? I tried to reproduce but no 
luck

table structure
{code}
zhunan@sparktest:~/testdata1$ ls
count1=1  count1=2  count1=3  _SUCCESS
{code}

query: select * from table1 where count1 = 4

I can see the log shows 

17/12/07 17:57:51 INFO PrunedInMemoryFileIndex: Selected 0 partitions out of 0, 
pruned 0 partitions.
17/12/07 17:57:51 TRACE PrunedInMemoryFileIndex: Selected files after partition 
pruning:

No file was selected


> SparkSQL scan all partitions when the specified partitions are not exists in 
> parquet formatted table
> 
>
> Key: SPARK-22680
> URL: https://issues.apache.org/jira/browse/SPARK-22680
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.2.0
> Environment: spark2.0.2 spark2.2.0
>Reporter: Xiaochen Ouyang
>
> 1. spark-sql --master local[2]
> 2. create external table test (id int,name string) partitioned by (country 
> string,province string, day string,hour int) stored as parquet localtion 
> '/warehouse/test';
> 3.produce data into table test
> 4. select count(1) from test where country = '185' and province = '021' and 
> day = '2017-11-12' and hour = 10; if the 4 filter conditions are not exists 
> in HDFS and MetaStore[mysql] , this sql will scan all partitions in table test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22673) InMemoryRelation should utilize on-disk table stats whenever possible

2017-12-01 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-22673:
---

 Summary: InMemoryRelation should utilize on-disk table stats 
whenever possible
 Key: SPARK-22673
 URL: https://issues.apache.org/jira/browse/SPARK-22673
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.0
Reporter: Nan Zhu


The current implementation of InMemoryRelation always uses the most expensive 
execution plan when writing cache

With CBO enabled, we can actually have a more exact estimation of the 
underlying table size...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22599) Avoid extra reading for cached table

2017-11-29 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-22599:

Description: 
In the current implementation of Spark, InMemoryTableExec read all data in a 
cached table, filter CachedBatch according to stats and pass data to the 
downstream operators. This implementation makes it inefficient to reside the 
whole table in memory to serve various queries against different partitions of 
the table, which occupies a certain portion of our users' scenarios.

The following is an example of such a use case:

store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
'location'. The first query, Q1, wants to output several metrics A, B, C for 
all stores in all locations. After that, a small team of 3 data scientists 
wants to do some causal analysis for the sales in different locations. To avoid 
unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole 
table in memory in Q1.

With the current implementation, even any one of the data scientists is only 
interested in one out of three locations, the queries they submit to Spark 
cluster is still reading 1TB data completely.

The reason behind the extra reading operation is that we implement CachedBatch 
as

{code}
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
{code}

where "stats" is a part of every CachedBatch, so we can only filter batches for 
output of InMemoryTableExec operator by reading all data in in-memory table as 
input. The extra reading would be even more unacceptable when some of the 
table's data is evicted to disks.

We propose to introduce a new type of block, metadata block, for the partitions 
of RDD representing data in the cached table. Every metadata block contains 
stats info for all columns in a partition and is saved to BlockManager when 
executing compute() method for the partition. To minimize the number of bytes 
to read,

More details can be found in design 
doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing

performance test results:

Environment: 6 Executors, each of which has 16 cores 90G memory

dataset: 1T TPCDS data

queries: tested 4 queries (Q19, Q46, Q34, Q27) in 
https://github.com/databricks/spark-sql-perf/blob/c2224f37e50628c5c8691be69414ec7f5a3d919a/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala

results: 
https://docs.google.com/spreadsheets/d/1A20LxqZzAxMjW7ptAJZF4hMBaHxKGk3TBEQoAJXfzCI/edit?usp=sharing

  was:
In the current implementation of Spark, InMemoryTableExec read all data in a 
cached table, filter CachedBatch according to stats and pass data to the 
downstream operators. This implementation makes it inefficient to reside the 
whole table in memory to serve various queries against different partitions of 
the table, which occupies a certain portion of our users' scenarios.

The following is an example of such a use case:

store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
'location'. The first query, Q1, wants to output several metrics A, B, C for 
all stores in all locations. After that, a small team of 3 data scientists 
wants to do some causal analysis for the sales in different locations. To avoid 
unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole 
table in memory in Q1.

With the current implementation, even any one of the data scientists is only 
interested in one out of three locations, the queries they submit to Spark 
cluster is still reading 1TB data completely.

The reason behind the extra reading operation is that we implement CachedBatch 
as

{code}
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
{code}

where "stats" is a part of every CachedBatch, so we can only filter batches for 
output of InMemoryTableExec operator by reading all data in in-memory table as 
input. The extra reading would be even more unacceptable when some of the 
table's data is evicted to disks.

We propose to introduce a new type of block, metadata block, for the partitions 
of RDD representing data in the cached table. Every metadata block contains 
stats info for all columns in a partition and is saved to BlockManager when 
executing compute() method for the partition. To minimize the number of bytes 
to read,

More details can be found in design 
doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing




> Avoid extra reading for cached table
> 
>
> Key: SPARK-22599
> URL: https://issues.apache.org/jira/browse/SPARK-22599
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Nan Zhu
>
> In the current implementation of Spark, InMemoryTableExec read 

[jira] [Updated] (SPARK-22599) Avoid extra reading for cached table

2017-11-23 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-22599:

Description: 
In the current implementation of Spark, InMemoryTableExec read all data in a 
cached table, filter CachedBatch according to stats and pass data to the 
downstream operators. This implementation makes it inefficient to reside the 
whole table in memory to serve various queries against different partitions of 
the table, which occupies a certain portion of our users' scenarios.

The following is an example of such a use case:

store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
'location'. The first query, Q1, wants to output several metrics A, B, C for 
all stores in all locations. After that, a small team of 3 data scientists 
wants to do some causal analysis for the sales in different locations. To avoid 
unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole 
table in memory in Q1.

With the current implementation, even any one of the data scientists is only 
interested in one out of three locations, the queries they submit to Spark 
cluster is still reading 1TB data completely.

The reason behind the extra reading operation is that we implement CachedBatch 
as

{code}
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
{code}

where "stats" is a part of every CachedBatch, so we can only filter batches for 
output of InMemoryTableExec operator by reading all data in in-memory table as 
input. The extra reading would be even more unacceptable when some of the 
table's data is evicted to disks.

We propose to introduce a new type of block, metadata block, for the partitions 
of RDD representing data in the cached table. Every metadata block contains 
stats info for all columns in a partition and is saved to BlockManager when 
executing compute() method for the partition. To minimize the number of bytes 
to read,

More details can be found in design 
doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing



  was:
In the current implementation of Spark, InMemoryTableExec read all data in a 
cached table, filter CachedBatch according to stats and pass data to the 
downstream operators. This implementation makes it inefficient to reside the 
whole table in memory to serve various queries against different partitions of 
the table, which occupies a certain portion of our users' scenarios.

The following is an example of such a use case:

store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
'location'. The first query, Q1, wants to output several metrics A, B, C for 
all stores in all locations. After that, a small team of 3 data scientists 
wants to do some causal analysis for the sales in different locations. To avoid 
unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole 
table in memory in Q1.

With the current implementation, even any one of the data scientists is only 
interested in one out of three locations, the queries they submit to Spark 
cluster is still reading 1TB data completely.

The reason behind the extra reading operation is that we implement CachedBatch 
as

{code:scala}
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
{code}

where "stats" is a part of every CachedBatch, so we can only filter batches for 
output of InMemoryTableExec operator by reading all data in in-memory table as 
input. The extra reading would be even more unacceptable when some of the 
table's data is evicted to disks.

We propose to introduce a new type of block, metadata block, for the partitions 
of RDD representing data in the cached table. Every metadata block contains 
stats info for all columns in a partition and is saved to BlockManager when 
executing compute() method for the partition. To minimize the number of bytes 
to read,

More details can be found in design 
doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing




> Avoid extra reading for cached table
> 
>
> Key: SPARK-22599
> URL: https://issues.apache.org/jira/browse/SPARK-22599
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Nan Zhu
>
> In the current implementation of Spark, InMemoryTableExec read all data in a 
> cached table, filter CachedBatch according to stats and pass data to the 
> downstream operators. This implementation makes it inefficient to reside the 
> whole table in memory to serve various queries against different partitions 
> of the table, which occupies a certain portion of our users' scenarios.
> The following is an example of such a use case:
> store_sales is a 1TB-sized table in cloud storage, which is 

[jira] [Created] (SPARK-22599) Avoid extra reading for cached table

2017-11-23 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-22599:
---

 Summary: Avoid extra reading for cached table
 Key: SPARK-22599
 URL: https://issues.apache.org/jira/browse/SPARK-22599
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.0
Reporter: Nan Zhu


In the current implementation of Spark, InMemoryTableExec read all data in a 
cached table, filter CachedBatch according to stats and pass data to the 
downstream operators. This implementation makes it inefficient to reside the 
whole table in memory to serve various queries against different partitions of 
the table, which occupies a certain portion of our users' scenarios.

The following is an example of such a use case:

store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
'location'. The first query, Q1, wants to output several metrics A, B, C for 
all stores in all locations. After that, a small team of 3 data scientists 
wants to do some causal analysis for the sales in different locations. To avoid 
unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole 
table in memory in Q1.

With the current implementation, even any one of the data scientists is only 
interested in one out of three locations, the queries they submit to Spark 
cluster is still reading 1TB data completely.

The reason behind the extra reading operation is that we implement CachedBatch 
as

{code:scala}
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
{code}

where "stats" is a part of every CachedBatch, so we can only filter batches for 
output of InMemoryTableExec operator by reading all data in in-memory table as 
input. The extra reading would be even more unacceptable when some of the 
table's data is evicted to disks.

We propose to introduce a new type of block, metadata block, for the partitions 
of RDD representing data in the cached table. Every metadata block contains 
stats info for all columns in a partition and is saved to BlockManager when 
executing compute() method for the partition. To minimize the number of bytes 
to read,

More details can be found in design 
doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-21197) Tricky use case makes dead application struggle for a long duration

2017-06-24 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu closed SPARK-21197.
---
Resolution: Won't Fix

> Tricky use case makes dead application struggle for a long duration
> ---
>
> Key: SPARK-21197
> URL: https://issues.apache.org/jira/browse/SPARK-21197
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Spark Core
>Affects Versions: 2.0.2, 2.1.1
>Reporter: Nan Zhu
>
> The use case is in Spark Streaming while the root cause is in DAGScheduler, 
> so I said the component as both of DStreams and Core
> Use case: 
> the user has a thread periodically triggering Spark jobs, and in the same 
> application, they retrieve data through Spark Streaming from somewherein 
> the Streaming logic, an exception is thrown so that the whole application is 
> supposed to be shutdown and let YARN restart it...
> The user observed that after the exception is propagated to Spark core and 
> SparkContext.stop() is called, after 18 hours, the application is still 
> running...
> The root cause is that when we call DAGScheduler.stop(), we will wait for 
> eventLoop's thread to finish 
> (https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1704
>  and 
> https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/util/EventLoop.scala#L40)
> Since there is a thread periodically push events to DAGScheduler's event 
> queue, it will never finish
> a potential solution is that in EventLoop, we should allow interrupt the 
> thread directly for some cases, e.g. this one, and simultaneously allow 
> graceful shutdown for other cases, e.g. ListenerBus one, 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21197) Tricky use case makes dead application struggle for a long duration

2017-06-24 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062162#comment-16062162
 ] 

Nan Zhu commented on SPARK-21197:
-

yeah, after rethinking about the solution, I think daemon thread would be a 
better solution, 

> Tricky use case makes dead application struggle for a long duration
> ---
>
> Key: SPARK-21197
> URL: https://issues.apache.org/jira/browse/SPARK-21197
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Spark Core
>Affects Versions: 2.0.2, 2.1.1
>Reporter: Nan Zhu
>
> The use case is in Spark Streaming while the root cause is in DAGScheduler, 
> so I said the component as both of DStreams and Core
> Use case: 
> the user has a thread periodically triggering Spark jobs, and in the same 
> application, they retrieve data through Spark Streaming from somewherein 
> the Streaming logic, an exception is thrown so that the whole application is 
> supposed to be shutdown and let YARN restart it...
> The user observed that after the exception is propagated to Spark core and 
> SparkContext.stop() is called, after 18 hours, the application is still 
> running...
> The root cause is that when we call DAGScheduler.stop(), we will wait for 
> eventLoop's thread to finish 
> (https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1704
>  and 
> https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/util/EventLoop.scala#L40)
> Since there is a thread periodically push events to DAGScheduler's event 
> queue, it will never finish
> a potential solution is that in EventLoop, we should allow interrupt the 
> thread directly for some cases, e.g. this one, and simultaneously allow 
> graceful shutdown for other cases, e.g. ListenerBus one, 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21197) Tricky use case makes dead application struggle for a long duration

2017-06-23 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-21197:

Summary: Tricky use case makes dead application struggle for a long 
duration  (was: Tricky use cases makes dead application struggle for a long 
duration)

> Tricky use case makes dead application struggle for a long duration
> ---
>
> Key: SPARK-21197
> URL: https://issues.apache.org/jira/browse/SPARK-21197
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Spark Core
>Affects Versions: 2.0.2, 2.1.1
>Reporter: Nan Zhu
>
> The use case is in Spark Streaming while the root cause is in DAGScheduler, 
> so I said the component as both of DStreams and Core
> Use case: 
> the user has a thread periodically triggering Spark jobs, and in the same 
> application, they retrieve data through Spark Streaming from somewherein 
> the Streaming logic, an exception is thrown so that the whole application is 
> supposed to be shutdown and let YARN restart it...
> The user observed that after the exception is propagated to Spark core and 
> SparkContext.stop() is called, after 18 hours, the application is still 
> running...
> The root cause is that when we call DAGScheduler.stop(), we will wait for 
> eventLoop's thread to finish 
> (https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1704
>  and 
> https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/util/EventLoop.scala#L40)
> Since there is a thread periodically push events to DAGScheduler's event 
> queue, it will never finish
> a potential solution is that in EventLoop, we should allow interrupt the 
> thread directly for some cases, e.g. this one, and simultaneously allow 
> graceful shutdown for other cases, e.g. ListenerBus one, 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21197) Tricky use cases makes dead application struggle for a long duration

2017-06-23 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-21197:
---

 Summary: Tricky use cases makes dead application struggle for a 
long duration
 Key: SPARK-21197
 URL: https://issues.apache.org/jira/browse/SPARK-21197
 Project: Spark
  Issue Type: Bug
  Components: DStreams, Spark Core
Affects Versions: 2.1.1, 2.0.2
Reporter: Nan Zhu


The use case is in Spark Streaming while the root cause is in DAGScheduler, so 
I said the component as both of DStreams and Core

Use case: 

the user has a thread periodically triggering Spark jobs, and in the same 
application, they retrieve data through Spark Streaming from somewherein 
the Streaming logic, an exception is thrown so that the whole application is 
supposed to be shutdown and let YARN restart it...

The user observed that after the exception is propagated to Spark core and 
SparkContext.stop() is called, after 18 hours, the application is still 
running...

The root cause is that when we call DAGScheduler.stop(), we will wait for 
eventLoop's thread to finish 
(https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1704
 and 
https://github.com/apache/spark/blob/03eb6117affcca21798be25706a39e0d5a2f7288/core/src/main/scala/org/apache/spark/util/EventLoop.scala#L40)

Since there is a thread periodically push events to DAGScheduler's event queue, 
it will never finish

a potential solution is that in EventLoop, we should allow interrupt the thread 
directly for some cases, e.g. this one, and simultaneously allow graceful 
shutdown for other cases, e.g. ListenerBus one, 





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-06-03 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16036157#comment-16036157
 ] 

Nan Zhu commented on SPARK-20928:
-

if I understand correctly the tasks will be "long-term" tasks just like the 
receiver tasks in receiver-based InputDStream in Spark Streaming?

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-05-30 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030379#comment-16030379
 ] 

Nan Zhu commented on SPARK-20928:
-

Hi, is there any description on what does it mean?

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4921) TaskSetManager mistakenly returns PROCESS_LOCAL for NO_PREF tasks

2017-05-23 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16021417#comment-16021417
 ] 

Nan Zhu commented on SPARK-4921:


I forgot most of details...but the final conclusion was that "it's a typo 
resulting no performance issue, but for some reason, we cannot fully prove 
changing this will not bring hurts...so won't fix"

> TaskSetManager mistakenly returns PROCESS_LOCAL for NO_PREF tasks
> -
>
> Key: SPARK-4921
> URL: https://issues.apache.org/jira/browse/SPARK-4921
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Xuefu Zhang
> Attachments: NO_PREF.patch
>
>
> During research for HIVE-9153, we found that TaskSetManager returns 
> PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. 
> Changing the return value to NO_PREF, as demonstrated in the attached patch, 
> seemingly improves the performance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20811) GBT Classifier failed with mysterious StackOverflowError

2017-05-19 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16018246#comment-16018246
 ] 

Nan Zhu commented on SPARK-20811:
-

thanks, let me try it

> GBT Classifier failed with mysterious StackOverflowError
> 
>
> Key: SPARK-20811
> URL: https://issues.apache.org/jira/browse/SPARK-20811
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.1.0
>Reporter: Nan Zhu
>
> I am running GBT Classifier over airline dataset (combining 2005-2008) and in 
> total it's around 22M examples as training data
> code is simple
> {code:title=Bar.scala|borderStyle=solid}
> val gradientBoostedTrees = new GBTClassifier()
>   gradientBoostedTrees.setMaxBins(1000)
>   gradientBoostedTrees.setMaxIter(500)
>   gradientBoostedTrees.setMaxDepth(6)
>   gradientBoostedTrees.setStepSize(1.0)
>   transformedTrainingSet.cache().foreach(_ => Unit)
>   val startTime = System.nanoTime()
>   val model = gradientBoostedTrees.fit(transformedTrainingSet)
>   println(s"===training time cost: ${(System.nanoTime() - startTime) / 
> 1000.0 / 1000.0} ms")
>   val resultDF = model.transform(transformedTestset)
>   val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
>   
> binaryClassificationEvaluator.setRawPredictionCol("prediction").setLabelCol("label")
>   println(s"=test AUC: 
> ${binaryClassificationEvaluator.evaluate(resultDF)}==")
> {code}
> my training job always failed with 
> {quote}
> 17/05/19 13:41:29 WARN TaskSetManager: Lost task 18.0 in stage 3907.0 (TID 
> 137506, 10.0.0.13, executor 3): java.lang.StackOverflowError
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:3037)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:3061)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2234)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> {quote}
> the above pattern repeated for many times
> Is it a bug or did I make something wrong when using GBTClassifier in ML?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20811) GBT Classifier failed with mysterious StackOverflowException

2017-05-19 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-20811:
---

 Summary: GBT Classifier failed with mysterious 
StackOverflowException 
 Key: SPARK-20811
 URL: https://issues.apache.org/jira/browse/SPARK-20811
 Project: Spark
  Issue Type: Bug
  Components: ML
Affects Versions: 2.1.0
Reporter: Nan Zhu


I am running GBT Classifier over airline dataset (combining 2005-2008) and in 
total it's around 22M examples as training data

code is simple

{code:title=Bar.scala|borderStyle=solid}
val gradientBoostedTrees = new GBTClassifier()
  gradientBoostedTrees.setMaxBins(1000)
  gradientBoostedTrees.setMaxIter(500)
  gradientBoostedTrees.setMaxDepth(6)
  gradientBoostedTrees.setStepSize(1.0)
  transformedTrainingSet.cache().foreach(_ => Unit)
  val startTime = System.nanoTime()
  val model = gradientBoostedTrees.fit(transformedTrainingSet)
  println(s"===training time cost: ${(System.nanoTime() - startTime) / 
1000.0 / 1000.0} ms")
  val resultDF = model.transform(transformedTestset)
  val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
  
binaryClassificationEvaluator.setRawPredictionCol("prediction").setLabelCol("label")
  println(s"=test AUC: 
${binaryClassificationEvaluator.evaluate(resultDF)}==")
{code}


my training job always failed with 

{quote}
17/05/19 13:41:29 WARN TaskSetManager: Lost task 18.0 in stage 3907.0 (TID 
137506, 10.0.0.13, executor 3): java.lang.StackOverflowError
at 
java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:3037)
at 
java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:3061)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2234)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
{quote}

the above pattern repeated for many times

Is it a bug or did I make something wrong when using GBTClassifier in ML?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20811) GBT Classifier failed with mysterious StackOverflowError

2017-05-19 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-20811:

Summary: GBT Classifier failed with mysterious StackOverflowError  (was: 
GBT Classifier failed with mysterious StackOverflowException )

> GBT Classifier failed with mysterious StackOverflowError
> 
>
> Key: SPARK-20811
> URL: https://issues.apache.org/jira/browse/SPARK-20811
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.1.0
>Reporter: Nan Zhu
>
> I am running GBT Classifier over airline dataset (combining 2005-2008) and in 
> total it's around 22M examples as training data
> code is simple
> {code:title=Bar.scala|borderStyle=solid}
> val gradientBoostedTrees = new GBTClassifier()
>   gradientBoostedTrees.setMaxBins(1000)
>   gradientBoostedTrees.setMaxIter(500)
>   gradientBoostedTrees.setMaxDepth(6)
>   gradientBoostedTrees.setStepSize(1.0)
>   transformedTrainingSet.cache().foreach(_ => Unit)
>   val startTime = System.nanoTime()
>   val model = gradientBoostedTrees.fit(transformedTrainingSet)
>   println(s"===training time cost: ${(System.nanoTime() - startTime) / 
> 1000.0 / 1000.0} ms")
>   val resultDF = model.transform(transformedTestset)
>   val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
>   
> binaryClassificationEvaluator.setRawPredictionCol("prediction").setLabelCol("label")
>   println(s"=test AUC: 
> ${binaryClassificationEvaluator.evaluate(resultDF)}==")
> {code}
> my training job always failed with 
> {quote}
> 17/05/19 13:41:29 WARN TaskSetManager: Lost task 18.0 in stage 3907.0 (TID 
> 137506, 10.0.0.13, executor 3): java.lang.StackOverflowError
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:3037)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:3061)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2234)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> {quote}
> the above pattern repeated for many times
> Is it a bug or did I make something wrong when using GBTClassifier in ML?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20251) Spark streaming skips batches in a case of failure

2017-04-20 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977589#comment-15977589
 ] 

Nan Zhu commented on SPARK-20251:
-

ignore my previous comments...the moving on Spark Streaming is due to a leaked 
user thread (didn't set to daemon) and blocked the whole app to quit...the 
StreamingContext is shutdown

> Spark streaming skips batches in a case of failure
> --
>
> Key: SPARK-20251
> URL: https://issues.apache.org/jira/browse/SPARK-20251
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Roman Studenikin
>
> We are experiencing strange behaviour of spark streaming application. 
> Sometimes it just skips batch in a case of job failure and starts working on 
> the next one.
> We expect it to attempt to reprocess batch, but not to skip it. Is it a bug 
> or we are missing any important configuration params?
> Screenshots from spark UI:
> http://pasteboard.co/1oRW0GDUX.png
> http://pasteboard.co/1oSjdFpbc.png



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20251) Spark streaming skips batches in a case of failure

2017-04-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962318#comment-15962318
 ] 

Nan Zhu edited comment on SPARK-20251 at 4/10/17 12:16 AM:
---

more details here, it is expected that the compute() method for the next batch 
was executed before the app is shutdown, however, the app should be eventually 
shutdown since we have signalled the awaiting condition set in 
awaitTermination()

however, this "eventual shutdown" was not happened...(this issue did not 
consistently happen)


was (Author: codingcat):
more details here, by "be proceeding", I mean it is expected that the compute() 
method for the next batch was executed before the app is shutdown, however, the 
app should be eventually shutdown since we have signalled the awaiting 
condition set in awaitTermination()

however, this "eventual shutdown" was not happened...(this issue did not 
consistently happen)

> Spark streaming skips batches in a case of failure
> --
>
> Key: SPARK-20251
> URL: https://issues.apache.org/jira/browse/SPARK-20251
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Roman Studenikin
>
> We are experiencing strange behaviour of spark streaming application. 
> Sometimes it just skips batch in a case of job failure and starts working on 
> the next one.
> We expect it to attempt to reprocess batch, but not to skip it. Is it a bug 
> or we are missing any important configuration params?
> Screenshots from spark UI:
> http://pasteboard.co/1oRW0GDUX.png
> http://pasteboard.co/1oSjdFpbc.png



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20251) Spark streaming skips batches in a case of failure

2017-04-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962318#comment-15962318
 ] 

Nan Zhu commented on SPARK-20251:
-

more details here, by "be proceeding", I mean it is expected that the compute() 
method for the next batch was executed before the app is shutdown, however, the 
app should be eventually shutdown since we have signalled the awaiting 
condition set in awaitTermination()

however, this "eventual shutdown" was not happened...(this issue did not 
consistently happen)

> Spark streaming skips batches in a case of failure
> --
>
> Key: SPARK-20251
> URL: https://issues.apache.org/jira/browse/SPARK-20251
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Roman Studenikin
>
> We are experiencing strange behaviour of spark streaming application. 
> Sometimes it just skips batch in a case of job failure and starts working on 
> the next one.
> We expect it to attempt to reprocess batch, but not to skip it. Is it a bug 
> or we are missing any important configuration params?
> Screenshots from spark UI:
> http://pasteboard.co/1oRW0GDUX.png
> http://pasteboard.co/1oSjdFpbc.png



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20251) Spark streaming skips batches in a case of failure

2017-04-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962313#comment-15962313
 ] 

Nan Zhu edited comment on SPARK-20251 at 4/9/17 11:57 PM:
--

why this is an invalid report? I have been observing the same behavior recently 
when I upgrade to Spark 2.1 

The basic idea (in my side) is that an exception thrown from DStream.compute() 
method should close the app instead of be proceeding (as the error handling in 
Spark Streaming is to release the await lock set in awaitTermination)

I am still looking at those threads within Spark Streaming to see what was 
happening, 

can we change it back to a valid case and give me more time to investigate?


was (Author: codingcat):
why this is an invalid report? I have been observing the same behavior recently 
when I upgrade to Spark 2.1 

The basic idea (in my side), an exception thrown from DStream.compute() method 
should close the app instead of proceeding (as the error handling in Spark 
Streaming is to release the await lock set in awaitTermination)

I am still looking at those threads within Spark Streaming to see what was 
happening, 

can we change it back to a valid case and give me more time to investigate?

> Spark streaming skips batches in a case of failure
> --
>
> Key: SPARK-20251
> URL: https://issues.apache.org/jira/browse/SPARK-20251
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Roman Studenikin
>
> We are experiencing strange behaviour of spark streaming application. 
> Sometimes it just skips batch in a case of job failure and starts working on 
> the next one.
> We expect it to attempt to reprocess batch, but not to skip it. Is it a bug 
> or we are missing any important configuration params?
> Screenshots from spark UI:
> http://pasteboard.co/1oRW0GDUX.png
> http://pasteboard.co/1oSjdFpbc.png



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20251) Spark streaming skips batches in a case of failure

2017-04-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962313#comment-15962313
 ] 

Nan Zhu commented on SPARK-20251:
-

why this is an invalid report? I have been observing the same behavior recently 
when I upgrade to Spark 2.1 

The basic idea (in my side), an exception thrown from DStream.compute() method 
should close the app instead of proceeding (as the error handling in Spark 
Streaming is to release the await lock set in awaitTermination)

I am still looking at those threads within Spark Streaming to see what was 
happening, 

can we change it back to a valid case and give me more time to investigate?

> Spark streaming skips batches in a case of failure
> --
>
> Key: SPARK-20251
> URL: https://issues.apache.org/jira/browse/SPARK-20251
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Roman Studenikin
>
> We are experiencing strange behaviour of spark streaming application. 
> Sometimes it just skips batch in a case of job failure and starts working on 
> the next one.
> We expect it to attempt to reprocess batch, but not to skip it. Is it a bug 
> or we are missing any important configuration params?
> Screenshots from spark UI:
> http://pasteboard.co/1oRW0GDUX.png
> http://pasteboard.co/1oSjdFpbc.png



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19789) Add the shortcut of .format("parquet").option("path", "/hdfs/path").partitionBy("col1", "col2").start()

2017-03-12 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906782#comment-15906782
 ] 

Nan Zhu commented on SPARK-19789:
-

[~zsxwing] mind reviewing the PR?

> Add the shortcut of .format("parquet").option("path", 
> "/hdfs/path").partitionBy("col1", "col2").start()
> ---
>
> Key: SPARK-19789
> URL: https://issues.apache.org/jira/browse/SPARK-19789
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Nan Zhu
>
> as said in the title, everytime I use parquet, I need to type a long string 
> Being consistent with DataFrameReader/Writer, DataStreamReader, add parquet 
> shortcut for this case



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19789) Add the shortcut of .format("parquet").option("path", "/hdfs/path").partitionBy("col1", "col2").start()

2017-03-01 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-19789:
---

 Summary: Add the shortcut of .format("parquet").option("path", 
"/hdfs/path").partitionBy("col1", "col2").start()
 Key: SPARK-19789
 URL: https://issues.apache.org/jira/browse/SPARK-19789
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Nan Zhu


as said in the title, everytime I use parquet, I need to type a long string 

Being consistent with DataFrameReader/Writer, DataStreamReader, add parquet 
shortcut for this case




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19788) DataStreamReader/DataStreamWriter.option shall accept user-defined type

2017-03-01 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-19788:

Description: 
There are many other data sources/sinks which has very different configuration 
ways than Kafka, FileSystem, etc. 

The expected type of the configuration entry passed to them might be a nested 
collection type, e.g. Map[String, Map[String, String]], or even a user-defined 
type(for example, the one I am working on)

Right now, option can only accept String -> String/Boolean/Long/Double OR a 
complete Map[String, String]...my suggestion is that we can accept Map[String, 
Any], and the type of 'parameters' in SourceProvider.createSource can also be 
Map[String, Any], this will create much more flexibility to the user

The drawback is that, it is a breaking change ( we can mitigate this by 
deprecating the current one, and progressively evolve to the new one if the 
proposal is accepted)

[~zsxwing] what do you think?


  was:
There are many other data sources/sinks which has very different configuration 
ways than Kafka, FileSystem, etc. 

The expected type of the configuration entry passed to them might be a nested 
collection type, e.g. Map[String, Map[String, String]], or even a user-defined 
type(for example, the one I am working on)

Right now, option can only accept String -> String/Boolean/Long/Double OR a 
complete Map[String, String]...my suggestion is that we can accept Map[String, 
Any], and the type of 'parameters' in SourceProvider.createSource can also be 
Map[String, Any], this will create much more flexibility to the user

The drawback is that, it is a breaking change ( we can mitigate this by 
deprecate the current one, and progressively evolve to the new one if the 
proposal is accepted)

[~zsxwing] what do you think?



> DataStreamReader/DataStreamWriter.option shall accept user-defined type
> ---
>
> Key: SPARK-19788
> URL: https://issues.apache.org/jira/browse/SPARK-19788
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Nan Zhu
>
> There are many other data sources/sinks which has very different 
> configuration ways than Kafka, FileSystem, etc. 
> The expected type of the configuration entry passed to them might be a nested 
> collection type, e.g. Map[String, Map[String, String]], or even a 
> user-defined type(for example, the one I am working on)
> Right now, option can only accept String -> String/Boolean/Long/Double OR a 
> complete Map[String, String]...my suggestion is that we can accept 
> Map[String, Any], and the type of 'parameters' in SourceProvider.createSource 
> can also be Map[String, Any], this will create much more flexibility to the 
> user
> The drawback is that, it is a breaking change ( we can mitigate this by 
> deprecating the current one, and progressively evolve to the new one if the 
> proposal is accepted)
> [~zsxwing] what do you think?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19788) DataStreamReader/DataStreamWriter.option shall accept user-defined type

2017-03-01 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-19788:

Description: 
There are many other data sources/sinks which has very different configuration 
ways than Kafka, FileSystem, etc. 

The expected type of the configuration entry passed to them might be a nested 
collection type, e.g. Map[String, Map[String, String]], or even a user-defined 
type(for example, the one I am working on)

Right now, option can only accept String -> String/Boolean/Long/Double OR a 
complete Map[String, String]...my suggestion is that we can accept Map[String, 
Any], and the type of 'parameters' in SourceProvider.createSource can also be 
Map[String, Any], this will create much more flexibility to the user

The drawback is that, it is a breaking change ( we can mitigate this by 
deprecate the current one, and progressively evolve to the new one if the 
proposal is accepted)

[~zsxwing] what do you think?


  was:
There are many other data sources which has very different configuration ways 
than Kafka, FileSystem, etc. 

The expected type of the configuration entry passed to them might be a nested 
collection type, e.g. Map[String, Map[String, String]], or even a user-defined 
type(for example, the one I am working on)

Right now, option can only accept String -> String/Boolean/Long/Double OR a 
complete Map[String, String]...my suggestion is that we can accept Map[String, 
Any], and the type of 'parameters' in SourceProvider.createSource can also be 
Map[String, Any], this will create much more flexibility to the user

The drawback is that, it is a breaking change ( we can mitigate this by 
deprecate the current one, and progressively evolve to the new one if the 
proposal is accepted)

[~zsxwing] what do you think?



> DataStreamReader/DataStreamWriter.option shall accept user-defined type
> ---
>
> Key: SPARK-19788
> URL: https://issues.apache.org/jira/browse/SPARK-19788
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Nan Zhu
>
> There are many other data sources/sinks which has very different 
> configuration ways than Kafka, FileSystem, etc. 
> The expected type of the configuration entry passed to them might be a nested 
> collection type, e.g. Map[String, Map[String, String]], or even a 
> user-defined type(for example, the one I am working on)
> Right now, option can only accept String -> String/Boolean/Long/Double OR a 
> complete Map[String, String]...my suggestion is that we can accept 
> Map[String, Any], and the type of 'parameters' in SourceProvider.createSource 
> can also be Map[String, Any], this will create much more flexibility to the 
> user
> The drawback is that, it is a breaking change ( we can mitigate this by 
> deprecate the current one, and progressively evolve to the new one if the 
> proposal is accepted)
> [~zsxwing] what do you think?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19788) DataStreamReader/DataStreamWriter.option shall accept user-defined type

2017-03-01 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890522#comment-15890522
 ] 

Nan Zhu edited comment on SPARK-19788 at 3/1/17 4:45 PM:
-

another drawback is that it might look like incompatible with 
DataFrameReader/DataFrameWriter (we can also change that?)


was (Author: codingcat):
another drawback is that it might look like incompatible with DataFrameReader 
(we can also change that?)

> DataStreamReader/DataStreamWriter.option shall accept user-defined type
> ---
>
> Key: SPARK-19788
> URL: https://issues.apache.org/jira/browse/SPARK-19788
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Nan Zhu
>
> There are many other data sources/sinks which has very different 
> configuration ways than Kafka, FileSystem, etc. 
> The expected type of the configuration entry passed to them might be a nested 
> collection type, e.g. Map[String, Map[String, String]], or even a 
> user-defined type(for example, the one I am working on)
> Right now, option can only accept String -> String/Boolean/Long/Double OR a 
> complete Map[String, String]...my suggestion is that we can accept 
> Map[String, Any], and the type of 'parameters' in SourceProvider.createSource 
> can also be Map[String, Any], this will create much more flexibility to the 
> user
> The drawback is that, it is a breaking change ( we can mitigate this by 
> deprecate the current one, and progressively evolve to the new one if the 
> proposal is accepted)
> [~zsxwing] what do you think?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19788) DataStreamReader/DataStreamWriter.option shall accept user-defined type

2017-03-01 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-19788:

Summary: DataStreamReader/DataStreamWriter.option shall accept user-defined 
type  (was: DataStreamReader.option shall accept user-defined type)

> DataStreamReader/DataStreamWriter.option shall accept user-defined type
> ---
>
> Key: SPARK-19788
> URL: https://issues.apache.org/jira/browse/SPARK-19788
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Nan Zhu
>
> There are many other data sources which has very different configuration ways 
> than Kafka, FileSystem, etc. 
> The expected type of the configuration entry passed to them might be a nested 
> collection type, e.g. Map[String, Map[String, String]], or even a 
> user-defined type(for example, the one I am working on)
> Right now, option can only accept String -> String/Boolean/Long/Double OR a 
> complete Map[String, String]...my suggestion is that we can accept 
> Map[String, Any], and the type of 'parameters' in SourceProvider.createSource 
> can also be Map[String, Any], this will create much more flexibility to the 
> user
> The drawback is that, it is a breaking change ( we can mitigate this by 
> deprecate the current one, and progressively evolve to the new one if the 
> proposal is accepted)
> [~zsxwing] what do you think?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19788) DataStreamReader.option shall accept user-defined type

2017-03-01 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890522#comment-15890522
 ] 

Nan Zhu commented on SPARK-19788:
-

another drawback is that it might look like incompatible with DataFrameReader 
(we can also change that?)

> DataStreamReader.option shall accept user-defined type
> --
>
> Key: SPARK-19788
> URL: https://issues.apache.org/jira/browse/SPARK-19788
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Nan Zhu
>
> There are many other data sources which has very different configuration ways 
> than Kafka, FileSystem, etc. 
> The expected type of the configuration entry passed to them might be a nested 
> collection type, e.g. Map[String, Map[String, String]], or even a 
> user-defined type(for example, the one I am working on)
> Right now, option can only accept String -> String/Boolean/Long/Double OR a 
> complete Map[String, String]...my suggestion is that we can accept 
> Map[String, Any], and the type of 'parameters' in SourceProvider.createSource 
> can also be Map[String, Any], this will create much more flexibility to the 
> user
> The drawback is that, it is a breaking change ( we can mitigate this by 
> deprecate the current one, and progressively evolve to the new one if the 
> proposal is accepted)
> [~zsxwing] what do you think?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19788) DataStreamReader.option shall accept user-defined type

2017-03-01 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-19788:
---

 Summary: DataStreamReader.option shall accept user-defined type
 Key: SPARK-19788
 URL: https://issues.apache.org/jira/browse/SPARK-19788
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Nan Zhu


There are many other data sources which has very different configuration ways 
than Kafka, FileSystem, etc. 

The expected type of the configuration entry passed to them might be a nested 
collection type, e.g. Map[String, Map[String, String]], or even a user-defined 
type(for example, the one I am working on)

Right now, option can only accept String -> String/Boolean/Long/Double OR a 
complete Map[String, String]...my suggestion is that we can accept Map[String, 
Any], and the type of 'parameters' in SourceProvider.createSource can also be 
Map[String, Any], this will create much more flexibility to the user

The drawback is that, it is a breaking change ( we can mitigate this by 
deprecate the current one, and progressively evolve to the new one if the 
proposal is accepted)

[~zsxwing] what do you think?




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-02-27 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886999#comment-15886999
 ] 

Nan Zhu commented on SPARK-19280:
-

[~zsxwing] please let me know if we agree on that 2 is something we like

I plan to work on this 

> Failed Recovery from checkpoint caused by the multi-threads issue in Spark 
> Streaming scheduler
> --
>
> Key: SPARK-19280
> URL: https://issues.apache.org/jira/browse/SPARK-19280
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>Priority: Critical
>
> In one of our applications, we found the following issue, the application 
> recovering from a checkpoint file named "checkpoint-***16670" but with 
> the timestamp ***16650 will recover from the very beginning of the stream 
> and because our application relies on the external & periodically-cleaned 
> data (syncing with checkpoint cleanup), the recovery just failed
> We identified a potential issue in Spark Streaming checkpoint and will 
> describe it with the following example. We will propose a fix in the end of 
> this JIRA.
> 1. The application properties: Batch Duration: 2, Functionality: Single 
> Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
> SlideDuration, 2
> 2. RDD at 16650 is generated and the corresponding job is submitted to 
> the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the 
> queue of JobGenerator
> 3. Job at 16650 is finished and JobCompleted message is sent to 
> JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the 
> execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
> JobGenerator
> 4. JobScheduler's message processing thread (I will use JS-EventLoop to 
> identify it) is not scheduled by the operating system for a long time, and 
> during this period, Jobs generated from 16652 - 16670 are generated 
> and completed.
> 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
> and processed all DoCheckpoint messages for jobs ranging from 16652 - 
> 16670 and checkpoint files are successfully written. CRITICAL: at this 
> moment, the lastCheckpointTime would be 16670.
> 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
> ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is 
> pushed to JobGenerator's message queue for EACH JobCompleted.
> 7. The current message queue contains 20 ClearMetadata messages and 
> JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will 
> remove all RDDs out of rememberDuration window. In our case, 
> ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of 
> ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- 
> (16660, 16670] are kept. And ClearMetaData processing logic will push 
> a DoCheckpoint to JobGenerator's thread
> 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY 
> CRITICAL: at this step, RDD no later than 16660 has been removed, and 
> checkpoint data is updated as  
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
>  and 
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.
> 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with 
> the timestamp 16650. and at this moment, Application crashed
> 10. Application recovers from /path/checkpoint-16670 and try to get RDD 
> with validTime 16650. Of course it will not find it and has to recompute. 
> In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
> to the start of the stream. When the stream depends on the external data, it 
> will not successfully recover. In the case of Kafka, the recovered RDDs would 
> not be the same as the original one, as the currentOffsets has been updated 
> to the value at the moment of 16670
> The proposed fix:
> 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime 
> instead of using the timestamp of Checkpoint instance (any side-effect?)
> 1. ClearMetadata shall be ClearMedataAndCheckpoint 
> 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see 
> any necessary to have two threads here



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Updated] (SPARK-19499) Add more notes in the comments of Sink.addBatch()

2017-02-07 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-19499:

Description: 
addBatch method in Sink trait is supposed to be a synchronous method to 
coordinate with the fault-tolerance design in StreamingExecution (being 
different with the compute() method in DStream)

We need to add more notes in the comments of this method to remind the 
developers

  was:
addBatch method in Sink trait is supposed to be a synchronous method to 
coordinate with the fault-tolerance design in StreamingExecution (being 
different with the compute() method in DStream)

We need to add more notes in the method of this method to remind the developers


> Add more notes in the comments of Sink.addBatch() 
> --
>
> Key: SPARK-19499
> URL: https://issues.apache.org/jira/browse/SPARK-19499
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>Priority: Minor
>
> addBatch method in Sink trait is supposed to be a synchronous method to 
> coordinate with the fault-tolerance design in StreamingExecution (being 
> different with the compute() method in DStream)
> We need to add more notes in the comments of this method to remind the 
> developers



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19499) Add more notes in the comments of Sink.addBatch()

2017-02-07 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-19499:

Summary: Add more notes in the comments of Sink.addBatch()   (was: Add more 
description in the comments of Sink.addBatch() )

> Add more notes in the comments of Sink.addBatch() 
> --
>
> Key: SPARK-19499
> URL: https://issues.apache.org/jira/browse/SPARK-19499
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>Priority: Minor
>
> addBatch method in Sink trait is supposed to be a synchronous method to 
> coordinate with the fault-tolerance design in StreamingExecution (being 
> different with the compute() method in DStream)
> We need to add more notes in the method of this method to remind the 
> developers



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19499) Add more description in the comments of Sink.addBatch()

2017-02-07 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-19499:
---

 Summary: Add more description in the comments of Sink.addBatch() 
 Key: SPARK-19499
 URL: https://issues.apache.org/jira/browse/SPARK-19499
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0
Reporter: Nan Zhu
Priority: Minor


addBatch method in Sink trait is supposed to be a synchronous method to 
coordinate with the fault-tolerance design in StreamingExecution (being 
different with the compute() method in DStream)

We need to add more notes in the method of this method to remind the developers



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint

2017-02-03 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851787#comment-15851787
 ] 

Nan Zhu commented on SPARK-19233:
-

ping

> Inconsistent Behaviour of Spark Streaming Checkpoint
> 
>
> Key: SPARK-19233
> URL: https://issues.apache.org/jira/browse/SPARK-19233
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>
> When checking one of our application logs, we found the following behavior 
> (simplified)
> 1. Spark application recovers from checkpoint constructed at timestamp 1000ms
> 2. The log shows that Spark application can recover RDDs generated at 
> timestamp 2000, 3000
> The root cause is that generateJobs event is pushed to the queue by a 
> separate thread (RecurTimer), before doCheckpoint event is pushed to the 
> queue, there might have been multiple generatedJobs being processed. As a 
> result, when doCheckpoint for timestamp 1000 is processed, the generatedRDDs 
> data structure containing RDDs generated at 2000, 3000 is serialized as part 
> of checkpoint of 1000.
> It brings overhead for debugging and coordinate our offset management 
> strategy with Spark Streaming's checkpoint strategy when we are developing a 
> new type of DStream which integrates Spark Streaming with an internal message 
> middleware.
> The proposed fix is to filter generatedRDDs according to checkpoint timestamp 
> when serializing it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-02-03 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851786#comment-15851786
 ] 

Nan Zhu commented on SPARK-19280:
-

ping

> Failed Recovery from checkpoint caused by the multi-threads issue in Spark 
> Streaming scheduler
> --
>
> Key: SPARK-19280
> URL: https://issues.apache.org/jira/browse/SPARK-19280
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>Priority: Critical
>
> In one of our applications, we found the following issue, the application 
> recovering from a checkpoint file named "checkpoint-***16670" but with 
> the timestamp ***16650 will recover from the very beginning of the stream 
> and because our application relies on the external & periodically-cleaned 
> data (syncing with checkpoint cleanup), the recovery just failed
> We identified a potential issue in Spark Streaming checkpoint and will 
> describe it with the following example. We will propose a fix in the end of 
> this JIRA.
> 1. The application properties: Batch Duration: 2, Functionality: Single 
> Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
> SlideDuration, 2
> 2. RDD at 16650 is generated and the corresponding job is submitted to 
> the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the 
> queue of JobGenerator
> 3. Job at 16650 is finished and JobCompleted message is sent to 
> JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the 
> execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
> JobGenerator
> 4. JobScheduler's message processing thread (I will use JS-EventLoop to 
> identify it) is not scheduled by the operating system for a long time, and 
> during this period, Jobs generated from 16652 - 16670 are generated 
> and completed.
> 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
> and processed all DoCheckpoint messages for jobs ranging from 16652 - 
> 16670 and checkpoint files are successfully written. CRITICAL: at this 
> moment, the lastCheckpointTime would be 16670.
> 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
> ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is 
> pushed to JobGenerator's message queue for EACH JobCompleted.
> 7. The current message queue contains 20 ClearMetadata messages and 
> JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will 
> remove all RDDs out of rememberDuration window. In our case, 
> ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of 
> ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- 
> (16660, 16670] are kept. And ClearMetaData processing logic will push 
> a DoCheckpoint to JobGenerator's thread
> 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY 
> CRITICAL: at this step, RDD no later than 16660 has been removed, and 
> checkpoint data is updated as  
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
>  and 
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.
> 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with 
> the timestamp 16650. and at this moment, Application crashed
> 10. Application recovers from /path/checkpoint-16670 and try to get RDD 
> with validTime 16650. Of course it will not find it and has to recompute. 
> In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
> to the start of the stream. When the stream depends on the external data, it 
> will not successfully recover. In the case of Kafka, the recovered RDDs would 
> not be the same as the original one, as the currentOffsets has been updated 
> to the value at the moment of 16670
> The proposed fix:
> 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime 
> instead of using the timestamp of Checkpoint instance (any side-effect?)
> 1. ClearMetadata shall be ClearMedataAndCheckpoint 
> 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see 
> any necessary to have two threads here



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint

2017-01-29 Thread Nan Zhu (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Nan Zhu commented on  SPARK-19233 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Inconsistent Behaviour of Spark Streaming Checkpoint  
 
 
 
 
 
 
 
 
 
 
update? 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-01-29 Thread Nan Zhu (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Nan Zhu commented on  SPARK-19280 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler  
 
 
 
 
 
 
 
 
 
 
update? 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] [Created] (SPARK-19358) LiveListenerBus shall log the event name when dropping them due to a fully filled queue

2017-01-24 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-19358:
---

 Summary: LiveListenerBus shall log the event name when dropping 
them due to a fully filled queue
 Key: SPARK-19358
 URL: https://issues.apache.org/jira/browse/SPARK-19358
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3
Reporter: Nan Zhu
Priority: Trivial


Some dropped event will make the whole application behaves unexpectedly, e.g. 
some UI problem...we shall log the dropped event name to facilitate the 
debugging



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-01-20 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15831209#comment-15831209
 ] 

Nan Zhu edited comment on SPARK-19280 at 1/20/17 1:24 PM:
--

[~zsxwing] Thanks for reply

0) 

I do not think the content in checkpoint file (except that timestamp) has some 
problem. A checkpoint request for an earlier time is just for reducing the 
future checkpoint size (because a checkpoint request for an earlier timestamp 
must follow a ClearMetadata for the same timestamp). 

based on this, we should not recover from that "just-for-reduceing-size" 
moment, we shall recover from the latestCheckpointTime. I didn't see how 
recovering from this moment will lose data (I may miss something..)

1) can you elaborate it? my original idea is just to avoid a checkpoint with an 
earlier timestamp in the case of ClearMetadata (if I miss any data loss case in 
0, then forget about this one...)

2) long-term fix and brings benefits, including SPARK-19233, it is the second 
one caused by multi threads here. I am volunteering to work on it and will be 
happy if you or TD can be the Shepherd  


was (Author: codingcat):
[~zsxwing] Thanks for reply

0) 

I do not think the content in checkpoint file (except that timestamp) has some 
problem. A checkpoint request for an earlier time is just for reducing the 
future checkpoint size (because a checkpoint request for an earlier timestamp 
but follow a ClearMetadata for the same timestamp). 

based on this, we should not recover from that "just-for-reduceing-size" 
moment, we shall recover from the latestCheckpointTime. I didn't see how 
recovering from this moment will lose data (I may miss something..)

1) can you elaborate it? my original idea is just to avoid a checkpoint with an 
earlier timestamp (if I miss any data loss case in 0, then forget about this 
one...)

2) long-term fix and brings benefits, including SPARK-19233, it is the second 
one caused by multi threads here. I am volunteering to work on it and will be 
happy if you or TD can be the Shepherd  

> Failed Recovery from checkpoint caused by the multi-threads issue in Spark 
> Streaming scheduler
> --
>
> Key: SPARK-19280
> URL: https://issues.apache.org/jira/browse/SPARK-19280
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>Priority: Critical
>
> In one of our applications, we found the following issue, the application 
> recovering from a checkpoint file named "checkpoint-***16670" but with 
> the timestamp ***16650 will recover from the very beginning of the stream 
> and because our application relies on the external & periodically-cleaned 
> data (syncing with checkpoint cleanup), the recovery just failed
> We identified a potential issue in Spark Streaming checkpoint and will 
> describe it with the following example. We will propose a fix in the end of 
> this JIRA.
> 1. The application properties: Batch Duration: 2, Functionality: Single 
> Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
> SlideDuration, 2
> 2. RDD at 16650 is generated and the corresponding job is submitted to 
> the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the 
> queue of JobGenerator
> 3. Job at 16650 is finished and JobCompleted message is sent to 
> JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the 
> execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
> JobGenerator
> 4. JobScheduler's message processing thread (I will use JS-EventLoop to 
> identify it) is not scheduled by the operating system for a long time, and 
> during this period, Jobs generated from 16652 - 16670 are generated 
> and completed.
> 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
> and processed all DoCheckpoint messages for jobs ranging from 16652 - 
> 16670 and checkpoint files are successfully written. CRITICAL: at this 
> moment, the lastCheckpointTime would be 16670.
> 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
> ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is 
> pushed to JobGenerator's message queue for EACH JobCompleted.
> 7. The current message queue contains 20 ClearMetadata messages and 
> JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will 
> remove all RDDs out of rememberDuration window. In our case, 
> ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of 
> ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- 
> (16660, 16670] are kept. And ClearMetaData processing logic will push 
> a DoCheckpoint to 

[jira] [Commented] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-01-19 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15831217#comment-15831217
 ] 

Nan Zhu commented on SPARK-19280:
-

BTW, do I need to highlight the KafkaDStream issue as another JIRA, (which is 
taken as an example here)?

> Failed Recovery from checkpoint caused by the multi-threads issue in Spark 
> Streaming scheduler
> --
>
> Key: SPARK-19280
> URL: https://issues.apache.org/jira/browse/SPARK-19280
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>Priority: Critical
>
> In one of our applications, we found the following issue, the application 
> recovering from a checkpoint file named "checkpoint-***16670" but with 
> the timestamp ***16650 will recover from the very beginning of the stream 
> and because our application relies on the external & periodically-cleaned 
> data (syncing with checkpoint cleanup), the recovery just failed
> We identified a potential issue in Spark Streaming checkpoint and will 
> describe it with the following example. We will propose a fix in the end of 
> this JIRA.
> 1. The application properties: Batch Duration: 2, Functionality: Single 
> Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
> SlideDuration, 2
> 2. RDD at 16650 is generated and the corresponding job is submitted to 
> the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the 
> queue of JobGenerator
> 3. Job at 16650 is finished and JobCompleted message is sent to 
> JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the 
> execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
> JobGenerator
> 4. JobScheduler's message processing thread (I will use JS-EventLoop to 
> identify it) is not scheduled by the operating system for a long time, and 
> during this period, Jobs generated from 16652 - 16670 are generated 
> and completed.
> 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
> and processed all DoCheckpoint messages for jobs ranging from 16652 - 
> 16670 and checkpoint files are successfully written. CRITICAL: at this 
> moment, the lastCheckpointTime would be 16670.
> 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
> ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is 
> pushed to JobGenerator's message queue for EACH JobCompleted.
> 7. The current message queue contains 20 ClearMetadata messages and 
> JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will 
> remove all RDDs out of rememberDuration window. In our case, 
> ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of 
> ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- 
> (16660, 16670] are kept. And ClearMetaData processing logic will push 
> a DoCheckpoint to JobGenerator's thread
> 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY 
> CRITICAL: at this step, RDD no later than 16660 has been removed, and 
> checkpoint data is updated as  
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
>  and 
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.
> 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with 
> the timestamp 16650. and at this moment, Application crashed
> 10. Application recovers from /path/checkpoint-16670 and try to get RDD 
> with validTime 16650. Of course it will not find it and has to recompute. 
> In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
> to the start of the stream. When the stream depends on the external data, it 
> will not successfully recover. In the case of Kafka, the recovered RDDs would 
> not be the same as the original one, as the currentOffsets has been updated 
> to the value at the moment of 16670
> The proposed fix:
> 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime 
> instead of using the timestamp of Checkpoint instance (any side-effect?)
> 1. ClearMetadata shall be ClearMedataAndCheckpoint 
> 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see 
> any necessary to have two threads here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Commented] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-01-19 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15831209#comment-15831209
 ] 

Nan Zhu commented on SPARK-19280:
-

[~zsxwing] Thanks for reply

0) 

I do not think the content in checkpoint file (except that timestamp) has some 
problem. A checkpoint request for an earlier time is just for reducing the 
future checkpoint size (because a checkpoint request for an earlier timestamp 
but follow a ClearMetadata for the same timestamp). 

based on this, we should not recover from that "just-for-reduceing-size" 
moment, we shall recover from the latestCheckpointTime. I didn't see how 
recovering from this moment will lose data (I may miss something..)

1) can you elaborate it? my original idea is just to avoid a checkpoint with an 
earlier timestamp (if I miss any data loss case in 0, then forget about this 
one...)

2) long-term fix and brings benefits, including SPARK-19233, it is the second 
one caused by multi threads here. I am volunteering to work on it and will be 
happy if you or TD can be the Shepherd  

> Failed Recovery from checkpoint caused by the multi-threads issue in Spark 
> Streaming scheduler
> --
>
> Key: SPARK-19280
> URL: https://issues.apache.org/jira/browse/SPARK-19280
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>Priority: Critical
>
> In one of our applications, we found the following issue, the application 
> recovering from a checkpoint file named "checkpoint-***16670" but with 
> the timestamp ***16650 will recover from the very beginning of the stream 
> and because our application relies on the external & periodically-cleaned 
> data (syncing with checkpoint cleanup), the recovery just failed
> We identified a potential issue in Spark Streaming checkpoint and will 
> describe it with the following example. We will propose a fix in the end of 
> this JIRA.
> 1. The application properties: Batch Duration: 2, Functionality: Single 
> Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
> SlideDuration, 2
> 2. RDD at 16650 is generated and the corresponding job is submitted to 
> the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the 
> queue of JobGenerator
> 3. Job at 16650 is finished and JobCompleted message is sent to 
> JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the 
> execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
> JobGenerator
> 4. JobScheduler's message processing thread (I will use JS-EventLoop to 
> identify it) is not scheduled by the operating system for a long time, and 
> during this period, Jobs generated from 16652 - 16670 are generated 
> and completed.
> 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
> and processed all DoCheckpoint messages for jobs ranging from 16652 - 
> 16670 and checkpoint files are successfully written. CRITICAL: at this 
> moment, the lastCheckpointTime would be 16670.
> 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
> ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is 
> pushed to JobGenerator's message queue for EACH JobCompleted.
> 7. The current message queue contains 20 ClearMetadata messages and 
> JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will 
> remove all RDDs out of rememberDuration window. In our case, 
> ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of 
> ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- 
> (16660, 16670] are kept. And ClearMetaData processing logic will push 
> a DoCheckpoint to JobGenerator's thread
> 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY 
> CRITICAL: at this step, RDD no later than 16660 has been removed, and 
> checkpoint data is updated as  
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
>  and 
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.
> 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with 
> the timestamp 16650. and at this moment, Application crashed
> 10. Application recovers from /path/checkpoint-16670 and try to get RDD 
> with validTime 16650. Of course it will not find it and has to recompute. 
> In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
> to the start of the stream. When the stream depends on the 

[jira] [Updated] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-01-19 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-19280:

Description: 
In one of our applications, we found the following issue, the application 
recovering from a checkpoint file named "checkpoint-***16670" but with the 
timestamp ***16650 will recover from the very beginning of the stream and 
because our application relies on the external & periodically-cleaned data 
(syncing with checkpoint cleanup), the recovery just failed

We identified a potential issue in Spark Streaming checkpoint and will describe 
it with the following example. We will propose a fix in the end of this JIRA.

1. The application properties: Batch Duration: 2, Functionality: Single 
Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
SlideDuration, 2

2. RDD at 16650 is generated and the corresponding job is submitted to the 
execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of 
JobGenerator

3. Job at 16650 is finished and JobCompleted message is sent to 
JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the 
execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
JobGenerator

4. JobScheduler's message processing thread (I will use JS-EventLoop to 
identify it) is not scheduled by the operating system for a long time, and 
during this period, Jobs generated from 16652 - 16670 are generated and 
completed.

5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
and processed all DoCheckpoint messages for jobs ranging from 16652 - 
16670 and checkpoint files are successfully written. CRITICAL: at this 
moment, the lastCheckpointTime would be 16670.

6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is pushed 
to JobGenerator's message queue for EACH JobCompleted.

7. The current message queue contains 20 ClearMetadata messages and 
JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove 
all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will 
set rememberDuration to 10 (rememberDuration of ReducedWindowDStream 
(4) + windowSize) resulting that only RDDs <- (16660, 16670] are 
kept. And ClearMetaData processing logic will push a DoCheckpoint to 
JobGenerator's thread

8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY 
CRITICAL: at this step, RDD no later than 16660 has been removed, and 
checkpoint data is updated as  
https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
 and 
https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.

9. After 8, a checkpoint named /path/checkpoint-16670 is created but with 
the timestamp 16650. and at this moment, Application crashed

10. Application recovers from /path/checkpoint-16670 and try to get RDD 
with validTime 16650. Of course it will not find it and has to recompute. 
In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
to the start of the stream. When the stream depends on the external data, it 
will not successfully recover. In the case of Kafka, the recovered RDDs would 
not be the same as the original one, as the currentOffsets has been updated to 
the value at the moment of 16670


The proposed fix:

0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime 
instead of using the timestamp of Checkpoint instance (any side-effect?)
1. ClearMetadata shall be ClearMedataAndCheckpoint 
2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see 
any necessary to have two threads here


  was:
In one of our applications, we found the following issue, the application 
recovering from a checkpoint file named "checkpoint-***16670" but with the 
timestamp ***16650 will recover from the very beginning of the stream and 
because our application relies on the external & periodically-cleaned data 
(syncing with checkpoint cleanup), the recovery just failed

We identified a potential issue in Spark Streaming checkpoint and will describe 
it with the following example. We will propose a fix in the end of this JIRA.

1. The application properties: Batch Duration: 2, Functionality: Single 
Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
SlideDuration, 2

2. RDD at 16650 is generated and the corresponding job is submitted to the 
execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of 
JobGenerator

3. Job at 16650 is finished and JobCompleted message is 

[jira] [Commented] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint

2017-01-19 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15831098#comment-15831098
 ] 

Nan Zhu commented on SPARK-19233:
-

By filtering generatedRDDs, I may bring some confusion here, what I propose is 

1. Add a latestCheckpointTime in DStreamCheckpointData

2. when update() is called, only put RDDs with the timestamp <= 
latestCheckpointTime to currentCheckpointFiles 
(https://github.com/CodingCat/spark/blob/65623f4408ab6152719046c55093c70435da82c8/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53)

What do you think?


Regarding correctness, until so far, I haven't seen applications die because of 
this issue. But when I analyzed log with some simple script/re to see whether 
(some lines of) DStream.compute() is called, I was always confused...since I 
assume checkpoint-1000 will always call compute(time > 1000)  

> Inconsistent Behaviour of Spark Streaming Checkpoint
> 
>
> Key: SPARK-19233
> URL: https://issues.apache.org/jira/browse/SPARK-19233
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>
> When checking one of our application logs, we found the following behavior 
> (simplified)
> 1. Spark application recovers from checkpoint constructed at timestamp 1000ms
> 2. The log shows that Spark application can recover RDDs generated at 
> timestamp 2000, 3000
> The root cause is that generateJobs event is pushed to the queue by a 
> separate thread (RecurTimer), before doCheckpoint event is pushed to the 
> queue, there might have been multiple generatedJobs being processed. As a 
> result, when doCheckpoint for timestamp 1000 is processed, the generatedRDDs 
> data structure containing RDDs generated at 2000, 3000 is serialized as part 
> of checkpoint of 1000.
> It brings overhead for debugging and coordinate our offset management 
> strategy with Spark Streaming's checkpoint strategy when we are developing a 
> new type of DStream which integrates Spark Streaming with an internal message 
> middleware.
> The proposed fix is to filter generatedRDDs according to checkpoint timestamp 
> when serializing it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-01-18 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-19280:

Description: 
In one of our applications, we found the following issue, the application 
recovering from a checkpoint file named "checkpoint-***16670" but with the 
timestamp ***16650 will recover from the very beginning of the stream and 
because our application relies on the external & periodically-cleaned data 
(syncing with checkpoint cleanup), the recovery just failed

We identified a potential issue in Spark Streaming checkpoint and will describe 
it with the following example. We will propose a fix in the end of this JIRA.

1. The application properties: Batch Duration: 2, Functionality: Single 
Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
SlideDuration, 2

2. RDD at 16650 is generated and the corresponding job is submitted to the 
execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of 
JobGenerator

3. Job at 16650 is finished and JobCompleted message is sent to 
JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the 
execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
JobGenerator

4. JobScheduler's message processing thread (I will use JS-EventLoop to 
identify it) is not scheduled by the operating system for a long time, and 
during this period, Jobs generated from 16652 - 16670 are generated and 
completed.

5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
and processed all DoCheckpoint messages for jobs ranging from 16652 - 
16670 and checkpoint files are successfully written. CRITICAL: at this 
moment, the lastCheckpointTime would be 16670.

6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is pushed 
to JobGenerator's message queue for EACH JobCompleted.

7. The current message queue contains 20 ClearMetadata messages and 
JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove 
all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will 
set rememberDuration to 10 (rememberDuration of ReducedWindowDStream 
(4) + windowSize) resulting that only RDDs <- (16660, 16670] are 
kept. And ClearMetaData processing logic will push a DoCheckpoint to 
JobGenerator's thread

8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY 
CRITICAL: at this step, RDD no later than 16670 has been removed, and 
checkpoint data is updated as  
https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
 and 
https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.

9. After 8, a checkpoint named /path/checkpoint-16670 is created but with 
the timestamp 16650. and at this moment, Application crashed

10. Application recovers from /path/checkpoint-16670 and try to get RDD 
with validTime 16650. Of course it will not find it and has to recompute. 
In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
to the start of the stream. When the stream depends on the external data, it 
will not successfully recover. In the case of Kafka, the recovered RDDs would 
not be the same as the original one, as the currentOffsets has been updated to 
the value at the moment of 16670


The proposed fix:

0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime 
instead of using the timestamp of Checkpoint instance (any side-effect?)
1. ClearMetadata shall be ClearMedataAndCheckpoint 
2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see 
any necessary to have two threads here


  was:
In one of our applications, we found the following issue, the application 
recovering from a checkpoint file named "checkpoint-***16670" but with the 
timestamp ***16650 will recover from the very beginning of the stream and 
because our application relies on the external & periodically-cleaned data 
(syncing with checkpoint cleanup), the recovery just failed

We identified a potential issue in Spark Streaming checkpoint and will describe 
it with the following example. We will propose a fix in the end of this JIRA.

1. The application properties: Batch Duration: 2, Functionality: Single 
Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
SlideDuration, 2

2. RDD at 16650 is generated and the corresponding job is submitted to the 
execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of 
JobGenerator

3. Job at 16650 is finished and JobCompleted message is 

[jira] [Commented] (SPARK-19278) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-01-18 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15828621#comment-15828621
 ] 

Nan Zhu commented on SPARK-19278:
-

any one would help to close this one? as it is a duplication of 19280I 
don't know how it comes 

[~srowen] would you mind helping?

> Failed Recovery from checkpoint caused by the multi-threads issue in Spark 
> Streaming scheduler
> --
>
> Key: SPARK-19278
> URL: https://issues.apache.org/jira/browse/SPARK-19278
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>
> In one of our applications, we found the following issue, the application 
> recovering from a checkpoint file named "checkpoint-***16670" but with 
> the timestamp ***16650 will recover from the very beginning of the stream 
> and because our application relies on the external & periodically-cleaned 
> data (syncing with checkpoint cleanup), the recovery just failed
> We identified a potential issue in Spark Streaming checkpoint and will 
> describe it with the following example. We will propose a fix in the end of 
> this JIRA.
> 1. The application properties: Batch Duration: 2, Functionality: Single 
> Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
> SlideDuration, 2
> 2. RDD at 16650 is generated and the corresponding job is submitted to 
> the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the 
> queue of JobGenerator
> 3. Job at 16650 is finished and JobCompleted message is sent to 
> JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the 
> execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
> JobGenerator
> 4. JobScheduler's message processing thread (I will use JS-EventLoop to 
> identify it) is not scheduled by the operating system for a long time, and 
> during this period, Jobs generated from 16652 - 16670 are generated 
> and completed.
> 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
> and processed all DoCheckpoint messages for jobs ranging from 16652 - 
> 16670 and checkpoint files are successfully written. CRITICAL: at this 
> moment, the lastCheckpointTime would be 16670.
> 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
> ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is 
> pushed to JobGenerator's message queue for EACH JobCompleted.
> 7. The current message queue contains 20 ClearMetadata messages and 
> JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will 
> remove all RDDs out of rememberDuration window. In our case, 
> ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of 
> ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- 
> (16660, 16670] are kept. And ClearMetaData processing logic will push 
> a DoCheckpoint to JobGenerator's thread
> 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY 
> CRITICAL: at this step, RDD no later than 16670 has been removed, and 
> checkpoint data is updated as  
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
>  and 
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.
> 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with 
> the timestamp 16650. and at this moment, Application crashed
> 10. Application recovers from /path/checkpoint-16670 and try to get RDD 
> with validTime 16650. Of course it will not find it and has to recompute. 
> In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
> to the start of the stream. When the stream depends on the external data, it 
> will not successfully recover. In the case of Kafka, the recovered RDDs would 
> not be the same as the original one, as the currentOffsets has been updated 
> to the value at the moment of 16670
> The proposed fix:
> 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime 
> instead of using the timestamp of Checkpoint instance (any side-effect?)
> 1. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see 
> any necessary to have two threads here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-01-18 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15828602#comment-15828602
 ] 

Nan Zhu commented on SPARK-19280:
-

[~zsxwing] would you mind confirming about this? it would be great if we can 
discuss about it

> Failed Recovery from checkpoint caused by the multi-threads issue in Spark 
> Streaming scheduler
> --
>
> Key: SPARK-19280
> URL: https://issues.apache.org/jira/browse/SPARK-19280
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>
> In one of our applications, we found the following issue, the application 
> recovering from a checkpoint file named "checkpoint-***16670" but with 
> the timestamp ***16650 will recover from the very beginning of the stream 
> and because our application relies on the external & periodically-cleaned 
> data (syncing with checkpoint cleanup), the recovery just failed
> We identified a potential issue in Spark Streaming checkpoint and will 
> describe it with the following example. We will propose a fix in the end of 
> this JIRA.
> 1. The application properties: Batch Duration: 2, Functionality: Single 
> Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
> SlideDuration, 2
> 2. RDD at 16650 is generated and the corresponding job is submitted to 
> the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the 
> queue of JobGenerator
> 3. Job at 16650 is finished and JobCompleted message is sent to 
> JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the 
> execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
> JobGenerator
> 4. JobScheduler's message processing thread (I will use JS-EventLoop to 
> identify it) is not scheduled by the operating system for a long time, and 
> during this period, Jobs generated from 16652 - 16670 are generated 
> and completed.
> 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
> and processed all DoCheckpoint messages for jobs ranging from 16652 - 
> 16670 and checkpoint files are successfully written. CRITICAL: at this 
> moment, the lastCheckpointTime would be 16670.
> 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
> ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is 
> pushed to JobGenerator's message queue for EACH JobCompleted.
> 7. The current message queue contains 20 ClearMetadata messages and 
> JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will 
> remove all RDDs out of rememberDuration window. In our case, 
> ReduceyKeyAndWindow will set rememberDuration to 10 (rememberDuration of 
> ReducedWindowDStream (4) + windowSize) resulting that only RDDs <- 
> (16660, 16670] are kept. And ClearMetaData processing logic will push 
> a DoCheckpoint to JobGenerator's thread
> 8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY 
> CRITICAL: at this step, RDD no later than 16670 has been removed, and 
> checkpoint data is updated as  
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
>  and 
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.
> 9. After 8, a checkpoint named /path/checkpoint-16670 is created but with 
> the timestamp 16650. and at this moment, Application crashed
> 10. Application recovers from /path/checkpoint-16670 and try to get RDD 
> with validTime 16650. Of course it will not find it and has to recompute. 
> In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
> to the start of the stream. When the stream depends on the external data, it 
> will not successfully recover. In the case of Kafka, the recovered RDDs would 
> not be the same as the original one, as the currentOffsets has been updated 
> to the value at the moment of 16670
> The proposed fix:
> 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime 
> instead of using the timestamp of Checkpoint instance (any side-effect?)
> 1. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see 
> any necessary to have two threads here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19280) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-01-18 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-19280:
---

 Summary: Failed Recovery from checkpoint caused by the 
multi-threads issue in Spark Streaming scheduler
 Key: SPARK-19280
 URL: https://issues.apache.org/jira/browse/SPARK-19280
 Project: Spark
  Issue Type: Bug
Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3
Reporter: Nan Zhu


In one of our applications, we found the following issue, the application 
recovering from a checkpoint file named "checkpoint-***16670" but with the 
timestamp ***16650 will recover from the very beginning of the stream and 
because our application relies on the external & periodically-cleaned data 
(syncing with checkpoint cleanup), the recovery just failed

We identified a potential issue in Spark Streaming checkpoint and will describe 
it with the following example. We will propose a fix in the end of this JIRA.

1. The application properties: Batch Duration: 2, Functionality: Single 
Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
SlideDuration, 2

2. RDD at 16650 is generated and the corresponding job is submitted to the 
execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of 
JobGenerator

3. Job at 16650 is finished and JobCompleted message is sent to 
JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the 
execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
JobGenerator

4. JobScheduler's message processing thread (I will use JS-EventLoop to 
identify it) is not scheduled by the operating system for a long time, and 
during this period, Jobs generated from 16652 - 16670 are generated and 
completed.

5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
and processed all DoCheckpoint messages for jobs ranging from 16652 - 
16670 and checkpoint files are successfully written. CRITICAL: at this 
moment, the lastCheckpointTime would be 16670.

6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is pushed 
to JobGenerator's message queue for EACH JobCompleted.

7. The current message queue contains 20 ClearMetadata messages and 
JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove 
all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will 
set rememberDuration to 10 (rememberDuration of ReducedWindowDStream 
(4) + windowSize) resulting that only RDDs <- (16660, 16670] are 
kept. And ClearMetaData processing logic will push a DoCheckpoint to 
JobGenerator's thread

8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY 
CRITICAL: at this step, RDD no later than 16670 has been removed, and 
checkpoint data is updated as  
https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
 and 
https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.

9. After 8, a checkpoint named /path/checkpoint-16670 is created but with 
the timestamp 16650. and at this moment, Application crashed

10. Application recovers from /path/checkpoint-16670 and try to get RDD 
with validTime 16650. Of course it will not find it and has to recompute. 
In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
to the start of the stream. When the stream depends on the external data, it 
will not successfully recover. In the case of Kafka, the recovered RDDs would 
not be the same as the original one, as the currentOffsets has been updated to 
the value at the moment of 16670


The proposed fix:

0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime 
instead of using the timestamp of Checkpoint instance (any side-effect?)
1. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see 
any necessary to have two threads here




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19278) Failed Recovery from checkpoint caused by the multi-threads issue in Spark Streaming scheduler

2017-01-18 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-19278:
---

 Summary: Failed Recovery from checkpoint caused by the 
multi-threads issue in Spark Streaming scheduler
 Key: SPARK-19278
 URL: https://issues.apache.org/jira/browse/SPARK-19278
 Project: Spark
  Issue Type: Bug
Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3
Reporter: Nan Zhu


In one of our applications, we found the following issue, the application 
recovering from a checkpoint file named "checkpoint-***16670" but with the 
timestamp ***16650 will recover from the very beginning of the stream and 
because our application relies on the external & periodically-cleaned data 
(syncing with checkpoint cleanup), the recovery just failed

We identified a potential issue in Spark Streaming checkpoint and will describe 
it with the following example. We will propose a fix in the end of this JIRA.

1. The application properties: Batch Duration: 2, Functionality: Single 
Stream calling ReduceByKeyAndWindow and print, Window Size: 6, 
SlideDuration, 2

2. RDD at 16650 is generated and the corresponding job is submitted to the 
execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of 
JobGenerator

3. Job at 16650 is finished and JobCompleted message is sent to 
JobScheduler's queue, and meanwhile, Job at 16652 is submitted to the 
execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
JobGenerator

4. JobScheduler's message processing thread (I will use JS-EventLoop to 
identify it) is not scheduled by the operating system for a long time, and 
during this period, Jobs generated from 16652 - 16670 are generated and 
completed.

5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
and processed all DoCheckpoint messages for jobs ranging from 16652 - 
16670 and checkpoint files are successfully written. CRITICAL: at this 
moment, the lastCheckpointTime would be 16670.

6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
ranging from 16652 - 16670. CRITICAL: a ClearMetadata message is pushed 
to JobGenerator's message queue for EACH JobCompleted.

7. The current message queue contains 20 ClearMetadata messages and 
JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove 
all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will 
set rememberDuration to 10 (rememberDuration of ReducedWindowDStream 
(4) + windowSize) resulting that only RDDs <- (16660, 16670] are 
kept. And ClearMetaData processing logic will push a DoCheckpoint to 
JobGenerator's thread

8. JG-EventLoop is scheduled again to process DoCheckpoint for 1665, VERY 
CRITICAL: at this step, RDD no later than 16670 has been removed, and 
checkpoint data is updated as  
https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
 and 
https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.

9. After 8, a checkpoint named /path/checkpoint-16670 is created but with 
the timestamp 16650. and at this moment, Application crashed

10. Application recovers from /path/checkpoint-16670 and try to get RDD 
with validTime 16650. Of course it will not find it and has to recompute. 
In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
to the start of the stream. When the stream depends on the external data, it 
will not successfully recover. In the case of Kafka, the recovered RDDs would 
not be the same as the original one, as the currentOffsets has been updated to 
the value at the moment of 16670


The proposed fix:

0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime 
instead of using the timestamp of Checkpoint instance (any side-effect?)
1. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see 
any necessary to have two threads here




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint

2017-01-15 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823364#comment-15823364
 ] 

Nan Zhu commented on SPARK-19233:
-

[~zsxwing] so, another potential issue I found in Spark Streaming recently, if 
you agree on this...I will file a PR

> Inconsistent Behaviour of Spark Streaming Checkpoint
> 
>
> Key: SPARK-19233
> URL: https://issues.apache.org/jira/browse/SPARK-19233
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>
> When checking one of our application logs, we found the following behavior 
> (simplified)
> 1. Spark application recovers from checkpoint constructed at timestamp 1000ms
> 2. The log shows that Spark application can recover RDDs generated at 
> timestamp 2000, 3000
> The root cause is that generateJobs event is pushed to the queue by a 
> separate thread (RecurTimer), before doCheckpoint event is pushed to the 
> queue, there might have been multiple generatedJobs being processed. As a 
> result, when doCheckpoint for timestamp 1000 is processed, the generatedRDDs 
> data structure containing RDDs generated at 2000, 3000 is serialized as part 
> of checkpoint of 1000.
> It brings overhead for debugging and coordinate our offset management 
> strategy with Spark Streaming's checkpoint strategy when we are developing a 
> new type of DStream which integrates Spark Streaming with an internal message 
> middleware.
> The proposed fix is to filter generatedRDDs according to checkpoint timestamp 
> when serializing it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint

2017-01-15 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823359#comment-15823359
 ] 

Nan Zhu commented on SPARK-19233:
-

The category of this issue is Improvement which is subject to be revised

> Inconsistent Behaviour of Spark Streaming Checkpoint
> 
>
> Key: SPARK-19233
> URL: https://issues.apache.org/jira/browse/SPARK-19233
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Nan Zhu
>
> When checking one of our application logs, we found the following behavior 
> (simplified)
> 1. Spark application recovers from checkpoint constructed at timestamp 1000ms
> 2. The log shows that Spark application can recover RDDs generated at 
> timestamp 2000, 3000
> The root cause is that generateJobs event is pushed to the queue by a 
> separate thread (RecurTimer), before doCheckpoint event is pushed to the 
> queue, there might have been multiple generatedJobs being processed. As a 
> result, when doCheckpoint for timestamp 1000 is processed, the generatedRDDs 
> data structure containing RDDs generated at 2000, 3000 is serialized as part 
> of checkpoint of 1000.
> It brings overhead for debugging and coordinate our offset management 
> strategy with Spark Streaming's checkpoint strategy when we are developing a 
> new type of DStream which integrates Spark Streaming with an internal message 
> middleware.
> The proposed fix is to filter generatedRDDs according to checkpoint timestamp 
> when serializing it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19233) Inconsistent Behaviour of Spark Streaming Checkpoint

2017-01-15 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-19233:
---

 Summary: Inconsistent Behaviour of Spark Streaming Checkpoint
 Key: SPARK-19233
 URL: https://issues.apache.org/jira/browse/SPARK-19233
 Project: Spark
  Issue Type: Improvement
  Components: DStreams
Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0
Reporter: Nan Zhu


When checking one of our application logs, we found the following behavior 
(simplified)

1. Spark application recovers from checkpoint constructed at timestamp 1000ms

2. The log shows that Spark application can recover RDDs generated at timestamp 
2000, 3000

The root cause is that generateJobs event is pushed to the queue by a separate 
thread (RecurTimer), before doCheckpoint event is pushed to the queue, there 
might have been multiple generatedJobs being processed. As a result, when 
doCheckpoint for timestamp 1000 is processed, the generatedRDDs data structure 
containing RDDs generated at 2000, 3000 is serialized as part of checkpoint of 
1000.

It brings overhead for debugging and coordinate our offset management strategy 
with Spark Streaming's checkpoint strategy when we are developing a new type of 
DStream which integrates Spark Streaming with an internal message middleware.

The proposed fix is to filter generatedRDDs according to checkpoint timestamp 
when serializing it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813632#comment-15813632
 ] 

Nan Zhu commented on SPARK-18905:
-

[~zsxwing] If you agree on the conclusion above, I will file a PR

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813560#comment-15813560
 ] 

Nan Zhu commented on SPARK-18905:
-

eat my words...

when we have queued up batches, we do need pendingTime, 

and yes, the original description in the JIRA still holds, 

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813459#comment-15813459
 ] 

Nan Zhu edited comment on SPARK-18905 at 1/10/17 1:16 AM:
--

yeah, but the downTime includes all batches from "checkpoint time" to "restart 
time"

the jobs that have been generated but not completed shall be the first batch in 
downTime...no?


was (Author: codingcat):
yeah, but the downTime including all batches from "checkpoint time" to "restart 
time"

the jobs that have been generated but not completed shall be the first batch in 
downTime...no?

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813459#comment-15813459
 ] 

Nan Zhu commented on SPARK-18905:
-

yeah, but the downTime including all batches from "checkpoint time" to "restart 
time"

the jobs that have been generated but not completed shall be the first batch in 
downTime...no?

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813434#comment-15813434
 ] 

Nan Zhu edited comment on SPARK-18905 at 1/10/17 1:05 AM:
--

Hi, [~zsxwing]

Thanks for the reply, 

After testing in our environment for more times, I feel that this is not a 
problem anymore. The failed job would be recovered 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L216,
 as counted in the downTime

The question right now is, why we need to have pendingTime + downTime in the 
above method, 


was (Author: codingcat):
Hi, [~zsxwing]

Thanks for the reply, 

After testing in our environment for more times, I feel that this is not a 
problem anymore. The failed job would be recovered 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L216,
 as the downTime

The question right now is, why we need to have pendingTime + downTime in the 
above method, 

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813434#comment-15813434
 ] 

Nan Zhu commented on SPARK-18905:
-

Hi, [~zsxwing]

Thanks for the reply, 

After testing in our environment for more times, I feel that this is not a 
problem anymore. The failed job would be recovered 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L216,
 as the downTime

The question right now is, why we need to have pendingTime + downTime in the 
above method, 

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2016-12-16 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-18905:

Description: 
the current implementation of Spark streaming considers a batch is completed no 
matter the results of the jobs 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)

Let's consider the following case:

A micro batch contains 2 jobs and they read from two different kafka topics 
respectively. One of these jobs is failed due to some problem in the user 
defined logic, after the other one is finished successfully. 

1. The main thread in the Spark streaming application will execute the line 
mentioned above, 

2. and another thread (checkpoint writer) will make a checkpoint file 
immediately after this line is executed. 

3. Then due to the current error handling mechanism in Spark Streaming, 
StreamingContext will be closed 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)

the user recovers from the checkpoint file, and because the JobSet containing 
the failed job has been removed (taken as completed) before the checkpoint is 
constructed, the data being processed by the failed job would never be 
reprocessed?


I might have missed something in the checkpoint thread or this 
handleJobCompletion()or it is a potential bug 

  was:
the current implementation of Spark streaming considers a batch is completed no 
matter the results of the jobs 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)

Let's consider the following case:

A micro batch contains 2 jobs and they read from two different kafka topics 
respectively. One of these jobs is failed due to some problem in the user 
defined logic. 

1. The main thread in the Spark streaming application will execute the line 
mentioned above, 

2. and another thread (checkpoint writer) will make a checkpoint file 
immediately after this line is executed. 

3. Then due to the current error handling mechanism in Spark Streaming, 
StreamingContext will be closed 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)

the user recovers from the checkpoint file, and because the JobSet containing 
the failed job has been removed (taken as completed) before the checkpoint is 
constructed, the data being processed by the failed job would never be 
reprocessed?


I might have missed something in the checkpoint thread or this 
handleJobCompletion()or it is a potential bug 


> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2016-12-16 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-18905:

Description: 
the current implementation of Spark streaming considers a batch is completed no 
matter the results of the jobs 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)

Let's consider the following case:

A micro batch contains 2 jobs and they read from two different kafka topics 
respectively. One of these jobs is failed due to some problem in the user 
defined logic. 

1. The main thread in the Spark streaming application will execute the line 
mentioned above, 

2. and another thread (checkpoint writer) will make a checkpoint file 
immediately after this line is executed. 

3. Then due to the current error handling mechanism in Spark Streaming, 
StreamingContext will be closed 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)

the user recovers from the checkpoint file, and because the JobSet containing 
the failed job has been removed (taken as completed) before the checkpoint is 
constructed, the data being processed by the failed job would never be 
reprocessed?


I might have missed something in the checkpoint thread or this 
handleJobCompletion()or it is a potential bug 

  was:
the current implementation of Spark streaming considers a batch is completed no 
matter the results of the jobs 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)

Let's consider the following case:

A micro batch contains 2 jobs and they read from two different kafka topics 
respectively. One of this job is failed due to some problem in the user defined 
logic. 

1. The main thread in the Spark streaming application will execute the line 
mentioned above, 

2. and another thread (checkpoint writer) will make a checkpoint file 
immediately after this line is executed. 

3. Then due to the current error handling mechanism in Spark Streaming, 
StreamingContext will be closed 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)

the user recovers from the checkpoint file, and because the JobSet containing 
the failed job has been removed (taken as completed) before the checkpoint is 
constructed, the data being processed by the failed job would never be 
reprocessed?


I might have missed something in the checkpoint thread or this 
handleJobCompletion()or it is a potential bug 


> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2016-12-16 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-18905:

Description: 
the current implementation of Spark streaming considers a batch is completed no 
matter the results of the jobs 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)

Let's consider the following case:

A micro batch contains 2 jobs and they read from two different kafka topics 
respectively. One of this job is failed due to some problem in the user defined 
logic. 

1. The main thread in the Spark streaming application will execute the line 
mentioned above, 

2. and another thread (checkpoint writer) will make a checkpoint file 
immediately after this line is executed. 

3. Then due to the current error handling mechanism in Spark Streaming, 
StreamingContext will be closed 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)

the user recovers from the checkpoint file, and because the JobSet containing 
the failed job has been removed (taken as completed) before the checkpoint is 
constructed, the data being processed by the failed job would never be 
reprocessed?


I might have missed something in the checkpoint thread or this 
handleJobCompletion()or it is a potential bug 

  was:
the current implementation of Spark streaming considers a batch is completed no 
matter the result of the jobs 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)

Let's consider the following case:

A micro batch contains 2 jobs and they read from two different kafka topics 
respectively. One of this job is failed due to some problem in the user defined 
logic. 

1. The main thread in the Spark streaming application will execute the line 
mentioned above, 

2. and another thread (checkpoint writer) will make a checkpoint file 
immediately after this line is executed. 

3. Then due to the current error handling mechanism in Spark Streaming, 
StreamingContext will be closed 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)

the user recovers from the checkpoint file, and because the JobSet containing 
the failed job has been removed (taken as completed) before the checkpoint is 
constructed, the data being processed by the failed job would never be 
reprocessed?


I might have missed something in the checkpoint thread or this 
handleJobCompletion()or it is a potential bug 


> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of this job is failed due to some problem in the user 
> defined logic. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2016-12-16 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-18905:
---

 Summary: Potential Issue of Semantics of BatchCompleted
 Key: SPARK-18905
 URL: https://issues.apache.org/jira/browse/SPARK-18905
 Project: Spark
  Issue Type: Bug
  Components: DStreams
Affects Versions: 2.0.2, 2.0.1, 2.0.0
Reporter: Nan Zhu


the current implementation of Spark streaming considers a batch is completed no 
matter the result of the jobs 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)

Let's consider the following case:

A micro batch contains 2 jobs and they read from two different kafka topics 
respectively. One of this job is failed due to some problem in the user defined 
logic. 

1. The main thread in the Spark streaming application will execute the line 
mentioned above, 

2. and another thread (checkpoint writer) will make a checkpoint file 
immediately after this line is executed. 

3. Then due to the current error handling mechanism in Spark Streaming, 
StreamingContext will be closed 
(https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)

the user recovers from the checkpoint file, and because the JobSet containing 
the failed job has been removed (taken as completed) before the checkpoint is 
constructed, the data being processed by the failed job would never be 
reprocessed?


I might have missed something in the checkpoint thread or this 
handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17347) Encoder in Dataset example has incorrect type

2016-08-31 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-17347:

Summary: Encoder in Dataset example has incorrect type  (was: Encoder in 
Dataset example is incorrect on type)

> Encoder in Dataset example has incorrect type
> -
>
> Key: SPARK-17347
> URL: https://issues.apache.org/jira/browse/SPARK-17347
> Project: Spark
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 2.0.0
>Reporter: Nan Zhu
>Priority: Minor
>
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala#L206
> The type of this alternative encoder shall be Encoder[Map[String, Any]] 
> instead of Encoder[Map[String, Int]]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17347) Encoder in Dataset example is incorrect on type

2016-08-31 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-17347:
---

 Summary: Encoder in Dataset example is incorrect on type
 Key: SPARK-17347
 URL: https://issues.apache.org/jira/browse/SPARK-17347
 Project: Spark
  Issue Type: Bug
  Components: Examples
Affects Versions: 2.0.0
Reporter: Nan Zhu
Priority: Minor


https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala#L206

The type of this alternative encoder shall be Encoder[Map[String, Any]] instead 
of Encoder[Map[String, Int]]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....

2016-03-29 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu closed SPARK-14247.
---
Resolution: Not A Problem

> Spark does not compile with CDH-5.4.x due to the possible bug of ivy.
> -
>
> Key: SPARK-14247
> URL: https://issues.apache.org/jira/browse/SPARK-14247
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.6.0
>Reporter: Nan Zhu
>Priority: Minor
>
> I recently tried to compile Spark with CDH 5.4.x by the following command:
> sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly
> I cannot finish the building due to an error saying that [error] impossible 
> to get artifacts when data has not been loaded. IvyNode = 
> org.slf4j#slf4j-api;1.6.1
> It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x 
> long time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly 
> (see the related discussion in https://github.com/sbt/sbt/issues/1598)
> I currently work around this by downgrade to slf4j 1.6.1
> What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with 
> 1.5.0, I can successfully compile Spark with the same version of CDH
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....

2016-03-29 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15216719#comment-15216719
 ] 

Nan Zhu edited comment on SPARK-14247 at 3/29/16 7:39 PM:
--

thanks [~sowen], it seems that change the hadoop.version name solves the 
problem 




was (Author: codingcat):
thanks [~sowen], it seems that change the hadoop.version name solves the 
problem 

but...

the command "sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.5 -DskipTests clean 
assembly"

gives me an assembly jar with the suffix 2.2.0 (testing in trunk)?

/Users/nanzhu/code/spark/assembly/target/scala-2.11/spark-assembly-2.0.0-SNAPSHOT-hadoop2.2.0.jar

seems it involves the issue belonging to another JIRA?


> Spark does not compile with CDH-5.4.x due to the possible bug of ivy.
> -
>
> Key: SPARK-14247
> URL: https://issues.apache.org/jira/browse/SPARK-14247
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.6.0
>Reporter: Nan Zhu
>Priority: Minor
>
> I recently tried to compile Spark with CDH 5.4.x by the following command:
> sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly
> I cannot finish the building due to an error saying that [error] impossible 
> to get artifacts when data has not been loaded. IvyNode = 
> org.slf4j#slf4j-api;1.6.1
> It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x 
> long time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly 
> (see the related discussion in https://github.com/sbt/sbt/issues/1598)
> I currently work around this by downgrade to slf4j 1.6.1
> What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with 
> 1.5.0, I can successfully compile Spark with the same version of CDH
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....

2016-03-29 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15216719#comment-15216719
 ] 

Nan Zhu commented on SPARK-14247:
-

thanks [~sowen], it seems that change the hadoop.version name solves the 
problem 

but...

the command "sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.5 -DskipTests clean 
assembly"

gives me an assembly jar with the suffix 2.2.0 (testing in trunk)?

/Users/nanzhu/code/spark/assembly/target/scala-2.11/spark-assembly-2.0.0-SNAPSHOT-hadoop2.2.0.jar

seems it involves the issue belonging to another JIRA?


> Spark does not compile with CDH-5.4.x due to the possible bug of ivy.
> -
>
> Key: SPARK-14247
> URL: https://issues.apache.org/jira/browse/SPARK-14247
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.6.0
>Reporter: Nan Zhu
>Priority: Minor
>
> I recently tried to compile Spark with CDH 5.4.x by the following command:
> sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly
> I cannot finish the building due to an error saying that [error] impossible 
> to get artifacts when data has not been loaded. IvyNode = 
> org.slf4j#slf4j-api;1.6.1
> It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x 
> long time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly 
> (see the related discussion in https://github.com/sbt/sbt/issues/1598)
> I currently work around this by downgrade to slf4j 1.6.1
> What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with 
> 1.5.0, I can successfully compile Spark with the same version of CDH
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....

2016-03-29 Thread Nan Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nan Zhu updated SPARK-14247:

Priority: Minor  (was: Major)

> Spark does not compile with CDH-5.4.x due to the possible bug of ivy.
> -
>
> Key: SPARK-14247
> URL: https://issues.apache.org/jira/browse/SPARK-14247
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.6.0
>Reporter: Nan Zhu
>Priority: Minor
>
> I recently tried to compile Spark with CDH 5.4.x by the following command:
> sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly
> I cannot finish the building due to an error saying that [error] impossible 
> to get artifacts when data has not been loaded. IvyNode = 
> org.slf4j#slf4j-api;1.6.1
> It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x 
> long time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly 
> (see the related discussion in https://github.com/sbt/sbt/issues/1598)
> I currently work around this by downgrade to slf4j 1.6.1
> What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with 
> 1.5.0, I can successfully compile Spark with the same version of CDH
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....

2016-03-29 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15216661#comment-15216661
 ] 

Nan Zhu commented on SPARK-14247:
-

[~srowen] I always blindly copied "CDH.*" string from Spark building doc since 
I only use HDFS/HBase and Cloud manager contained in CDH...I think it is just 
fine...and I have been working with it since 1.5 (but what's the name of the 
normal packages? 
https://repository.cloudera.com/artifactory/public/org/apache/hadoop/hadoop-core/
 )

yeah, it might be the problem of the compilation tools, but I'm not 100% sure 
about itmaybe someone who is more familiar about how maven/sbt decides 
which package to be evicted can give some explanations and solutions

I do not mind closing itjust keep a record here in case someone has the 
similar problem and they know what happened







> Spark does not compile with CDH-5.4.x due to the possible bug of ivy.
> -
>
> Key: SPARK-14247
> URL: https://issues.apache.org/jira/browse/SPARK-14247
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.6.0
>Reporter: Nan Zhu
>
> I recently tried to compile Spark with CDH 5.4.x by the following command:
> sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly
> I cannot finish the building due to an error saying that [error] impossible 
> to get artifacts when data has not been loaded. IvyNode = 
> org.slf4j#slf4j-api;1.6.1
> It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x 
> long time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly 
> (see the related discussion in https://github.com/sbt/sbt/issues/1598)
> I currently work around this by downgrade to slf4j 1.6.1
> What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with 
> 1.5.0, I can successfully compile Spark with the same version of CDH
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-14247) Spark does not compile with CDH-5.4.x due to the possible bug of ivy.....

2016-03-29 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-14247:
---

 Summary: Spark does not compile with CDH-5.4.x due to the possible 
bug of ivy.
 Key: SPARK-14247
 URL: https://issues.apache.org/jira/browse/SPARK-14247
 Project: Spark
  Issue Type: Bug
  Components: Build
Affects Versions: 1.6.0
Reporter: Nan Zhu


I recently tried to compile Spark with CDH 5.4.x by the following command:

sbt -Phadoop-2.6 -Dhadoop.version=2.6.0-mr1-cdh5.4.5 -DskipTests assembly

I cannot finish the building due to an error saying that [error] impossible to 
get artifacts when data has not been loaded. IvyNode = org.slf4j#slf4j-api;1.6.1

It seems that CDH depends on slf4j 1.6.x while Spark has upgraded to 1.7.x long 
time ago and during the compilation, slf4j 1.6.x is evicted unexpectedly (see 
the related discussion in https://github.com/sbt/sbt/issues/1598)

I currently work around this by downgrade to slf4j 1.6.1

What surprises me is that I was upgrading from Spark 1.5.0 to 1.6.x, with 
1.5.0, I can successfully compile Spark with the same version of CDH

 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8547) xgboost exploration

2016-03-15 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15195682#comment-15195682
 ] 

Nan Zhu commented on SPARK-8547:


FYI, we released a solution to integrate XGBoost with Spark directly 

http://dmlc.ml/2016/03/14/xgboost4j-portable-distributed-xgboost-in-spark-flink-and-dataflow.html

> xgboost exploration
> ---
>
> Key: SPARK-8547
> URL: https://issues.apache.org/jira/browse/SPARK-8547
> Project: Spark
>  Issue Type: New Feature
>  Components: ML, MLlib
>Reporter: Joseph K. Bradley
>
> There has been quite a bit of excitement around xgboost: 
> [https://github.com/dmlc/xgboost]
> It improves the parallelism of boosting by mixing boosting and bagging (where 
> bagging makes the algorithm more parallel).
> It would worth exploring implementing this within MLlib (probably as a new 
> algorithm).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13868) Random forest accuracy exploration

2016-03-15 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15195686#comment-15195686
 ] 

Nan Zhu commented on SPARK-13868:
-

FYI, we released a solution to integrate XGBoost with Spark directly 

http://dmlc.ml/2016/03/14/xgboost4j-portable-distributed-xgboost-in-spark-flink-and-dataflow.html

> Random forest accuracy exploration
> --
>
> Key: SPARK-13868
> URL: https://issues.apache.org/jira/browse/SPARK-13868
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Reporter: Joseph K. Bradley
>
> This is a JIRA for exploring accuracy improvements for Random Forests.
> h2. Background
> Initial exploration was based on reports of poor accuracy from 
> [http://datascience.la/benchmarking-random-forest-implementations/]
> Essentially, Spark 1.2 showed poor performance relative to other libraries 
> for training set sizes of 1M and 10M.
> h3.  Initial improvements
> The biggest issue was that the metric being used was AUC and Spark 1.2 was 
> using hard predictions, not class probabilities.  This was fixed in 
> [SPARK-9528], and that brought Spark up to performance parity with 
> scikit-learn, Vowpal Wabbit, and R for the training set size of 1M.
> h3.  Remaining issues
> For training set size 10M, Spark does not yet match the AUC of the other 2 
> libraries benchmarked (H2O and xgboost).
> Note that, on 1M instances, these 2 libraries also show better results than 
> scikit-learn, VW, and R.  I'm not too familiar with the H2O implementation 
> and how it differs, but xgboost is a very different algorithm, so it's not 
> surprising it has different behavior.
> h2. My explorations
> I've run Spark on the test set of 10M instances.  (Note that the benchmark 
> linked above used somewhat different settings for the different algorithms, 
> but those settings are actually not that important for this problem.  This 
> included gini vs. entropy impurity and limits on splitting nodes.)
> I've tried adjusting:
> * maxDepth: Past depth 20, going deeper does not seem to matter
> * maxBins: I've gone up to 500, but this too does not seem to matter.  
> However, this is a hard thing to verify since slight differences in 
> discretization could become significant in a large tree.
> h2. Current questions
> * H2O: It would be good to understand how this implementation differs from 
> standard RF implementations (in R, VW, scikit-learn, and Spark).
> * xgboost: There's a JIRA for it: [SPARK-8547].  It would be great to see the 
> Spark package linked from that JIRA tested vs. MLlib on the benchmark data 
> (or other data).  From what I've heard/read, xgboost is sometimes better, 
> sometimes worse in accuracy (but of course faster with more localized 
> training).
> * Based on the above explorations, are there changes we should make to Spark 
> RFs?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13227) Risky apply() in OpenHashMap

2016-02-06 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-13227:
---

 Summary: Risky apply() in OpenHashMap
 Key: SPARK-13227
 URL: https://issues.apache.org/jira/browse/SPARK-13227
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.6.0
Reporter: Nan Zhu
Priority: Minor


It might confuse the future developers when they use OpenHashMap.apply() with a 
numeric value type.

null.asInstance[Int], null.asInstance[Long], null.asInstace[Float] and 
null.asInstance[Double] will return 0/0.0/0L, which might confuse the developer 
if the value set contains 0/0.0/0L with an existing key 







--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12786) Actor demo does not demonstrate usable code

2016-01-13 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15096187#comment-15096187
 ] 

Nan Zhu commented on SPARK-12786:
-

the only place it relies on AkkaUtil is to create an ActorSystem, which I 
thought to be fine as you can easily replace that part with your own system 
creation logicdid I miss anything?

> Actor demo does not demonstrate usable code
> ---
>
> Key: SPARK-12786
> URL: https://issues.apache.org/jira/browse/SPARK-12786
> Project: Spark
>  Issue Type: Documentation
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Brian London
>Priority: Minor
>
> The ActorWordCount demo doesn't show how to set up an actor based dstream in 
> a way that can be used.  
> The demo relies on the {{AkkaUtils}} object, which is marked private[spark].  
> Thus the code presented will not compile unless users declare their code to 
> be in the org.apache.spark package. 
> Demo is located at 
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12713) UI Executor page should keep links around to executors that died

2016-01-08 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15089884#comment-15089884
 ] 

Nan Zhu commented on SPARK-12713:
-

I attached a PR and two duplicate JIRAs which are addressing the same issue

> UI Executor page should keep links around to executors that died
> 
>
> Key: SPARK-12713
> URL: https://issues.apache.org/jira/browse/SPARK-12713
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 1.5.2
>Reporter: Thomas Graves
>
> When an executor dies the web ui no longer shows it in the executors page 
> which makes getting to the logs to see what happened very difficult.  I'm 
> running on yarn so not sure if behavior is different in standalone mode.
> We should figure out a way to keep links around to the ones that died so we 
> can show stats and log links.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12469) Consistent Accumulators for Spark

2015-12-25 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15071799#comment-15071799
 ] 

Nan Zhu commented on SPARK-12469:
-

Just to bring the previous discussions about the topic here, 
https://github.com/apache/spark/pull/2524

I originally would like to fix  exactly the same issue in the patch, but later 
we have to shrink our range to result task, 

There, I wanted to use StageId + partitionId to identify the accumulator 
uniquely, but [~matei] indicated the counter example that 

"
A shuffle stage may be resubmitted once the old one is garbage-collected (if 
periodic cleanup is on)

If you use an accumulator in a pipelined transformation like a map(), and then 
you make a new RDD built on top of that (e.g. apply another map() to it), it 
won't count as the same stage so you'll still get the updates twice

"

I'm not sure if the proposed solution can fully resolve this issue

> Consistent Accumulators for Spark
> -
>
> Key: SPARK-12469
> URL: https://issues.apache.org/jira/browse/SPARK-12469
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: holdenk
>
> Tasks executed on Spark workers are unable to modify values from the driver, 
> and accumulators are the one exception for this. Accumulators in Spark are 
> implemented in such a way that when a stage is recomputed (say for cache 
> eviction) the accumulator will be updated a second time. This makes 
> accumulators inside of transformations more difficult to use for things like 
> counting invalid records (one of the primary potential use cases of 
> collecting side information during a transformation). However in some cases 
> this counting during re-evaluation is exactly the behaviour we want (say in 
> tracking total execution time for a particular function). Spark would benefit 
> from a version of accumulators which did not double count even if stages were 
> re-executed.
> Motivating example:
> {code}
> val parseTime = sc.accumulator(0L)
> val parseFailures = sc.accumulator(0L)
> val parsedData = sc.textFile(...).flatMap { line =>
>   val start = System.currentTimeMillis()
>   val parsed = Try(parse(line))
>   if (parsed.isFailure) parseFailures += 1
>   parseTime += System.currentTimeMillis() - start
>   parsed.toOption
> }
> parsedData.cache()
> val resultA = parsedData.map(...).filter(...).count()
> // some intervening code.  Almost anything could happen here -- some of 
> parsedData may
> // get kicked out of the cache, or an executor where data was cached might 
> get lost
> val resultB = parsedData.filter(...).map(...).flatMap(...).count()
> // now we look at the accumulators
> {code}
> Here we would want parseFailures to only have been added to once for every 
> line which failed to parse.  Unfortunately, the current Spark accumulator API 
> doesn’t support the current parseFailures use case since if some data had 
> been evicted its possible that it will be double counted.
> See the full design document at 
> https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12469) Consistent Accumulators for Spark

2015-12-25 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15071799#comment-15071799
 ] 

Nan Zhu edited comment on SPARK-12469 at 12/26/15 2:44 AM:
---

Just to bring the previous discussions about the topic here, 
https://github.com/apache/spark/pull/2524

I originally would like to fix  exactly the same issue in the patch, but later 
we have to shrink our range to result task, 

There, I wanted to use StageId + partitionId to identify the accumulator 
uniquely, but [~matei] raised up the counter example that 

"
A shuffle stage may be resubmitted once the old one is garbage-collected (if 
periodic cleanup is on)

If you use an accumulator in a pipelined transformation like a map(), and then 
you make a new RDD built on top of that (e.g. apply another map() to it), it 
won't count as the same stage so you'll still get the updates twice

"

I'm not sure if the proposed solution can fully resolve this issue


was (Author: codingcat):
Just to bring the previous discussions about the topic here, 
https://github.com/apache/spark/pull/2524

I originally would like to fix  exactly the same issue in the patch, but later 
we have to shrink our range to result task, 

There, I wanted to use StageId + partitionId to identify the accumulator 
uniquely, but [~matei] indicated the counter example that 

"
A shuffle stage may be resubmitted once the old one is garbage-collected (if 
periodic cleanup is on)

If you use an accumulator in a pipelined transformation like a map(), and then 
you make a new RDD built on top of that (e.g. apply another map() to it), it 
won't count as the same stage so you'll still get the updates twice

"

I'm not sure if the proposed solution can fully resolve this issue

> Consistent Accumulators for Spark
> -
>
> Key: SPARK-12469
> URL: https://issues.apache.org/jira/browse/SPARK-12469
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: holdenk
>
> Tasks executed on Spark workers are unable to modify values from the driver, 
> and accumulators are the one exception for this. Accumulators in Spark are 
> implemented in such a way that when a stage is recomputed (say for cache 
> eviction) the accumulator will be updated a second time. This makes 
> accumulators inside of transformations more difficult to use for things like 
> counting invalid records (one of the primary potential use cases of 
> collecting side information during a transformation). However in some cases 
> this counting during re-evaluation is exactly the behaviour we want (say in 
> tracking total execution time for a particular function). Spark would benefit 
> from a version of accumulators which did not double count even if stages were 
> re-executed.
> Motivating example:
> {code}
> val parseTime = sc.accumulator(0L)
> val parseFailures = sc.accumulator(0L)
> val parsedData = sc.textFile(...).flatMap { line =>
>   val start = System.currentTimeMillis()
>   val parsed = Try(parse(line))
>   if (parsed.isFailure) parseFailures += 1
>   parseTime += System.currentTimeMillis() - start
>   parsed.toOption
> }
> parsedData.cache()
> val resultA = parsedData.map(...).filter(...).count()
> // some intervening code.  Almost anything could happen here -- some of 
> parsedData may
> // get kicked out of the cache, or an executor where data was cached might 
> get lost
> val resultB = parsedData.filter(...).map(...).flatMap(...).count()
> // now we look at the accumulators
> {code}
> Here we would want parseFailures to only have been added to once for every 
> line which failed to parse.  Unfortunately, the current Spark accumulator API 
> doesn’t support the current parseFailures use case since if some data had 
> been evicted its possible that it will be double counted.
> See the full design document at 
> https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12237) Unsupported message RpcMessage causes message retries

2015-12-10 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15051499#comment-15051499
 ] 

Nan Zhu commented on SPARK-12237:
-

if that's the case, I don't think it would happen in the real world

As Executor will not directly communicate with the Master

> Unsupported message RpcMessage causes message retries
> -
>
> Key: SPARK-12237
> URL: https://issues.apache.org/jira/browse/SPARK-12237
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Jacek Laskowski
>
> When an unsupported message is sent to an endpoint, Spark throws 
> {{org.apache.spark.SparkException}} and retries sending the message. It 
> should *not* since the message is unsupported.
> {code}
> WARN NettyRpcEndpointRef: Error sending message [message = 
> RetrieveSparkProps] in 1 attempts
> org.apache.spark.SparkException: Unsupported message 
> RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@c0a6275)
>  from localhost:51137
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104)
>   at 
> org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104)
>   at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
>   at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>   at 
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> WARN NettyRpcEndpointRef: Error sending message [message = 
> RetrieveSparkProps] in 2 attempts
> org.apache.spark.SparkException: Unsupported message 
> RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@73a76a5a)
>  from localhost:51137
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104)
>   at 
> org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104)
>   at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
>   at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>   at 
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> WARN NettyRpcEndpointRef: Error sending message [message = 
> RetrieveSparkProps] in 3 attempts
> org.apache.spark.SparkException: Unsupported message 
> RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@670bfda7)
>  from localhost:51137
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104)
>   at 
> org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104)
>   at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
>   at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>   at 
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1672)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
>   at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:151)
>   at 
> 

[jira] [Commented] (SPARK-12237) Unsupported message RpcMessage causes message retries

2015-12-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048651#comment-15048651
 ] 

Nan Zhu commented on SPARK-12237:
-

may I ask how you found this issue?

It seems that Master received RetrieveSparkProps message.which is supposed 
to be only transmitted between Executor and Driver

> Unsupported message RpcMessage causes message retries
> -
>
> Key: SPARK-12237
> URL: https://issues.apache.org/jira/browse/SPARK-12237
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Jacek Laskowski
>
> When an unsupported message is sent to an endpoint, Spark throws 
> {{org.apache.spark.SparkException}} and retries sending the message. It 
> should *not* since the message is unsupported.
> {code}
> WARN NettyRpcEndpointRef: Error sending message [message = 
> RetrieveSparkProps] in 1 attempts
> org.apache.spark.SparkException: Unsupported message 
> RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@c0a6275)
>  from localhost:51137
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104)
>   at 
> org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104)
>   at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
>   at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>   at 
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> WARN NettyRpcEndpointRef: Error sending message [message = 
> RetrieveSparkProps] in 2 attempts
> org.apache.spark.SparkException: Unsupported message 
> RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@73a76a5a)
>  from localhost:51137
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104)
>   at 
> org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104)
>   at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
>   at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>   at 
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> WARN NettyRpcEndpointRef: Error sending message [message = 
> RetrieveSparkProps] in 3 attempts
> org.apache.spark.SparkException: Unsupported message 
> RpcMessage(localhost:51137,RetrieveSparkProps,org.apache.spark.rpc.netty.RemoteNettyRpcCallContext@670bfda7)
>  from localhost:51137
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:105)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1$$anonfun$apply$mcV$sp$1.apply(Inbox.scala:104)
>   at 
> org.apache.spark.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:373)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104)
>   at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
>   at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>   at 
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1672)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
>   at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:151)
>   at 
> 

[jira] [Commented] (SPARK-12229) How to Perform spark submit of application written in scala from Node js

2015-12-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048663#comment-15048663
 ] 

Nan Zhu commented on SPARK-12229:
-

https://github.com/spark-jobserver/spark-jobserver might be a good reference

BTW, I think you can forward this question to user list to get more feedback. 
Most of the discussions here are just for bug/feature tracking, etc. If you are 
comfortable about this, would you mind closing the issue?

> How to Perform spark submit of application written in scala from Node js
> 
>
> Key: SPARK-12229
> URL: https://issues.apache.org/jira/browse/SPARK-12229
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core, Spark Submit
>Affects Versions: 1.3.1
> Environment: Linux 14.4
>Reporter: himanshu singhal
>  Labels: newbie
>   Original Estimate: 1,344h
>  Remaining Estimate: 1,344h
>
> I am having an spark core application written in scala and now i am 
> developing the front end in node js but having a question how can i make the 
> spark submit to run the application from the node js 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12021) Fishy test of "don't call ssc.stop in listener"

2015-11-26 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-12021:
---

 Summary: Fishy test of  "don't call ssc.stop in listener"
 Key: SPARK-12021
 URL: https://issues.apache.org/jira/browse/SPARK-12021
 Project: Spark
  Issue Type: Bug
  Components: Streaming, Tests
Affects Versions: 1.6.0
Reporter: Nan Zhu


I just noticed that some of the tests blocked at the case "don't call ssc.stop 
in listener" in StreamingListenerSuite

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46766/console

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46776/console


https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46774/console


The PRs corresponding to the tests are on different things...I think something 
fishy hidden in the case...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11402) Allow to define a custom driver runner and executor runner

2015-10-29 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14980584#comment-14980584
 ] 

Nan Zhu commented on SPARK-11402:
-

I'm curious about what kind of functionalities you need with a customized 
ExecutorRunner/DriverRunner?



> Allow to define a custom driver runner and executor runner
> --
>
> Key: SPARK-11402
> URL: https://issues.apache.org/jira/browse/SPARK-11402
> Project: Spark
>  Issue Type: Improvement
>  Components: Deploy, Spark Core
>Reporter: Jacek Lewandowski
>Priority: Minor
>
> {{DriverRunner}} and {{ExecutorRunner}} are used by Spark Worker in 
> standalone mode to spawn driver and executor processes respectively. When 
> integrating Spark with some environments, it would be useful to allow 
> providing a custom implementation of those components.
> The idea is simple - provide factory class names for driver and executor 
> runners in Worker configuration. By default, the current implementations are 
> used. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10315) remove document on spark.akka.failure-detector.threshold

2015-08-27 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-10315:
---

 Summary: remove document on spark.akka.failure-detector.threshold
 Key: SPARK-10315
 URL: https://issues.apache.org/jira/browse/SPARK-10315
 Project: Spark
  Issue Type: Bug
  Components: Documentation
Reporter: Nan Zhu


this parameter is not used any longer and there is some mistake in the current 
document , should be 'akka.remote.watch-failure-detector.threshold'



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-9602) Remove 'Actor' from the comments

2015-08-04 Thread Nan Zhu (JIRA)
Nan Zhu created SPARK-9602:
--

 Summary: Remove 'Actor' from the comments
 Key: SPARK-9602
 URL: https://issues.apache.org/jira/browse/SPARK-9602
 Project: Spark
  Issue Type: Improvement
  Components: Build, PySpark, Spark Core, Streaming
Reporter: Nan Zhu


Although we have hidden Akka behind RPC interface, I found that the 
Akka/Actor-related comments are still spreading everywhere. To make it 
consistent, we shall remove actor/akka words from the comments...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9514) Add EventHubsReceiver to support Spark Streaming using Azure EventHubs

2015-08-01 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14650308#comment-14650308
 ] 

Nan Zhu commented on SPARK-9514:


I think the best way to do it is to add a new component in external directory, 
if we ensure that the code is maintained in long term...

 Add EventHubsReceiver to support Spark Streaming using Azure EventHubs
 --

 Key: SPARK-9514
 URL: https://issues.apache.org/jira/browse/SPARK-9514
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.1
Reporter: shanyu zhao
 Fix For: 1.5.0


 We need to add EventHubsReceiver implementation to support Spark Streaming 
 applications that receive data from Azure EventHubs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9514) Add EventHubsReceiver to support Spark Streaming using Azure EventHubs

2015-08-01 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14650568#comment-14650568
 ] 

Nan Zhu commented on SPARK-9514:


Hi, [~shanyu],  in Spark, we usually submit patches via Github, 
https://github.com/apache/spark/pulls, so just move the patch over there to get 
more eyes on this

 Add EventHubsReceiver to support Spark Streaming using Azure EventHubs
 --

 Key: SPARK-9514
 URL: https://issues.apache.org/jira/browse/SPARK-9514
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.1
Reporter: shanyu zhao
 Fix For: 1.5.0

 Attachments: SPARK-9514.patch


 We need to add EventHubsReceiver implementation to support Spark Streaming 
 applications that receive data from Azure EventHubs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   3   >