[
https://issues.apache.org/jira/browse/TAJO-1206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236702#comment-14236702
]
ASF GitHub Bot commented on TAJO-1206:
--------------------------------------
Github user babokim commented on a diff in the pull request:
https://github.com/apache/tajo/pull/268#discussion_r21415697
--- Diff:
tajo-thrift-server/src/main/java/org/apache/tajo/thrift/TajoThriftServiceImpl.java
---
@@ -0,0 +1,1013 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.thrift;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tajo.*;
+import org.apache.tajo.TajoIdProtos.SessionIdProto;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.client.TajoClientUtil;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos.*;
+import org.apache.tajo.jdbc.FetchResultSet;
+import org.apache.tajo.jdbc.TajoMemoryResultSet;
+import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.jdbc.TajoResultSetBase;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.thrift.generated.*;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.TajoIdUtils;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TajoThriftServiceImpl implements TajoThriftService.Iface {
+ private static final Log LOG =
LogFactory.getLog(TajoThriftServiceImpl.class);
+
+ private Map<TajoIdProtos.SessionIdProto, TajoClientHolder> tajoClientMap
=
+ new ConcurrentHashMap<SessionIdProto, TajoClientHolder>();
+
+ private Map<String, ResultSetHolder> queryResultSets = new
HashMap<String, ResultSetHolder>();
+ private Map<String, QuerySubmitTask> querySubmitTasks = new
HashMap<String, QuerySubmitTask>();
+ private ExecutorService executorService;
+ private ResultSetAndTaskCleaner resultSetAndTaskCleaner;
+
+ private int maxSession;
+ private TajoConf tajoConf;
+
+ public TajoThriftServiceImpl(TajoConf tajoConf) {
+ this.tajoConf = tajoConf;
+ this.maxSession =
tajoConf.getInt(ThriftServerConstants.MAX_SESSION_CONF_KEY, 100);
+ int maxTaskRunner =
tajoConf.getInt(ThriftServerConstants.MAX_TASK_RUNNER_CONF_KEY, 200);
+
+ this.executorService = Executors.newFixedThreadPool(maxTaskRunner);
+ this.resultSetAndTaskCleaner = new ResultSetAndTaskCleaner();
+
+ this.resultSetAndTaskCleaner.start();
+ }
+
+ public void stop() {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+
+ if (resultSetAndTaskCleaner != null) {
+ resultSetAndTaskCleaner.interrupt();
+ }
+
+ synchronized (tajoClientMap) {
+ for (TajoClientHolder eachClient : tajoClientMap.values()) {
+ eachClient.tajoClient.close();
+ }
+ }
+ }
+
+ public TajoClient getTajoClient(SessionIdProto sessionId) throws
TServiceException {
+ if (sessionId == null || !sessionId.hasId()) {
+ throw new TServiceException("No sessionId", "");
+ }
+
+ synchronized (tajoClientMap) {
+ if (tajoClientMap.size() >= maxSession) {
+ throw new TServiceException("exceed max session [" + maxSession +
"]", "");
+ }
+ TajoClientHolder tajoClientHolder = tajoClientMap.get(sessionId);
+
+ //if there is multiple proxy server, TajoProxyClient call randomly.
So certain proxy server hasn't session.
+ if (tajoClientHolder == null) {
+ //throw new ServiceException("No session info:" +
sessionId.getId());
+ try {
+ TajoClient tajoClient = new TajoClientImpl(tajoConf);
+ tajoClient.setSessionId(sessionId);
+ tajoClientHolder = new TajoClientHolder();
+ tajoClientHolder.tajoClient = tajoClient;
+ tajoClientHolder.lastTouchTime = System.currentTimeMillis();
+
+ tajoClientMap.put(sessionId, tajoClientHolder);
+ return tajoClient;
+ } catch (Exception e) {
+ throw new TServiceException(e.getMessage(),
StringUtils.stringifyException(e));
+ }
+ } else {
+ tajoClientHolder.lastTouchTime = System.currentTimeMillis();
+ return tajoClientHolder.tajoClient;
+ }
+ }
+ }
+
+ private TServerResponse makeErrorServerResponse(Throwable t) {
+ LOG.error(t.getMessage(), t);
+ TServerResponse response = new TServerResponse();
+ response.setErrorMessage(t.getMessage());
+ response.setResultCode(ResultCode.ERROR.name());
+ response.setBoolResult(false);
+ return response;
+ }
+
+ private TGetQueryStatusResponse makeErrorQueryStatusResponse(String
errorMessage) {
+ LOG.error("Query error:" + errorMessage);
+
+ TGetQueryStatusResponse response = new TGetQueryStatusResponse();
+ response.setErrorMessage(errorMessage);
+ response.setState(TajoProtos.QueryState.QUERY_ERROR.name());
+ response.setResultCode(ResultCode.ERROR.name());
+ QueryId queryId = QueryIdFactory.newQueryId(0, 0); //DUMMY
+ response.setQueryId(queryId.toString());
+
+ return response;
+ }
+
+ @Override
+ public TGetQueryStatusResponse submitQuery(String sessionIdStr, String
query, boolean isJson) throws TException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Run Query:" + query);
+ }
+ SessionIdProto sessionId = TajoThriftUtil.makeSessionId(sessionIdStr);
+ TGetQueryStatusResponse queryStatus = new TGetQueryStatusResponse();
+
+ try {
+ TajoClient tajoClient = getTajoClient(sessionId);
+
+ SubmitQueryResponse clientResponse = tajoClient.executeQuery(query);
+
+ if (clientResponse.hasErrorMessage()) {
+ return
makeErrorQueryStatusResponse(clientResponse.getErrorMessage());
+ }
+
+ if (clientResponse.getIsForwarded()) {
+ QuerySubmitTask querySubmitTask = new QuerySubmitTask(sessionId);
+ querySubmitTask.queryProgressInfo.queryId = new
QueryId(clientResponse.getQueryId());
+
+ QueryStatus clientQueryStatus =
tajoClient.getQueryStatus(querySubmitTask.queryProgressInfo.queryId);
+ querySubmitTask.queryProgressInfo.lastTouchTime =
System.currentTimeMillis();
+
+ queryStatus.setQueryId(clientQueryStatus.getQueryId().toString());
+ queryStatus.setResultCode(ResultCode.OK.name());
+ queryStatus.setState(clientQueryStatus.getState().name());
+ queryStatus.setProgress(queryStatus.getProgress());
+ queryStatus.setSubmitTime(queryStatus.getSubmitTime());
+ queryStatus.setFinishTime(queryStatus.getFinishTime());
+ queryStatus.setHasResult(clientQueryStatus.hasResult());
+
+ if (queryStatus.getErrorMessage() != null) {
+ queryStatus.setErrorMessage(clientQueryStatus.getErrorMessage());
+ }
+
+ if (queryStatus.getQueryMasterHost() != null) {
+
queryStatus.setQueryMasterHost(clientQueryStatus.getQueryMasterHost());
+
queryStatus.setQueryMasterPort(clientQueryStatus.getQueryMasterPort());
+ }
+
+ querySubmitTask.queryProgressInfo.queryStatus = queryStatus;
+ querySubmitTask.queryProgressInfo.query = query;
+
+ synchronized (querySubmitTasks) {
+ LOG.info(querySubmitTask.getKey() + " query started");
+ querySubmitTasks.put(querySubmitTask.getKey(), querySubmitTask);
+ }
+ executorService.submit(querySubmitTask);
+ return querySubmitTask.queryProgressInfo.queryStatus;
+ } else {
+ QueryId queryId = new QueryId(clientResponse.getQueryId());
+
+ queryStatus.setQueryId(queryId.toString());
+ queryStatus.setResultCode(ResultCode.OK.name());
+ queryStatus.setState(TajoProtos.QueryState.QUERY_SUCCEEDED.name());
+ queryStatus.setProgress(1.0f);
+ queryStatus.setSubmitTime(System.currentTimeMillis());
+ queryStatus.setFinishTime(System.currentTimeMillis());
+ queryStatus.setHasResult(true);
+
+ //select * from table limit 100 or select 1+1
--- End diff --
This comment is not for debugging. It shows a example.
> Implements Thrift proxy server.
> -------------------------------
>
> Key: TAJO-1206
> URL: https://issues.apache.org/jira/browse/TAJO-1206
> Project: Tajo
> Issue Type: New Feature
> Reporter: Hyoungjun Kim
> Assignee: Hyoungjun Kim
> Attachments: Tajo-ThriftServer.png
>
>
> Currently Tajo supports only Java client. To support multiple languages like
> Python, Thrift is very useful tool. Hive also supports multiple languages
> using Thrift server.
> Thrift Server may be useful to separate a user from Tajo cluster for security
> issues.
> In this issue, The following functions will be implemented.
> - Thrift server for Tajo
> - Web UI for monitoring Thrift server
> - CLI for Thrift server
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)