[ 
https://issues.apache.org/jira/browse/TAJO-1206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236704#comment-14236704
 ] 

ASF GitHub Bot commented on TAJO-1206:
--------------------------------------

Github user babokim commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/268#discussion_r21415699
  
    --- Diff: 
tajo-thrift-server/src/main/java/org/apache/tajo/thrift/client/TajoThriftClient.java
 ---
    @@ -0,0 +1,500 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.thrift.client;
    +
    +import com.google.protobuf.ServiceException;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.tajo.QueryIdFactory;
    +import org.apache.tajo.TajoProtos.QueryState;
    +import org.apache.tajo.annotation.Nullable;
    +import org.apache.tajo.annotation.ThreadSafe;
    +import org.apache.tajo.client.*;
    +import org.apache.tajo.conf.TajoConf;
    +import org.apache.tajo.ipc.ClientProtos.*;
    +import org.apache.tajo.thrift.TajoThriftUtil;
    +import org.apache.tajo.thrift.ThriftServerConstants;
    +import org.apache.tajo.thrift.generated.*;
    +import org.apache.tajo.thrift.generated.TajoThriftService.Client;
    +import org.apache.thrift.protocol.TBinaryProtocol;
    +import org.apache.thrift.transport.TSocket;
    +import org.apache.thrift.transport.TTransport;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.sql.ResultSet;
    +import java.util.*;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +@ThreadSafe
    +public class TajoThriftClient {
    +  private final Log LOG = LogFactory.getLog(TajoThriftClient.class);
    +
    +  protected final TajoConf tajoConf;
    +
    +  protected String thriftServer;
    +
    +  protected Client currentClient;
    +
    +  private final String baseDatabase;
    +
    +  private final UserGroupInformation userInfo;
    +
    +  volatile String sessionId;
    +
    +  private AtomicBoolean closed = new AtomicBoolean(false);
    +
    +  // Thrift client is thread unsafe. So every call should be synchronized 
with callMonitor.
    +  private Object callMonitor = new Object();
    +
    +  public TajoThriftClient(TajoConf tajoConf, String thriftServer) throws 
IOException {
    +    this(tajoConf, thriftServer, null);
    +
    +  }
    +
    +  /**
    +   * Connect to ThriftServer
    +   *
    +   * @param tajoConf     TajoConf
    +   * @param thriftServer ThriftServer
    +   * @param baseDatabase The base database name. It is case sensitive. If 
it is null,
    +   *                     the 'default' database will be used.
    +   * @throws java.io.IOException
    +   */
    +  public TajoThriftClient(TajoConf tajoConf, String thriftServer, 
@Nullable String baseDatabase) throws IOException {
    +    this.tajoConf = tajoConf;
    +    this.thriftServer = thriftServer;
    +    this.baseDatabase = baseDatabase;
    +
    +    this.userInfo = UserGroupInformation.getCurrentUser();
    +
    +    synchronized (callMonitor) {
    +      makeConnection();
    +    }
    +  }
    +
    +  public TajoConf getConf() {
    +    return tajoConf;
    +  }
    +
    +  public UserGroupInformation getUserInfo() {
    +    return userInfo;
    +  }
    +
    +  protected void makeConnection() throws IOException {
    +    // Should be synchronized with callMonitor
    +    if (currentClient == null) {
    +      String[] tokens = thriftServer.split(":");
    +      TTransport transport = new TSocket(tokens[0], 
Integer.parseInt(tokens[1]));
    +      try {
    +        transport.open();
    +      } catch (Exception e) {
    +        LOG.error("Can not make protocol: " + thriftServer + ", " + 
e.getMessage(), e);
    +        throw new IOException("Can not make protocol", e);
    +      }
    +      currentClient = new TajoThriftService.Client(new 
TBinaryProtocol(transport));
    +    }
    +  }
    +
    +  public boolean createDatabase(final String databaseName) throws 
Exception {
    +    return new ReconnectThriftServerCallable<Boolean>(currentClient) {
    +      public Boolean syncCall(Client client) throws Exception {
    +        checkSessionAndGet(client);
    +        return client.createDatabase(sessionId, databaseName);
    +      }
    +    }.withRetries();
    +  }
    +
    +  public boolean existDatabase(final String databaseName) throws Exception 
{
    +    return new ReconnectThriftServerCallable<Boolean>(currentClient) {
    +      public Boolean syncCall(Client client) throws Exception {
    +        checkSessionAndGet(client);
    +        return client.existDatabase(sessionId, databaseName);
    +      }
    +    }.withRetries();
    +  }
    +
    +  public boolean dropDatabase(final String databaseName) throws Exception {
    +    return new ReconnectThriftServerCallable<Boolean>(currentClient) {
    +      public Boolean syncCall(Client client) throws Exception {
    +        checkSessionAndGet(client);
    +        return client.dropDatabase(sessionId, databaseName);
    +      }
    +    }.withRetries();
    +  }
    +
    +  public List<String> getAllDatabaseNames() throws Exception {
    +    return new ReconnectThriftServerCallable<List<String>>(currentClient) {
    +      public List<String> syncCall(Client client) throws Exception {
    +        checkSessionAndGet(client);
    +        return client.getAllDatabases(sessionId);
    +      }
    +    }.withRetries();
    +  }
    +
    +  public boolean existTable(final String tableName) throws Exception {
    +    return new ReconnectThriftServerCallable<Boolean>(currentClient) {
    +      public Boolean syncCall(Client client) throws Exception {
    +        try {
    +          checkSessionAndGet(client);
    +          return client.existTable(sessionId, tableName);
    +        } catch (Exception e) {
    +          abort();
    +          throw e;
    +        }
    +      }
    +    }.withRetries();
    +  }
    +
    +  public boolean dropTable(final String tableName) throws Exception {
    +    return dropTable(tableName, false);
    +  }
    +
    +  public boolean dropTable(final String tableName, final boolean purge) 
throws Exception {
    +    return new ReconnectThriftServerCallable<Boolean>(currentClient) {
    +      public Boolean syncCall(Client client) throws Exception {
    +        checkSessionAndGet(client);
    +        return client.dropTable(sessionId, tableName, purge);
    +      }
    +    }.withRetries();
    +  }
    +
    +  public List<String> getTableList(@Nullable final String databaseName) 
throws Exception {
    +    return new ReconnectThriftServerCallable<List<String>>(currentClient) {
    +      public List<String> syncCall(Client client) throws Exception {
    +        checkSessionAndGet(client);
    +        return client.getTableList(sessionId, databaseName);
    +      }
    +    }.withRetries();
    +  }
    +
    +  public TTableDesc getTableDesc(final String tableName) throws Exception {
    +    return new ReconnectThriftServerCallable<TTableDesc>(currentClient) {
    +      public TTableDesc syncCall(Client client) throws Exception {
    +        checkSessionAndGet(client);
    +        return client.getTableDesc(sessionId, tableName);
    +      }
    +    }.withRetries();
    +  }
    +
    +  public void closeQuery(String queryId) {
    +    try {
    +      checkSessionAndGet(currentClient);
    +
    +      currentClient.closeQuery(sessionId, queryId);
    +    } catch (Exception e) {
    +      LOG.warn("Fail to close query (qid=" + queryId + ", msg=" + 
e.getMessage() + ")", e);
    +    }
    +  }
    +
    +  public TGetQueryStatusResponse executeQuery(final String sql) throws 
Exception {
    +    return new 
ReconnectThriftServerCallable<TGetQueryStatusResponse>(currentClient) {
    +      public TGetQueryStatusResponse syncCall(Client client) throws 
Exception {
    +        checkSessionAndGet(client);
    +        return client.submitQuery(sessionId, sql, false);
    +      }
    +    }.withRetries();
    +  }
    +
    +  public boolean updateQuery(final String sql) throws Exception {
    +    return new ReconnectThriftServerCallable<Boolean>(currentClient) {
    +      public Boolean syncCall(Client client) throws Exception {
    +        checkSessionAndGet(client);
    +        return client.updateQuery(sessionId, sql).isBoolResult();
    +      }
    +    }.withRetries();
    +  }
    +
    +  public ResultSet executeQueryAndGetResult(final String sql) throws 
ServiceException, IOException {
    +    try {
    +      TGetQueryStatusResponse response = new 
ReconnectThriftServerCallable<TGetQueryStatusResponse>(currentClient) {
    +        public TGetQueryStatusResponse syncCall(Client client) throws 
Exception {
    +          checkSessionAndGet(client);
    +          TGetQueryStatusResponse response = null;
    +          try {
    +            response = client.submitQuery(sessionId, sql, false);
    +          } catch (TServiceException e) {
    +            abort();
    +            throw new IOException(e.getMessage(), e);
    +          } catch (Throwable t) {
    +            throw new IOException(t.getMessage(), t);
    +          }
    +          if (!ResultCode.OK.name().equals(response.getResultCode()) || 
response.getErrorMessage() != null) {
    +            abort();
    +            throw new IOException(response.getErrorMessage());
    +          }
    +          return response;
    +        }
    +      }.withRetries();
    +
    +      if (response != null && response.getQueryId() != null) {
    +        return this.getQueryResultAndWait(response.getQueryId(), response);
    +      } else {
    +        return 
createNullResultSet(QueryIdFactory.NULL_QUERY_ID.toString());
    +      }
    +    } catch (Exception e) {
    +      LOG.error(e.getMessage(), e);
    +      throw new IOException(e.getMessage(), e);
    +    }
    +  }
    +
    +  public ResultSet createNullResultSet(String queryId) throws IOException {
    +    TGetQueryStatusResponse emptyQueryStatus = new 
TGetQueryStatusResponse();
    +    emptyQueryStatus.setResultCode(ResultCode.OK.name());
    +    emptyQueryStatus.setState(QueryState.QUERY_SUCCEEDED.name());
    +    emptyQueryStatus.setQueryId(queryId);
    +
    +    TQueryResult emptyQueryResult = new TQueryResult();
    +
    +    emptyQueryResult.setRows(Collections.<ByteBuffer>emptyList());
    +    return new TajoThriftResultSet(this, queryId, emptyQueryResult);
    +  }
    +
    +  public ResultSet getQueryResultAndWait(String queryId, 
TGetQueryStatusResponse queryResponse) throws Exception {
    +    if (queryResponse.getQueryResult() != null) {
    +      //select 1+1
    --- End diff --
    
    This comment is not for debugging. It shows a example.


> Implements Thrift proxy server.
> -------------------------------
>
>                 Key: TAJO-1206
>                 URL: https://issues.apache.org/jira/browse/TAJO-1206
>             Project: Tajo
>          Issue Type: New Feature
>            Reporter: Hyoungjun Kim
>            Assignee: Hyoungjun Kim
>         Attachments: Tajo-ThriftServer.png
>
>
> Currently Tajo supports only Java client. To support multiple languages like 
> Python, Thrift is very useful tool. Hive also supports multiple languages 
> using Thrift server.
> Thrift Server may be useful to separate a user from Tajo cluster for security 
> issues.
> In this issue, The following functions will be implemented.
> - Thrift server for Tajo
> - Web UI for monitoring Thrift server
> - CLI for Thrift server



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to