[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2018-07-16 Thread mgaido91
Github user mgaido91 closed the pull request at:

https://github.com/apache/spark/pull/18329


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123705179
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -235,6 +239,21 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 "write files of Hive data source directly.")
 }
 
+val hadoopConf = df.sparkSession.sessionState.newHadoopConf()
+val defaultFS = FileSystem.getDefaultUri(hadoopConf).getScheme
+val tmpFS = new URI(System.getProperty("java.io.tmpdir")).getScheme
--- End diff --

The change looks fine to me :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123702984
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -235,6 +239,21 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 "write files of Hive data source directly.")
 }
 
+val hadoopConf = df.sparkSession.sessionState.newHadoopConf()
+val defaultFS = FileSystem.getDefaultUri(hadoopConf).getScheme
+val tmpFS = new URI(System.getProperty("java.io.tmpdir")).getScheme
--- End diff --

@jerryshao thanks for the hints, I made all the changes you pointed out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123698342
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -235,6 +239,21 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 "write files of Hive data source directly.")
 }
 
+val hadoopConf = df.sparkSession.sessionState.newHadoopConf()
+val defaultFS = FileSystem.getDefaultUri(hadoopConf).getScheme
+val tmpFS = new URI(System.getProperty("java.io.tmpdir")).getScheme
--- End diff --

Can you please use `Utils.resolveURI()` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123698373
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -17,17 +17,21 @@
 
 package org.apache.spark.sql.streaming
 
+import java.net.URI
 import java.util.Locale
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.fs.FileSystem
+
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
 
+
--- End diff --

Remove this blank line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123698894
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -553,6 +554,32 @@ class StreamSuite extends StreamTest {
 }
   }
 
+  test("SPARK-19909: if the checkpoint location is not set and the default 
filesystem " +
+"is different from the java.io.tmp one an AnalysisException should be 
thrown") {
+
+val defaultFS = spark.conf.getOption(FileSystem.FS_DEFAULT_NAME_KEY)
+spark.conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs:///")
+
+val input = MemoryStream[Int]
+var streamingQuery: StreamingQuery = null
+input.addData(1)
+try {
+  intercept[AnalysisException](
+streamingQuery = input.toDF().writeStream.format("console").start()
+  )
+} finally {
+  if (streamingQuery ne null) {
--- End diff --

Can you please change to `streamingQuery != null`? Spark seldom uses `ne`, 
we'd better follow the convention.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-22 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123458885
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -235,6 +239,21 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 "write files of Hive data source directly.")
 }
 
+val hadoopConf = df.sparkSession.sessionState.newHadoopConf()
+val defaultFS = FileSystem.getDefaultUri(hadoopConf).getScheme
+val tmpFS = new URI(System.getProperty("java.io.tmpdir")).getScheme
+
+val isTempCheckpointLocationAvailable = tmpFS match {
+  case null | "file" =>
+if (defaultFS == null || defaultFS.equals("file")) {
+  true
+} else {
+  false
+}
+  case defaultFS => true
--- End diff --

I think this situation is unlikely but if it happens I guess it is not bad 
to manage it in this way, i.e. let everything as it was before the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-22 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123455655
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -264,12 +281,12 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 df,
 sink,
 outputMode,
-useTempCheckpointLocation = true,
+useTempCheckpointLocation = isTempCheckpointLocationAvailable,
 trigger = trigger)
 } else {
   val (useTempCheckpointLocation, recoverFromCheckpointLocation) =
 if (source == "console") {
-  (true, false)
+  (isTempCheckpointLocationAvailable, false)
--- End diff --

Yes, you are right. Because if they are different, now you would get the 
hard-to-understand `PermissionDenied` exception, which gives you no hint about 
how to solve the problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-22 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123454797
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -235,6 +239,21 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 "write files of Hive data source directly.")
 }
 
+val hadoopConf = df.sparkSession.sessionState.newHadoopConf()
+val defaultFS = FileSystem.getDefaultUri(hadoopConf).getScheme
+val tmpFS = new URI(System.getProperty("java.io.tmpdir")).getScheme
+
+val isTempCheckpointLocationAvailable = tmpFS match {
+  case null | "file" =>
+if (defaultFS == null || defaultFS.equals("file")) {
+  true
+} else {
+  false
+}
+  case defaultFS => true
--- End diff --

Then shall we rule this out?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-22 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123453429
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -264,12 +281,12 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 df,
 sink,
 outputMode,
-useTempCheckpointLocation = true,
+useTempCheckpointLocation = isTempCheckpointLocationAvailable,
 trigger = trigger)
 } else {
   val (useTempCheckpointLocation, recoverFromCheckpointLocation) =
 if (source == "console") {
-  (true, false)
+  (isTempCheckpointLocationAvailable, false)
--- End diff --

So your proposal is that only when defaultFs and tmpFs are all local fs, 
then `TempCheckpointLocation` is meaningful, am I understanding right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-22 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123451639
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -235,6 +239,21 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 "write files of Hive data source directly.")
 }
 
+val hadoopConf = df.sparkSession.sessionState.newHadoopConf()
+val defaultFS = FileSystem.getDefaultUri(hadoopConf).getScheme
+val tmpFS = new URI(System.getProperty("java.io.tmpdir")).getScheme
+
+val isTempCheckpointLocationAvailable = tmpFS match {
+  case null | "file" =>
+if (defaultFS == null || defaultFS.equals("file")) {
+  true
+} else {
+  false
+}
+  case defaultFS => true
--- End diff --

yes, but I think this situation is unlikely. More likely `tmpFs` will 
always be the local filesystem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-22 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123448787
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -235,6 +239,21 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 "write files of Hive data source directly.")
 }
 
+val hadoopConf = df.sparkSession.sessionState.newHadoopConf()
+val defaultFS = FileSystem.getDefaultUri(hadoopConf).getScheme
+val tmpFS = new URI(System.getProperty("java.io.tmpdir")).getScheme
+
+val isTempCheckpointLocationAvailable = tmpFS match {
+  case null | "file" =>
+if (defaultFS == null || defaultFS.equals("file")) {
+  true
+} else {
+  false
+}
+  case defaultFS => true
--- End diff --

So @mgaido91 based on the logics here, if `tmpFs` and `defaultFs` are both 
HDFS, this will return `true`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-22 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123445514
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -264,12 +281,12 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 df,
 sink,
 outputMode,
-useTempCheckpointLocation = true,
+useTempCheckpointLocation = isTempCheckpointLocationAvailable,
 trigger = trigger)
 } else {
   val (useTempCheckpointLocation, recoverFromCheckpointLocation) =
 if (source == "console") {
-  (true, false)
+  (isTempCheckpointLocationAvailable, false)
--- End diff --

Well, actually I don't think "java.io.tmpdir" will ever be on a filesystem 
different from the local one. But, the other PR forces the metadata to be 
written on the local filesystem, despite the default one is different (for 
instance it can be HDFS).
This means that in a distributed environment, which should be fault 
tolerant, with that patch  if a node fails we loose the metadata. Since one of 
the involved sink is the `foreach` one, which can be used to write the data 
somewhere (for example HBase or Kafka), I think that forcing the user to 
specify a `checkpointLocation` which is created on the `defaultFs` (in this 
case HDFS) would be a better option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-22 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123442164
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -264,12 +281,12 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 df,
 sink,
 outputMode,
-useTempCheckpointLocation = true,
+useTempCheckpointLocation = isTempCheckpointLocationAvailable,
 trigger = trigger)
 } else {
   val (useTempCheckpointLocation, recoverFromCheckpointLocation) =
 if (source == "console") {
-  (true, false)
+  (isTempCheckpointLocationAvailable, false)
--- End diff --

Thanks for the clarification. I saw there's another PR fixed this issue and 
force tmp dir to be local dir. But in your case tmp dir can also be hdfs 
folder, since tmp dir is defined by "java.io.tmpdir", so do you mean that user 
will override this system property?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-22 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123440321
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -17,8 +17,10 @@
 
 package org.apache.spark.sql.streaming
 
+import java.net.URI
 import java.util.Locale
 
+import org.apache.hadoop.fs.FileSystem
--- End diff --

@jerryshao thanks for your comment, my IDE showed it fine, I don't know 
why. I'm fixing it, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-22 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123440074
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -264,12 +281,12 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 df,
 sink,
 outputMode,
-useTempCheckpointLocation = true,
+useTempCheckpointLocation = isTempCheckpointLocationAvailable,
 trigger = trigger)
 } else {
   val (useTempCheckpointLocation, recoverFromCheckpointLocation) =
 if (source == "console") {
-  (true, false)
+  (isTempCheckpointLocationAvailable, false)
--- End diff --

@jerryshao whether `useTempCheckpointLocation` is `true` or `false` is 
still based on the type of Sink, but there is an additional constraint: the 
`defaultFs` must be the same as the `tmpFs`. This is the reason of this patch. 
Indeed, otherwise the metadata whose path is based on the checkpoint directory 
will be tried to be created on `defaultFs`.
If it is different from the `tmpFs`, you get a `PermissionDenied` exception 
which is actually caused by the fact that the temp dir is created on the 
`tmpFs` and not on the `defaultFs`. With this PR, instead, you will get an 
`AnalysisException` telling you to set a `checkpointLocation`, which is a much 
more meaningful exception, easy to understand and to fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-21 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123418793
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -264,12 +281,12 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 df,
 sink,
 outputMode,
-useTempCheckpointLocation = true,
+useTempCheckpointLocation = isTempCheckpointLocationAvailable,
 trigger = trigger)
 } else {
   val (useTempCheckpointLocation, recoverFromCheckpointLocation) =
 if (source == "console") {
-  (true, false)
+  (isTempCheckpointLocationAvailable, false)
--- End diff --

@mgaido91 AFAIK whether `useTempCheckpointLocation` is `true` or `false` is 
based on the type of `Sink`, here with your change, now the semantics are 
changing to wether `tmpFs` equals `defaultFs` or `defaultFs` is local FS. So 
looks like the semantics are different now. I'm not if it is a valid fix.

@zsxwing would you please help to review this patch? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-21 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123417606
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -235,6 +237,21 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 "write files of Hive data source directly.")
 }
 
+val hadoopConf = df.sparkSession.sessionState.newHadoopConf()
+val defaultFS = FileSystem.getDefaultUri(hadoopConf).getScheme
+val tmpFS = new URI(System.getProperty("java.io.tmpdir")).getScheme
+
+val isTempCheckpointLocationAvailable = tmpFS match {
+  case null | "file" =>
+if (defaultFS == null || defaultFS.equals("file")) {
+  true
+} else {
+  false
+}
+  case defaultFS => true
--- End diff --

@mgaido91 , if defaultFS is HDFS, in which scenario `tmpFS` 
("java.io.tmpdir") be the same as `defaultFS`? From my understanding, unless we 
change "java.io.tmpdir" property, then `tmpFs` will always be local FS.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-21 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/18329#discussion_r123416837
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -17,8 +17,10 @@
 
 package org.apache.spark.sql.streaming
 
+import java.net.URI
 import java.util.Locale
 
+import org.apache.hadoop.fs.FileSystem
--- End diff --

The import ordering is not correct, third-party packages should be put 
under Scala packages. You can locally verify the style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18329: [SPARK-19909][SS] Disabling the usage of a tempor...

2017-06-16 Thread mgaido91
GitHub user mgaido91 opened a pull request:

https://github.com/apache/spark/pull/18329

[SPARK-19909][SS] Disabling the usage of a temporary directory for the 
checkpoint location if the temporary directory is on a filesystem different 
from the default one.

## What changes were proposed in this pull request?

As stated in 
[SPARK-19909](https://issues.apache.org/jira/browse/SPARK-19909), if the 
`checkpointLocation` is not set, for some formats Spark falls back using a 
directory in the `java.io.tmp` dir to store the metadata. This fails with a 
weird exception (permission denied) if the default filesystem is different from 
the temp one. This is the case if the default filesystem is HDFS, for instance, 
and the temp directory is written on the local filesystem. 

The change proposed in this PR disables the usage of the temp directory in 
this case. Thus, a meaningful `AnalysisException` is thrown, suggesting the 
proper fix to the user, i.e. to set a value for `checkpointLocation`, instead 
of a `PermissionDenied` exception which is hard to understand and fix.

This behavior is consistent with what happens in all the cases in which the 
`checkpointLocation` needs to be set.

## How was this patch tested?

A unit test was added to check the patch and all the unit tests have been 
run.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mgaido91/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18329.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18329


commit 1fc4a954c08296ed24ca90b7dde0f1cfaedf572f
Author: Marco Gaido 
Date:   2017-06-16T14:22:25Z

[SPARK-19909][SS] Disabling the usage of a temporary directory for the 
checkpoint location if the temporary directory is on a filesystem different 
from the default one.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org