Ohad Raviv created SPARK-22910:
----------------------------------

             Summary: Wrong results in Spark Job because failed to move to Trash
                 Key: SPARK-22910
                 URL: https://issues.apache.org/jira/browse/SPARK-22910
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.2.0, 2.1.0
            Reporter: Ohad Raviv


Our Spark job has completed with status successful although the data save was 
corrupted.

What happened is that we have a monthly job. each run overwrites the output of 
the previous run. we happened to change the sql.shuffle.partitions number 
between the runs from 2000 to 1000, and what happened was that the new run had 
Warn failure of moving the old data to the user's .Trash because it was full. 
because it was only a warning the process continued and overwritten the new 
1000 files - while leaving most of the old remaining 1000 files in their place. 
this resulted that in the final output we had a folder with mix of old and new 
data and that caused corruption in the process.

the post mortem is relatively easy to understand.
{code}
hadoop fs -ls /the/folder
-rwxr-xr-x 3 spark_user spark_user 209012005 2017-12-10 14:20 
/the/folder/part-00000.gz 
.
.
-rwxr-xr-x 3 spark_user spark_user 111134899 2017-11-17 06:39 
/the/folder/part-01990.gz 
{code}
and in the driver's log:
{code}
17/12/10 15:10:00 WARN Hive: Directory hdfs:///the/folder cannot be removed: 
java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-00000.gz
java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-00000.gz
        at 
org.apache.hadoop.fs.TrashPolicyDefault.moveToTrash(TrashPolicyDefault.java:160)
        at org.apache.hadoop.fs.Trash.moveToTrash(Trash.java:109)
        at org.apache.hadoop.fs.Trash.moveToAppropriateTrash(Trash.java:90)
        at 
org.apache.hadoop.hive.shims.Hadoop23Shims.moveToAppropriateTrash(Hadoop23Shims.java:272)
        at 
org.apache.hadoop.hive.common.FileUtils.moveToTrash(FileUtils.java:603)
        at 
org.apache.hadoop.hive.common.FileUtils.trashFilesUnderDir(FileUtils.java:586)
        at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:2851)
        at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1640)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:716)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply$mcV$sp(HiveClientImpl.scala:672)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:283)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:230)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:229)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:272)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.loadTable(HiveClientImpl.scala:671)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply$mcV$sp(HiveExternalCatalog.scala:741)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:95)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.loadTable(HiveExternalCatalog.scala:739)
        at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:323)
        at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:170)
        at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:347)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
        at 
org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand.run(CreateHiveTableAsSelectCommand.scala:92)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)

        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
Caused by: org.apache.hadoop.hdfs.protocol.DSQuotaExceededException: The 
DiskSpace quota of /user/spark_user is exceeded: quota = 3298534883328 B = 3 TB 
but diskspace consumed = 3298551987606 B = 3.00 TB
        at 
org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyStoragespaceQuota(DirectoryWithQuotaFeature.java:211)
        at 
org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyQuota(DirectoryWithQuotaFeature.java:239)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.verifyQuota(FSDirectory.java:945)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.verifyQuotaForRename(FSDirRenameOp.java:102)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:190)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:474)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:73)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3768)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename(NameNodeRpcServer.java:986)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename(ClientNamenodeProtocolServerSideTranslatorPB.java:583)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)

        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
        at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
        at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1855)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:575)
        at 
org.apache.hadoop.fs.TrashPolicyDefault.moveToTrash(TrashPolicyDefault.java:154)
        ... 70 more
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.DSQuotaExceededException):
 The DiskSpace quota of /user/spark_user is exceeded: quota = 3298534883328 B = 
3 TB but diskspace consumed = 3298551987606 B = 3.00 TB
        at 
org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyStoragespaceQuota(DirectoryWithQuotaFeature.java:211)
        at 
org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyQuota(DirectoryWithQuotaFeature.java:239)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.verifyQuota(FSDirectory.java:945)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.verifyQuotaForRename(FSDirRenameOp.java:102)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:190)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:474)
        at 
org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:73)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3768)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename(NameNodeRpcServer.java:986)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename(ClientNamenodeProtocolServerSideTranslatorPB.java:583)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)

        at org.apache.hadoop.ipc.Client.call(Client.java:1469)
        at org.apache.hadoop.ipc.Client.call(Client.java:1400)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
        at com.sun.proxy.$Proxy9.rename(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:468)
        at sun.reflect.GeneratedMethodAccessor70.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy10.rename(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1853)
{code}

it looks like the problem is with Spark's Hive version here:
https://github.com/apache/hive/blob/spark2/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java

where the code actually says:
{code}
    //swallow the exception
    LOG.warn("Directory " + oldPath.toString() + " canot be removed:" + 
StringUtils.stringifyException(e));
{code}
and that is also the same as Hive release-1.2.1, However, it seems like Hive 
solved it in Hive 2.0.0:
https://issues.apache.org/jira/browse/HIVE-12505

with this patch:
https://github.com/apache/hive/commit/f9791bebaa9d71ff62cecedb0243d09e99abfb38

So I guess I'm trying to add urgency to: SPARK-20202




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to