HDFS-11273. Move TransferFsImage#doGetUrl function to a Util class. Contributed 
by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7ec609b2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7ec609b2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7ec609b2

Branch: refs/heads/YARN-5734
Commit: 7ec609b28989303fe0cc36812f225028b0251b32
Parents: 511d39e
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Jan 9 18:00:14 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Jan 9 18:05:33 2017 -0800

----------------------------------------------------------------------
 .../server/common/HttpGetFailedException.java   |  39 +++
 .../server/common/HttpPutFailedException.java   |  37 +++
 .../apache/hadoop/hdfs/server/common/Util.java  | 258 +++++++++++++++-
 .../server/namenode/EditLogFileInputStream.java |   3 +-
 .../hdfs/server/namenode/ImageServlet.java      |  15 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   4 +-
 .../hdfs/server/namenode/TransferFsImage.java   | 295 ++-----------------
 7 files changed, 366 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ec609b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java
new file mode 100644
index 0000000..f36aca3
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+
+@InterfaceAudience.Private
+public class HttpGetFailedException extends IOException {
+  private static final long serialVersionUID = 1L;
+  private final int responseCode;
+
+  public HttpGetFailedException(String msg, HttpURLConnection connection)
+      throws IOException {
+    super(msg);
+    this.responseCode = connection.getResponseCode();
+  }
+
+  public int getResponseCode() {
+    return responseCode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ec609b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java
new file mode 100644
index 0000000..5fad77d
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+
+@InterfaceAudience.Private
+public class HttpPutFailedException extends IOException {
+  private static final long serialVersionUID = 1L;
+  private final int responseCode;
+
+  public HttpPutFailedException(String msg, int responseCode) throws 
IOException {
+    super(msg);
+    this.responseCode = responseCode;
+  }
+
+  public int getResponseCode() {
+    return responseCode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ec609b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
index 6ec686f..f08c3fa 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
@@ -18,30 +18,66 @@
 package org.apache.hadoop.hdfs.server.common;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.security.UserGroupInformation;
+import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+
 
 @InterfaceAudience.Private
 public final class Util {
   private final static Log LOG = LogFactory.getLog(Util.class.getName());
 
+  public final static String FILE_LENGTH = "File-Length";
+  public final static String CONTENT_LENGTH = "Content-Length";
+  public final static String MD5_HEADER = "X-MD5-Digest";
+  public final static String CONTENT_TYPE = "Content-Type";
+  public final static String CONTENT_TRANSFER_ENCODING = 
"Content-Transfer-Encoding";
+
+  public final static int IO_FILE_BUFFER_SIZE;
+  private static final boolean isSpnegoEnabled;
+  public static final URLConnectionFactory connectionFactory;
+
+  static {
+    Configuration conf = new Configuration();
+    connectionFactory = URLConnectionFactory
+        .newDefaultURLConnectionFactory(conf);
+    isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
+    IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
+  }
+
   /**
    * Interprets the passed string as a URI. In case of error it 
    * assumes the specified string is a file.
    *
    * @param s the string to interpret
-   * @return the resulting URI 
-   * @throws IOException 
+   * @return the resulting URI
    */
-  public static URI stringAsURI(String s) throws IOException {
+  static URI stringAsURI(String s) throws IOException {
     URI u = null;
     // try to make a URI
     try {
@@ -67,7 +103,6 @@ public final class Util {
    * 
    * @param f the file to convert
    * @return the resulting URI
-   * @throws IOException
    */
   public static URI fileAsURI(File f) throws IOException {
     URI u = f.getCanonicalFile().toURI();
@@ -92,7 +127,7 @@ public final class Util {
    */
   public static List<URI> stringCollectionAsURIs(
                                   Collection<String> names) {
-    List<URI> uris = new ArrayList<URI>(names.size());
+    List<URI> uris = new ArrayList<>(names.size());
     for(String name : names) {
       try {
         uris.add(stringAsURI(name));
@@ -102,4 +137,217 @@ public final class Util {
     }
     return uris;
   }
+
+  /**
+   * Downloads the files at the specified url location into destination
+   * storage.
+   */
+  public static MD5Hash doGetUrl(URL url, List<File> localPaths,
+      Storage dstStorage, boolean getChecksum, int timeout) throws IOException 
{
+    HttpURLConnection connection;
+    try {
+      connection = (HttpURLConnection)
+          connectionFactory.openConnection(url, isSpnegoEnabled);
+    } catch (AuthenticationException e) {
+      throw new IOException(e);
+    }
+
+    setTimeout(connection, timeout);
+
+    if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+      throw new HttpGetFailedException("Image transfer servlet at " + url +
+              " failed with status code " + connection.getResponseCode() +
+              "\nResponse message:\n" + connection.getResponseMessage(),
+          connection);
+    }
+
+    long advertisedSize;
+    String contentLength = connection.getHeaderField(CONTENT_LENGTH);
+    if (contentLength != null) {
+      advertisedSize = Long.parseLong(contentLength);
+    } else {
+      throw new IOException(CONTENT_LENGTH + " header is not provided " +
+          "by the namenode when trying to fetch " + url);
+    }
+    MD5Hash advertisedDigest = parseMD5Header(connection);
+    String fsImageName = connection
+        .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
+    InputStream stream = connection.getInputStream();
+
+    return receiveFile(url.toExternalForm(), localPaths, dstStorage,
+        getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
+        null);
+  }
+
+  /**
+   * Receives file at the url location from the input stream and puts them in
+   * the specified destination storage location.
+   */
+  public static MD5Hash receiveFile(String url, List<File> localPaths,
+      Storage dstStorage, boolean getChecksum, long advertisedSize,
+      MD5Hash advertisedDigest, String fsImageName, InputStream stream,
+      DataTransferThrottler throttler) throws
+      IOException {
+    long startTime = Time.monotonicNow();
+    Map<FileOutputStream, File> streamPathMap = new HashMap<>();
+    StringBuilder xferStats = new StringBuilder();
+    double xferCombined = 0;
+    if (localPaths != null) {
+      // If the local paths refer to directories, use the server-provided 
header
+      // as the filename within that directory
+      List<File> newLocalPaths = new ArrayList<>();
+      for (File localPath : localPaths) {
+        if (localPath.isDirectory()) {
+          if (fsImageName == null) {
+            throw new IOException("No filename header provided by server");
+          }
+          newLocalPaths.add(new File(localPath, fsImageName));
+        } else {
+          newLocalPaths.add(localPath);
+        }
+      }
+      localPaths = newLocalPaths;
+    }
+
+
+    long received = 0;
+    MessageDigest digester = null;
+    if (getChecksum) {
+      digester = MD5Hash.getDigester();
+      stream = new DigestInputStream(stream, digester);
+    }
+    boolean finishedReceiving = false;
+
+    List<FileOutputStream> outputStreams = Lists.newArrayList();
+
+    try {
+      if (localPaths != null) {
+        for (File f : localPaths) {
+          try {
+            if (f.exists()) {
+              LOG.warn("Overwriting existing file " + f
+                  + " with file downloaded from " + url);
+            }
+            FileOutputStream fos = new FileOutputStream(f);
+            outputStreams.add(fos);
+            streamPathMap.put(fos, f);
+          } catch (IOException ioe) {
+            LOG.warn("Unable to download file " + f, ioe);
+            // This will be null if we're downloading the fsimage to a file
+            // outside of an NNStorage directory.
+            if (dstStorage != null &&
+                (dstStorage instanceof StorageErrorReporter)) {
+              ((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
+            }
+          }
+        }
+
+        if (outputStreams.isEmpty()) {
+          throw new IOException(
+              "Unable to download to any storage directory");
+        }
+      }
+
+      int num = 1;
+      byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
+      while (num > 0) {
+        num = stream.read(buf);
+        if (num > 0) {
+          received += num;
+          for (FileOutputStream fos : outputStreams) {
+            fos.write(buf, 0, num);
+          }
+          if (throttler != null) {
+            throttler.throttle(num);
+          }
+        }
+      }
+      finishedReceiving = true;
+      double xferSec = Math.max(
+          ((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
+      long xferKb = received / 1024;
+      xferCombined += xferSec;
+      xferStats.append(
+          String.format(" The fsimage download took %.2fs at %.2f KB/s.",
+              xferSec, xferKb / xferSec));
+    } finally {
+      stream.close();
+      for (FileOutputStream fos : outputStreams) {
+        long flushStartTime = Time.monotonicNow();
+        fos.getChannel().force(true);
+        fos.close();
+        double writeSec = Math.max(((float)
+            (flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
+        xferCombined += writeSec;
+        xferStats.append(String
+            .format(" Synchronous (fsync) write to disk of " +
+                streamPathMap.get(fos).getAbsolutePath() +
+                " took %.2fs.", writeSec));
+      }
+
+      // Something went wrong and did not finish reading.
+      // Remove the temporary files.
+      if (!finishedReceiving) {
+        deleteTmpFiles(localPaths);
+      }
+
+      if (finishedReceiving && received != advertisedSize) {
+        // only throw this exception if we think we read all of it on our end
+        // -- otherwise a client-side IOException would be masked by this
+        // exception that makes it look like a server-side problem!
+        deleteTmpFiles(localPaths);
+        throw new IOException("File " + url + " received length " + received +
+            " is not of the advertised size " +
+            advertisedSize);
+      }
+    }
+    xferStats.insert(0, String.format("Combined time for fsimage download and" 
+
+        " fsync to all disks took %.2fs.", xferCombined));
+    LOG.info(xferStats.toString());
+
+    if (digester != null) {
+      MD5Hash computedDigest = new MD5Hash(digester.digest());
+
+      if (advertisedDigest != null &&
+          !computedDigest.equals(advertisedDigest)) {
+        deleteTmpFiles(localPaths);
+        throw new IOException("File " + url + " computed digest " +
+            computedDigest + " does not match advertised digest " +
+            advertisedDigest);
+      }
+      return computedDigest;
+    } else {
+      return null;
+    }
+  }
+
+  private static void deleteTmpFiles(List<File> files) {
+    if (files == null) {
+      return;
+    }
+
+    LOG.info("Deleting temporary files: " + files);
+    for (File file : files) {
+      if (!file.delete()) {
+        LOG.warn("Deleting " + file + " has failed");
+      }
+    }
+  }
+
+  /**
+   * Sets a timeout value in millisecods for the Http connection.
+   * @param connection the Http connection for which timeout needs to be set
+   * @param timeout value to be set as timeout in milliseconds
+   */
+  public static void setTimeout(HttpURLConnection connection, int timeout) {
+    if (timeout > 0) {
+      connection.setConnectTimeout(timeout);
+      connection.setReadTimeout(timeout);
+    }
+  }
+
+  private static MD5Hash parseMD5Header(HttpURLConnection connection) {
+    String header = connection.getHeaderField(MD5_HEADER);
+    return (header != null) ? new MD5Hash(header) : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ec609b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
index 48df8d6..36c2232 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
@@ -36,9 +36,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.LayoutFlags;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import 
org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
-import 
org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.SecurityUtil;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ec609b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
index 4e4028d..7a26df9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import org.apache.hadoop.hdfs.server.common.Util;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.net.HttpURLConnection;
@@ -304,11 +305,12 @@ public class ImageServlet extends HttpServlet {
    */
   public static void setVerificationHeadersForGet(HttpServletResponse response,
       File file) throws IOException {
-    response.setHeader(TransferFsImage.CONTENT_LENGTH,
+    response.setHeader(
+        Util.CONTENT_LENGTH,
         String.valueOf(file.length()));
     MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
     if (hash != null) {
-      response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
+      response.setHeader(Util.MD5_HEADER, hash.toString());
     }
   }
   
@@ -437,12 +439,13 @@ public class ImageServlet extends HttpServlet {
    */
   static void setVerificationHeadersForPut(HttpURLConnection connection,
       File file) throws IOException {
-    connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH,
+    connection.setRequestProperty(
+        Util.CONTENT_LENGTH,
         String.valueOf(file.length()));
     MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
     if (hash != null) {
       connection
-          .setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString());
+          .setRequestProperty(Util.MD5_HEADER, hash.toString());
     }
   }
 
@@ -462,7 +465,7 @@ public class ImageServlet extends HttpServlet {
     params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString());
     // setting the length of the file to be uploaded in separate property as
     // Content-Length only supports up to 2GB
-    params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize));
+    params.put(Util.FILE_LENGTH, Long.toString(imageFileSize));
     params.put(IMAGE_FILE_TYPE, nnf.name());
     return params;
   }
@@ -586,7 +589,7 @@ public class ImageServlet extends HttpServlet {
       txId = ServletUtil.parseLongParam(request, TXID_PARAM);
       storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM);
       fileSize = ServletUtil.parseLongParam(request,
-          TransferFsImage.FILE_LENGTH);
+          Util.FILE_LENGTH);
       String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
       nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
           .valueOf(imageType);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ec609b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 0fc3e60..735b2c0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -28,6 +28,7 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
+
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.FileNotFoundException;
@@ -136,6 +137,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@@ -2104,7 +2106,7 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
     } catch (FileNotFoundException e) {
       LOG.debug("Tried to read from deleted or moved edit log segment", e);
       return null;
-    } catch (TransferFsImage.HttpGetFailedException e) {
+    } catch (HttpGetFailedException e) {
       LOG.debug("Tried to read from deleted edit log segment", e);
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ec609b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
index e4b95ee..5821353 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
@@ -19,18 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.security.DigestInputStream;
-import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -44,17 +38,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.common.HttpPutFailedException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -66,6 +59,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.eclipse.jetty.io.EofException;
 
+import static org.apache.hadoop.hdfs.server.common.Util.IO_FILE_BUFFER_SIZE;
+import static org.apache.hadoop.hdfs.server.common.Util.connectionFactory;
+
 /**
  * This class provides fetching a specified file from the NameNode.
  */
@@ -82,43 +78,23 @@ public class TransferFsImage {
     private final int response;
     private final boolean shouldReThrowException;
 
-    private TransferResult(int response, boolean rethrow) {
+    TransferResult(int response, boolean rethrow) {
       this.response = response;
       this.shouldReThrowException = rethrow;
     }
 
     public static TransferResult getResultForCode(int code){
-      TransferResult ret = UNEXPECTED_FAILURE;
       for(TransferResult result:TransferResult.values()){
         if(result.response == code){
           return result;
         }
       }
-      return ret;
+      return UNEXPECTED_FAILURE;
     }
   }
 
-  public final static String CONTENT_LENGTH = "Content-Length";
-  public final static String FILE_LENGTH = "File-Length";
-  public final static String MD5_HEADER = "X-MD5-Digest";
-
-  private final static String CONTENT_TYPE = "Content-Type";
-  private final static String CONTENT_TRANSFER_ENCODING = 
"Content-Transfer-Encoding";
-  private final static int IO_FILE_BUFFER_SIZE;
-
   @VisibleForTesting
   static int timeout = 0;
-  private static final URLConnectionFactory connectionFactory;
-  private static final boolean isSpnegoEnabled;
-
-  static {
-    Configuration conf = new Configuration();
-    connectionFactory = URLConnectionFactory
-        .newDefaultURLConnectionFactory(conf);
-    isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
-    IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
-  }
-
   private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
   
   public static void downloadMostRecentImageToDirectory(URL infoServer,
@@ -159,7 +135,7 @@ public class TransferFsImage {
     }
 
     MD5Hash advertisedDigest = parseMD5Header(request);
-    MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
+    MD5Hash hash = Util.receiveFile(fileName, dstFiles, dstStorage, true,
         advertisedSize, advertisedDigest, fileName, stream, throttler);
     LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
         + dstFiles.get(0).length() + " bytes.");
@@ -226,8 +202,9 @@ public class TransferFsImage {
    * @param txid the transaction ID of the image to be uploaded
    * @throws IOException if there is an I/O error
    */
-  public static TransferResult uploadImageFromStorage(URL fsName, 
Configuration conf,
-      NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
+  static TransferResult uploadImageFromStorage(URL fsName,
+      Configuration conf, NNStorage storage, NameNodeFile nnf, long txid)
+      throws IOException {
     return uploadImageFromStorage(fsName, conf, storage, nnf, txid, null);
   }
 
@@ -323,9 +300,7 @@ public class TransferFsImage {
             responseCode, urlWithParams, connection.getResponseMessage()),
             responseCode);
       }
-    } catch (AuthenticationException e) {
-      throw new IOException(e);
-    } catch (URISyntaxException e) {
+    } catch (AuthenticationException | URISyntaxException e) {
       throw new IOException(e);
     } finally {
       if (connection != null) {
@@ -336,9 +311,9 @@ public class TransferFsImage {
 
   private static void writeFileToPutRequest(Configuration conf,
       HttpURLConnection connection, File imageFile, Canceler canceler)
-      throws FileNotFoundException, IOException {
-    connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
-    connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
+      throws IOException {
+    connection.setRequestProperty(Util.CONTENT_TYPE, 
"application/octet-stream");
+    connection.setRequestProperty(Util.CONTENT_TRANSFER_ENCODING, "binary");
     OutputStream output = connection.getOutputStream();
     FileInputStream input = new FileInputStream(imageFile);
     try {
@@ -414,7 +389,7 @@ public class TransferFsImage {
    * Copies the response from the URL to a list of local files.
    * @param dstStorage if an error occurs writing to one of the files,
    *                   this storage object will be notified. 
-   * @Return a digest of the received file if getChecksum is true
+   * @return a digest of the received file if getChecksum is true
    */
   static MD5Hash getFileClient(URL infoServer,
       String queryString, List<File> localPaths,
@@ -426,40 +401,12 @@ public class TransferFsImage {
   
   public static MD5Hash doGetUrl(URL url, List<File> localPaths,
       Storage dstStorage, boolean getChecksum) throws IOException {
-    HttpURLConnection connection;
-    try {
-      connection = (HttpURLConnection)
-        connectionFactory.openConnection(url, isSpnegoEnabled);
-    } catch (AuthenticationException e) {
-      throw new IOException(e);
-    }
-
-    setTimeout(connection);
+    return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout);
+  }
 
-    if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
-      throw new HttpGetFailedException(
-          "Image transfer servlet at " + url +
-          " failed with status code " + connection.getResponseCode() +
-          "\nResponse message:\n" + connection.getResponseMessage(),
-          connection);
-    }
-    
-    long advertisedSize;
-    String contentLength = connection.getHeaderField(CONTENT_LENGTH);
-    if (contentLength != null) {
-      advertisedSize = Long.parseLong(contentLength);
-    } else {
-      throw new IOException(CONTENT_LENGTH + " header is not provided " +
-                            "by the namenode when trying to fetch " + url);
-    }
-    MD5Hash advertisedDigest = parseMD5Header(connection);
-    String fsImageName = connection
-        .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
-    InputStream stream = connection.getInputStream();
-
-    return receiveFile(url.toExternalForm(), localPaths, dstStorage,
-        getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
-        null);
+  private static MD5Hash parseMD5Header(HttpServletRequest request) {
+    String header = request.getHeader(Util.MD5_HEADER);
+    return (header != null) ? new MD5Hash(header) : null;
   }
 
   private static void setTimeout(HttpURLConnection connection) {
@@ -467,204 +414,10 @@ public class TransferFsImage {
       Configuration conf = new HdfsConfiguration();
       timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
           DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
-      LOG.info("Image Transfer timeout configured to " + timeout
-          + " milliseconds");
-    }
-
-    if (timeout > 0) {
-      connection.setConnectTimeout(timeout);
-      connection.setReadTimeout(timeout);
-    }
-  }
-
-  private static MD5Hash receiveFile(String url, List<File> localPaths,
-      Storage dstStorage, boolean getChecksum, long advertisedSize,
-      MD5Hash advertisedDigest, String fsImageName, InputStream stream,
-      DataTransferThrottler throttler) throws IOException {
-    long startTime = Time.monotonicNow();
-    Map<FileOutputStream, File> streamPathMap = new HashMap<>();
-    StringBuilder xferStats = new StringBuilder();
-    double xferCombined = 0;
-    if (localPaths != null) {
-      // If the local paths refer to directories, use the server-provided 
header
-      // as the filename within that directory
-      List<File> newLocalPaths = new ArrayList<File>();
-      for (File localPath : localPaths) {
-        if (localPath.isDirectory()) {
-          if (fsImageName == null) {
-            throw new IOException("No filename header provided by server");
-          }
-          newLocalPaths.add(new File(localPath, fsImageName));
-        } else {
-          newLocalPaths.add(localPath);
-        }
-      }
-      localPaths = newLocalPaths;
-    }
-    
-
-    long received = 0;
-    MessageDigest digester = null;
-    if (getChecksum) {
-      digester = MD5Hash.getDigester();
-      stream = new DigestInputStream(stream, digester);
-    }
-    boolean finishedReceiving = false;
-
-    List<FileOutputStream> outputStreams = Lists.newArrayList();
-
-    try {
-      if (localPaths != null) {
-        for (File f : localPaths) {
-          try {
-            if (f.exists()) {
-              LOG.warn("Overwriting existing file " + f
-                  + " with file downloaded from " + url);
-            }
-            FileOutputStream fos = new FileOutputStream(f);
-            outputStreams.add(fos);
-            streamPathMap.put(fos, f);
-          } catch (IOException ioe) {
-            LOG.warn("Unable to download file " + f, ioe);
-            // This will be null if we're downloading the fsimage to a file
-            // outside of an NNStorage directory.
-            if (dstStorage != null &&
-                (dstStorage instanceof StorageErrorReporter)) {
-              ((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
-            }
-          }
-        }
-        
-        if (outputStreams.isEmpty()) {
-          throw new IOException(
-              "Unable to download to any storage directory");
-        }
-      }
-      
-      int num = 1;
-      byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
-      while (num > 0) {
-        num = stream.read(buf);
-        if (num > 0) {
-          received += num;
-          for (FileOutputStream fos : outputStreams) {
-            fos.write(buf, 0, num);
-          }
-          if (throttler != null) {
-            throttler.throttle(num);
-          }
-        }
-      }
-      finishedReceiving = true;
-      double xferSec = Math.max(
-                 ((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
-      long xferKb = received / 1024;
-      xferCombined += xferSec;
-      xferStats.append(
-          String.format(" The fsimage download took %.2fs at %.2f KB/s.",
-              xferSec, xferKb / xferSec));
-    } finally {
-      stream.close();
-      for (FileOutputStream fos : outputStreams) {
-        long flushStartTime = Time.monotonicNow();
-        fos.getChannel().force(true);
-        fos.close();
-        double writeSec = Math.max(((float)
-               (flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
-        xferCombined += writeSec;
-        xferStats.append(String
-                .format(" Synchronous (fsync) write to disk of " +
-                 streamPathMap.get(fos).getAbsolutePath() +
-                " took %.2fs.", writeSec));
-      }
-
-      // Something went wrong and did not finish reading.
-      // Remove the temporary files.
-      if (!finishedReceiving) {
-        deleteTmpFiles(localPaths);
-      }
-
-      if (finishedReceiving && received != advertisedSize) {
-        // only throw this exception if we think we read all of it on our end
-        // -- otherwise a client-side IOException would be masked by this
-        // exception that makes it look like a server-side problem!
-        deleteTmpFiles(localPaths);
-        throw new IOException("File " + url + " received length " + received +
-                              " is not of the advertised size " +
-                              advertisedSize);
-      }
-    }
-    xferStats.insert(
-        0, String.format(
-            "Combined time for fsimage download and fsync " +
-            "to all disks took %.2fs.", xferCombined));
-    LOG.info(xferStats.toString());
-
-    if (digester != null) {
-      MD5Hash computedDigest = new MD5Hash(digester.digest());
-      
-      if (advertisedDigest != null &&
-          !computedDigest.equals(advertisedDigest)) {
-        deleteTmpFiles(localPaths);
-        throw new IOException("File " + url + " computed digest " +
-            computedDigest + " does not match advertised digest " + 
-            advertisedDigest);
-      }
-      return computedDigest;
-    } else {
-      return null;
-    }    
-  }
-
-  private static void deleteTmpFiles(List<File> files) {
-    if (files == null) {
-      return;
-    }
-
-    LOG.info("Deleting temporary files: " + files);
-    for (File file : files) {
-      if (!file.delete()) {
-        LOG.warn("Deleting " + file + " has failed");
-      }
+      LOG.info("Image Transfer timeout configured to " + timeout +
+          " milliseconds");
     }
-  }
 
-  private static MD5Hash parseMD5Header(HttpURLConnection connection) {
-    String header = connection.getHeaderField(MD5_HEADER);
-    return (header != null) ? new MD5Hash(header) : null;
-  }
-
-  private static MD5Hash parseMD5Header(HttpServletRequest request) {
-    String header = request.getHeader(MD5_HEADER);
-    return (header != null) ? new MD5Hash(header) : null;
+    Util.setTimeout(connection, timeout);
   }
-
-  public static class HttpGetFailedException extends IOException {
-    private static final long serialVersionUID = 1L;
-    private final int responseCode;
-
-    HttpGetFailedException(String msg, HttpURLConnection connection) throws 
IOException {
-      super(msg);
-      this.responseCode = connection.getResponseCode();
-    }
-    
-    public int getResponseCode() {
-      return responseCode;
-    }
-  }
-
-  public static class HttpPutFailedException extends IOException {
-    private static final long serialVersionUID = 1L;
-    private final int responseCode;
-
-    HttpPutFailedException(String msg, int responseCode) throws IOException {
-      super(msg);
-      this.responseCode = responseCode;
-    }
-
-    public int getResponseCode() {
-      return responseCode;
-    }
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to