[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17958 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/17958#discussion_r116296620 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider( /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") + try { +state = ABORTED +if (tempDeltaFileStream != null) { + tempDeltaFileStream.close() +} +if (tempDeltaFile != null) { + fs.delete(tempDeltaFile, true) +} + } catch { +case c: ClosedChannelException => + // This can happen when underlying file output stream has been closed before the + // compression stream. + logDebug(s"Error aborting version $newVersion into $this", c) - state = ABORTED - if (tempDeltaFileStream != null) { -tempDeltaFileStream.close() - } - if (tempDeltaFile != null) { -fs.delete(tempDeltaFile, true) +case e: Exception => + logWarning(s"Error aborting version $newVersion into $this") --- End diff -- Dumb mistake. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17958#discussion_r116292013 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider( /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") + try { +state = ABORTED +if (tempDeltaFileStream != null) { + tempDeltaFileStream.close() +} +if (tempDeltaFile != null) { + fs.delete(tempDeltaFile, true) +} + } catch { +case c: ClosedChannelException => --- End diff -- Gotcha! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17958#discussion_r116289648 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider( /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") + try { +state = ABORTED +if (tempDeltaFileStream != null) { + tempDeltaFileStream.close() +} +if (tempDeltaFile != null) { + fs.delete(tempDeltaFile, true) +} + } catch { +case c: ClosedChannelException => --- End diff -- Maybe it should be a `warning`? In this case, the task will fail and it hurts nothing to output a warning but will be helpful when we have other issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/17958#discussion_r116288688 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider( /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") + try { +state = ABORTED +if (tempDeltaFileStream != null) { + tempDeltaFileStream.close() +} +if (tempDeltaFile != null) { + fs.delete(tempDeltaFile, true) +} + } catch { +case c: ClosedChannelException => --- End diff -- Its debug though for the expected case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17958#discussion_r116288183 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider( /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") + try { +state = ABORTED +if (tempDeltaFileStream != null) { + tempDeltaFileStream.close() +} +if (tempDeltaFile != null) { + fs.delete(tempDeltaFile, true) +} + } catch { +case c: ClosedChannelException => --- End diff -- Why need two `case`s? The error message is same, and the exception is also in the log. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/17958#discussion_r116283013 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider( /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") + try { +state = ABORTED +if (tempDeltaFileStream != null) { + tempDeltaFileStream.close() +} +if (tempDeltaFile != null) { + fs.delete(tempDeltaFile, true) +} + } catch { +case c: ClosedChannelException => + // This can happen when underlying file output stream has been closed before the + // compression stream. + logDebug(s"Error aborting version $newVersion into $this", c) - state = ABORTED - if (tempDeltaFileStream != null) { -tempDeltaFileStream.close() - } - if (tempDeltaFile != null) { -fs.delete(tempDeltaFile, true) +case e: Exception => + logWarning(s"Error aborting version $newVersion into $this") --- End diff -- Include the exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/17958 [SPARK-20716][SS] StateStore.abort() should not throw exceptions ## What changes were proposed in this pull request? StateStore.abort() should do a best effort attempt to clean up temporary resources. It should not throw errors, especially because its called in a TaskCompletionListener, because this error could hide previous real errors in the task. ## How was this patch tested? No unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-20716 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17958.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17958 commit e10101eafe2329031d079977ab3f3e0aaee98908 Author: Tathagata Das Date: 2017-05-12T03:21:54Z Ignored exceptions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org