[ https://issues.apache.org/jira/browse/SPARK-10109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15932154#comment-15932154 ]
Hyukjin Kwon commented on SPARK-10109: -------------------------------------- Then, I think this should be resolvable when we upgrade Parquet into 1.9.0. Now, this should be Idempotent per PARQUET-544. > NPE when saving Parquet To HDFS > ------------------------------- > > Key: SPARK-10109 > URL: https://issues.apache.org/jira/browse/SPARK-10109 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.4.1 > Environment: Sparc-ec2, standalone cluster on amazon > Reporter: Virgil Palanciuc > > Very simple code, trying to save a dataframe > I get this in the driver > {quote} > 15/08/19 11:21:41 INFO TaskSetManager: Lost task 9.2 in stage 217.0 (TID > 4748) on executor 172.xx.xx.xx: java.lang.NullPointerException (null) > and (not for that task): > 15/08/19 11:21:46 WARN TaskSetManager: Lost task 5.0 in stage 543.0 (TID > 5607, 172.yy.yy.yy): java.lang.NullPointerException > at > parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146) > at > parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112) > at > parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73) > at > org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:88) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer.clearOutputWriters(commands.scala:536) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer.abortTask(commands.scala:552) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:269) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {quote} > I get this in the executor log: > {quote} > 15/08/19 11:21:41 WARN DFSClient: DataStreamer Exception > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease on > /gglogs/2015-07-27/_temporary/_attempt_201508191119_0217_m_000009_2/dpid=18432/pid=1109/part-r-00009-46ac3a79-a95c-4d9c-a2f1-b3ee76f6a46c.snappy.parquet > File does not exist. Holder DFSClient_NONMAPREDUCE_1730998114_63 does not > have any open files. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2396) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2387) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2183) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:481) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:297) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44080) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689) > at org.apache.hadoop.ipc.Client.call(Client.java:1225) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202) > at com.sun.proxy.$Proxy14.addBlock(Unknown Source) > at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83) > at com.sun.proxy.$Proxy14.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:290) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1150) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1003) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463) > 15/08/19 11:21:41 ERROR InsertIntoHadoopFsRelation: Aborting task. > java.lang.RuntimeException: Failed to commit task > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer.commitTask(commands.scala:546) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:266) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease on > /gglogs/2015-07-27/_temporary/_attempt_201508191119_0217_m_000009_2/dpid=18432/pid=1109/part-r-00009-46ac3a79-a95c-4d9c-a2f1-b3ee76f6a46c.snappy.parquet > File does not exist. Holder DFSClient_NONMAPREDUCE_1730998114_63 does not > have any open files. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2396) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2387) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2183) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:481) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:297) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44080) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689) > at org.apache.hadoop.ipc.Client.call(Client.java:1225) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202) > at com.sun.proxy.$Proxy14.addBlock(Unknown Source) > at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83) > at com.sun.proxy.$Proxy14.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:290) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1150) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1003) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463) > 15/08/19 11:21:41 ERROR DynamicPartitionWriterContainer: Task attempt > attempt_201508191119_0217_m_000009_2 aborted. > 15/08/19 11:21:41 ERROR Executor: Exception in task 9.2 in stage 217.0 (TID > 4748) > java.lang.NullPointerException > at > parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146) > at > parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112) > at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73) > at > org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:88) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$clearOutputWriters$1.apply(commands.scala:536) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer.clearOutputWriters(commands.scala:536) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer.abortTask(commands.scala:552) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:269) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {quote} > this is the code that I'm using: > {quote} > val ppRDD= > sparkContext > .sequenceFile[Array[Byte], String](inputPath) > .values > .repartition(numPartitions) > .filter( <criteria>) > .flatMap(line => parseGGLogLine(<params; including 2 > broadcasted variables, one a "Set" and one a "Map">) > if(ppRDD.isEmpty()) > logInfo(s"<message>") > else > ppRDD.toDF().write > .partitionBy("dpid","pid") > .mode( SaveMode.Append ) > .parquet(outputPath) > {quote} > possibly relevant configuration: > {quote} > "spark.sql.parquet" { > cacheMetadata = "true", > compression.codec = "snappy" > } > > "spark.serializer" = "org.apache.spark.serializer.KryoSerializer", > {quote} > I didn't modify the speculation setting, so I'm assuming it's disabled. > Input path is s3n://<path> > Output path is hdfs:///<path> (i.e. ephemeral hdfs) -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org