[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r891876268 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { + ExpectFailure[SparkException](e => { +assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") // The offset of `topic2` should be changed from 2 to 1 -assert(e.getMessage.contains("was changed from 2 to 1")) +assert(e.getCause.getMessage.contains("was changed from 2 to 1")) Review Comment: The error is simply wrapped and the original error is kept in causedBy, so this is not a big problem. The bigger problem is how to define internal errors. We picked some common java exceptions that can be treated as internal errors: when end users see these errors from Spark, it indicates a bug. For third-party Spark plugins, I think all errors from them are internal errors? end-users can do nothing to work around these errors. cc @srielau -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r891840233 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { + ExpectFailure[SparkException](e => { +assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") // The offset of `topic2` should be changed from 2 to 1 -assert(e.getMessage.contains("was changed from 2 to 1")) +assert(e.getCause.getMessage.contains("was changed from 2 to 1")) Review Comment: If Spark is a simple Java library, then this is fine. The caller should handle these errors property when building a product. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r891839935 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { + ExpectFailure[SparkException](e => { +assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") // The offset of `topic2` should be changed from 2 to 1 -assert(e.getMessage.contains("was changed from 2 to 1")) +assert(e.getCause.getMessage.contains("was changed from 2 to 1")) Review Comment: > Signals that a method has been invoked at an illegal or inappropriate time. This means a bug in any product, right? We should not expose these low-level errors to end-users, same to NPE, index out of bounds error, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r891224222 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { + ExpectFailure[SparkException](e => { +assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") // The offset of `topic2` should be changed from 2 to 1 -assert(e.getMessage.contains("was changed from 2 to 1")) +assert(e.getCause.getMessage.contains("was changed from 2 to 1")) Review Comment: It's a convention in Spark (and many other Java programs) that `IllegalStateException` means a bug. If this is not a bug, I think streaming code should not throw `IllegalStateException` in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r891099418 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { + ExpectFailure[SparkException](e => { +assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") // The offset of `topic2` should be changed from 2 to 1 -assert(e.getMessage.contains("was changed from 2 to 1")) +assert(e.getCause.getMessage.contains("was changed from 2 to 1")) Review Comment: I'm looking into this test and feels like this should be a user-facing error as users can trigger it with valid queries. @HeartSaVioR what do you think? ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { + ExpectFailure[SparkException](e => { +assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") // The offset of `topic2` should be changed from 2 to 1 -assert(e.getMessage.contains("was changed from 2 to 1")) +assert(e.getCause.getMessage.contains("was changed from 2 to 1")) Review Comment: I'm looking into this test and feel like this should be a user-facing error as users can trigger it with valid queries. @HeartSaVioR what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r886731669 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala: ## @@ -319,7 +320,8 @@ abstract class StreamExecution( // This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException` // to `new IOException(ie.toString())` before Hadoop 2.8. updateStatusMessage("Stopped") - case e: Throwable => + case t: Throwable => +val e = QueryExecution.toInternalError(msg = s"Execution of the stream $name failed.", t) Review Comment: do you know why v2 streaming sink doesn't need this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r886278462 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -666,10 +669,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { -// The offset of `topic2` should be changed from 2 to 1 -assert(e.getMessage.contains("was changed from 2 to 1")) - }) + expectOffsetChange() Review Comment: well, if we build a product based on Spark, this is still an internal error of the product. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885677872 ## sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala: ## @@ -486,4 +489,22 @@ object QueryExecution { val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true) prepareForExecution(preparationRules, sparkPlan.clone()) } + + /** + * Catches asserts and illegal state exceptions, and converts them to internal errors. + */ + private[sql] def withInternalError[T](msg: String)(block: => T): T = { +try { + block +} catch { + case e: SparkThrowable => throw e + case e @ (_: java.lang.IllegalStateException | _: java.lang.AssertionError) => Review Comment: shall we catch a bit more exceptions? I have one in my mind: `NullPointerException` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885675714 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -1376,6 +1376,13 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { classOf[KafkaSourceProvider].getCanonicalName) } + override def expectOffsetChange(): ExpectFailure[_] = { +ExpectFailure[IllegalStateException](e => { Review Comment: can we look into the stacktrace and see if we can catch the internal error for streaming v1 as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885674491 ## sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala: ## @@ -486,4 +489,22 @@ object QueryExecution { val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true) prepareForExecution(preparationRules, sparkPlan.clone()) } + + /** + * Catches asserts and illegal state exceptions, and converts them to internal errors. + */ + private[sql] def withInternalError[T](msg: String)(block: => T): T = { +try { + block +} catch { + case e: SparkThrowable => throw e Review Comment: do we need this line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885671871 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -1401,6 +1408,14 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { + override def expectOffsetChange(): ExpectFailure[_] = { +ExpectFailure[SparkException](e => { Review Comment: ```suggestion ExpectFailure[SparkThrowable](e => { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org