[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

2022-06-07 Thread GitBox


cloud-fan commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r891876268


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 testUtils.sendMessages(topic2, Array("6"))
   },
   StartStream(),
-  ExpectFailure[IllegalStateException](e => {
+  ExpectFailure[SparkException](e => {
+assert(e.asInstanceOf[SparkThrowable].getErrorClass === 
"INTERNAL_ERROR")
 // The offset of `topic2` should be changed from 2 to 1
-assert(e.getMessage.contains("was changed from 2 to 1"))
+assert(e.getCause.getMessage.contains("was changed from 2 to 1"))

Review Comment:
   The error is simply wrapped and the original error is kept in causedBy, so 
this is not a big problem. The bigger problem is how to define internal errors. 
We picked some common java exceptions that can be treated as internal errors: 
when end users see these errors from Spark, it indicates a bug.
   
   For third-party Spark plugins, I think all errors from them are internal 
errors? end-users can do nothing to work around these errors. cc @srielau 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

2022-06-07 Thread GitBox


cloud-fan commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r891840233


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 testUtils.sendMessages(topic2, Array("6"))
   },
   StartStream(),
-  ExpectFailure[IllegalStateException](e => {
+  ExpectFailure[SparkException](e => {
+assert(e.asInstanceOf[SparkThrowable].getErrorClass === 
"INTERNAL_ERROR")
 // The offset of `topic2` should be changed from 2 to 1
-assert(e.getMessage.contains("was changed from 2 to 1"))
+assert(e.getCause.getMessage.contains("was changed from 2 to 1"))

Review Comment:
   If Spark is a simple Java library, then this is fine. The caller should 
handle these errors property when building a product.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

2022-06-07 Thread GitBox


cloud-fan commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r891839935


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 testUtils.sendMessages(topic2, Array("6"))
   },
   StartStream(),
-  ExpectFailure[IllegalStateException](e => {
+  ExpectFailure[SparkException](e => {
+assert(e.asInstanceOf[SparkThrowable].getErrorClass === 
"INTERNAL_ERROR")
 // The offset of `topic2` should be changed from 2 to 1
-assert(e.getMessage.contains("was changed from 2 to 1"))
+assert(e.getCause.getMessage.contains("was changed from 2 to 1"))

Review Comment:
   > Signals that a method has been invoked at an illegal or inappropriate time.
   
   This means a bug in any product, right? We should not expose these low-level 
errors to end-users, same to NPE, index out of bounds error, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

2022-06-07 Thread GitBox


cloud-fan commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r891224222


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 testUtils.sendMessages(topic2, Array("6"))
   },
   StartStream(),
-  ExpectFailure[IllegalStateException](e => {
+  ExpectFailure[SparkException](e => {
+assert(e.asInstanceOf[SparkThrowable].getErrorClass === 
"INTERNAL_ERROR")
 // The offset of `topic2` should be changed from 2 to 1
-assert(e.getMessage.contains("was changed from 2 to 1"))
+assert(e.getCause.getMessage.contains("was changed from 2 to 1"))

Review Comment:
   It's a convention in Spark (and many other Java programs) that 
`IllegalStateException` means a bug. If this is not a bug, I think streaming 
code should not throw `IllegalStateException` in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

2022-06-07 Thread GitBox


cloud-fan commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r891099418


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 testUtils.sendMessages(topic2, Array("6"))
   },
   StartStream(),
-  ExpectFailure[IllegalStateException](e => {
+  ExpectFailure[SparkException](e => {
+assert(e.asInstanceOf[SparkThrowable].getErrorClass === 
"INTERNAL_ERROR")
 // The offset of `topic2` should be changed from 2 to 1
-assert(e.getMessage.contains("was changed from 2 to 1"))
+assert(e.getCause.getMessage.contains("was changed from 2 to 1"))

Review Comment:
   I'm looking into this test and feels like this should be a user-facing error 
as users can trigger it with valid queries. @HeartSaVioR what do you think?



##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 testUtils.sendMessages(topic2, Array("6"))
   },
   StartStream(),
-  ExpectFailure[IllegalStateException](e => {
+  ExpectFailure[SparkException](e => {
+assert(e.asInstanceOf[SparkThrowable].getErrorClass === 
"INTERNAL_ERROR")
 // The offset of `topic2` should be changed from 2 to 1
-assert(e.getMessage.contains("was changed from 2 to 1"))
+assert(e.getCause.getMessage.contains("was changed from 2 to 1"))

Review Comment:
   I'm looking into this test and feel like this should be a user-facing error 
as users can trigger it with valid queries. @HeartSaVioR what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

2022-06-01 Thread GitBox


cloud-fan commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r886731669


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala:
##
@@ -319,7 +320,8 @@ abstract class StreamExecution(
 // This is a workaround for HADOOP-12074: `Shell.runCommand` converts 
`InterruptedException`
 // to `new IOException(ie.toString())` before Hadoop 2.8.
 updateStatusMessage("Stopped")
-  case e: Throwable =>
+  case t: Throwable =>
+val e = QueryExecution.toInternalError(msg = s"Execution of the stream 
$name failed.", t)

Review Comment:
   do you know why v2 streaming sink doesn't need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

2022-05-31 Thread GitBox


cloud-fan commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r886278462


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -666,10 +669,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 testUtils.sendMessages(topic2, Array("6"))
   },
   StartStream(),
-  ExpectFailure[IllegalStateException](e => {
-// The offset of `topic2` should be changed from 2 to 1
-assert(e.getMessage.contains("was changed from 2 to 1"))
-  })
+  expectOffsetChange()

Review Comment:
   well, if we build a product based on Spark, this is still an internal error 
of the product.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

2022-05-31 Thread GitBox


cloud-fan commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r885677872


##
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##
@@ -486,4 +489,22 @@ object QueryExecution {
 val preparationRules = preparations(session, 
Option(InsertAdaptiveSparkPlan(context)), true)
 prepareForExecution(preparationRules, sparkPlan.clone())
   }
+
+  /**
+   * Catches asserts and illegal state exceptions, and converts them to 
internal errors.
+   */
+  private[sql] def withInternalError[T](msg: String)(block: => T): T = {
+try {
+  block
+} catch {
+  case e: SparkThrowable => throw e
+  case e @ (_: java.lang.IllegalStateException | _: 
java.lang.AssertionError) =>

Review Comment:
   shall we catch a bit more exceptions? I have one in my mind: 
`NullPointerException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

2022-05-31 Thread GitBox


cloud-fan commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r885675714


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -1376,6 +1376,13 @@ class KafkaMicroBatchV1SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
   classOf[KafkaSourceProvider].getCanonicalName)
   }
 
+  override def expectOffsetChange(): ExpectFailure[_] = {
+ExpectFailure[IllegalStateException](e => {

Review Comment:
   can we look into the stacktrace and see if we can catch the internal error 
for streaming v1 as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

2022-05-31 Thread GitBox


cloud-fan commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r885674491


##
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##
@@ -486,4 +489,22 @@ object QueryExecution {
 val preparationRules = preparations(session, 
Option(InsertAdaptiveSparkPlan(context)), true)
 prepareForExecution(preparationRules, sparkPlan.clone())
   }
+
+  /**
+   * Catches asserts and illegal state exceptions, and converts them to 
internal errors.
+   */
+  private[sql] def withInternalError[T](msg: String)(block: => T): T = {
+try {
+  block
+} catch {
+  case e: SparkThrowable => throw e

Review Comment:
   do we need this line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

2022-05-31 Thread GitBox


cloud-fan commented on code in PR #36704:
URL: https://github.com/apache/spark/pull/36704#discussion_r885671871


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -1401,6 +1408,14 @@ class KafkaMicroBatchV1SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
 
 class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
 
+  override def expectOffsetChange(): ExpectFailure[_] = {
+ExpectFailure[SparkException](e => {

Review Comment:
   ```suggestion
   ExpectFailure[SparkThrowable](e => {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org