[ https://issues.apache.org/jira/browse/SPARK-16091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen updated SPARK-16091: ------------------------------ Priority: Minor (was: Blocker) [~rgiot] have a look at https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark first. Don't set Blocker, but this doesn't seem nearly that important. You need to write an hdfs:// URI if you intend to reference an HDFS path. > Dataset.partitionBy.csv raise a java.io.FileNotFoundException when launched > on an hadoop cluster > ------------------------------------------------------------------------------------------------ > > Key: SPARK-16091 > URL: https://issues.apache.org/jira/browse/SPARK-16091 > Project: Spark > Issue Type: Bug > Components: Input/Output > Affects Versions: 2.0.0 > Environment: Hadoop version: 2.5.1 > Reporter: Romain Giot > Priority: Minor > > When writing a Dataset in a CSV file, the following exception > java.io.FileNotFoundException is raised *after* the writing is done and > successful. > This behaviour does not happen when the spark application is launched locally > ; it should be related to hdfs management. > Here is a test code: > {code} > import org.apache.spark.SparkContext > import org.apache.hadoop.fs.FileSystem > import org.apache.hadoop.fs.{Path, PathFilter} > import org.apache.spark.sql.SQLContext > case class Test(A: String, B: String, C:String){ > } > object WriteTest { > val sc: SparkContext = new SparkContext() > val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration) > val sqlContext: SQLContext = new SQLContext(sc) > > import sqlContext.implicits._ > def main(args: Array[String]):Unit = { > val ds = Seq( > Test("abc", "abc", "abc"), > Test("abc", "abc", "def"), > Test("abc", "abc", "ghi"), > Test("abc", "xyz", "abc"), > Test("xyz", "xyz", "abc") > ).toDS() > // works > ds > .write > .option("header",true) > .mode("overwrite") > .csv("/tmp/test1.csv") > // fails > ds > .write > .option("header",true) > .mode("overwrite") > .partitionBy("A", "B") > .csv("/tmp/test2.csv") > } > } > {code} > and here is the exception stack: > {code} > java.io.FileNotFoundException: Path is not a file: /tmp/test2.csv/A=abc > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:68) > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:519) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:337) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:408) > at > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) > at > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1222) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1210) > at > org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1260) > at > org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:220) > at > org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:216) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:216) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:208) > at > org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:104) > at > org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:92) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:92) > at > org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:80) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.sql.execution.datasources.ListingFileCatalog.listLeafFiles(ListingFileCatalog.scala:80) > at > org.apache.spark.sql.execution.datasources.ListingFileCatalog.refresh(ListingFileCatalog.scala:69) > at > org.apache.spark.sql.execution.datasources.ListingFileCatalog.<init>(ListingFileCatalog.scala:50) > at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307) > at > org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:424) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:234) > at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:697) > at WriteTest$.main(write.scala:42) > at WriteTest.main(write.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:610) > {code} > And here is the result of hdfs dfs -ls /tmp/test2.csv: > {code} > Found 3 items > drwxr-xr-x - hadoop supergroup 0 2016-06-21 14:59 > /tmp/test2.csv/A=abc > drwxr-xr-x - hadoop supergroup 0 2016-06-21 14:59 > /tmp/test2.csv/A=xyz > -rw-r--r-- 3 hadoop supergroup 0 2016-06-21 14:59 > /tmp/test2.csv/_SUCCESS > {code} > I have no idea if the bug comes from the hdfs implementation or the way spark > uses it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org