[
https://issues.apache.org/jira/browse/TAJO-1206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236704#comment-14236704
]
ASF GitHub Bot commented on TAJO-1206:
--------------------------------------
Github user babokim commented on a diff in the pull request:
https://github.com/apache/tajo/pull/268#discussion_r21415699
--- Diff:
tajo-thrift-server/src/main/java/org/apache/tajo/thrift/client/TajoThriftClient.java
---
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.thrift.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.client.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos.*;
+import org.apache.tajo.thrift.TajoThriftUtil;
+import org.apache.tajo.thrift.ThriftServerConstants;
+import org.apache.tajo.thrift.generated.*;
+import org.apache.tajo.thrift.generated.TajoThriftService.Client;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.ResultSet;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@ThreadSafe
+public class TajoThriftClient {
+ private final Log LOG = LogFactory.getLog(TajoThriftClient.class);
+
+ protected final TajoConf tajoConf;
+
+ protected String thriftServer;
+
+ protected Client currentClient;
+
+ private final String baseDatabase;
+
+ private final UserGroupInformation userInfo;
+
+ volatile String sessionId;
+
+ private AtomicBoolean closed = new AtomicBoolean(false);
+
+ // Thrift client is thread unsafe. So every call should be synchronized
with callMonitor.
+ private Object callMonitor = new Object();
+
+ public TajoThriftClient(TajoConf tajoConf, String thriftServer) throws
IOException {
+ this(tajoConf, thriftServer, null);
+
+ }
+
+ /**
+ * Connect to ThriftServer
+ *
+ * @param tajoConf TajoConf
+ * @param thriftServer ThriftServer
+ * @param baseDatabase The base database name. It is case sensitive. If
it is null,
+ * the 'default' database will be used.
+ * @throws java.io.IOException
+ */
+ public TajoThriftClient(TajoConf tajoConf, String thriftServer,
@Nullable String baseDatabase) throws IOException {
+ this.tajoConf = tajoConf;
+ this.thriftServer = thriftServer;
+ this.baseDatabase = baseDatabase;
+
+ this.userInfo = UserGroupInformation.getCurrentUser();
+
+ synchronized (callMonitor) {
+ makeConnection();
+ }
+ }
+
+ public TajoConf getConf() {
+ return tajoConf;
+ }
+
+ public UserGroupInformation getUserInfo() {
+ return userInfo;
+ }
+
+ protected void makeConnection() throws IOException {
+ // Should be synchronized with callMonitor
+ if (currentClient == null) {
+ String[] tokens = thriftServer.split(":");
+ TTransport transport = new TSocket(tokens[0],
Integer.parseInt(tokens[1]));
+ try {
+ transport.open();
+ } catch (Exception e) {
+ LOG.error("Can not make protocol: " + thriftServer + ", " +
e.getMessage(), e);
+ throw new IOException("Can not make protocol", e);
+ }
+ currentClient = new TajoThriftService.Client(new
TBinaryProtocol(transport));
+ }
+ }
+
+ public boolean createDatabase(final String databaseName) throws
Exception {
+ return new ReconnectThriftServerCallable<Boolean>(currentClient) {
+ public Boolean syncCall(Client client) throws Exception {
+ checkSessionAndGet(client);
+ return client.createDatabase(sessionId, databaseName);
+ }
+ }.withRetries();
+ }
+
+ public boolean existDatabase(final String databaseName) throws Exception
{
+ return new ReconnectThriftServerCallable<Boolean>(currentClient) {
+ public Boolean syncCall(Client client) throws Exception {
+ checkSessionAndGet(client);
+ return client.existDatabase(sessionId, databaseName);
+ }
+ }.withRetries();
+ }
+
+ public boolean dropDatabase(final String databaseName) throws Exception {
+ return new ReconnectThriftServerCallable<Boolean>(currentClient) {
+ public Boolean syncCall(Client client) throws Exception {
+ checkSessionAndGet(client);
+ return client.dropDatabase(sessionId, databaseName);
+ }
+ }.withRetries();
+ }
+
+ public List<String> getAllDatabaseNames() throws Exception {
+ return new ReconnectThriftServerCallable<List<String>>(currentClient) {
+ public List<String> syncCall(Client client) throws Exception {
+ checkSessionAndGet(client);
+ return client.getAllDatabases(sessionId);
+ }
+ }.withRetries();
+ }
+
+ public boolean existTable(final String tableName) throws Exception {
+ return new ReconnectThriftServerCallable<Boolean>(currentClient) {
+ public Boolean syncCall(Client client) throws Exception {
+ try {
+ checkSessionAndGet(client);
+ return client.existTable(sessionId, tableName);
+ } catch (Exception e) {
+ abort();
+ throw e;
+ }
+ }
+ }.withRetries();
+ }
+
+ public boolean dropTable(final String tableName) throws Exception {
+ return dropTable(tableName, false);
+ }
+
+ public boolean dropTable(final String tableName, final boolean purge)
throws Exception {
+ return new ReconnectThriftServerCallable<Boolean>(currentClient) {
+ public Boolean syncCall(Client client) throws Exception {
+ checkSessionAndGet(client);
+ return client.dropTable(sessionId, tableName, purge);
+ }
+ }.withRetries();
+ }
+
+ public List<String> getTableList(@Nullable final String databaseName)
throws Exception {
+ return new ReconnectThriftServerCallable<List<String>>(currentClient) {
+ public List<String> syncCall(Client client) throws Exception {
+ checkSessionAndGet(client);
+ return client.getTableList(sessionId, databaseName);
+ }
+ }.withRetries();
+ }
+
+ public TTableDesc getTableDesc(final String tableName) throws Exception {
+ return new ReconnectThriftServerCallable<TTableDesc>(currentClient) {
+ public TTableDesc syncCall(Client client) throws Exception {
+ checkSessionAndGet(client);
+ return client.getTableDesc(sessionId, tableName);
+ }
+ }.withRetries();
+ }
+
+ public void closeQuery(String queryId) {
+ try {
+ checkSessionAndGet(currentClient);
+
+ currentClient.closeQuery(sessionId, queryId);
+ } catch (Exception e) {
+ LOG.warn("Fail to close query (qid=" + queryId + ", msg=" +
e.getMessage() + ")", e);
+ }
+ }
+
+ public TGetQueryStatusResponse executeQuery(final String sql) throws
Exception {
+ return new
ReconnectThriftServerCallable<TGetQueryStatusResponse>(currentClient) {
+ public TGetQueryStatusResponse syncCall(Client client) throws
Exception {
+ checkSessionAndGet(client);
+ return client.submitQuery(sessionId, sql, false);
+ }
+ }.withRetries();
+ }
+
+ public boolean updateQuery(final String sql) throws Exception {
+ return new ReconnectThriftServerCallable<Boolean>(currentClient) {
+ public Boolean syncCall(Client client) throws Exception {
+ checkSessionAndGet(client);
+ return client.updateQuery(sessionId, sql).isBoolResult();
+ }
+ }.withRetries();
+ }
+
+ public ResultSet executeQueryAndGetResult(final String sql) throws
ServiceException, IOException {
+ try {
+ TGetQueryStatusResponse response = new
ReconnectThriftServerCallable<TGetQueryStatusResponse>(currentClient) {
+ public TGetQueryStatusResponse syncCall(Client client) throws
Exception {
+ checkSessionAndGet(client);
+ TGetQueryStatusResponse response = null;
+ try {
+ response = client.submitQuery(sessionId, sql, false);
+ } catch (TServiceException e) {
+ abort();
+ throw new IOException(e.getMessage(), e);
+ } catch (Throwable t) {
+ throw new IOException(t.getMessage(), t);
+ }
+ if (!ResultCode.OK.name().equals(response.getResultCode()) ||
response.getErrorMessage() != null) {
+ abort();
+ throw new IOException(response.getErrorMessage());
+ }
+ return response;
+ }
+ }.withRetries();
+
+ if (response != null && response.getQueryId() != null) {
+ return this.getQueryResultAndWait(response.getQueryId(), response);
+ } else {
+ return
createNullResultSet(QueryIdFactory.NULL_QUERY_ID.toString());
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ public ResultSet createNullResultSet(String queryId) throws IOException {
+ TGetQueryStatusResponse emptyQueryStatus = new
TGetQueryStatusResponse();
+ emptyQueryStatus.setResultCode(ResultCode.OK.name());
+ emptyQueryStatus.setState(QueryState.QUERY_SUCCEEDED.name());
+ emptyQueryStatus.setQueryId(queryId);
+
+ TQueryResult emptyQueryResult = new TQueryResult();
+
+ emptyQueryResult.setRows(Collections.<ByteBuffer>emptyList());
+ return new TajoThriftResultSet(this, queryId, emptyQueryResult);
+ }
+
+ public ResultSet getQueryResultAndWait(String queryId,
TGetQueryStatusResponse queryResponse) throws Exception {
+ if (queryResponse.getQueryResult() != null) {
+ //select 1+1
--- End diff --
This comment is not for debugging. It shows a example.
> Implements Thrift proxy server.
> -------------------------------
>
> Key: TAJO-1206
> URL: https://issues.apache.org/jira/browse/TAJO-1206
> Project: Tajo
> Issue Type: New Feature
> Reporter: Hyoungjun Kim
> Assignee: Hyoungjun Kim
> Attachments: Tajo-ThriftServer.png
>
>
> Currently Tajo supports only Java client. To support multiple languages like
> Python, Thrift is very useful tool. Hive also supports multiple languages
> using Thrift server.
> Thrift Server may be useful to separate a user from Tajo cluster for security
> issues.
> In this issue, The following functions will be implemented.
> - Thrift server for Tajo
> - Web UI for monitoring Thrift server
> - CLI for Thrift server
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)