[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646658#comment-17646658
 ] 

huangtengfei commented on SPARK-41497:
--

I also think that option3/4(include the improved proposal [~mridulm80] 
suggested) would be promising, the issue could be resolved either from the 
accumulator side or rdd cache side.
And option4 seems more straightforward since it's a complement to existing 
cache mechanism. And making decision based on task status could be a feasible 
solution. As mentioned above, the downside is that it may be overkill. If such 
cases are small probability events, maybe it is also acceptable.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e

[jira] [Created] (SPARK-39853) Support stage level schedule for standalone cluster when dynamic allocation is disabled

2022-07-24 Thread huangtengfei (Jira)
huangtengfei created SPARK-39853:


 Summary: Support stage level schedule for standalone cluster when 
dynamic allocation is disabled
 Key: SPARK-39853
 URL: https://issues.apache.org/jira/browse/SPARK-39853
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.3.0
Reporter: huangtengfei


[SPARK-39062|https://issues.apache.org/jira/browse/SPARK-39062] added stage 
level schedule support for standalone cluster when dynamic allocation was 
enabled, spark would request for executors for different resource profiles.
While when dynamic allocation is disabled, we can also leverage stage level 
schedule to schedule tasks based on resource profile(task resource requests) to 
executors with default resource profile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-39062) Add Standalone backend support for Stage Level Scheduling

2022-04-28 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-39062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17529717#comment-17529717
 ] 

huangtengfei commented on SPARK-39062:
--

I am working on this. Thanks [~jiangxb1987]

> Add Standalone backend support for Stage Level Scheduling
> -
>
> Key: SPARK-39062
> URL: https://issues.apache.org/jira/browse/SPARK-39062
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.0, 3.3.0
>Reporter: Xingbo Jiang
>Priority: Major
>
> We should add Standalone backend support for Stage Level Scheduling:
> * The Master should be able to generate executors for multiple 
> ResouceProfiles, currently it only considers available CPUs;
> * The Worker need let the executor know about its ResourceProfile.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38471) Use error classes in org.apache.spark.rdd

2022-04-19 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524688#comment-17524688
 ] 

huangtengfei commented on SPARK-38471:
--

I am working on this.

> Use error classes in org.apache.spark.rdd
> -
>
> Key: SPARK-38471
> URL: https://issues.apache.org/jira/browse/SPARK-38471
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: Bo Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38462) Use error classes in org.apache.spark.executor

2022-04-14 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522331#comment-17522331
 ] 

huangtengfei commented on SPARK-38462:
--

I am working on this. Thanks [~bozhang]

> Use error classes in org.apache.spark.executor
> --
>
> Key: SPARK-38462
> URL: https://issues.apache.org/jira/browse/SPARK-38462
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: Bo Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38689) Use error classes in the compilation errors of not allowed DESC PARTITION

2022-04-11 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17520541#comment-17520541
 ] 

huangtengfei commented on SPARK-38689:
--

I am working on this. Thanks [~maxgekk]

> Use error classes in the compilation errors of not allowed DESC PARTITION
> -
>
> Key: SPARK-38689
> URL: https://issues.apache.org/jira/browse/SPARK-38689
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Max Gekk
>Priority: Major
>
> Migrate the following errors in QueryCompilationErrors:
> * descPartitionNotAllowedOnTempView
> * descPartitionNotAllowedOnView
> * descPartitionNotAllowedOnViewError
> onto use error classes. Throw an implementation of SparkThrowable. Also write 
> a test per every error in QueryCompilationErrorsSuite.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38108) Use error classes in the compilation errors of UDF/UDAF

2022-03-16 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507528#comment-17507528
 ] 

huangtengfei commented on SPARK-38108:
--

I am working on this. Thanks [~maxgekk]

> Use error classes in the compilation errors of UDF/UDAF
> ---
>
> Key: SPARK-38108
> URL: https://issues.apache.org/jira/browse/SPARK-38108
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Max Gekk
>Priority: Major
>
> Migrate the following errors in QueryCompilationErrors:
> * noHandlerForUDAFError
> * unexpectedEvalTypesForUDFsError
> * usingUntypedScalaUDFError
> * udfClassDoesNotImplementAnyUDFInterfaceError
> * udfClassNotAllowedToImplementMultiUDFInterfacesError
> * udfClassWithTooManyTypeArgumentsError
> onto use error classes. Throw an implementation of SparkThrowable. Also write 
> a test per every error in QueryCompilationErrorsSuite.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38106) Use error classes in the parsing errors of functions

2022-03-08 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17502884#comment-17502884
 ] 

huangtengfei commented on SPARK-38106:
--

I am working on this. Thanks [~maxgekk]

> Use error classes in the parsing errors of functions
> 
>
> Key: SPARK-38106
> URL: https://issues.apache.org/jira/browse/SPARK-38106
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Max Gekk
>Priority: Major
>
> Migrate the following errors in QueryParsingErrors:
> * functionNameUnsupportedError
> * showFunctionsUnsupportedError
> * showFunctionsInvalidPatternError
> * createFuncWithBothIfNotExistsAndReplaceError
> * defineTempFuncWithIfNotExistsError
> * unsupportedFunctionNameError
> * specifyingDBInCreateTempFuncError
> * invalidNameForDropTempFunc
> onto use error classes. Throw an implementation of SparkThrowable. Also write 
> a test per every error in QueryParsingErrorsSuite.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38434) Correct semantic of CheckAnalysis.getDataTypesAreCompatibleFn method

2022-03-07 Thread huangtengfei (Jira)
huangtengfei created SPARK-38434:


 Summary: Correct semantic of 
CheckAnalysis.getDataTypesAreCompatibleFn method
 Key: SPARK-38434
 URL: https://issues.apache.org/jira/browse/SPARK-38434
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.1
Reporter: huangtengfei


Currently, in `CheckAnalysis` method  [getDataTypesAreCompatibleFn 
|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L606]
 implemented as:

{code:java}
  private def getDataTypesAreCompatibleFn(plan: LogicalPlan): (DataType, 
DataType) => Boolean = {
val isUnion = plan.isInstanceOf[Union]
if (isUnion) {
  (dt1: DataType, dt2: DataType) =>
!DataType.equalsStructurally(dt1, dt2, true)
} else {
  // SPARK-18058: we shall not care about the nullability of columns
  (dt1: DataType, dt2: DataType) =>
TypeCoercion.findWiderTypeForTwo(dt1.asNullable, dt2.asNullable).isEmpty
}
  }
{code}

Return false when data types are compatible, otherwise return true, which is 
pretty confusing.




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38112) Use error classes in the execution errors of date/timestamp handling

2022-02-15 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492514#comment-17492514
 ] 

huangtengfei commented on SPARK-38112:
--

I will work on this. Thanks [~maxgekk]

> Use error classes in the execution errors of date/timestamp handling
> 
>
> Key: SPARK-38112
> URL: https://issues.apache.org/jira/browse/SPARK-38112
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Max Gekk
>Priority: Major
>
> Migrate the following errors in QueryExecutionErrors:
> * sparkUpgradeInReadingDatesError
> * sparkUpgradeInWritingDatesError
> * timeZoneIdNotSpecifiedForTimestampTypeError
> * cannotConvertOrcTimestampToTimestampNTZError
> onto use error classes. Throw an implementation of SparkThrowable. Also write 
> a test per every error in QueryExecutionErrorsSuite.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38113) Use error classes in the execution errors of pivoting

2022-02-06 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487833#comment-17487833
 ] 

huangtengfei commented on SPARK-38113:
--

I will work on this. Thanks [~maxgekk]

> Use error classes in the execution errors of pivoting
> -
>
> Key: SPARK-38113
> URL: https://issues.apache.org/jira/browse/SPARK-38113
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Max Gekk
>Priority: Major
>
> Migrate the following errors in QueryExecutionErrors:
> * repeatedPivotsUnsupportedError
> * pivotNotAfterGroupByUnsupportedError
> onto use error classes. Throw an implementation of SparkThrowable. Also write 
> a test per every error in QueryExecutionErrorsSuite.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38105) Use error classes in the parsing errors of joins

2022-02-05 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487619#comment-17487619
 ] 

huangtengfei commented on SPARK-38105:
--

I will work on this. Thanks [~maxgekk]

> Use error classes in the parsing errors of joins
> 
>
> Key: SPARK-38105
> URL: https://issues.apache.org/jira/browse/SPARK-38105
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Max Gekk
>Priority: Major
>
> Migrate the following errors in QueryParsingErrors:
> * joinCriteriaUnimplementedError
> * naturalCrossJoinUnsupportedError
> onto use error classes. Throw an implementation of SparkThrowable. Also write 
> a test per every error in QueryParsingErrorsSuite.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37941) Use error classes in the compilation errors of casting

2022-01-17 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477582#comment-17477582
 ] 

huangtengfei commented on SPARK-37941:
--

I will work on this. Thanks [~maxgekk]

> Use error classes in the compilation errors of casting
> --
>
> Key: SPARK-37941
> URL: https://issues.apache.org/jira/browse/SPARK-37941
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Max Gekk
>Priority: Major
>
> Migrate the following errors in QueryCompilationErrors:
> * upCastFailureError
> * unsupportedAbstractDataTypeForUpCastError
> * cannotUpCastAsAttributeError
> onto use error classes. Throw an implementation of SparkThrowable. Also write 
> a test per every error in QueryCompilationErrorsSuite.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-36954) Fast fail with explicit err msg when calling withWatermark on non-streaming dataset

2021-10-12 Thread huangtengfei (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangtengfei resolved SPARK-36954.
--
Resolution: Not A Problem

> Fast fail with explicit err msg when calling withWatermark on non-streaming 
> dataset
> ---
>
> Key: SPARK-36954
> URL: https://issues.apache.org/jira/browse/SPARK-36954
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Structured Streaming
>Affects Versions: 3.1.2
>Reporter: huangtengfei
>Priority: Minor
>
> [Dataset.withWatermark|https://github.com/apache/spark/blob/v3.2.0-rc7/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L740]
>  is a function specific for SS.
> Now it could be triggered on a batch dataset, and add a specific rule to 
> eliminate in analyze phase. User can call this API and nothing happens, it 
> may be a little bit confused.
> If the usage is not as expected, maybe we can just fast fail it with explicit 
> message, and also we do not have to keep on extra rule to do this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36954) Fast fail with explicit err msg when calling withWatermark on non-streaming dataset

2021-10-08 Thread huangtengfei (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangtengfei updated SPARK-36954:
-
Description: 
[Dataset.withWatermark|https://github.com/apache/spark/blob/v3.2.0-rc7/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L740]
 is a function specific for SS.
Now it could be triggered on a batch dataset, and add a specific rule to 
eliminate in analyze phase. User can call this API and nothing happens, it may 
be a little bit confused.
If the usage is not as expected, maybe we can just fast fail it with explicit 
message, and also we do not have to keep on extra rule to do this.

> Fast fail with explicit err msg when calling withWatermark on non-streaming 
> dataset
> ---
>
> Key: SPARK-36954
> URL: https://issues.apache.org/jira/browse/SPARK-36954
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Structured Streaming
>Affects Versions: 3.1.2
>Reporter: huangtengfei
>Priority: Minor
>
> [Dataset.withWatermark|https://github.com/apache/spark/blob/v3.2.0-rc7/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L740]
>  is a function specific for SS.
> Now it could be triggered on a batch dataset, and add a specific rule to 
> eliminate in analyze phase. User can call this API and nothing happens, it 
> may be a little bit confused.
> If the usage is not as expected, maybe we can just fast fail it with explicit 
> message, and also we do not have to keep on extra rule to do this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36954) Fast fail with explicit err msg when calling withWatermark on non-streaming dataset

2021-10-08 Thread huangtengfei (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangtengfei updated SPARK-36954:
-
Environment: (was: 
[Dataset.withWatermark|https://github.com/apache/spark/blob/v3.2.0-rc7/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L740]
 is a function specific for SS.
Now it could be triggered on a batch dataset, and add a specific rule to 
eliminate in analyze phase. User can call this API and nothing happens, it may 
be a little bit confused.
If the usage is not as expected, maybe we can just fast fail it with explicit 
message, and also we do not have to keep on extra rule to do this.)

> Fast fail with explicit err msg when calling withWatermark on non-streaming 
> dataset
> ---
>
> Key: SPARK-36954
> URL: https://issues.apache.org/jira/browse/SPARK-36954
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Structured Streaming
>Affects Versions: 3.1.2
>Reporter: huangtengfei
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36954) Fast fail with explicit err msg when calling withWatermark on non-streaming dataset

2021-10-08 Thread huangtengfei (Jira)
huangtengfei created SPARK-36954:


 Summary: Fast fail with explicit err msg when calling 
withWatermark on non-streaming dataset
 Key: SPARK-36954
 URL: https://issues.apache.org/jira/browse/SPARK-36954
 Project: Spark
  Issue Type: Improvement
  Components: SQL, Structured Streaming
Affects Versions: 3.1.2
 Environment: 
[Dataset.withWatermark|https://github.com/apache/spark/blob/v3.2.0-rc7/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L740]
 is a function specific for SS.
Now it could be triggered on a batch dataset, and add a specific rule to 
eliminate in analyze phase. User can call this API and nothing happens, it may 
be a little bit confused.
If the usage is not as expected, maybe we can just fast fail it with explicit 
message, and also we do not have to keep on extra rule to do this.
Reporter: huangtengfei






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-36658) Expose executionId to QueryExecutionListener

2021-09-02 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409210#comment-17409210
 ] 

huangtengfei edited comment on SPARK-36658 at 9/3/21, 2:36 AM:
---

cc [~cloud_fan] could you share thoughts about this?


was (Author: ivoson):
cc [~cloud_fan] could you share any thoughts about this?

> Expose executionId to QueryExecutionListener
> 
>
> Key: SPARK-36658
> URL: https://issues.apache.org/jira/browse/SPARK-36658
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.2
>Reporter: huangtengfei
>Priority: Minor
>
> Now in 
> [QueryExecutionListener|https://github.com/apache/spark/blob/v3.2.0-rc2/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala#L38]
>  we have exposed API to get the query execution information:
> def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
> def onFailure(funcName: String, qe: QueryExecution, exception: Exception): 
> Unit
>  
> But we can not get a clear information that which query is this. In Spark 
> SQL, I think that executionId is the direct identifier of a query execution. 
> So I think it make sense to expose executionId to the QueryExecutionListener, 
> so that people can easily find the exact query in UI or history server to 
> track more information of the query execution. And there is no easy way we 
> can find the relevant executionId from a QueryExecution object. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36658) Expose executionId to QueryExecutionListener

2021-09-02 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409228#comment-17409228
 ] 

huangtengfei commented on SPARK-36658:
--

Will create a RP for this.

> Expose executionId to QueryExecutionListener
> 
>
> Key: SPARK-36658
> URL: https://issues.apache.org/jira/browse/SPARK-36658
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.2
>Reporter: huangtengfei
>Priority: Minor
>
> Now in 
> [QueryExecutionListener|https://github.com/apache/spark/blob/v3.2.0-rc2/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala#L38]
>  we have exposed API to get the query execution information:
> def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
> def onFailure(funcName: String, qe: QueryExecution, exception: Exception): 
> Unit
>  
> But we can not get a clear information that which query is this. In Spark 
> SQL, I think that executionId is the direct identifier of a query execution. 
> So I think it make sense to expose executionId to the QueryExecutionListener, 
> so that people can easily find the exact query in UI or history server to 
> track more information of the query execution. And there is no easy way we 
> can find the relevant executionId from a QueryExecution object. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36658) Expose executionId to QueryExecutionListener

2021-09-02 Thread huangtengfei (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangtengfei updated SPARK-36658:
-
Description: 
Now in 
[QueryExecutionListener|https://github.com/apache/spark/blob/v3.2.0-rc2/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala#L38]
 we have exposed API to get the query execution information:

def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit

def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit

 

But we can not get a clear information that which query is this. In Spark SQL, 
I think that executionId is the direct identifier of a query execution. So I 
think it make sense to expose executionId to the QueryExecutionListener, so 
that people can easily find the exact query in UI or history server to track 
more information of the query execution. And there is no easy way we can find 
the relevant executionId from a QueryExecution object. 

 

  was:
Now in 
[QueryExecutionListener|https://github.com/apache/spark/blob/v3.2.0-rc2/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala#L38]
 we have exposed API to get the query execution information:

def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit

def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit

 

But we can not get a clear information that which query is this. In Spark SQL, 
I think that executionId is the direct identifier of a query execution. So I 
think it make sense to expose executionId to the QueryExecutionListener, so 
that people can easily find the exact query in UI or history server to track 
more information of the query execution.

 


> Expose executionId to QueryExecutionListener
> 
>
> Key: SPARK-36658
> URL: https://issues.apache.org/jira/browse/SPARK-36658
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.2
>Reporter: huangtengfei
>Priority: Minor
>
> Now in 
> [QueryExecutionListener|https://github.com/apache/spark/blob/v3.2.0-rc2/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala#L38]
>  we have exposed API to get the query execution information:
> def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
> def onFailure(funcName: String, qe: QueryExecution, exception: Exception): 
> Unit
>  
> But we can not get a clear information that which query is this. In Spark 
> SQL, I think that executionId is the direct identifier of a query execution. 
> So I think it make sense to expose executionId to the QueryExecutionListener, 
> so that people can easily find the exact query in UI or history server to 
> track more information of the query execution. And there is no easy way we 
> can find the relevant executionId from a QueryExecution object. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36658) Expose executionId to QueryExecutionListener

2021-09-02 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409210#comment-17409210
 ] 

huangtengfei commented on SPARK-36658:
--

cc [~cloud_fan] could you share any thoughts about this?

> Expose executionId to QueryExecutionListener
> 
>
> Key: SPARK-36658
> URL: https://issues.apache.org/jira/browse/SPARK-36658
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.2
>Reporter: huangtengfei
>Priority: Minor
>
> Now in 
> [QueryExecutionListener|https://github.com/apache/spark/blob/v3.2.0-rc2/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala#L38]
>  we have exposed API to get the query execution information:
> def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
> def onFailure(funcName: String, qe: QueryExecution, exception: Exception): 
> Unit
>  
> But we can not get a clear information that which query is this. In Spark 
> SQL, I think that executionId is the direct identifier of a query execution. 
> So I think it make sense to expose executionId to the QueryExecutionListener, 
> so that people can easily find the exact query in UI or history server to 
> track more information of the query execution.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36658) Expose executionId to QueryExecutionListener

2021-09-02 Thread huangtengfei (Jira)
huangtengfei created SPARK-36658:


 Summary: Expose executionId to QueryExecutionListener
 Key: SPARK-36658
 URL: https://issues.apache.org/jira/browse/SPARK-36658
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.2
Reporter: huangtengfei


Now in 
[QueryExecutionListener|https://github.com/apache/spark/blob/v3.2.0-rc2/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala#L38]
 we have exposed API to get the query execution information:

def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit

def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit

 

But we can not get a clear information that which query is this. In Spark SQL, 
I think that executionId is the direct identifier of a query execution. So I 
think it make sense to expose executionId to the QueryExecutionListener, so 
that people can easily find the exact query in UI or history server to track 
more information of the query execution.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35411) Essential information missing in TreeNode json string

2021-05-17 Thread huangtengfei (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangtengfei updated SPARK-35411:
-
Description: 
TreeNode can be serialized to json string with the method toJSON() or 
prettyJson(). To avoid OOM issues, 
[SPARK-17426|https://issues.apache.org/jira/browse/SPARK-17426] only keep part 
of Seq data that can be written out to result json string.
 Essential data like 
[cteRelations|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L497]
 in node With, 
[branches|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala#L123]
 in CaseWhen will be skipped and written out as null.

  was:
TreeNode can be serialized to json string with the method toJSON() or 
prettyJson(). To avoid OOM issues, 
[SPARK-17426|https//issues.apache.org/jira/browse/SPARK-17426] only keep part 
of Seq data that can be written out to result json string.
 Essential data like 
[cteRelations|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L497]
 in node With, 
[branches|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala#L123]
 in CaseWhen will be skipped and written out as null.


> Essential information missing in TreeNode json string
> -
>
> Key: SPARK-35411
> URL: https://issues.apache.org/jira/browse/SPARK-35411
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: huangtengfei
>Priority: Minor
>
> TreeNode can be serialized to json string with the method toJSON() or 
> prettyJson(). To avoid OOM issues, 
> [SPARK-17426|https://issues.apache.org/jira/browse/SPARK-17426] only keep 
> part of Seq data that can be written out to result json string.
>  Essential data like 
> [cteRelations|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L497]
>  in node With, 
> [branches|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala#L123]
>  in CaseWhen will be skipped and written out as null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35411) Essential information missing in TreeNode json string

2021-05-17 Thread huangtengfei (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangtengfei updated SPARK-35411:
-
Description: 
TreeNode can be serialized to json string with the method toJSON() or 
prettyJson(). To avoid OOM issues, 
[SPARK-17426|https//issues.apache.org/jira/browse/SPARK-17426] only keep part 
of Seq data that can be written out to result json string.
 Essential data like 
[cteRelations|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L497]
 in node With, 
[branches|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala#L123]
 in CaseWhen will be skipped and written out as null.

  was:
TreeNode can be serialized to json string with the method toJSON() or 
prettyJson(). To avoid OOM issues, 
[SPARK-17426|http://example.comhttps//issues.apache.org/jira/browse/SPARK-17426]
 only keep part of Seq data that can be written out to result json string.
 Essential data like 
[cteRelations|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L497]
 in node With, 
[branches|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala#L123]
 in CaseWhen will be skipped and written out as null.


> Essential information missing in TreeNode json string
> -
>
> Key: SPARK-35411
> URL: https://issues.apache.org/jira/browse/SPARK-35411
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: huangtengfei
>Priority: Minor
>
> TreeNode can be serialized to json string with the method toJSON() or 
> prettyJson(). To avoid OOM issues, 
> [SPARK-17426|https//issues.apache.org/jira/browse/SPARK-17426] only keep part 
> of Seq data that can be written out to result json string.
>  Essential data like 
> [cteRelations|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L497]
>  in node With, 
> [branches|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala#L123]
>  in CaseWhen will be skipped and written out as null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-35411) Essential information missing in TreeNode json string

2021-05-15 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17345060#comment-17345060
 ] 

huangtengfei edited comment on SPARK-35411 at 5/15/21, 3:10 PM:


Maybe we can write out Seq of  product objects which contain expressions or 
plans to avoid such cases. 


was (Author: ivoson):
Maybe we can write out Seq of  product objects which contain expressions or 
plan to avoid such cases. 

> Essential information missing in TreeNode json string
> -
>
> Key: SPARK-35411
> URL: https://issues.apache.org/jira/browse/SPARK-35411
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: huangtengfei
>Priority: Minor
>
> TreeNode can be serialized to json string with the method toJSON() or 
> prettyJson(). To avoid OOM issues, 
> [SPARK-17426|http://example.comhttps//issues.apache.org/jira/browse/SPARK-17426]
>  only keep part of Seq data that can be written out to result json string.
>  Essential data like 
> [cteRelations|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L497]
>  in node With, 
> [branches|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala#L123]
>  in CaseWhen will be skipped and written out as null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-35411) Essential information missing in TreeNode json string

2021-05-15 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17345060#comment-17345060
 ] 

huangtengfei edited comment on SPARK-35411 at 5/15/21, 3:10 PM:


Maybe we can write out Seq of  product objects which contain expressions or 
plan to avoid such cases. 


was (Author: ivoson):
Maybe we can write out product objects which contain expressions or plan to 
avoid such cases. 

> Essential information missing in TreeNode json string
> -
>
> Key: SPARK-35411
> URL: https://issues.apache.org/jira/browse/SPARK-35411
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: huangtengfei
>Priority: Minor
>
> TreeNode can be serialized to json string with the method toJSON() or 
> prettyJson(). To avoid OOM issues, 
> [SPARK-17426|http://example.comhttps//issues.apache.org/jira/browse/SPARK-17426]
>  only keep part of Seq data that can be written out to result json string.
>  Essential data like 
> [cteRelations|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L497]
>  in node With, 
> [branches|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala#L123]
>  in CaseWhen will be skipped and written out as null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35411) Essential information missing in TreeNode json string

2021-05-15 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17345060#comment-17345060
 ] 

huangtengfei commented on SPARK-35411:
--

Maybe we can write out product objects which contain expressions or plan to 
avoid such cases. 

> Essential information missing in TreeNode json string
> -
>
> Key: SPARK-35411
> URL: https://issues.apache.org/jira/browse/SPARK-35411
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: huangtengfei
>Priority: Minor
>
> TreeNode can be serialized to json string with the method toJSON() or 
> prettyJson(). To avoid OOM issues, 
> [SPARK-17426|http://example.comhttps//issues.apache.org/jira/browse/SPARK-17426]
>  only keep part of Seq data that can be written out to result json string.
>  Essential data like 
> [cteRelations|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L497]
>  in node With, 
> [branches|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala#L123]
>  in CaseWhen will be skipped and written out as null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35411) Essential information missing in TreeNode json string

2021-05-15 Thread huangtengfei (Jira)
huangtengfei created SPARK-35411:


 Summary: Essential information missing in TreeNode json string
 Key: SPARK-35411
 URL: https://issues.apache.org/jira/browse/SPARK-35411
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.1
Reporter: huangtengfei


TreeNode can be serialized to json string with the method toJSON() or 
prettyJson(). To avoid OOM issues, 
[SPARK-17426|http://example.comhttps//issues.apache.org/jira/browse/SPARK-17426]
 only keep part of Seq data that can be written out to result json string.
 Essential data like 
[cteRelations|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L497]
 in node With, 
[branches|https://github.com/apache/spark/blob/v3.1.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala#L123]
 in CaseWhen will be skipped and written out as null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10816) EventTime based sessionization

2018-11-25 Thread huangtengfei (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698433#comment-16698433
 ] 

huangtengfei commented on SPARK-10816:
--

Ran the benchmark [~kabhwan] mentioned above last week, and found the key 
performance issue in the original [Baidu's 
patch|https://github.com/apache/spark/pull/22583]. With a [fix 
patch|https://github.com/apache/spark/pull/22583/commits/672bccb64e75b009179e00fe6ede9bf34b5b4dbb],
 and ran the benchmark, got results as follows (cc [~XuanYuan]):

Ran with Local[3],35G driver-memory. (CPU with 2.3GHz)

A. Plenty Of Rows In Session / Append Mode (rate: 50,000)
||batchId||input rows||input rows per second||processed rows per second||
|2|83300|16410.55949566588|14933.667981355324|
|3|178500|31949.167710757116|19739.02465995798|
|4|414200|45752.78913067491|22497.420020639835|
|5|95|51571.57591878834|25623.044557125904|
|6|185|49885.39840906027|26061.84405156019|
|7|355|50003.5213747447|25037.55633450175|

B. Plenty Of Keys / Append Mode (rate: 5,000,000)
||batchId||input rows||input rows per second||processed rows per second||
|2|825|1601638.4778012685|1626331.967213115|
|3|17857125|3477531.64556962|2343762.304764405|
|4|31428550|4118536.2337832525|2693337.046876339|
|5|6000|5137426.149499101|3087372.6458783573|
|6|9500|4885574.697865775|3265278.064205678|
|7|14500|4981790.69607641|2888561.297262839|
|8|25500|5078871.89292543|2521157.9530174797|

> EventTime based sessionization
> --
>
> Key: SPARK-10816
> URL: https://issues.apache.org/jira/browse/SPARK-10816
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Reynold Xin
>Priority: Major
> Attachments: SPARK-10816 Support session window natively.pdf, Session 
> Window Support For Structure Streaming.pdf
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25261) Update configuration.md, correct the default units of spark.driver|executor.memory

2018-08-28 Thread huangtengfei (JIRA)
huangtengfei created SPARK-25261:


 Summary: Update configuration.md, correct the default units of 
spark.driver|executor.memory
 Key: SPARK-25261
 URL: https://issues.apache.org/jira/browse/SPARK-25261
 Project: Spark
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 2.3.0
Reporter: huangtengfei


From  
[SparkContext|https://github.com/ivoson/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L464]
 and 
[SparkSubmitCommandBuilder|https://github.com/ivoson/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L265],we
 can see that spark.driver.memory and spark.executor.memory are parsed as bytes 
if no units specified. But in the doc, they are described as mb in default, 
which may lead to some misunderstanding.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24351) offsetLog/commitLog purge thresholdBatchId should be computed with current committed epoch but not currentBatchId in CP mode

2018-05-22 Thread huangtengfei (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangtengfei updated SPARK-24351:
-
Description: 
In structured streaming, there is a conf spark.sql.streaming.minBatchesToRetain 
which is set to specify 'The minimum number of batches that must be retained 
and made recoverable' as described in 
[SQLConf|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L802].
 In continuous processing, the metadata purge is triggered when an epoch is 
committed in 
[ContinuousExecution|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala#L306].
 
 Since currentBatchId increases independently in cp mode, the current committed 
epoch may be far behind currentBatchId if some task hangs for some time. It is 
not safe to discard the metadata with thresholdBatchId computed based on 
currentBatchId because we may clean all the metadata in the checkpoint 
directory.

  was:
In structured streaming, there is a conf spark.sql.streaming.minBatchesToRetain 
which is set to specify 'The minimum number of batches that must be retained 
and made recoverable' as described in 
[SQLConf](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L802).
 In continuous processing, the metadata purge is triggered when an epoch is 
committed in 
[ContinuousExecution](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala#L306).
 
Since currentBatchId increases independently in cp mode, the current committed 
epoch may be far behind currentBatchId if some task hangs for some time. It is 
not safe to discard the metadata with thresholdBatchId computed based on 
currentBatchId because we may clean all the metadata in the checkpoint 
directory.


> offsetLog/commitLog purge thresholdBatchId should be computed with current 
> committed epoch but not currentBatchId in CP mode
> 
>
> Key: SPARK-24351
> URL: https://issues.apache.org/jira/browse/SPARK-24351
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: huangtengfei
>Priority: Major
>
> In structured streaming, there is a conf 
> spark.sql.streaming.minBatchesToRetain which is set to specify 'The minimum 
> number of batches that must be retained and made recoverable' as described in 
> [SQLConf|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L802].
>  In continuous processing, the metadata purge is triggered when an epoch is 
> committed in 
> [ContinuousExecution|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala#L306].
>  
>  Since currentBatchId increases independently in cp mode, the current 
> committed epoch may be far behind currentBatchId if some task hangs for some 
> time. It is not safe to discard the metadata with thresholdBatchId computed 
> based on currentBatchId because we may clean all the metadata in the 
> checkpoint directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24351) offsetLog/commitLog purge thresholdBatchId should be computed with current committed epoch but not currentBatchId in CP mode

2018-05-22 Thread huangtengfei (JIRA)
huangtengfei created SPARK-24351:


 Summary: offsetLog/commitLog purge thresholdBatchId should be 
computed with current committed epoch but not currentBatchId in CP mode
 Key: SPARK-24351
 URL: https://issues.apache.org/jira/browse/SPARK-24351
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: huangtengfei


In structured streaming, there is a conf spark.sql.streaming.minBatchesToRetain 
which is set to specify 'The minimum number of batches that must be retained 
and made recoverable' as described in 
[SQLConf](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L802).
 In continuous processing, the metadata purge is triggered when an epoch is 
committed in 
[ContinuousExecution](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala#L306).
 
Since currentBatchId increases independently in cp mode, the current committed 
epoch may be far behind currentBatchId if some task hangs for some time. It is 
not safe to discard the metadata with thresholdBatchId computed based on 
currentBatchId because we may clean all the metadata in the checkpoint 
directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23053) taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-02-08 Thread huangtengfei (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357952#comment-16357952
 ] 

huangtengfei commented on SPARK-23053:
--

the following is a repro case, for clarity 
{code:java}
/** Wrapped rdd partition. */
class WrappedPartition(val partition: Partition) extends Partition {
  def index: Int = partition.index
}

/**
 * An RDD with a particular defined Partition which is WrappedPartition.
 * The compute method will cast the split to WrappedPartition. The cast 
operation will be
 * used in this test suite.
 */
class WrappedRDD(parent: RDD[Int]) extends RDD[Int](parent) {
  protected def getPartitions: Array[Partition] = {
parent.partitions.map(p => new WrappedPartition(p))
  }

  def compute(split: Partition, context: TaskContext): Iterator[Int] = {
parent.compute(split.asInstanceOf[WrappedPartition].partition, context)
  }
}
{code}
{code:java}
/**
 * In this repro, we simulate the scene in concurrent jobs using the same
 * rdd which is marked to do checkpoint:
 * Job one has already finished the spark job, and start the process of 
doCheckpoint;
 * Job two is submitted, and submitMissingTasks is called.
 * In submitMissingTasks, if taskSerialization is called before doCheckpoint is 
done,
 * while part calculates from stage.rdd.partitions is called after doCheckpoint 
is done,
 * we may get a ClassCastException when execute the task because of some rdd 
will do
 * Partition cast.
 *
 * With this test case, just want to indicate that we should do 
taskSerialization and
 * part calculate in submitMissingTasks with the same rdd checkpoint status.
 */
repro("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
  // set checkpointDir.
  val checkpointDir = Utils.createTempDir()
  sc.setCheckpointDir(checkpointDir.toString)

  // Semaphores to control the process sequence for the two threads below.
  val doCheckpointStarted = new Semaphore(0)
  val taskBinaryBytesFinished = new Semaphore(0)
  val checkpointStateUpdated = new Semaphore(0)

  val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
  rdd.checkpoint()

  val checkpointRunnable = new Runnable {
override def run() = {
  // Simulate what RDD.doCheckpoint() does here.
  rdd.doCheckpointCalled = true
  val checkpointData = rdd.checkpointData.get
  RDDCheckpointData.synchronized {
if (checkpointData.cpState == CheckpointState.Initialized) {
  checkpointData.cpState = CheckpointState.CheckpointingInProgress
}
  }

  val newRDD = checkpointData.doCheckpoint()

  // Release doCheckpointStarted after job triggered in checkpoint 
finished, so
  // that taskBinary serialization can start.
  doCheckpointStarted.release()
  // Wait until taskBinary serialization finished in 
submitMissingTasksThread.
  taskBinaryBytesFinished.acquire()

  // Update our state and truncate the RDD lineage.
  RDDCheckpointData.synchronized {
checkpointData.cpRDD = Some(newRDD)
checkpointData.cpState = CheckpointState.Checkpointed
rdd.markCheckpointed()
  }
  checkpointStateUpdated.release()
}
  }

  val submitMissingTasksRunnable = new Runnable {
override def run() = {
  // Simulate the process of submitMissingTasks.
  // Wait until doCheckpoint job running finished, but checkpoint status 
not changed.
  doCheckpointStarted.acquire()

  val ser = SparkEnv.get.closureSerializer.newInstance()

  // Simulate task serialization while submitMissingTasks.
  // Task serialized with rdd checkpoint not finished.
  val cleanedFunc = sc.clean(Utils.getIteratorSize _)
  val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
  val taskBinaryBytes = JavaUtils.bufferToArray(
ser.serialize((rdd, func): AnyRef))
  // Because partition calculate is in a synchronized block, so in the 
fixed code
  // partition is calculated here.
  val correctPart = rdd.partitions(0)

  // Release taskBinaryBytesFinished so changing checkpoint status to 
Checkpointed will
  // be done in checkpointThread.
  taskBinaryBytesFinished.release()
  // Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
  checkpointStateUpdated.acquire()

  // Now we're done simulating the interleaving that might happen within 
the scheduler,
  // we'll check to make sure the final state is OK by simulating a couple 
steps that
  // normally happen on the executor.
  // Part calculated with rdd checkpoint already finished.
  val errPart = rdd.partitions(0)

  // TaskBinary will be deserialized when run task in executor.
  val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)

  val taskContext = 

[jira] [Commented] (SPARK-23053) taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-02-06 Thread huangtengfei (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353761#comment-16353761
 ] 

huangtengfei commented on SPARK-23053:
--

here is the stack trace of exception.

java.lang.ClassCastException: org.apache.spark.rdd.CheckpointRDDPartition 
cannot be cast to org.apache.spark.streaming.rdd.MapWithStateRDDPartition
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:152)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)

> taskBinarySerialization and task partitions calculate in 
> DagScheduler.submitMissingTasks should keep the same RDD checkpoint status
> ---
>
> Key: SPARK-23053
> URL: https://issues.apache.org/jira/browse/SPARK-23053
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: huangtengfei
>Priority: Major
>
> When we run concurrent jobs using the same rdd which is marked to do 
> checkpoint. If one job has finished running the job, and start the process of 
> RDD.doCheckpoint, while another job is submitted, then submitStage and 
> submitMissingTasks will be called. In 
> [submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961],
>  will serialize taskBinaryBytes and calculate task partitions which are both 
> affected by the status of checkpoint, if the former is calculated before 
> doCheckpoint finished, while the latter is calculated after doCheckpoint 
> finished, when run task, rdd.compute will be called, for some rdds with 
> particular partition type such as 
> [MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala]
>  who will do partition type cast, will get a ClassCastException because the 
> part params is actually a CheckpointRDDPartition.
> This error occurs because rdd.doCheckpoint occurs in the same thread that 
> called sc.runJob, while the task serialization occurs in the DAGSchedulers 
> event loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23053) taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-02-06 Thread huangtengfei (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353761#comment-16353761
 ] 

huangtengfei edited comment on SPARK-23053 at 2/6/18 11:48 AM:
---

here is the stack trace of exception.

{code:java}
java.lang.ClassCastException: org.apache.spark.rdd.CheckpointRDDPartition 
cannot be cast to org.apache.spark.streaming.rdd.MapWithStateRDDPartition
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:152)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
{code}



was (Author: ivoson):
here is the stack trace of exception.

java.lang.ClassCastException: org.apache.spark.rdd.CheckpointRDDPartition 
cannot be cast to org.apache.spark.streaming.rdd.MapWithStateRDDPartition
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:152)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)

> taskBinarySerialization and task partitions calculate in 
> DagScheduler.submitMissingTasks should keep the same RDD checkpoint status
> ---
>
> Key: SPARK-23053
> URL: https://issues.apache.org/jira/browse/SPARK-23053
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: huangtengfei
>Priority: Major
>
> When we run concurrent jobs using the same rdd which is marked to do 
> checkpoint. If one job has finished running the job, and start the process of 
> RDD.doCheckpoint, while another job is submitted, then submitStage and 
> submitMissingTasks will be called. In 
> [submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961],
>  will serialize taskBinaryBytes and calculate task partitions which are both 
> affected by the status of checkpoint, if the former is calculated before 
> doCheckpoint finished, while the latter is calculated after doCheckpoint 
> finished, when run task, rdd.compute will be called, for some rdds with 
> particular partition type such as 
> [MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala]
>  who will do partition type cast, will get a ClassCastException because the 
> part params is actually a CheckpointRDDPartition.
> This error occurs because rdd.doCheckpoint occurs in the same thread that 
> called sc.runJob, while the task serialization occurs in the DAGSchedulers 
> event loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23053) taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-02-06 Thread huangtengfei (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangtengfei updated SPARK-23053:
-
Description: 
When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961],
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala]
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.
This error occurs because rdd.doCheckpoint occurs in the same thread that 
called sc.runJob, while the task serialization occurs in the DAGSchedulers 
event loop.

  was:When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961],
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala]
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.


> taskBinarySerialization and task partitions calculate in 
> DagScheduler.submitMissingTasks should keep the same RDD checkpoint status
> ---
>
> Key: SPARK-23053
> URL: https://issues.apache.org/jira/browse/SPARK-23053
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: huangtengfei
>Priority: Major
>
> When we run concurrent jobs using the same rdd which is marked to do 
> checkpoint. If one job has finished running the job, and start the process of 
> RDD.doCheckpoint, while another job is submitted, then submitStage and 
> submitMissingTasks will be called. In 
> [submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961],
>  will serialize taskBinaryBytes and calculate task partitions which are both 
> affected by the status of checkpoint, if the former is calculated before 
> doCheckpoint finished, while the latter is calculated after doCheckpoint 
> finished, when run task, rdd.compute will be called, for some rdds with 
> particular partition type such as 
> [MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala]
>  who will do partition type cast, will get a ClassCastException because the 
> part params is actually a CheckpointRDDPartition.
> This error occurs because rdd.doCheckpoint occurs in the same thread that 
> called sc.runJob, while the task serialization occurs in the DAGSchedulers 
> event loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23053) taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-01-11 Thread huangtengfei (JIRA)
huangtengfei created SPARK-23053:


 Summary: taskBinarySerialization and task partitions calculate in 
DagScheduler.submitMissingTasks should keep the same RDD checkpoint status
 Key: SPARK-23053
 URL: https://issues.apache.org/jira/browse/SPARK-23053
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.1.0
Reporter: huangtengfei


When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961],
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala]
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org