[ 
https://issues.apache.org/jira/browse/BEAM-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16593406#comment-16593406
 ] 

Tim Robertson edited comment on BEAM-4861 at 8/27/18 10:03 AM:
---------------------------------------------------------------

The {{HadoopFileSystem}} has the following methods:
{code:java}
  @Override
  protected void copy(List<HadoopResourceId> srcResourceIds, 
List<HadoopResourceId> destResourceIds)
      throws IOException {
    for (int i = 0; i < srcResourceIds.size(); ++i) {
      // Unfortunately HDFS FileSystems don't support a native copy operation 
so we are forced
      // to use the inefficient implementation found in FileUtil which copies 
all the bytes through
      // the local machine.
      //
      // HDFS FileSystem does define a concat method but could only find the 
DFSFileSystem
      // implementing it. The DFSFileSystem implemented concat by deleting the 
srcs after which
      // is not what we want. Also, all the other FileSystem implementations I 
saw threw
      // UnsupportedOperationException within concat.
      FileUtil.copy(
          fileSystem,
          srcResourceIds.get(i).toPath(),
          fileSystem,
          destResourceIds.get(i).toPath(),
          false,
          true,
          fileSystem.getConf());
    }
  }

  @Override
  protected void rename(
      List<HadoopResourceId> srcResourceIds, List<HadoopResourceId> 
destResourceIds)
      throws IOException {
    for (int i = 0; i < srcResourceIds.size(); ++i) {
      fileSystem.rename(srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath());
    }
  }

  @Override
  protected void delete(Collection<HadoopResourceId> resourceIds) throws 
IOException {
    for (HadoopResourceId resourceId : resourceIds) {
      fileSystem.delete(resourceId.toPath(), false);
    }
  }
{code}
{{FileUtil.copy}}, {{fileSystem.rename}} and {{fileSystem.delete}} can all 
return false indicating that the operation was not performed.

 

*1. Informing the user of the unsuccessful operation*

We could either:
 # Change the Beam {{FileSystem}} API to propagate this, although the [rules 
for HDFS 
rename()|https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#boolean_renamePath_src_Path_d]
 are not trivial to document and this might prove to be invasive in many places.
 # Throw an {{IOException}} to signal that the operation was not successful if 
the response is false?

I tend towards suggesting leaving the API to return void but throw an exception 
on the first case of error within the loops - thoughts?

 

*2. rename() in HDFS*

What do we believe are the expectations for {{rename()}} on HDFS?

Currently the user is not informed that nothing happens if an attempt to rename 
a file into a non existent directory is made. This is obviously bad.

We could change behaviour to one of:
 # Throw exception if the directory does not exist
 # Create the directory where necessary, letting files be overridden if it does 
exist (equivalent of e.g. {{S3Filesystem}})
 # Verify that the directory does not exist, and only then create it and 
proceed, otherwise alerting with Exception (the usual behaviour of a 
{{MapReduce FileOutputFormat}} at job startup where it quickly fails with 
"directory already exists").

Note that {{S3FileSystem}} and {{GcsFileSystem}} treat a rename as a {{copy()}} 
and {{delete()}} operation internally.

I tend towards creating the directory where necessary allowing for overwriting 
- thoughts?

CC [~reuvenlax] as relates to BEAM-5036 as well.


was (Author: timrobertson100):
The {{HadoopFileSystem}} has the following methods:
{code:java}
  @Override
  protected void copy(List<HadoopResourceId> srcResourceIds, 
List<HadoopResourceId> destResourceIds)
      throws IOException {
    for (int i = 0; i < srcResourceIds.size(); ++i) {
      // Unfortunately HDFS FileSystems don't support a native copy operation 
so we are forced
      // to use the inefficient implementation found in FileUtil which copies 
all the bytes through
      // the local machine.
      //
      // HDFS FileSystem does define a concat method but could only find the 
DFSFileSystem
      // implementing it. The DFSFileSystem implemented concat by deleting the 
srcs after which
      // is not what we want. Also, all the other FileSystem implementations I 
saw threw
      // UnsupportedOperationException within concat.
      FileUtil.copy(
          fileSystem,
          srcResourceIds.get(i).toPath(),
          fileSystem,
          destResourceIds.get(i).toPath(),
          false,
          true,
          fileSystem.getConf());
    }
  }

  @Override
  protected void rename(
      List<HadoopResourceId> srcResourceIds, List<HadoopResourceId> 
destResourceIds)
      throws IOException {
    for (int i = 0; i < srcResourceIds.size(); ++i) {
      fileSystem.rename(srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath());
    }
  }

  @Override
  protected void delete(Collection<HadoopResourceId> resourceIds) throws 
IOException {
    for (HadoopResourceId resourceId : resourceIds) {
      fileSystem.delete(resourceId.toPath(), false);
    }
  }
{code}
{{FileUtil.copy}}, {{fileSystem.rename}} and {{fileSystem.delete}} can all 
return false indicating that the operation was not performed.

 

*1. Informing the user of the unsuccessful operation*

We could either:
 # Change the Beam {{FileSystem}} API to propagate this, although the [rules 
for HDFS 
rename()|https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#boolean_renamePath_src_Path_d]
 are not trivial to document and this might prove to be invasive in many places.
 # Throw an {{IOException}} to signal that the operation was not successful if 
the response is false?

I tend towards suggesting leaving the API to return void but throw an exception 
on the first case of error within the loops - thoughts?

 

*2. rename() in HDFS*

What do we believe are the expectations for {{rename()}} on HDFS?

Currently the user is not informed if an attempt to rename a file into a non 
existent directory is made. This is obviously bad.

We could change behaviour to one of:
 # Throw exception if the directory does not exist
 # Create the directory where necessary, letting files be overridden if it does 
exist (equivalent of e.g. {{S3Filesystem}})
 # Verify that the directory does not exist, and only then create it and 
proceed, otherwise alerting with Exception (the usual behaviour of a 
{{MapReduce FileOutputFormat}} at job startup where it quickly fails with 
"directory already exists").

Note that {{S3FileSystem}} and {{GcsFileSystem}} treat a rename as a {{copy()}} 
and {{delete()}} operation internally.

I tend towards creating the directory where necessary allowing for overwriting 
- thoughts?

CC [~reuvenlax] as relates to BEAM-5036 as well.

> Hadoop Filesystem silently fails
> --------------------------------
>
>                 Key: BEAM-4861
>                 URL: https://issues.apache.org/jira/browse/BEAM-4861
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-hadoop
>            Reporter: Jozef Vilcek
>            Assignee: Chamikara Jayalath
>            Priority: Major
>
> Hi,
> beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
> native filesystem operations are not and returns void. Current implementation 
> in Beam ignores the result and pass as long as exception is not thrown.
> I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
> target directory does not exists, operations returns false and do not touch 
> the file.
> [https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L148]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to