[ 
https://issues.apache.org/jira/browse/TAJO-1206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236702#comment-14236702
 ] 

ASF GitHub Bot commented on TAJO-1206:
--------------------------------------

Github user babokim commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/268#discussion_r21415697
  
    --- Diff: 
tajo-thrift-server/src/main/java/org/apache/tajo/thrift/TajoThriftServiceImpl.java
 ---
    @@ -0,0 +1,1013 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.thrift;
    +
    +import com.google.protobuf.ByteString;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.util.StringUtils;
    +import org.apache.tajo.*;
    +import org.apache.tajo.TajoIdProtos.SessionIdProto;
    +import org.apache.tajo.TajoProtos.QueryState;
    +import org.apache.tajo.catalog.Schema;
    +import org.apache.tajo.catalog.TableDesc;
    +import org.apache.tajo.client.QueryStatus;
    +import org.apache.tajo.client.TajoClient;
    +import org.apache.tajo.client.TajoClientImpl;
    +import org.apache.tajo.client.TajoClientUtil;
    +import org.apache.tajo.conf.TajoConf;
    +import org.apache.tajo.ipc.ClientProtos.*;
    +import org.apache.tajo.jdbc.FetchResultSet;
    +import org.apache.tajo.jdbc.TajoMemoryResultSet;
    +import org.apache.tajo.jdbc.TajoResultSet;
    +import org.apache.tajo.jdbc.TajoResultSetBase;
    +import org.apache.tajo.storage.RowStoreUtil;
    +import org.apache.tajo.storage.Tuple;
    +import org.apache.tajo.thrift.generated.*;
    +import org.apache.tajo.util.TUtil;
    +import org.apache.tajo.util.TajoIdUtils;
    +import org.apache.thrift.TException;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.util.*;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +public class TajoThriftServiceImpl implements TajoThriftService.Iface {
    +  private static final Log LOG = 
LogFactory.getLog(TajoThriftServiceImpl.class);
    +
    +  private Map<TajoIdProtos.SessionIdProto, TajoClientHolder> tajoClientMap 
=
    +      new ConcurrentHashMap<SessionIdProto, TajoClientHolder>();
    +
    +  private Map<String, ResultSetHolder> queryResultSets = new 
HashMap<String, ResultSetHolder>();
    +  private Map<String, QuerySubmitTask> querySubmitTasks = new 
HashMap<String, QuerySubmitTask>();
    +  private ExecutorService executorService;
    +  private ResultSetAndTaskCleaner resultSetAndTaskCleaner;
    +
    +  private int maxSession;
    +  private TajoConf tajoConf;
    +
    +  public TajoThriftServiceImpl(TajoConf tajoConf) {
    +    this.tajoConf = tajoConf;
    +    this.maxSession = 
tajoConf.getInt(ThriftServerConstants.MAX_SESSION_CONF_KEY, 100);
    +    int maxTaskRunner = 
tajoConf.getInt(ThriftServerConstants.MAX_TASK_RUNNER_CONF_KEY, 200);
    +
    +    this.executorService = Executors.newFixedThreadPool(maxTaskRunner);
    +    this.resultSetAndTaskCleaner = new ResultSetAndTaskCleaner();
    +
    +    this.resultSetAndTaskCleaner.start();
    +  }
    +
    +  public void stop() {
    +    if (executorService != null) {
    +      executorService.shutdownNow();
    +    }
    +
    +    if (resultSetAndTaskCleaner != null) {
    +      resultSetAndTaskCleaner.interrupt();
    +    }
    +
    +    synchronized (tajoClientMap) {
    +      for (TajoClientHolder eachClient : tajoClientMap.values()) {
    +        eachClient.tajoClient.close();
    +      }
    +    }
    +  }
    +
    +  public TajoClient getTajoClient(SessionIdProto sessionId) throws 
TServiceException {
    +    if (sessionId == null || !sessionId.hasId()) {
    +      throw new TServiceException("No sessionId", "");
    +    }
    +
    +    synchronized (tajoClientMap) {
    +      if (tajoClientMap.size() >= maxSession) {
    +        throw new TServiceException("exceed max session [" + maxSession + 
"]", "");
    +      }
    +      TajoClientHolder tajoClientHolder = tajoClientMap.get(sessionId);
    +
    +      //if there is multiple proxy server, TajoProxyClient call randomly. 
So certain proxy server hasn't session.
    +      if (tajoClientHolder == null) {
    +        //throw new ServiceException("No session info:" + 
sessionId.getId());
    +        try {
    +          TajoClient tajoClient = new TajoClientImpl(tajoConf);
    +          tajoClient.setSessionId(sessionId);
    +          tajoClientHolder = new TajoClientHolder();
    +          tajoClientHolder.tajoClient = tajoClient;
    +          tajoClientHolder.lastTouchTime = System.currentTimeMillis();
    +
    +          tajoClientMap.put(sessionId, tajoClientHolder);
    +          return tajoClient;
    +        } catch (Exception e) {
    +          throw new TServiceException(e.getMessage(), 
StringUtils.stringifyException(e));
    +        }
    +      } else {
    +        tajoClientHolder.lastTouchTime = System.currentTimeMillis();
    +        return tajoClientHolder.tajoClient;
    +      }
    +    }
    +  }
    +
    +  private TServerResponse makeErrorServerResponse(Throwable t) {
    +    LOG.error(t.getMessage(), t);
    +    TServerResponse response = new TServerResponse();
    +    response.setErrorMessage(t.getMessage());
    +    response.setResultCode(ResultCode.ERROR.name());
    +    response.setBoolResult(false);
    +    return response;
    +  }
    +
    +  private TGetQueryStatusResponse makeErrorQueryStatusResponse(String 
errorMessage) {
    +    LOG.error("Query error:" + errorMessage);
    +
    +    TGetQueryStatusResponse response = new TGetQueryStatusResponse();
    +    response.setErrorMessage(errorMessage);
    +    response.setState(TajoProtos.QueryState.QUERY_ERROR.name());
    +    response.setResultCode(ResultCode.ERROR.name());
    +    QueryId queryId = QueryIdFactory.newQueryId(0, 0);  //DUMMY
    +    response.setQueryId(queryId.toString());
    +
    +    return response;
    +  }
    +
    +  @Override
    +  public TGetQueryStatusResponse submitQuery(String sessionIdStr, String 
query, boolean isJson) throws TException {
    +    if (LOG.isDebugEnabled()) {
    +      LOG.debug("Run Query:" + query);
    +    }
    +    SessionIdProto sessionId = TajoThriftUtil.makeSessionId(sessionIdStr);
    +    TGetQueryStatusResponse queryStatus = new TGetQueryStatusResponse();
    +
    +    try {
    +      TajoClient tajoClient = getTajoClient(sessionId);
    +
    +      SubmitQueryResponse clientResponse = tajoClient.executeQuery(query);
    +
    +      if (clientResponse.hasErrorMessage()) {
    +        return 
makeErrorQueryStatusResponse(clientResponse.getErrorMessage());
    +      }
    +
    +      if (clientResponse.getIsForwarded()) {
    +        QuerySubmitTask querySubmitTask = new QuerySubmitTask(sessionId);
    +        querySubmitTask.queryProgressInfo.queryId = new 
QueryId(clientResponse.getQueryId());
    +
    +        QueryStatus clientQueryStatus = 
tajoClient.getQueryStatus(querySubmitTask.queryProgressInfo.queryId);
    +        querySubmitTask.queryProgressInfo.lastTouchTime = 
System.currentTimeMillis();
    +
    +        queryStatus.setQueryId(clientQueryStatus.getQueryId().toString());
    +        queryStatus.setResultCode(ResultCode.OK.name());
    +        queryStatus.setState(clientQueryStatus.getState().name());
    +        queryStatus.setProgress(queryStatus.getProgress());
    +        queryStatus.setSubmitTime(queryStatus.getSubmitTime());
    +        queryStatus.setFinishTime(queryStatus.getFinishTime());
    +        queryStatus.setHasResult(clientQueryStatus.hasResult());
    +
    +        if (queryStatus.getErrorMessage() != null) {
    +          queryStatus.setErrorMessage(clientQueryStatus.getErrorMessage());
    +        }
    +
    +        if (queryStatus.getQueryMasterHost() != null) {
    +          
queryStatus.setQueryMasterHost(clientQueryStatus.getQueryMasterHost());
    +          
queryStatus.setQueryMasterPort(clientQueryStatus.getQueryMasterPort());
    +        }
    +
    +        querySubmitTask.queryProgressInfo.queryStatus = queryStatus;
    +        querySubmitTask.queryProgressInfo.query = query;
    +
    +        synchronized (querySubmitTasks) {
    +          LOG.info(querySubmitTask.getKey() + " query started");
    +          querySubmitTasks.put(querySubmitTask.getKey(), querySubmitTask);
    +        }
    +        executorService.submit(querySubmitTask);
    +        return querySubmitTask.queryProgressInfo.queryStatus;
    +      } else {
    +        QueryId queryId = new QueryId(clientResponse.getQueryId());
    +
    +        queryStatus.setQueryId(queryId.toString());
    +        queryStatus.setResultCode(ResultCode.OK.name());
    +        queryStatus.setState(TajoProtos.QueryState.QUERY_SUCCEEDED.name());
    +        queryStatus.setProgress(1.0f);
    +        queryStatus.setSubmitTime(System.currentTimeMillis());
    +        queryStatus.setFinishTime(System.currentTimeMillis());
    +        queryStatus.setHasResult(true);
    +
    +        //select * from table limit 100 or select 1+1
    --- End diff --
    
    This comment is not for debugging. It shows a example.


> Implements Thrift proxy server.
> -------------------------------
>
>                 Key: TAJO-1206
>                 URL: https://issues.apache.org/jira/browse/TAJO-1206
>             Project: Tajo
>          Issue Type: New Feature
>            Reporter: Hyoungjun Kim
>            Assignee: Hyoungjun Kim
>         Attachments: Tajo-ThriftServer.png
>
>
> Currently Tajo supports only Java client. To support multiple languages like 
> Python, Thrift is very useful tool. Hive also supports multiple languages 
> using Thrift server.
> Thrift Server may be useful to separate a user from Tajo cluster for security 
> issues.
> In this issue, The following functions will be implemented.
> - Thrift server for Tajo
> - Web UI for monitoring Thrift server
> - CLI for Thrift server



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to