Merge branch '1.6' into 1.7

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f85717cb
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f85717cb
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f85717cb

Branch: refs/heads/master
Commit: f85717cbdbedf9d705bc2c246a2fbe684a63d336
Parents: afa475d 4eb8085
Author: Josh Elser <els...@apache.org>
Authored: Fri Dec 4 16:24:11 2015 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Fri Dec 4 16:24:11 2015 -0500

----------------------------------------------------------------------
 .../accumulo/core/rpc/TTimeoutTransport.java    | 21 ++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f85717cb/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
index 8c23555,0000000..400d90a
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@@ -1,60 -1,0 +1,77 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.rpc;
 +
 +import java.io.BufferedInputStream;
 +import java.io.BufferedOutputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.OutputStream;
 +import java.lang.reflect.Method;
 +import java.net.InetSocketAddress;
 +import java.net.Socket;
 +import java.net.SocketAddress;
 +import java.nio.channels.spi.SelectorProvider;
 +
 +import org.apache.hadoop.net.NetUtils;
 +import org.apache.thrift.transport.TIOStreamTransport;
 +import org.apache.thrift.transport.TTransport;
 +
 +import com.google.common.net.HostAndPort;
 +
 +public class TTimeoutTransport {
 +
++  private static volatile Method GET_INPUT_STREAM_METHOD = null;
++
++  private static Method getNetUtilsInputStreamMethod() {
++    if (null == GET_INPUT_STREAM_METHOD) {
++      synchronized (TTimeoutTransport.class) {
++        if (null == GET_INPUT_STREAM_METHOD) {
++          try {
++            GET_INPUT_STREAM_METHOD = 
NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
++          } catch (Exception e) {
++            throw new RuntimeException(e);
++          }
++        }
++      }
++    }
++
++    return GET_INPUT_STREAM_METHOD;
++  }
++
 +  private static InputStream getInputStream(Socket socket, long timeout) {
 +    try {
-       Method m = NetUtils.class.getMethod("getInputStream", Socket.class, 
Long.TYPE);
-       return (InputStream) m.invoke(null, socket, timeout);
++      return (InputStream) getNetUtilsInputStreamMethod().invoke(null, 
socket, timeout);
 +    } catch (Exception e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  public static TTransport create(HostAndPort addr, long timeoutMillis) 
throws IOException {
 +    return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), 
timeoutMillis);
 +  }
 +
 +  public static TTransport create(SocketAddress addr, long timeoutMillis) 
throws IOException {
 +    Socket socket = SelectorProvider.provider().openSocketChannel().socket();
 +    socket.setSoLinger(false, 0);
 +    socket.setTcpNoDelay(true);
 +    socket.connect(addr);
 +    InputStream input = new BufferedInputStream(getInputStream(socket, 
timeoutMillis), 1024 * 10);
 +    OutputStream output = new 
BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 
10);
 +    return new TIOStreamTransport(input, output);
 +  }
 +}

Reply via email to