Repository: samza Updated Branches: refs/heads/master 1c7e4d7aa -> bf9bc2c37
SAMZA-1079: Add timeouts for reads from HttpFileSystem. Add tests. * Wrote a unit/integration test to simulate a stuck connection when reading binaries for the job. Other misc. changes: - Moved some debug log messages to be info for better debugging. Author: vjagadish1989 <jvenk...@linkedin.com> Reviewers: jmakes,nickpan47 Closes #42 from vjagadish/http-fs Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bf9bc2c3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bf9bc2c3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bf9bc2c3 Branch: refs/heads/master Commit: bf9bc2c373f3456159fdc610f268992e8d9a476c Parents: 1c7e4d7 Author: vjagadish1989 <jvenk...@linkedin.com> Authored: Mon Jan 30 15:10:06 2017 -0800 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Mon Jan 30 15:10:06 2017 -0800 ---------------------------------------------------------------------- .../samza/util/hadoop/HttpFileSystem.scala | 15 +- .../yarn/util/hadoop/TestHttpFileSystem.java | 161 +++++++++++++++++++ 2 files changed, 173 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/bf9bc2c3/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala b/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala index 7dff90e..fa65a4e 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/util/hadoop/HttpFileSystem.scala @@ -37,20 +37,29 @@ import org.apache.hadoop.util.Progressable import org.apache.samza.util.Logging class HttpFileSystem extends FileSystem with Logging { - val DEFAULT_BLOCK_SIZE = 4 * 1024; + val DEFAULT_BLOCK_SIZE = 4 * 1024 var uri: URI = null + var connectionTimeoutMs = 5 * 60 * 1000 + var socketReadTimeoutMs = 5 * 60 * 1000 + + def setConnectionTimeoutMs(timeout: Int): Unit = connectionTimeoutMs = timeout + + def setSocketReadTimeoutMs(timeout: Int): Unit = socketReadTimeoutMs = timeout override def initialize(uri: URI, conf: Configuration) { super.initialize(uri, conf) - debug("init uri %s" format (uri)) + info("init uri %s" format (uri)) this.uri = uri } override def getUri = uri override def open(f: Path, bufferSize: Int): FSDataInputStream = { - debug("open http file %s" format (f)) + info("open http file %s" format (f)) val client = new HttpClient + client.getHttpConnectionManager.getParams.setConnectionTimeout(connectionTimeoutMs) + client.getHttpConnectionManager.getParams.setSoTimeout(socketReadTimeoutMs) + val method = new GetMethod(f.toUri.toString) val statusCode = client.executeMethod(method) http://git-wip-us.apache.org/repos/asf/samza/blob/bf9bc2c3/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java new file mode 100644 index 0000000..6f42856 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/hadoop/TestHttpFileSystem.java @@ -0,0 +1,161 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.job.yarn.util.hadoop; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.samza.coordinator.server.HttpServer; +import org.apache.samza.util.hadoop.HttpFileSystem; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Test the behavior of {@link HttpFileSystem} + */ +public class TestHttpFileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(TestHttpFileSystem.class); + /** + * Number of bytes the server should stream before hanging the TCP connection. + */ + private static final int THRESHOLD_BYTES = 5; + private static final String RESPONSE_STR = "HELLO WORLD"; + + private final CountDownLatch serverWaitLatch = new CountDownLatch(1); + + private Exception clientException; + private Exception serverException; + + /** + * A {@link HttpServlet} implementation that streams its response to the client one byte at a time. + */ + private class PartialFileFetchServlet extends HttpServlet { + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) { + + //Mimic download of the package tar-ball + response.setContentType("application/gzip"); + response.setStatus(HttpServletResponse.SC_OK); + + try { + int currByteWritten = -1; + int numBytesWritten = 0; + //Begin streaming a response. + InputStream in = new ByteArrayInputStream(RESPONSE_STR.getBytes()); + OutputStream out = response.getOutputStream(); + + while ((currByteWritten = in.read()) != -1) { + out.write(currByteWritten); + out.flush(); + numBytesWritten++; + + //Hang the connection until the read timeout expires on the client side. + if (numBytesWritten >= THRESHOLD_BYTES) { + if(!serverWaitLatch.await(5, TimeUnit.SECONDS)) { + throw new IOException("Timed out waiting for latch"); + } + break; + } + } + } catch(Exception e) { + //Record any exception that may have occurred + LOG.error("{}", e); + serverException = e; + } + } + } + + class FileSystemClientThread extends Thread { + + private static final int TIMEOUT_MS = 1000; + private final URI resourceURI; + private int totalBytesRead = 0; + + FileSystemClientThread(URI resourceURI) { + this.resourceURI = resourceURI; + } + + public int getTotalBytesRead() { + return totalBytesRead; + } + + @Override + public void run() { + try { + Path resource = new Path(resourceURI); + Configuration conf = new Configuration(); + HttpFileSystem fs = new HttpFileSystem(); + fs.setSocketReadTimeoutMs(TIMEOUT_MS); + fs.setConnectionTimeoutMs(TIMEOUT_MS); + fs.setConf(conf); + fs.initialize(resourceURI, conf); + + //Read from the socket one byte at a time. + FSDataInputStream in = fs.open(resource); + while (in.read() >= 0) { + totalBytesRead++; + } + } catch(SocketTimeoutException e) { + //Expect the socket to timeout after THRESHOLD bytes have been read. + serverWaitLatch.countDown(); + } catch(Exception e) { + //Record any exception that may have occurred. + LOG.error("{}", e); + clientException = e; + } + } + } + + @Test + public void testHttpFileSystemReadTimeouts() throws Exception { + HttpServer server = new HttpServer("/", 0, null, new ServletHolder(DefaultServlet.class)); + try { + server.addServlet("/download", new PartialFileFetchServlet()); + server.start(); + String serverUrl = server.getUrl().toString() + "download"; + FileSystemClientThread fileSystemClientThread = new FileSystemClientThread(new URI(serverUrl)); + fileSystemClientThread.start(); + fileSystemClientThread.join(); + Assert.assertEquals(fileSystemClientThread.getTotalBytesRead(), THRESHOLD_BYTES); + Assert.assertNull(clientException); + Assert.assertNull(serverException); + } finally { + server.stop(); + } + } +} +