HBASE-21652 Refactor ThriftServer making thrift2 server inherited from thrift1 server
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e4b6b4af Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e4b6b4af Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e4b6b4af Branch: refs/heads/HBASE-21512 Commit: e4b6b4afb933a961f543537875f87a2dc62d3757 Parents: f0b50a8 Author: Allan Yang <allan...@apache.org> Authored: Wed Jan 2 16:13:17 2019 +0800 Committer: Allan Yang <allan...@apache.org> Committed: Wed Jan 2 16:13:57 2019 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/thrift/Constants.java | 151 ++ .../hbase/thrift/HBaseServiceHandler.java | 90 + .../hbase/thrift/HbaseHandlerMetricsProxy.java | 20 +- .../apache/hadoop/hbase/thrift/ImplType.java | 143 ++ .../hadoop/hbase/thrift/IncrementCoalescer.java | 6 +- .../hbase/thrift/ThriftHBaseServiceHandler.java | 1347 ++++++++++++ .../hadoop/hbase/thrift/ThriftHttpServlet.java | 12 +- .../hadoop/hbase/thrift/ThriftServer.java | 698 +++++- .../hadoop/hbase/thrift/ThriftServerRunner.java | 2031 ------------------ .../thrift2/ThriftHBaseServiceHandler.java | 69 +- .../hadoop/hbase/thrift2/ThriftServer.java | 594 +---- .../resources/hbase-webapps/thrift/thrift.jsp | 2 +- .../hbase/thrift/TestThriftHttpServer.java | 28 +- .../hadoop/hbase/thrift/TestThriftServer.java | 58 +- .../hbase/thrift/TestThriftServerCmdLine.java | 48 +- .../thrift/TestThriftSpnegoHttpServer.java | 21 +- .../hbase/thrift2/TestThrift2HttpServer.java | 90 + .../hbase/thrift2/TestThrift2ServerCmdLine.java | 99 + .../thrift2/TestThriftHBaseServiceHandler.java | 15 +- 19 files changed, 2711 insertions(+), 2811 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/Constants.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/Constants.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/Constants.java new file mode 100644 index 0000000..8e3d004 --- /dev/null +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/Constants.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.thrift; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Thrift related constants + */ +@InterfaceAudience.Private +public final class Constants { + private Constants(){} + + public static final int DEFAULT_HTTP_MAX_HEADER_SIZE = 64 * 1024; // 64k + + public static final String SERVER_TYPE_CONF_KEY = + "hbase.regionserver.thrift.server.type"; + + public static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact"; + public static final boolean COMPACT_CONF_DEFAULT = false; + + public static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed"; + public static final boolean FRAMED_CONF_DEFAULT = false; + + public static final String MAX_FRAME_SIZE_CONF_KEY = + "hbase.regionserver.thrift.framed.max_frame_size_in_mb"; + public static final int MAX_FRAME_SIZE_CONF_DEFAULT = 2; + + public static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement"; + public static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http"; + + public static final String HTTP_MIN_THREADS_KEY = "hbase.thrift.http_threads.min"; + public static final int HTTP_MIN_THREADS_KEY_DEFAULT = 2; + + public static final String HTTP_MAX_THREADS_KEY = "hbase.thrift.http_threads.max"; + public static final int HTTP_MAX_THREADS_KEY_DEFAULT = 100; + + // ssl related configs + public static final String THRIFT_SSL_ENABLED_KEY = "hbase.thrift.ssl.enabled"; + public static final String THRIFT_SSL_KEYSTORE_STORE_KEY = "hbase.thrift.ssl.keystore.store"; + public static final String THRIFT_SSL_KEYSTORE_PASSWORD_KEY = + "hbase.thrift.ssl.keystore.password"; + public static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY + = "hbase.thrift.ssl.keystore.keypassword"; + public static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY = + "hbase.thrift.ssl.exclude.cipher.suites"; + public static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY = + "hbase.thrift.ssl.include.cipher.suites"; + public static final String THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY = + "hbase.thrift.ssl.exclude.protocols"; + public static final String THRIFT_SSL_INCLUDE_PROTOCOLS_KEY = + "hbase.thrift.ssl.include.protocols"; + + + public static final String THRIFT_SUPPORT_PROXYUSER_KEY = "hbase.thrift.support.proxyuser"; + + //kerberos related configs + public static final String THRIFT_DNS_INTERFACE_KEY = "hbase.thrift.dns.interface"; + public static final String THRIFT_DNS_NAMESERVER_KEY = "hbase.thrift.dns.nameserver"; + public static final String THRIFT_KERBEROS_PRINCIPAL_KEY = "hbase.thrift.kerberos.principal"; + public static final String THRIFT_KEYTAB_FILE_KEY = "hbase.thrift.keytab.file"; + public static final String THRIFT_SPNEGO_PRINCIPAL_KEY = "hbase.thrift.spnego.principal"; + public static final String THRIFT_SPNEGO_KEYTAB_FILE_KEY = "hbase.thrift.spnego.keytab.file"; + + /** + * Amount of time in milliseconds before a server thread will timeout + * waiting for client to send data on a connected socket. Currently, + * applies only to TBoundedThreadPoolServer + */ + public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY = + "hbase.thrift.server.socket.read.timeout"; + public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000; + + + /** + * Thrift quality of protection configuration key. Valid values can be: + * auth-conf: authentication, integrity and confidentiality checking + * auth-int: authentication and integrity checking + * auth: authentication only + * + * This is used to authenticate the callers and support impersonation. + * The thrift server and the HBase cluster must run in secure mode. + */ + public static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop"; + + public static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog"; + public static final int BACKLOG_CONF_DEAFULT = 0; + + public static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress"; + public static final String DEFAULT_BIND_ADDR = "0.0.0.0"; + + public static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port"; + public static final int DEFAULT_LISTEN_PORT = 9090; + + public static final String THRIFT_HTTP_ALLOW_OPTIONS_METHOD = + "hbase.thrift.http.allow.options.method"; + public static final boolean THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT = false; + + public static final String THRIFT_INFO_SERVER_PORT = "hbase.thrift.info.port"; + public static final int THRIFT_INFO_SERVER_PORT_DEFAULT = 9095; + + public static final String THRIFT_INFO_SERVER_BINDING_ADDRESS = "hbase.thrift.info.bindAddress"; + public static final String THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT = "0.0.0.0"; + + public static final String THRIFT_QUEUE_SIZE = "hbase.thrift.queue.size"; + public static final int THRIFT_QUEUE_SIZE_DEFAULT = Integer.MAX_VALUE; + + public static final String THRIFT_SELECTOR_NUM = "hbase.thrift.selector.num"; + + public static final String THRIFT_FILTERS = "hbase.thrift.filters"; + + // Command line options + + public static final String READ_TIMEOUT_OPTION = "readTimeout"; + public static final String MIN_WORKERS_OPTION = "minWorkers"; + public static final String MAX_WORKERS_OPTION = "workers"; + public static final String MAX_QUEUE_SIZE_OPTION = "queue"; + public static final String SELECTOR_NUM_OPTION = "selectors"; + public static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec"; + public static final String BIND_OPTION = "bind"; + public static final String COMPACT_OPTION = "compact"; + public static final String FRAMED_OPTION = "framed"; + public static final String PORT_OPTION = "port"; + public static final String INFOPORT_OPTION = "infoport"; + + //for thrift2 server + public static final String READONLY_OPTION ="readonly"; + + public static final String THRIFT_READONLY_ENABLED = "hbase.thrift.readonly"; + public static final boolean THRIFT_READONLY_ENABLED_DEFAULT = false; + + + + + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java new file mode 100644 index 0000000..7990871 --- /dev/null +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java @@ -0,0 +1,90 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.thrift; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ConnectionCache; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * abstract class for HBase handler + * providing a Connection cache and get table/admin method + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public abstract class HBaseServiceHandler { + public static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval"; + public static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime"; + + protected Configuration conf; + + protected final ConnectionCache connectionCache; + + public HBaseServiceHandler(final Configuration c, + final UserProvider userProvider) throws IOException { + this.conf = c; + int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000); + int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000); + connectionCache = new ConnectionCache( + conf, userProvider, cleanInterval, maxIdleTime); + } + + protected ThriftMetrics metrics = null; + + public void initMetrics(ThriftMetrics metrics) { + this.metrics = metrics; + } + + public void setEffectiveUser(String effectiveUser) { + connectionCache.setEffectiveUser(effectiveUser); + } + + /** + * Obtain HBaseAdmin. Creates the instance if it is not already created. + */ + protected Admin getAdmin() throws IOException { + return connectionCache.getAdmin(); + } + + /** + * Creates and returns a Table instance from a given table name. + * + * @param tableName + * name of table + * @return Table object + * @throws IOException if getting the table fails + */ + protected Table getTable(final byte[] tableName) throws IOException { + String table = Bytes.toString(tableName); + return connectionCache.getTable(table); + } + + protected Table getTable(final ByteBuffer tableName) throws IOException { + return getTable(Bytes.getBytes(tableName)); + } + + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java index 5a6e436..1402f86 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java @@ -25,9 +25,8 @@ import java.lang.reflect.Proxy; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.thrift.generated.Hbase; +import org.apache.hadoop.hbase.thrift2.generated.THBaseService; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Converts a Hbase.Iface using InvocationHandler so that it reports process @@ -36,10 +35,7 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private public final class HbaseHandlerMetricsProxy implements InvocationHandler { - private static final Logger LOG = LoggerFactory.getLogger( - HbaseHandlerMetricsProxy.class); - - private final Hbase.Iface handler; + private final Object handler; private final ThriftMetrics metrics; public static Hbase.Iface newInstance(Hbase.Iface handler, @@ -51,8 +47,18 @@ public final class HbaseHandlerMetricsProxy implements InvocationHandler { new HbaseHandlerMetricsProxy(handler, metrics, conf)); } + // for thrift 2 + public static THBaseService.Iface newInstance(THBaseService.Iface handler, + ThriftMetrics metrics, + Configuration conf) { + return (THBaseService.Iface) Proxy.newProxyInstance( + handler.getClass().getClassLoader(), + new Class[]{THBaseService.Iface.class}, + new HbaseHandlerMetricsProxy(handler, metrics, conf)); + } + private HbaseHandlerMetricsProxy( - Hbase.Iface handler, ThriftMetrics metrics, Configuration conf) { + Object handler, ThriftMetrics metrics, Configuration conf) { this.handler = handler; this.metrics = metrics; } http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ImplType.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ImplType.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ImplType.java new file mode 100644 index 0000000..7108115 --- /dev/null +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ImplType.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.thrift; + +import static org.apache.hadoop.hbase.thrift.Constants.SERVER_TYPE_CONF_KEY; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.server.THsHaServer; +import org.apache.thrift.server.TNonblockingServer; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadedSelectorServer; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; +import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup; + +/** An enum of server implementation selections */ +@InterfaceAudience.Private +public enum ImplType { + HS_HA("hsha", true, THsHaServer.class, true), + NONBLOCKING("nonblocking", true, TNonblockingServer.class, true), + THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true), + THREADED_SELECTOR("threadedselector", true, TThreadedSelectorServer.class, true); + + private static final Logger LOG = LoggerFactory.getLogger(ImplType.class); + public static final ImplType DEFAULT = THREAD_POOL; + + + final String option; + final boolean isAlwaysFramed; + final Class<? extends TServer> serverClass; + final boolean canSpecifyBindIP; + + private ImplType(String option, boolean isAlwaysFramed, + Class<? extends TServer> serverClass, boolean canSpecifyBindIP) { + this.option = option; + this.isAlwaysFramed = isAlwaysFramed; + this.serverClass = serverClass; + this.canSpecifyBindIP = canSpecifyBindIP; + } + + /** + * @return <code>-option</code> + */ + @Override + public String toString() { + return "-" + option; + } + + public String getOption() { + return option; + } + + public boolean isAlwaysFramed() { + return isAlwaysFramed; + } + + public String getDescription() { + StringBuilder sb = new StringBuilder("Use the " + + serverClass.getSimpleName()); + if (isAlwaysFramed) { + sb.append(" This implies the framed transport."); + } + if (this == DEFAULT) { + sb.append("This is the default."); + } + return sb.toString(); + } + + static OptionGroup createOptionGroup() { + OptionGroup group = new OptionGroup(); + for (ImplType t : values()) { + group.addOption(new Option(t.option, t.getDescription())); + } + return group; + } + + public static ImplType getServerImpl(Configuration conf) { + String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option); + for (ImplType t : values()) { + if (confType.equals(t.option)) { + return t; + } + } + throw new AssertionError("Unknown server ImplType.option:" + confType); + } + + static void setServerImpl(CommandLine cmd, Configuration conf) { + ImplType chosenType = null; + int numChosen = 0; + for (ImplType t : values()) { + if (cmd.hasOption(t.option)) { + chosenType = t; + ++numChosen; + } + } + if (numChosen < 1) { + LOG.info("Using default thrift server type"); + chosenType = DEFAULT; + } else if (numChosen > 1) { + throw new AssertionError("Exactly one option out of " + + Arrays.toString(values()) + " has to be specified"); + } + LOG.info("Using thrift server type " + chosenType.option); + conf.set(SERVER_TYPE_CONF_KEY, chosenType.option); + } + + public String simpleClassName() { + return serverClass.getSimpleName(); + } + + public static List<String> serversThatCannotSpecifyBindIP() { + List<String> l = new ArrayList<>(); + for (ImplType t : values()) { + if (!t.canSpecifyBindIP) { + l.add(t.simpleClassName()); + } + } + return l; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java index e36d639..971cd17 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java @@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler; import org.apache.hadoop.hbase.thrift.generated.TIncrement; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -180,7 +179,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { private final ConcurrentMap<FullyQualifiedRow, Long> countersMap = new ConcurrentHashMap<>(100000, 0.75f, 1500); private final ThreadPoolExecutor pool; - private final HBaseHandler handler; + private final ThriftHBaseServiceHandler handler; private int maxQueueSize = 500000; private static final int CORE_POOL_SIZE = 1; @@ -188,7 +187,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { private static final Logger LOG = LoggerFactory.getLogger(FullyQualifiedRow.class); @SuppressWarnings("deprecation") - public IncrementCoalescer(HBaseHandler hand) { + public IncrementCoalescer(ThriftHBaseServiceHandler hand) { this.handler = hand; LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); pool = @@ -230,6 +229,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { inc.getAmmount()); } + @SuppressWarnings("FutureReturnValueIgnored") private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam, byte[] qual, long ammount) throws TException { int countersMapSize = countersMap.size(); http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java new file mode 100644 index 0000000..34bf5e8 --- /dev/null +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java @@ -0,0 +1,1347 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY; +import static org.apache.hadoop.hbase.util.Bytes.getBytes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.OperationWithAttributes; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.ParseFilter; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.WhileMatchFilter; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; +import org.apache.hadoop.hbase.thrift.generated.BatchMutation; +import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; +import org.apache.hadoop.hbase.thrift.generated.Hbase; +import org.apache.hadoop.hbase.thrift.generated.IOError; +import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; +import org.apache.hadoop.hbase.thrift.generated.Mutation; +import org.apache.hadoop.hbase.thrift.generated.TAppend; +import org.apache.hadoop.hbase.thrift.generated.TCell; +import org.apache.hadoop.hbase.thrift.generated.TIncrement; +import org.apache.hadoop.hbase.thrift.generated.TRegionInfo; +import org.apache.hadoop.hbase.thrift.generated.TRowResult; +import org.apache.hadoop.hbase.thrift.generated.TScan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.thrift.TException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; + +/** + * The HBaseServiceHandler is a glue object that connects Thrift RPC calls to the + * HBase client API primarily defined in the Admin and Table objects. + */ +@InterfaceAudience.Private +@SuppressWarnings("deprecation") +public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements Hbase.Iface { + private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class); + + public static final int HREGION_VERSION = 1; + + // nextScannerId and scannerMap are used to manage scanner state + private int nextScannerId = 0; + private HashMap<Integer, ResultScannerWrapper> scannerMap; + IncrementCoalescer coalescer; + + /** + * Returns a list of all the column families for a given Table. + * + * @param table table + * @throws IOException + */ + byte[][] getAllColumns(Table table) throws IOException { + HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies(); + byte[][] columns = new byte[cds.length][]; + for (int i = 0; i < cds.length; i++) { + columns[i] = Bytes.add(cds[i].getName(), + KeyValue.COLUMN_FAMILY_DELIM_ARRAY); + } + return columns; + } + + + /** + * Assigns a unique ID to the scanner and adds the mapping to an internal + * hash-map. + * + * @param scanner the {@link ResultScanner} to add + * @return integer scanner id + */ + protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) { + int id = nextScannerId++; + ResultScannerWrapper resultScannerWrapper = + new ResultScannerWrapper(scanner, sortColumns); + scannerMap.put(id, resultScannerWrapper); + return id; + } + + /** + * Returns the scanner associated with the specified ID. + * + * @param id the ID of the scanner to get + * @return a Scanner, or null if ID was invalid. + */ + private synchronized ResultScannerWrapper getScanner(int id) { + return scannerMap.get(id); + } + + /** + * Removes the scanner associated with the specified ID from the internal + * id->scanner hash-map. + * + * @param id the ID of the scanner to remove + * @return a Scanner, or null if ID was invalid. + */ + private synchronized ResultScannerWrapper removeScanner(int id) { + return scannerMap.remove(id); + } + + protected ThriftHBaseServiceHandler(final Configuration c, + final UserProvider userProvider) throws IOException { + super(c, userProvider); + scannerMap = new HashMap<>(); + this.coalescer = new IncrementCoalescer(this); + } + + + @Override + public void enableTable(ByteBuffer tableName) throws IOError { + try{ + getAdmin().enableTable(getTableName(tableName)); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } + } + + @Override + public void disableTable(ByteBuffer tableName) throws IOError{ + try{ + getAdmin().disableTable(getTableName(tableName)); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } + } + + @Override + public boolean isTableEnabled(ByteBuffer tableName) throws IOError { + try { + return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName)); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } + } + + // ThriftServerRunner.compact should be deprecated and replaced with methods specific to + // table and region. + @Override + public void compact(ByteBuffer tableNameOrRegionName) throws IOError { + try { + try { + getAdmin().compactRegion(getBytes(tableNameOrRegionName)); + } catch (IllegalArgumentException e) { + // Invalid region, try table + getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName))); + } + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } + } + + // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific + // to table and region. + @Override + public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError { + try { + try { + getAdmin().compactRegion(getBytes(tableNameOrRegionName)); + } catch (IllegalArgumentException e) { + // Invalid region, try table + getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName))); + } + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } + } + + @Override + public List<ByteBuffer> getTableNames() throws IOError { + try { + TableName[] tableNames = this.getAdmin().listTableNames(); + ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length); + for (TableName tableName : tableNames) { + list.add(ByteBuffer.wrap(tableName.getName())); + } + return list; + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } + } + + /** + * @return the list of regions in the given table, or an empty list if the table does not exist + */ + @Override + public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError { + try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) { + List<HRegionLocation> regionLocations = locator.getAllRegionLocations(); + List<TRegionInfo> results = new ArrayList<>(regionLocations.size()); + for (HRegionLocation regionLocation : regionLocations) { + RegionInfo info = regionLocation.getRegionInfo(); + ServerName serverName = regionLocation.getServerName(); + TRegionInfo region = new TRegionInfo(); + region.serverName = ByteBuffer.wrap( + Bytes.toBytes(serverName.getHostname())); + region.port = serverName.getPort(); + region.startKey = ByteBuffer.wrap(info.getStartKey()); + region.endKey = ByteBuffer.wrap(info.getEndKey()); + region.id = info.getRegionId(); + region.name = ByteBuffer.wrap(info.getRegionName()); + region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used + results.add(region); + } + return results; + } catch (TableNotFoundException e) { + // Return empty list for non-existing table + return Collections.emptyList(); + } catch (IOException e){ + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } + } + + @Override + public List<TCell> get( + ByteBuffer tableName, ByteBuffer row, ByteBuffer column, + Map<ByteBuffer, ByteBuffer> attributes) + throws IOError { + byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + return get(tableName, row, famAndQf[0], null, attributes); + } + if (famAndQf.length == 2) { + return get(tableName, row, famAndQf[0], famAndQf[1], attributes); + } + throw new IllegalArgumentException("Invalid familyAndQualifier provided."); + } + + /** + * Note: this internal interface is slightly different from public APIs in regard to handling + * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, + * we respect qual == null as a request for the entire column family. The caller ( + * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the + * column is parse like normal. + */ + protected List<TCell> get(ByteBuffer tableName, + ByteBuffer row, + byte[] family, + byte[] qualifier, + Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + Table table = null; + try { + table = getTable(tableName); + Get get = new Get(getBytes(row)); + addAttributes(get, attributes); + if (qualifier == null) { + get.addFamily(family); + } else { + get.addColumn(family, qualifier); + } + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.rawCells()); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally { + closeTable(table); + } + } + + @Override + public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, + int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return getVer(tableName, row, famAndQf[0], null, numVersions, attributes); + } + if (famAndQf.length == 2) { + return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes); + } + throw new IllegalArgumentException("Invalid familyAndQualifier provided."); + + } + + /** + * Note: this public interface is slightly different from public Java APIs in regard to + * handling of the qualifier. Here we differ from the public Java API in that null != byte[0]. + * Rather, we respect qual == null as a request for the entire column family. If you want to + * access the entire column family, use + * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value + * that lacks a {@code ':'}. + */ + public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family, + byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + + Table table = null; + try { + table = getTable(tableName); + Get get = new Get(getBytes(row)); + addAttributes(get, attributes); + if (null == qualifier) { + get.addFamily(family); + } else { + get.addColumn(family, qualifier); + } + get.setMaxVersions(numVersions); + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.rawCells()); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, + long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes); + } + if (famAndQf.length == 2) { + return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions, + attributes); + } + throw new IllegalArgumentException("Invalid familyAndQualifier provided."); + } + + /** + * Note: this internal interface is slightly different from public APIs in regard to handling + * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, + * we respect qual == null as a request for the entire column family. The caller ( + * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS + * consistent in that the column is parse like normal. + */ + protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family, + byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) + throws IOError { + + Table table = null; + try { + table = getTable(tableName); + Get get = new Get(getBytes(row)); + addAttributes(get, attributes); + if (null == qualifier) { + get.addFamily(family); + } else { + get.addColumn(family, qualifier); + } + get.setTimeRange(0, timestamp); + get.setMaxVersions(numVersions); + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.rawCells()); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row, + Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + return getRowWithColumnsTs(tableName, row, null, + HConstants.LATEST_TIMESTAMP, + attributes); + } + + @Override + public List<TRowResult> getRowWithColumns(ByteBuffer tableName, + ByteBuffer row, + List<ByteBuffer> columns, + Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + return getRowWithColumnsTs(tableName, row, columns, + HConstants.LATEST_TIMESTAMP, + attributes); + } + + @Override + public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row, + long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + return getRowWithColumnsTs(tableName, row, null, + timestamp, attributes); + } + + @Override + public List<TRowResult> getRowWithColumnsTs( + ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns, + long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + + Table table = null; + try { + table = getTable(tableName); + if (columns == null) { + Get get = new Get(getBytes(row)); + addAttributes(get, attributes); + get.setTimeRange(0, timestamp); + Result result = table.get(get); + return ThriftUtilities.rowResultFromHBase(result); + } + Get get = new Get(getBytes(row)); + addAttributes(get, attributes); + for(ByteBuffer column : columns) { + byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + get.addFamily(famAndQf[0]); + } else { + get.addColumn(famAndQf[0], famAndQf[1]); + } + } + get.setTimeRange(0, timestamp); + Result result = table.get(get); + return ThriftUtilities.rowResultFromHBase(result); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public List<TRowResult> getRows(ByteBuffer tableName, + List<ByteBuffer> rows, + Map<ByteBuffer, ByteBuffer> attributes) + throws IOError { + return getRowsWithColumnsTs(tableName, rows, null, + HConstants.LATEST_TIMESTAMP, + attributes); + } + + @Override + public List<TRowResult> getRowsWithColumns(ByteBuffer tableName, + List<ByteBuffer> rows, + List<ByteBuffer> columns, + Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + return getRowsWithColumnsTs(tableName, rows, columns, + HConstants.LATEST_TIMESTAMP, + attributes); + } + + @Override + public List<TRowResult> getRowsTs(ByteBuffer tableName, + List<ByteBuffer> rows, + long timestamp, + Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + return getRowsWithColumnsTs(tableName, rows, null, + timestamp, attributes); + } + + @Override + public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName, + List<ByteBuffer> rows, + List<ByteBuffer> columns, long timestamp, + Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + + Table table= null; + try { + List<Get> gets = new ArrayList<>(rows.size()); + table = getTable(tableName); + if (metrics != null) { + metrics.incNumRowKeysInBatchGet(rows.size()); + } + for (ByteBuffer row : rows) { + Get get = new Get(getBytes(row)); + addAttributes(get, attributes); + if (columns != null) { + + for(ByteBuffer column : columns) { + byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + get.addFamily(famAndQf[0]); + } else { + get.addColumn(famAndQf[0], famAndQf[1]); + } + } + } + get.setTimeRange(0, timestamp); + gets.add(get); + } + Result[] result = table.get(gets); + return ThriftUtilities.rowResultFromHBase(result); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public void deleteAll( + ByteBuffer tableName, ByteBuffer row, ByteBuffer column, + Map<ByteBuffer, ByteBuffer> attributes) + throws IOError { + deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP, + attributes); + } + + @Override + public void deleteAllTs(ByteBuffer tableName, + ByteBuffer row, + ByteBuffer column, + long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + Table table = null; + try { + table = getTable(tableName); + Delete delete = new Delete(getBytes(row)); + addAttributes(delete, attributes); + byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + delete.addFamily(famAndQf[0], timestamp); + } else { + delete.addColumns(famAndQf[0], famAndQf[1], timestamp); + } + table.delete(delete); + + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally { + closeTable(table); + } + } + + @Override + public void deleteAllRow( + ByteBuffer tableName, ByteBuffer row, + Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes); + } + + @Override + public void deleteAllRowTs( + ByteBuffer tableName, ByteBuffer row, long timestamp, + Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + Table table = null; + try { + table = getTable(tableName); + Delete delete = new Delete(getBytes(row), timestamp); + addAttributes(delete, attributes); + table.delete(delete); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally { + closeTable(table); + } + } + + @Override + public void createTable(ByteBuffer in_tableName, + List<ColumnDescriptor> columnFamilies) throws IOError, IllegalArgument, AlreadyExists { + TableName tableName = getTableName(in_tableName); + try { + if (getAdmin().tableExists(tableName)) { + throw new AlreadyExists("table name already in use"); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + for (ColumnDescriptor col : columnFamilies) { + HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col); + desc.addFamily(colDesc); + } + getAdmin().createTable(desc); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } catch (IllegalArgumentException e) { + LOG.warn(e.getMessage(), e); + throw new IllegalArgument(Throwables.getStackTraceAsString(e)); + } + } + + private static TableName getTableName(ByteBuffer buffer) { + return TableName.valueOf(getBytes(buffer)); + } + + @Override + public void deleteTable(ByteBuffer in_tableName) throws IOError { + TableName tableName = getTableName(in_tableName); + if (LOG.isDebugEnabled()) { + LOG.debug("deleteTable: table={}", tableName); + } + try { + if (!getAdmin().tableExists(tableName)) { + throw new IOException("table does not exist"); + } + getAdmin().deleteTable(tableName); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } + } + + @Override + public void mutateRow(ByteBuffer tableName, ByteBuffer row, + List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes) + throws IOError, IllegalArgument { + mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes); + } + + @Override + public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, + List<Mutation> mutations, long timestamp, + Map<ByteBuffer, ByteBuffer> attributes) + throws IOError, IllegalArgument { + Table table = null; + try { + table = getTable(tableName); + Put put = new Put(getBytes(row), timestamp); + addAttributes(put, attributes); + + Delete delete = new Delete(getBytes(row)); + addAttributes(delete, attributes); + if (metrics != null) { + metrics.incNumRowKeysInBatchMutate(mutations.size()); + } + + // I apologize for all this mess :) + CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); + for (Mutation m : mutations) { + byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column)); + if (m.isDelete) { + if (famAndQf.length == 1) { + delete.addFamily(famAndQf[0], timestamp); + } else { + delete.addColumns(famAndQf[0], famAndQf[1], timestamp); + } + delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + } else { + if(famAndQf.length == 1) { + LOG.warn("No column qualifier specified. Delete is the only mutation supported " + + "over the whole column family."); + } else { + put.add(builder.clear() + .setRow(put.getRow()) + .setFamily(famAndQf[0]) + .setQualifier(famAndQf[1]) + .setTimestamp(put.getTimestamp()) + .setType(Cell.Type.Put) + .setValue(m.value != null ? getBytes(m.value) + : HConstants.EMPTY_BYTE_ARRAY) + .build()); + } + put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + } + } + if (!delete.isEmpty()) { + table.delete(delete); + } + if (!put.isEmpty()) { + table.put(put); + } + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } catch (IllegalArgumentException e) { + LOG.warn(e.getMessage(), e); + throw new IllegalArgument(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); + } + } + + @Override + public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches, + Map<ByteBuffer, ByteBuffer> attributes) + throws IOError, IllegalArgument, TException { + mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes); + } + + @Override + public void mutateRowsTs( + ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp, + Map<ByteBuffer, ByteBuffer> attributes) + throws IOError, IllegalArgument, TException { + List<Put> puts = new ArrayList<>(); + List<Delete> deletes = new ArrayList<>(); + CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); + for (BatchMutation batch : rowBatches) { + byte[] row = getBytes(batch.row); + List<Mutation> mutations = batch.mutations; + Delete delete = new Delete(row); + addAttributes(delete, attributes); + Put put = new Put(row, timestamp); + addAttributes(put, attributes); + for (Mutation m : mutations) { + byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column)); + if (m.isDelete) { + // no qualifier, family only. + if (famAndQf.length == 1) { + delete.addFamily(famAndQf[0], timestamp); + } else { + delete.addColumns(famAndQf[0], famAndQf[1], timestamp); + } + delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL + : Durability.SKIP_WAL); + } else { + if (famAndQf.length == 1) { + LOG.warn("No column qualifier specified. Delete is the only mutation supported " + + "over the whole column family."); + } + if (famAndQf.length == 2) { + try { + put.add(builder.clear() + .setRow(put.getRow()) + .setFamily(famAndQf[0]) + .setQualifier(famAndQf[1]) + .setTimestamp(put.getTimestamp()) + .setType(Cell.Type.Put) + .setValue(m.value != null ? getBytes(m.value) + : HConstants.EMPTY_BYTE_ARRAY) + .build()); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } else { + throw new IllegalArgumentException("Invalid famAndQf provided."); + } + put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + } + } + if (!delete.isEmpty()) { + deletes.add(delete); + } + if (!put.isEmpty()) { + puts.add(put); + } + } + + Table table = null; + try { + table = getTable(tableName); + if (!puts.isEmpty()) { + table.put(puts); + } + if (!deletes.isEmpty()) { + table.delete(deletes); + } + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } catch (IllegalArgumentException e) { + LOG.warn(e.getMessage(), e); + throw new IllegalArgument(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); + } + } + + @Override + public long atomicIncrement( + ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount) + throws IOError, IllegalArgument, TException { + byte [][] famAndQf = CellUtil.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount); + } + return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount); + } + + protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, + byte [] family, byte [] qualifier, long amount) + throws IOError, IllegalArgument, TException { + Table table = null; + try { + table = getTable(tableName); + return table.incrementColumnValue( + getBytes(row), family, qualifier, amount); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally { + closeTable(table); + } + } + + @Override + public void scannerClose(int id) throws IOError, IllegalArgument { + LOG.debug("scannerClose: id={}", id); + ResultScannerWrapper resultScannerWrapper = getScanner(id); + if (resultScannerWrapper == null) { + LOG.warn("scanner ID is invalid"); + throw new IllegalArgument("scanner ID is invalid"); + } + resultScannerWrapper.getScanner().close(); + removeScanner(id); + } + + @Override + public List<TRowResult> scannerGetList(int id,int nbRows) + throws IllegalArgument, IOError { + LOG.debug("scannerGetList: id={}", id); + ResultScannerWrapper resultScannerWrapper = getScanner(id); + if (null == resultScannerWrapper) { + String message = "scanner ID is invalid"; + LOG.warn(message); + throw new IllegalArgument("scanner ID is invalid"); + } + + Result [] results; + try { + results = resultScannerWrapper.getScanner().next(nbRows); + if (null == results) { + return new ArrayList<>(); + } + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } + return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted()); + } + + @Override + public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError { + return scannerGetList(id,1); + } + + @Override + public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan, + Map<ByteBuffer, ByteBuffer> attributes) + throws IOError { + + Table table = null; + try { + table = getTable(tableName); + Scan scan = new Scan(); + addAttributes(scan, attributes); + if (tScan.isSetStartRow()) { + scan.setStartRow(tScan.getStartRow()); + } + if (tScan.isSetStopRow()) { + scan.setStopRow(tScan.getStopRow()); + } + if (tScan.isSetTimestamp()) { + scan.setTimeRange(0, tScan.getTimestamp()); + } + if (tScan.isSetCaching()) { + scan.setCaching(tScan.getCaching()); + } + if (tScan.isSetBatchSize()) { + scan.setBatch(tScan.getBatchSize()); + } + if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) { + for(ByteBuffer column : tScan.getColumns()) { + byte [][] famQf = CellUtil.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + if (tScan.isSetFilterString()) { + ParseFilter parseFilter = new ParseFilter(); + scan.setFilter( + parseFilter.parseFilterString(tScan.getFilterString())); + } + if (tScan.isSetReversed()) { + scan.setReversed(tScan.isReversed()); + } + if (tScan.isSetCacheBlocks()) { + scan.setCacheBlocks(tScan.isCacheBlocks()); + } + return addScanner(table.getScanner(scan), tScan.sortColumns); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, + List<ByteBuffer> columns, + Map<ByteBuffer, ByteBuffer> attributes) throws IOError { + + Table table = null; + try { + table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow)); + addAttributes(scan, attributes); + if(columns != null && !columns.isEmpty()) { + for(ByteBuffer column : columns) { + byte [][] famQf = CellUtil.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan), false); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, + ByteBuffer stopRow, List<ByteBuffer> columns, + Map<ByteBuffer, ByteBuffer> attributes) + throws IOError, TException { + + Table table = null; + try { + table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); + addAttributes(scan, attributes); + if(columns != null && !columns.isEmpty()) { + for(ByteBuffer column : columns) { + byte [][] famQf = CellUtil.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan), false); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public int scannerOpenWithPrefix(ByteBuffer tableName, + ByteBuffer startAndPrefix, + List<ByteBuffer> columns, + Map<ByteBuffer, ByteBuffer> attributes) + throws IOError, TException { + + Table table = null; + try { + table = getTable(tableName); + Scan scan = new Scan(getBytes(startAndPrefix)); + addAttributes(scan, attributes); + Filter f = new WhileMatchFilter( + new PrefixFilter(getBytes(startAndPrefix))); + scan.setFilter(f); + if (columns != null && !columns.isEmpty()) { + for(ByteBuffer column : columns) { + byte [][] famQf = CellUtil.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan), false); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, + List<ByteBuffer> columns, long timestamp, + Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException { + + Table table = null; + try { + table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow)); + addAttributes(scan, attributes); + scan.setTimeRange(0, timestamp); + if (columns != null && !columns.isEmpty()) { + for (ByteBuffer column : columns) { + byte [][] famQf = CellUtil.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan), false); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, + ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp, + Map<ByteBuffer, ByteBuffer> attributes) + throws IOError, TException { + + Table table = null; + try { + table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); + addAttributes(scan, attributes); + scan.setTimeRange(0, timestamp); + if (columns != null && !columns.isEmpty()) { + for (ByteBuffer column : columns) { + byte [][] famQf = CellUtil.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + scan.setTimeRange(0, timestamp); + return addScanner(table.getScanner(scan), false); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors( + ByteBuffer tableName) throws IOError, TException { + + Table table = null; + try { + TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>(); + + table = getTable(tableName); + HTableDescriptor desc = table.getTableDescriptor(); + + for (HColumnDescriptor e : desc.getFamilies()) { + ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e); + columns.put(col.name, col); + } + return columns; + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally { + closeTable(table); + } + } + + private void closeTable(Table table) throws IOError { + try{ + if(table != null){ + table.close(); + } + } catch (IOException e){ + LOG.error(e.getMessage(), e); + throw getIOError(e); + } + } + + @Override + public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError { + try { + byte[] row = getBytes(searchRow); + Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row, + HConstants.CATALOG_FAMILY); + + if (startRowResult == null) { + throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row=" + + Bytes.toStringBinary(row)); + } + + // find region start and end keys + RegionInfo regionInfo = MetaTableAccessor.getRegionInfo(startRowResult); + if (regionInfo == null) { + throw new IOException("RegionInfo REGIONINFO was null or " + + " empty in Meta for row=" + + Bytes.toStringBinary(row)); + } + TRegionInfo region = new TRegionInfo(); + region.setStartKey(regionInfo.getStartKey()); + region.setEndKey(regionInfo.getEndKey()); + region.id = regionInfo.getRegionId(); + region.setName(regionInfo.getRegionName()); + region.version = HREGION_VERSION; // version not used anymore, PB encoding used. + + // find region assignment to server + ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0); + if (serverName != null) { + region.setServerName(Bytes.toBytes(serverName.getHostname())); + region.port = serverName.getPort(); + } + return region; + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } + } + + private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family) + throws IOException { + Scan scan = new Scan(row); + scan.setReversed(true); + scan.addFamily(family); + scan.setStartRow(row); + try (Table table = getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + return scanner.next(); + } + } + + @Override + public void increment(TIncrement tincrement) throws IOError, TException { + + if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) { + throw new TException("Must supply a table and a row key; can't increment"); + } + + if (conf.getBoolean(COALESCE_INC_KEY, false)) { + this.coalescer.queueIncrement(tincrement); + return; + } + + Table table = null; + try { + table = getTable(tincrement.getTable()); + Increment inc = ThriftUtilities.incrementFromThrift(tincrement); + table.increment(inc); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public void incrementRows(List<TIncrement> tincrements) throws IOError, TException { + if (conf.getBoolean(COALESCE_INC_KEY, false)) { + this.coalescer.queueIncrements(tincrements); + return; + } + for (TIncrement tinc : tincrements) { + increment(tinc); + } + } + + @Override + public List<TCell> append(TAppend tappend) throws IOError, TException { + if (tappend.getRow().length == 0 || tappend.getTable().length == 0) { + throw new TException("Must supply a table and a row key; can't append"); + } + + Table table = null; + try { + table = getTable(tappend.getTable()); + Append append = ThriftUtilities.appendFromThrift(tappend); + Result result = table.append(append); + return ThriftUtilities.cellFromHBase(result.rawCells()); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } finally{ + closeTable(table); + } + } + + @Override + public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, + ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, + IllegalArgument, TException { + Put put; + try { + put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP); + addAttributes(put, attributes); + + byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column)); + put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) + .setRow(put.getRow()) + .setFamily(famAndQf[0]) + .setQualifier(famAndQf[1]) + .setTimestamp(put.getTimestamp()) + .setType(Cell.Type.Put) + .setValue(mput.value != null ? getBytes(mput.value) + : HConstants.EMPTY_BYTE_ARRAY) + .build()); + put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + } catch (IOException | IllegalArgumentException e) { + LOG.warn(e.getMessage(), e); + throw new IllegalArgument(Throwables.getStackTraceAsString(e)); + } + + Table table = null; + try { + table = getTable(tableName); + byte[][] famAndQf = CellUtil.parseColumn(getBytes(column)); + Table.CheckAndMutateBuilder mutateBuilder = + table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]); + if (value != null) { + return mutateBuilder.ifEquals(getBytes(value)).thenPut(put); + } else { + return mutateBuilder.ifNotExists().thenPut(put); + } + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } catch (IllegalArgumentException e) { + LOG.warn(e.getMessage(), e); + throw new IllegalArgument(Throwables.getStackTraceAsString(e)); + } finally { + closeTable(table); + } + } + + private static IOError getIOError(Throwable throwable) { + IOError error = new IOErrorWithCause(throwable); + error.setMessage(Throwables.getStackTraceAsString(throwable)); + return error; + } + + /** + * Adds all the attributes into the Operation object + */ + private static void addAttributes(OperationWithAttributes op, + Map<ByteBuffer, ByteBuffer> attributes) { + if (attributes == null || attributes.isEmpty()) { + return; + } + for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) { + String name = Bytes.toStringBinary(getBytes(entry.getKey())); + byte[] value = getBytes(entry.getValue()); + op.setAttribute(name, value); + } + } + + protected static class ResultScannerWrapper { + + private final ResultScanner scanner; + private final boolean sortColumns; + public ResultScannerWrapper(ResultScanner resultScanner, + boolean sortResultColumns) { + scanner = resultScanner; + sortColumns = sortResultColumns; + } + + public ResultScanner getScanner() { + return scanner; + } + + public boolean isColumnSorted() { + return sortColumns; + } + } + + public static class IOErrorWithCause extends IOError { + private final Throwable cause; + public IOErrorWithCause(Throwable cause) { + this.cause = cause; + } + + @Override + public synchronized Throwable getCause() { + return cause; + } + + @Override + public boolean equals(Object other) { + if (super.equals(other) && + other instanceof IOErrorWithCause) { + Throwable otherCause = ((IOErrorWithCause) other).getCause(); + if (this.getCause() != null) { + return otherCause != null && this.getCause().equals(otherCause); + } else { + return otherCause == null; + } + } + return false; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (cause != null ? cause.hashCode() : 0); + return result; + } + } + + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java index 4c9a35b..7f85152 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.thrift; -import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_KEYTAB_FILE_KEY; -import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_PRINCIPAL_KEY; +import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY; import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -58,7 +58,7 @@ public class ThriftHttpServlet extends TServlet { private static final Logger LOG = LoggerFactory.getLogger(ThriftHttpServlet.class.getName()); private final transient UserGroupInformation serviceUGI; private final transient UserGroupInformation httpUGI; - private final transient ThriftServerRunner.HBaseHandler hbaseHandler; + private final transient HBaseServiceHandler handler; private final boolean doAsEnabled; private final boolean securityEnabled; @@ -67,11 +67,11 @@ public class ThriftHttpServlet extends TServlet { public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory, UserGroupInformation serviceUGI, Configuration conf, - ThriftServerRunner.HBaseHandler hbaseHandler, boolean securityEnabled, boolean doAsEnabled) + HBaseServiceHandler handler, boolean securityEnabled, boolean doAsEnabled) throws IOException { super(processor, protocolFactory); this.serviceUGI = serviceUGI; - this.hbaseHandler = hbaseHandler; + this.handler = handler; this.securityEnabled = securityEnabled; this.doAsEnabled = doAsEnabled; @@ -146,7 +146,7 @@ public class ThriftHttpServlet extends TServlet { } effectiveUser = doAsUserFromQuery; } - hbaseHandler.setEffectiveUser(effectiveUser); + handler.setEffectiveUser(effectiveUser); super.doPost(request, response); }