[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...

2017-05-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17958


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...

2017-05-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17958#discussion_r116296620
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider(
 /** Abort all the updates made on this store. This store will not be 
usable any more. */
 override def abort(): Unit = {
   verify(state == UPDATING || state == ABORTED, "Cannot abort after 
already committed")
+  try {
+state = ABORTED
+if (tempDeltaFileStream != null) {
+  tempDeltaFileStream.close()
+}
+if (tempDeltaFile != null) {
+  fs.delete(tempDeltaFile, true)
+}
+  } catch {
+case c: ClosedChannelException =>
+  // This can happen when underlying file output stream has been 
closed before the
+  // compression stream.
+  logDebug(s"Error aborting version $newVersion into $this", c)
 
-  state = ABORTED
-  if (tempDeltaFileStream != null) {
-tempDeltaFileStream.close()
-  }
-  if (tempDeltaFile != null) {
-fs.delete(tempDeltaFile, true)
+case e: Exception =>
+  logWarning(s"Error aborting version $newVersion into $this")
--- End diff --

Dumb mistake.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...

2017-05-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17958#discussion_r116292013
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider(
 /** Abort all the updates made on this store. This store will not be 
usable any more. */
 override def abort(): Unit = {
   verify(state == UPDATING || state == ABORTED, "Cannot abort after 
already committed")
+  try {
+state = ABORTED
+if (tempDeltaFileStream != null) {
+  tempDeltaFileStream.close()
+}
+if (tempDeltaFile != null) {
+  fs.delete(tempDeltaFile, true)
+}
+  } catch {
+case c: ClosedChannelException =>
--- End diff --

Gotcha!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...

2017-05-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17958#discussion_r116289648
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider(
 /** Abort all the updates made on this store. This store will not be 
usable any more. */
 override def abort(): Unit = {
   verify(state == UPDATING || state == ABORTED, "Cannot abort after 
already committed")
+  try {
+state = ABORTED
+if (tempDeltaFileStream != null) {
+  tempDeltaFileStream.close()
+}
+if (tempDeltaFile != null) {
+  fs.delete(tempDeltaFile, true)
+}
+  } catch {
+case c: ClosedChannelException =>
--- End diff --

Maybe it should be a `warning`? In this case, the task will fail and it 
hurts nothing to output a warning but will be helpful when we have other issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...

2017-05-12 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/17958#discussion_r116288688
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider(
 /** Abort all the updates made on this store. This store will not be 
usable any more. */
 override def abort(): Unit = {
   verify(state == UPDATING || state == ABORTED, "Cannot abort after 
already committed")
+  try {
+state = ABORTED
+if (tempDeltaFileStream != null) {
+  tempDeltaFileStream.close()
+}
+if (tempDeltaFile != null) {
+  fs.delete(tempDeltaFile, true)
+}
+  } catch {
+case c: ClosedChannelException =>
--- End diff --

Its debug though for the expected case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...

2017-05-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17958#discussion_r116288183
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider(
 /** Abort all the updates made on this store. This store will not be 
usable any more. */
 override def abort(): Unit = {
   verify(state == UPDATING || state == ABORTED, "Cannot abort after 
already committed")
+  try {
+state = ABORTED
+if (tempDeltaFileStream != null) {
+  tempDeltaFileStream.close()
+}
+if (tempDeltaFile != null) {
+  fs.delete(tempDeltaFile, true)
+}
+  } catch {
+case c: ClosedChannelException =>
--- End diff --

Why need two `case`s? The error message is same, and the exception is also 
in the log.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...

2017-05-12 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/17958#discussion_r116283013
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider(
 /** Abort all the updates made on this store. This store will not be 
usable any more. */
 override def abort(): Unit = {
   verify(state == UPDATING || state == ABORTED, "Cannot abort after 
already committed")
+  try {
+state = ABORTED
+if (tempDeltaFileStream != null) {
+  tempDeltaFileStream.close()
+}
+if (tempDeltaFile != null) {
+  fs.delete(tempDeltaFile, true)
+}
+  } catch {
+case c: ClosedChannelException =>
+  // This can happen when underlying file output stream has been 
closed before the
+  // compression stream.
+  logDebug(s"Error aborting version $newVersion into $this", c)
 
-  state = ABORTED
-  if (tempDeltaFileStream != null) {
-tempDeltaFileStream.close()
-  }
-  if (tempDeltaFile != null) {
-fs.delete(tempDeltaFile, true)
+case e: Exception =>
+  logWarning(s"Error aborting version $newVersion into $this")
--- End diff --

Include the exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17958: [SPARK-20716][SS] StateStore.abort() should not t...

2017-05-11 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/17958

[SPARK-20716][SS] StateStore.abort() should not throw exceptions

## What changes were proposed in this pull request?

StateStore.abort() should do a best effort attempt to clean up temporary 
resources. It should not throw errors, especially because its called in a 
TaskCompletionListener, because this error could hide previous real errors in 
the task.

## How was this patch tested?
No unit test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-20716

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17958.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17958


commit e10101eafe2329031d079977ab3f3e0aaee98908
Author: Tathagata Das 
Date:   2017-05-12T03:21:54Z

Ignored exceptions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org