[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216771752 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- sure Sean. Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216732765 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- OK. If so then there isn't a case here that we know of where something worked, and now doesn't. That's good. This change in behavior is a positive consequence of the wildcard change then. A summary of this change too should be in the release notes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216728326 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- Spark before change: "test file.csv" does not work - yes "test%20file.csv" works? or fails to load? I see two statements here load command successful but no data is shown while querying which means the use-case is not working properly Spark after change: "test file.csv" works - Yes "test%20file.csv" does not work - Yes Hive behaviour "test file.csv" works - Yes "test%20file.csv" does not work - Yes Our new changes are inline with Hive. --- ---
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216703781 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- Let's break this down: Spark before change: - "test\ file.csv" does not work - "test%20file.csv" works? or fails to load? I see two statements here Spark after change: - "test\ file.csv" works - "test%20file.csv" does not work What is Hive's behavior? I assume it's the latter? If so, then I think this change is properly viewed as a bug fix. But if "test%20file.csv" succeeded before, it's more complicated: inconsistent with Hive, but worked. --- --
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216694466 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- gatorsmile is mentioning about this change to be mentioned in Migration guide --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216693375 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- i mean in older versions compare to the latest --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216693154 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- if we give %20 it will work fine.. but this is expected behavior as per our current code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216692677 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- Nope we need to give %20 as space representation in older version ![image](https://user-images.githubusercontent.com/12999161/45367012-e057bd80-b5fd-11e8-8956-896e31c2bb78.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216690734 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- Would loading '/tmp/dir1/test\ file.csv' succeed before this change? What about adding a quick test for a case like this too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216685077 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- Did a small update in the summary, at https://issues.apache.org/jira/browse/SPARK-23425, let me know for any suggestions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216638992 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- I will update the same in Migration guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216638725 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- @gatorsmile @srowen I tested the scenario by providing the spaces in the path, attaching the snapshots and observations ![image](https://user-images.githubusercontent.com/12999161/45349239-5f350200-b5ce-11e8-817b-10c4809ad23e.png) ![image](https://user-images.githubusercontent.com/12999161/45358119-0bceae00-b5e6-11e8-8ef5-10814987ea02.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216429821 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- Ah right. I think it's necessary to not parse it as a URI because it could contain "?" and that's a reserved char in URIs. We don't want any other behavior to change though; paths with a space in it should still work for example. @sujith71955 do you have maybe a quick test for a case or two like that? and/or @gatorsmile do you know of a case where user-visible behavior has changed (aside from wildcards)? --- - To unsubscribe, e-mail: reviews-unsubscr...@sp
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216426579 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- ```Scala val p = new Path("a b") val p2 = new Path(new URI("a%20b")) p2 == p ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216425911 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- This looks fine, shall we need to mention anything related to folder level wildcard support for the local file systems.. previously this usage was restricted for the user --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216423457 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- I added a quick summary of the behavior change at https://issues.apache.org/jira/browse/SPARK-23425 in the Docs Text field. Feel free to add more. Is it more than the fact that certain characters are now wildcards? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216417629 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- you mean the exception which we are throwing now is different, we need to mention the same in migration guide also. actually now user can also apply wildcard in the local folder path- shall we need to add this in guide --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r216413405 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) --- End diff -- The change of this line is a behavior change. @sujith71955 We need to add this to the migration guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20611 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r211607975 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -413,6 +363,38 @@ case class LoadDataCommand( CommandUtils.updateTableStats(sparkSession, targetTable) Seq.empty[Row] } + + /** + * Returns a qualified path object. Method ported from org.apache.hadoop.fs.Path class. + * + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return qualified path object + */ + private def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = { + if (!path.isAbsolute()) new Path(workingDir, path).toUri() else path.toUri() +} +if (pathUri.getScheme == null || pathUri.getAuthority == null && + defaultUri.getAuthority != null) { --- End diff -- This should indent two more spaces to line up with the start of the condition above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r211607830 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -413,6 +363,38 @@ case class LoadDataCommand( CommandUtils.updateTableStats(sparkSession, targetTable) Seq.empty[Row] } + + /** + * Returns a qualified path object. Method ported from org.apache.hadoop.fs.Path class. + * + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return qualified path object + */ + private def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = { --- End diff -- We're close now, but these braces are still unndeeded. I also tend to advise flipping an "if" condition so it's not negated when the clauses are equally important. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r211106063 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,40 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object, API ported from org.apache.hadoop.fs.Path class + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = if (!path.isAbsolute()) new Path(workingDir, path).toUri() else path.toUri() +val newPaths = { + if (pathUri.getScheme == null || pathUri.getAuthority == null && + defaultUri.getAuthority != null) { +val scheme = if (pathUri.getScheme == null) defaultUri.getScheme else pathUri.getScheme +val authority = { + if (pathUri.getAuthority == null) { +if (defaultUri.getAuthority == null) "" else defaultUri.getAuthority + } else { +pathUri.getAuthority + } +} +try { + var newUri = new URI(scheme, authority, pathUri.getPath, null.asInstanceOf[String], +pathUri.getFragment) + new Path(newUri) +} catch { + case exception: URISyntaxException => +throw new IllegalArgumentException(exception) +} + } + else path --- End diff -- ``` } else { path } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r211106039 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,56 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object. + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { --- End diff -- Make this method `private` -- can be right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r211106263 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,40 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object, API ported from org.apache.hadoop.fs.Path class --- End diff -- Specifically, it was ported from `org.apache.hadoop.fs.Path(URI, Path)`. I think that's worth mentioning. Add a blank line after this and end the two sentence fragments here with a period. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r211106172 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,40 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object, API ported from org.apache.hadoop.fs.Path class + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = if (!path.isAbsolute()) new Path(workingDir, path).toUri() else path.toUri() +val newPaths = { + if (pathUri.getScheme == null || pathUri.getAuthority == null && + defaultUri.getAuthority != null) { +val scheme = if (pathUri.getScheme == null) defaultUri.getScheme else pathUri.getScheme +val authority = { --- End diff -- This brace isn't needed either --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r211106213 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,40 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object, API ported from org.apache.hadoop.fs.Path class + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = if (!path.isAbsolute()) new Path(workingDir, path).toUri() else path.toUri() +val newPaths = { + if (pathUri.getScheme == null || pathUri.getAuthority == null && + defaultUri.getAuthority != null) { +val scheme = if (pathUri.getScheme == null) defaultUri.getScheme else pathUri.getScheme +val authority = { + if (pathUri.getAuthority == null) { +if (defaultUri.getAuthority == null) "" else defaultUri.getAuthority + } else { +pathUri.getAuthority + } +} +try { + var newUri = new URI(scheme, authority, pathUri.getPath, null.asInstanceOf[String], --- End diff -- Is this really ambiguous such that it needs a cast? I only see one 5-arg constructor for URI. Also doesn't need to be a `var`; could be inlined too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r211106057 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,40 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object, API ported from org.apache.hadoop.fs.Path class + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = if (!path.isAbsolute()) new Path(workingDir, path).toUri() else path.toUri() +val newPaths = { + if (pathUri.getScheme == null || pathUri.getAuthority == null && + defaultUri.getAuthority != null) { +val scheme = if (pathUri.getScheme == null) defaultUri.getScheme else pathUri.getScheme +val authority = { + if (pathUri.getAuthority == null) { +if (defaultUri.getAuthority == null) "" else defaultUri.getAuthority + } else { +pathUri.getAuthority + } +} +try { + var newUri = new URI(scheme, authority, pathUri.getPath, null.asInstanceOf[String], +pathUri.getFragment) + new Path(newUri) +} catch { + case exception: URISyntaxException => --- End diff -- The code usually just writes `e` for exceptions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r211106268 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,40 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object, API ported from org.apache.hadoop.fs.Path class + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object --- End diff -- Nit: remove "Returns" as redundant --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r211106052 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,40 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object, API ported from org.apache.hadoop.fs.Path class + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = if (!path.isAbsolute()) new Path(workingDir, path).toUri() else path.toUri() +val newPaths = { --- End diff -- I think this brace isn't needed? In fact the var `newPaths` isn't needed? would be `newPath` I think anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r209646629 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,56 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object. + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = { + if (!path.isAbsolute()) { +new Path(workingDir, path).toUri() + } + else { +path.toUri() + } +} +val newPaths = { + if (pathUri.getScheme == null || pathUri.getAuthority == null && +defaultUri.getAuthority != null) { +val scheme = { + if (pathUri.getScheme == null) defaultUri.getScheme else pathUri.getScheme +} +val authority = { + if (pathUri.getAuthority == null) { +if (null == defaultUri.getAuthority) { --- End diff -- Use `== null`; there's no possibility of accidentally writing `foo = null` as it does not evaluate to a boolean --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r209646842 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,56 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object. + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = { + if (!path.isAbsolute()) { +new Path(workingDir, path).toUri() + } + else { +path.toUri() + } +} +val newPaths = { + if (pathUri.getScheme == null || pathUri.getAuthority == null && +defaultUri.getAuthority != null) { +val scheme = { + if (pathUri.getScheme == null) defaultUri.getScheme else pathUri.getScheme +} +val authority = { + if (pathUri.getAuthority == null) { +if (null == defaultUri.getAuthority) { + "" +} +else { + defaultUri.getAuthority +} + } + else { +pathUri.getAuthority + } +} +try { + var newUri = new URI(scheme, authority, pathUri.getPath, null.asInstanceOf[String], +pathUri.getFragment) + new Path(newUri) +} +catch { --- End diff -- Same, pull this up onto the previous line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r209646292 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,56 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object. + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = { + if (!path.isAbsolute()) { +new Path(workingDir, path).toUri() + } + else { --- End diff -- Nit: pull this up onto the previous line. I personally would suggest inverting the if condition for a little more clarity, while you're at it, but that's just a matter of taste. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r209646119 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,56 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object. + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { --- End diff -- If this is 'ported' from a Hadoop API method, it's good to note that inside the method as a comment, at least. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r209646401 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,56 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object. + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = { + if (!path.isAbsolute()) { +new Path(workingDir, path).toUri() + } + else { +path.toUri() + } +} +val newPaths = { + if (pathUri.getScheme == null || pathUri.getAuthority == null && +defaultUri.getAuthority != null) { +val scheme = { --- End diff -- No need for braces --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r209646365 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,56 @@ private[spark] object Utils extends Logging { } } + /** + * Returns a qualified path object. + * @param defaultUri default uri corresponding to the filesystem provided. + * @param workingDir the working directory for the particular child path wd-relative names. + * @param path Path instance based on the path string specified by the user. + * @return Returns a qualified path object + */ + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { +val pathUri = { + if (!path.isAbsolute()) { +new Path(workingDir, path).toUri() + } + else { +path.toUri() + } +} +val newPaths = { + if (pathUri.getScheme == null || pathUri.getAuthority == null && +defaultUri.getAuthority != null) { --- End diff -- Indent this two more spaces --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r208412882 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1976,6 +1976,49 @@ private[spark] object Utils extends Logging { } } + def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { --- End diff -- This is a utility function. We normally need to add function comments and param description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r207320333 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, +// we will use hadoop configuration "fs.defaultFS". +val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") +val defaultFS = if (defaultFSConf == null) new URI("") else new URI(defaultFSConf) +// Follow Hive's behavior: +// If LOCAL is not specified, and the path is relative, +// then the path is interpreted relative to "/user/" +val uriPath = new Path(s"/user/${System.getPro
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r207258477 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, +// we will use hadoop configuration "fs.defaultFS". +val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") +val defaultFS = if (defaultFSConf == null) new URI("") else new URI(defaultFSConf) +// Follow Hive's behavior: +// If LOCAL is not specified, and the path is relative, +// then the path is interpreted relative to "/user/" +val uriPath = new Path(s"/user/${System.getProperty
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r206961528 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, --- End diff -- Yes previously we do check. Now since we are calling makeQualified APIthis logic will be taken care in this API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r206582602 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, --- End diff -- We checked the scheme and authority? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r206411960 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { --- End diff -- Got your point Sean, Code is not exactly similar , the key difference here is makeQualified() will pass null as query parameter while URI construction, because of this the string values after '?' will not be discarded and the load path will remain same. this is the reason why i used this API. As you suggested we can extract the logic in a private API since makeQualified() is LimitedPrivate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r206119389 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { --- End diff -- I mean, the Hadoop method you are calling is marked as one that we are not supposed to call. The code you are replacing looks similar to the implementation of the method you call now. I think this isn't a coincidence. Maybe the implementation was based on an older implementation of `makeQualified`, on purpose? how about just extracting the logic of `makeQualified` here? I think that's already what the code kind of is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r206037303 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { --- End diff -- @srowen you mean lets have our own private API similar to makeQualified API in hadoop file system? Is this for having better control on the api logic? i am bit confused --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202528097 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { --- End diff -- My point is really about using a quasi-private method here. The implementation of that method actually looks like the old code here, although not entirely the same. I'm wondering whether it was therefore based on the implementation of the method you're trying to call already? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202429058 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { --- End diff -- Suppose if user provides a path like "load data inpath 'hdfs://hacluster/user/su?ith.txt' into table t1", Meaning of this wild card usage -: after 'su' followed by any single character followed by 'ith' As per old logic when we form an URI the characters after ? will be ignored since it interprets the string after ? as query parameter( in this scenario there is no significance for query param). makeQualified() will ignore the query parameter part while creating a Path, so the entire path string will be considered while making a Path instance. I had discussed regarding the usage of this API with couple of Hadoop PMC members (MR. Vinay Kumar and Brahma Reddy), they said there is no harm with the usage. If we revert this change then wildcard '?' usage in paths will lead to incorrect path validation in this context, whereas in hive they support the usage. so this will become a limitation . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202363783 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { --- End diff -- Hm. I was trying to understand where this logic went. I see that's sort of in the call to `makeQualified`. I couldn't find the docs for that method overload though because it's actually "LimitedPrivate" in Hadoop. I think we shouldn't call this method? can we instead just restore this logic? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202360482 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, +// we will use hadoop configuration "fs.defaultFS". +val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") +val defaultFS = if (defaultFSConf == null) new URI("") else new URI(defaultFSConf) +// Follow Hive's behavior: +// If LOCAL is not specified, and the path is relative, +// then the path is interpreted relative to "/user/" +val uriPath = new Path(s"/user/${System.getProperty
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202360294 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, +// we will use hadoop configuration "fs.defaultFS". +val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") +val defaultFS = if (defaultFSConf == null) new URI("") else new URI(defaultFSConf) +// Follow Hive's behavior: +// If LOCAL is not specified, and the path is relative, +// then the path is interpreted relative to "/user/" +val uriPath = new Path(s"/user/${System.getProperty
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202359876 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -1912,11 +1912,59 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("LOAD DATA LOCAL INPATH '/non-exist-folder/*part*' INTO TABLE load_t") }.getMessage assert(m.contains("LOAD DATA input path does not exist")) + } +} + } -val m2 = intercept[AnalysisException] { - sql(s"LOAD DATA LOCAL INPATH '$path*/*part*' INTO TABLE load_t") + test("Support wildcard character in folderlevel for LOAD DATA LOCAL INPATH") { +withTempDir { dir => + val path = dir.toURI.toString.stripSuffix("/") + val dirPath = dir.getAbsoluteFile + for (i <- 1 to 3) { +Files.write(s"$i", new File(dirPath, s"part-r-$i"), StandardCharsets.UTF_8) + } + withTable("load_t_folder_wildcard") { +sql("CREATE TABLE load_t (a STRING)") +sql(s"LOAD DATA LOCAL INPATH '${ + path.substring(0, path.length - 1) +.concat("*") +}/' INTO TABLE load_t") +checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"), Row("3"))) +val m = intercept[AnalysisException] { + sql(s"LOAD DATA LOCAL INPATH '${ +path.substring(0, path.length - 1).concat("_invalid_dir") concat ("*") + }/' INTO TABLE load_t") }.getMessage -assert(m2.contains("LOAD DATA input path allows only filename wildcard")) +assert(m.contains("LOAD DATA input path does not exist")) + } +} + } --- End diff -- Still need a space here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202255494 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,49 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, +// we will use hadoop configuration "fs.defaultFS". +val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") +val defaultFS = if (defaultFSConf == null) { + new URI("") } else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + new URI(defaultFSConf) } -hdfsUri - } +// Follow Hive's behavior: +// If LOCAL is not specified, and the path is relative, +// then the path is interpreted relative to "/user/" +va
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202255324 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,49 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, +// we will use hadoop configuration "fs.defaultFS". +val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") +val defaultFS = if (defaultFSConf == null) { + new URI("") } else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + new URI(defaultFSConf) } -hdfsUri - } +// Follow Hive's behavior: +// If LOCAL is not specified, and the path is relative, +// then the path is interpreted relative to "/user/" +va
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202255185 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -1912,11 +1912,58 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("LOAD DATA LOCAL INPATH '/non-exist-folder/*part*' INTO TABLE load_t") }.getMessage assert(m.contains("LOAD DATA input path does not exist")) - -val m2 = intercept[AnalysisException] { - sql(s"LOAD DATA LOCAL INPATH '$path*/*part*' INTO TABLE load_t") + } +} + } + test("Support wildcard character in folderlevel for LOAD DATA LOCAL INPATH") { --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202255230 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,49 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, +// we will use hadoop configuration "fs.defaultFS". +val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") +val defaultFS = if (defaultFSConf == null) { + new URI("") } else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + new URI(defaultFSConf) } -hdfsUri - } +// Follow Hive's behavior: +// If LOCAL is not specified, and the path is relative, +// then the path is interpreted relative to "/user/" +va
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202110305 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,49 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, +// we will use hadoop configuration "fs.defaultFS". +val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") +val defaultFS = if (defaultFSConf == null) { + new URI("") } else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + new URI(defaultFSConf) } -hdfsUri - } +// Follow Hive's behavior: +// If LOCAL is not specified, and the path is relative, +// then the path is interpreted relative to "/user/" +val uri
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202110481 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,49 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, +// we will use hadoop configuration "fs.defaultFS". +val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") +val defaultFS = if (defaultFSConf == null) { + new URI("") } else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + new URI(defaultFSConf) } -hdfsUri - } +// Follow Hive's behavior: +// If LOCAL is not specified, and the path is relative, +// then the path is interpreted relative to "/user/" +val uri
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202110382 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,49 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, +// we will use hadoop configuration "fs.defaultFS". +val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") +val defaultFS = if (defaultFSConf == null) { + new URI("") } else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + new URI(defaultFSConf) } -hdfsUri - } +// Follow Hive's behavior: +// If LOCAL is not specified, and the path is relative, +// then the path is interpreted relative to "/user/" +val uri
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202110816 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,49 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, +// we will use hadoop configuration "fs.defaultFS". +val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") +val defaultFS = if (defaultFSConf == null) { --- End diff -- The indent is a little distracting; I'd just say `= if () new URI("") else new URI(defaultFSConf)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r202110252 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -1912,11 +1912,58 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("LOAD DATA LOCAL INPATH '/non-exist-folder/*part*' INTO TABLE load_t") }.getMessage assert(m.contains("LOAD DATA input path does not exist")) - -val m2 = intercept[AnalysisException] { - sql(s"LOAD DATA LOCAL INPATH '$path*/*part*' INTO TABLE load_t") + } +} + } + test("Support wildcard character in folderlevel for LOAD DATA LOCAL INPATH") { --- End diff -- Nit: space before tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r196670750 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,50 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri - } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) + } + else { --- End diff -- OK --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r196365562 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,50 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri - } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) + } + else { --- End diff -- nit: `} else {` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r181552961 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -304,45 +304,14 @@ case class LoadDataCommand( } } -val loadPath = +val loadPath = { if (isLocal) { val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. --- End diff -- OK, I am fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r181543985 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -304,45 +304,14 @@ case class LoadDataCommand( } } -val loadPath = +val loadPath = { if (isLocal) { val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. --- End diff -- @wzhfy All test-cases related to windows query suite are passing i think in previous code, for reading the files based on wildcard char, we are trying to read the files parent directory first, and we were listing all files inside that folder and then we are trying to match the pattern for each file inside directory, so i think for getting the parent path we need to explicitly check that it should not have any unsupported characters like '*' ,But now we are directly passing the path with wildchar to globStatus() API of hdfs and this should able to pattern match irrespective of directory/files, in globStatus API i could see they have special handling for windows path, i will look into more details regarding this. ![image](https://user-images.githubusercontent.com/12999161/38765114-20a47d7e-3fd9-11e8-9863-59d179c4a2d8.png) Thanks all for the valuable feedbacks . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r181538566 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -304,45 +304,14 @@ case class LoadDataCommand( } } -val loadPath = +val loadPath = { if (isLocal) { val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. --- End diff -- yeah, this is what I was worried about. We need to be careful to change this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r181000918 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -304,45 +304,14 @@ case class LoadDataCommand( } } -val loadPath = +val loadPath = { if (isLocal) { val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. --- End diff -- But I wonder if it's safe to get rid of this whole logic. Just realised that I wrote this. I believe this one is related with: ``` org.apache.spark.sql.hive.execution.SQLQuerySuite org.apache.spark.sql.hive.execution.WindowQuerySuite ``` on Windows. I can trigger tests on Windows. Let's see if they pass: Build started: [SQL] `org.apache.spark.sql.hive.execution.SQLQuerySuite` [![PR-20611](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=45A91D8E-AFF8-4615-A5D2-09DFD224440B&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/45A91D8E-AFF8-4615-A5D2-09DFD224440B) Build started: [SQL] `org.apache.spark.sql.hive.execution.WindowQuerySuite` [![PR-20611](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=C8762134-C19A-422E-A716-4E072B71120A&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/C8762134-C19A-422E-A716-4E072B71120A) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user vinodkc commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r180993462 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -370,26 +339,35 @@ case class LoadDataCommand( throw new AnalysisException( s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") } - // Follow Hive's behavior: // If LOCAL is not specified, and the path is relative, // then the path is interpreted relative to "/user/" val uriPath = uri.getPath() val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { uriPath } else { -s"/user/${System.getProperty("user.name")}/$uriPath" +s"/user/${ System.getProperty("user.name") }/$uriPath" } new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) } -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri } +} +val srcPath = new Path(loadPath) +val fs = srcPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) +// This handling is because while reoslving the invalid urls starting with file:/// +// system throws IllegalArgumentException from globStatus api,so inorder to handle +// such scenarios this code is added in try catch block and after catching the +// run time exception a generic error will be displayed to the user. +try { + if (null == fs.globStatus(srcPath) || fs.globStatus(srcPath).isEmpty) { +throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + } +} +catch { + case e: Exception => --- End diff -- Avoid catching generic exception, catch IllegalArgumentException --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user vinodkc commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r180993068 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -370,26 +339,35 @@ case class LoadDataCommand( throw new AnalysisException( s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") } - // Follow Hive's behavior: // If LOCAL is not specified, and the path is relative, // then the path is interpreted relative to "/user/" val uriPath = uri.getPath() val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { uriPath } else { -s"/user/${System.getProperty("user.name")}/$uriPath" +s"/user/${ System.getProperty("user.name") }/$uriPath" --- End diff -- nit: Please remove space --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r179030611 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,7 +385,9 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { +// Check if the path exists or there are matched paths if it's a path with wildcard. +// For HDFS path, we support wildcard in directory name and file name. +if (null == fs.globStatus(srcPath) || fs.globStatus(srcPath).isEmpty) { --- End diff -- I will update the PR as such we can use fs.globStatus() API in both local and hdfs file path scenarios. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r179030399 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,7 +385,9 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { +// Check if the path exists or there are matched paths if it's a path with wildcard. +// For HDFS path, we support wildcard in directory name and file name. +if (null == fs.globStatus(srcPath) || fs.globStatus(srcPath).isEmpty) { --- End diff -- @wzhfy @HyukjinKwon @dongjoon-hyun i verified the scenario by updating the code by using fs.globStatus() API for both local and hdfs path. for local path its working fine --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r178801061 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,7 +385,9 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { +// Check if the path exists or there are matched paths if it's a path with wildcard. +// For HDFS path, we support wildcard in directory name and file name. +if (null == fs.globStatus(srcPath) || fs.globStatus(srcPath).isEmpty) { --- End diff -- Up to my knowledge, it's fine to use `fs.globStatus` for both hdfs and local path. I think that Hadoop's interface should guarantee it at least. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r178736198 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,7 +385,9 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { +// Check if the path exists or there are matched paths if it's a path with wildcard. +// For HDFS path, we support wildcard in directory name and file name. +if (null == fs.globStatus(srcPath) || fs.globStatus(srcPath).isEmpty) { --- End diff -- @HyukjinKwon @dongjoon-hyun Is it possible to use `fs.globStatus` for both local path and hdfs path? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r174868215 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,8 +385,12 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") +// A validaton logic is been added for non local files, Error will be thrown +// If hdfs path doest not exist or if no files matches the wild card defined +// in load path --- End diff -- yeah i updated the comment, the above comment is more precise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r174791662 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,8 +385,12 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") +// A validaton logic is been added for non local files, Error will be thrown +// If hdfs path doest not exist or if no files matches the wild card defined +// in load path +if (null == fs.globStatus(srcPath) || fs.globStatus(srcPath).isEmpty) { + throw new AnalysisException(s"LOAD DATA input path does not exist " + +s"or no files are matching the wildcard string: $path") --- End diff -- Fine i think you are right, even the above message will able to convey the issue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r174771588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,8 +385,12 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") +// A validaton logic is been added for non local files, Error will be thrown +// If hdfs path doest not exist or if no files matches the wild card defined +// in load path +if (null == fs.globStatus(srcPath) || fs.globStatus(srcPath).isEmpty) { + throw new AnalysisException(s"LOAD DATA input path does not exist " + +s"or no files are matching the wildcard string: $path") --- End diff -- I think the previous message ("LOAD DATA input path does not exist: $path") is fine, it covers the case no path matches the wildcard, like the above case for local path. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r174773206 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,8 +385,12 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") +// A validaton logic is been added for non local files, Error will be thrown +// If hdfs path doest not exist or if no files matches the wild card defined +// in load path --- End diff -- Check if the path exists or there is matched path. For HDFS path, we support wildcard in directory name and file name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org