Merge branch '1.6' into 1.7
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f85717cb Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f85717cb Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f85717cb Branch: refs/heads/master Commit: f85717cbdbedf9d705bc2c246a2fbe684a63d336 Parents: afa475d 4eb8085 Author: Josh Elser <els...@apache.org> Authored: Fri Dec 4 16:24:11 2015 -0500 Committer: Josh Elser <els...@apache.org> Committed: Fri Dec 4 16:24:11 2015 -0500 ---------------------------------------------------------------------- .../accumulo/core/rpc/TTimeoutTransport.java | 21 ++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f85717cb/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java index 8c23555,0000000..400d90a mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java @@@ -1,60 -1,0 +1,77 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.spi.SelectorProvider; + +import org.apache.hadoop.net.NetUtils; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransport; + +import com.google.common.net.HostAndPort; + +public class TTimeoutTransport { + ++ private static volatile Method GET_INPUT_STREAM_METHOD = null; ++ ++ private static Method getNetUtilsInputStreamMethod() { ++ if (null == GET_INPUT_STREAM_METHOD) { ++ synchronized (TTimeoutTransport.class) { ++ if (null == GET_INPUT_STREAM_METHOD) { ++ try { ++ GET_INPUT_STREAM_METHOD = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE); ++ } catch (Exception e) { ++ throw new RuntimeException(e); ++ } ++ } ++ } ++ } ++ ++ return GET_INPUT_STREAM_METHOD; ++ } ++ + private static InputStream getInputStream(Socket socket, long timeout) { + try { - Method m = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE); - return (InputStream) m.invoke(null, socket, timeout); ++ return (InputStream) getNetUtilsInputStreamMethod().invoke(null, socket, timeout); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException { + return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis); + } + + public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException { + Socket socket = SelectorProvider.provider().openSocketChannel().socket(); + socket.setSoLinger(false, 0); + socket.setTcpNoDelay(true); + socket.connect(addr); + InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10); + OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10); + return new TIOStreamTransport(input, output); + } +}